X-Git-Url: http://git.vrable.net/?p=cumulus.git;a=blobdiff_plain;f=store.cc;h=a77d64af0a144ef197116aab55ac93f3e3be98bb;hp=0a833ab322cf32764fbef1171ab6037a48e55b92;hb=HEAD;hpb=15f7cef45e82e54a039e5486f08134c4f9a05471 diff --git a/store.cc b/store.cc index 0a833ab..a77d64a 100644 --- a/store.cc +++ b/store.cc @@ -1,342 +1,392 @@ -/* LBS: An LFS-inspired filesystem backup system - * Copyright (C) 2006 Michael Vrable +/* Cumulus: Efficient Filesystem Backup to the Cloud + * Copyright (C) 2008-2009 The Cumulus Developers + * See the AUTHORS file for a list of contributors. * - * Backup data is stored in a collection of objects, which are grouped together - * into segments for storage purposes. This file provides interfaces for - * reading and writing objects and segments. */ + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ -#include -#include +/* Backup data is stored in a collection of objects, which are grouped together + * into segments for storage purposes. This implementation of the object store + * represents segments as TAR files and objects as files within them. */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "hash.h" +#include "localdb.h" #include "store.h" - +#include "ref.h" +#include "util.h" + +using std::max; +using std::list; +using std::map; +using std::pair; +using std::set; using std::string; -OutputStream::OutputStream() - : bytes_written(0) -{ -} +/* Default filter program is bzip2 */ +const char *filter_program = "bzip2 -c"; +const char *filter_extension = ".bz2"; -void OutputStream::write(const void *data, size_t len) +Tarfile::Tarfile(RemoteFile *file, const string &segment) + : size(0), + segment_name(segment) { - write_internal(data, len); - bytes_written += len; -} + assert(sizeof(struct tar_header) == TAR_BLOCK_SIZE); -void OutputStream::write_u8(uint8_t val) -{ - write(&val, 1); + this->file = file; + this->filter.reset(FileFilter::New(file->get_fd(), filter_program)); } -void OutputStream::write_u16(uint16_t val) +Tarfile::~Tarfile() { - unsigned char buf[2]; + char buf[TAR_BLOCK_SIZE]; - buf[0] = val & 0xff; - buf[1] = (val >> 8) & 0xff; - write(buf, 2); -} + /* Append the EOF marker: two blocks filled with nulls. */ + memset(buf, 0, sizeof(buf)); + tar_write(buf, TAR_BLOCK_SIZE); + tar_write(buf, TAR_BLOCK_SIZE); -void OutputStream::write_u32(uint32_t val) -{ - unsigned char buf[4]; + if (close(filter->get_wrapped_fd()) != 0) + fatal("Error closing Tarfile"); - buf[0] = val & 0xff; - buf[1] = (val >> 8) & 0xff; - buf[2] = (val >> 16) & 0xff; - buf[3] = (val >> 24) & 0xff; - write(buf, 4); + /* ...and wait for filter process to finish. */ + if (filter->wait() != 0) { + fatal("Filter process error"); + } } -void OutputStream::write_u64(uint64_t val) -{ - unsigned char buf[8]; - - buf[0] = val & 0xff; - buf[1] = (val >> 8) & 0xff; - buf[2] = (val >> 16) & 0xff; - buf[3] = (val >> 24) & 0xff; - buf[4] = (val >> 32) & 0xff; - buf[5] = (val >> 40) & 0xff; - buf[6] = (val >> 48) & 0xff; - buf[7] = (val >> 56) & 0xff; - write(buf, 8); -} +FileFilter::FileFilter(int raw, int wrapped, pid_t pid) + : fd_raw(raw), fd_wrapped(wrapped), pid(pid) { } -/* Writes an integer to an output stream using a variable-sized representation: - * seven bits are written at a time (little-endian), and the eigth bit of each - * byte is set if more data follows. */ -void OutputStream::write_varint(uint64_t val) +FileFilter *FileFilter::New(int fd, const char *program) { - do { - uint8_t remainder = (val & 0x7f); - val >>= 7; - if (val) - remainder |= 0x80; - write_u8(remainder); - } while (val); -} + if (program == NULL || strlen(program) == 0) { + return new FileFilter(fd, fd, -1); + } -void OutputStream::write_uuid(const struct uuid &u) -{ - write(u.bytes, 16); + pid_t pid; + int wrapped_fd = spawn_filter(fd, program, &pid); + return new FileFilter(fd, wrapped_fd, pid); } -/* Write an arbitrary string by first writing out the length, followed by the - * data itself. */ -void OutputStream::write_string(const string &s) +int FileFilter::wait() { - size_t len = s.length(); - write_varint(len); - write(s.data(), len); -} + // No filter program was launched implies no need to wait. + if (pid == -1) + return 0; -void OutputStream::write_dictionary(const dictionary &d) -{ - size_t size = d.size(); - size_t written = 0; + // The raw file descriptor was held open to track the output file size, but + // is not needed any longer. + close(fd_raw); - write_varint(size); + int status; + if (waitpid(pid, &status, 0) < 0) { + fprintf(stderr, "Error waiting for filter process: %m\n"); + return -1; + } - for (dictionary::const_iterator i = d.begin(); i != d.end(); ++i) { - write_string(i->first); - write_string(i->second); - written++; + if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) { + fprintf(stderr, "Filter process error: %d\n", WEXITSTATUS(status)); } - assert(written == size); + return status; } -StringOutputStream::StringOutputStream() - : buf(std::ios_base::out) +/* Launch a child process which can act as a filter (compress, encrypt, etc.) + * on the TAR output. The file descriptor to which output should be written + * must be specified; the return value is the file descriptor which will be + * attached to the standard input of the filter program. */ +int FileFilter::spawn_filter(int fd_out, const char *program, pid_t *filter_pid) { -} + int fds[2]; + pid_t pid; -void StringOutputStream::write_internal(const void *data, size_t len) -{ - buf.write((const char *)data, len); - if (!buf.good()) - throw IOException("error writing to StringOutputStream"); -} + /* Create a pipe for communicating with the filter process. */ + if (pipe(fds) < 0) { + fatal("Unable to create pipe for filter"); + } -FileOutputStream::FileOutputStream(FILE *file) -{ - f = file; -} + /* Create a child process which can exec() the filter program. */ + pid = fork(); + if (pid < 0) + fatal("Unable to fork filter process"); + + if (pid > 0) { + /* Parent process */ + close(fds[0]); + cloexec(fds[1]); + if (filter_pid != NULL) + *filter_pid = pid; + } else { + /* Child process. Rearrange file descriptors. stdin is fds[0], stdout + * is fd_out, stderr is unchanged. */ + close(fds[1]); + + if (dup2(fds[0], 0) < 0) + exit(1); + close(fds[0]); + + if (dup2(fd_out, 1) < 0) + exit(1); + close(fd_out); + + /* Exec the filter program. */ + execlp("/bin/sh", "/bin/sh", "-c", program, NULL); + + /* Should not reach here except for error cases. */ + fprintf(stderr, "Could not exec filter: %m\n"); + exit(1); + } -FileOutputStream::~FileOutputStream() -{ - fclose(f); + return fds[1]; } -void FileOutputStream::write_internal(const void *data, size_t len) +void Tarfile::tar_write(const char *data, size_t len) { - size_t res; + size += len; + + while (len > 0) { + int res = write(filter->get_wrapped_fd(), data, len); + + if (res < 0) { + if (errno == EINTR) + continue; + fprintf(stderr, "Write error: %m\n"); + fatal("Write error"); + } - res = fwrite(data, 1, len, f); - if (res != len) { - throw IOException("write error"); + len -= res; + data += res; } } -WrapperOutputStream::WrapperOutputStream(OutputStream &o) - : real(o) +void Tarfile::write_object(int id, const char *data, size_t len) { -} + struct tar_header header; + memset(&header, 0, sizeof(header)); -void WrapperOutputStream::write_internal(const void *data, size_t len) -{ - real.write(data, len); -} + char buf[64]; + sprintf(buf, "%08x", id); + string path = segment_name + "/" + buf; -/* Provide checksumming of a data stream. */ -ChecksumOutputStream::ChecksumOutputStream(OutputStream &o) - : real(o) -{ -} + assert(path.size() < 100); + memcpy(header.name, path.data(), path.size()); + sprintf(header.mode, "%07o", 0600); + sprintf(header.uid, "%07o", 0); + sprintf(header.gid, "%07o", 0); + sprintf(header.size, "%011o", (int)len); + sprintf(header.mtime, "%011o", (int)time(NULL)); + header.typeflag = '0'; + strcpy(header.magic, "ustar "); + strcpy(header.uname, "root"); + strcpy(header.gname, "root"); -void ChecksumOutputStream::write_internal(const void *data, size_t len) -{ - real.write(data, len); - csum.process(data, len); -} + memset(header.chksum, ' ', sizeof(header.chksum)); + int checksum = 0; + for (int i = 0; i < TAR_BLOCK_SIZE; i++) { + checksum += ((uint8_t *)&header)[i]; + } + sprintf(header.chksum, "%06o", checksum); -const uint8_t *ChecksumOutputStream::finish_and_checksum() -{ - return csum.checksum(); -} + tar_write((const char *)&header, TAR_BLOCK_SIZE); -/* Utility functions, for encoding data types to strings. */ -string encode_u16(uint16_t val) -{ - StringOutputStream s; - s.write_u16(val); - return s.contents(); -} + if (len == 0) + return; -string encode_u32(uint32_t val) -{ - StringOutputStream s; - s.write_u32(val); - return s.contents(); -} + tar_write(data, len); -string encode_u64(uint64_t val) -{ - StringOutputStream s; - s.write_u64(val); - return s.contents(); + char padbuf[TAR_BLOCK_SIZE]; + size_t blocks = (len + TAR_BLOCK_SIZE - 1) / TAR_BLOCK_SIZE; + size_t padding = blocks * TAR_BLOCK_SIZE - len; + memset(padbuf, 0, padding); + tar_write(padbuf, padding); } -string encode_objref(const struct uuid &segment, uint32_t object) +/* Estimate the size based on the size of the actual output file on disk. + * However, it might be the case that the filter program is buffering all its + * data, and might potentially not write a single byte until we have closed + * our end of the pipe. If we don't do so until we see data written, we have + * a problem. So, arbitrarily pick an upper bound on the compression ratio + * that the filter will achieve (128:1), and return a size estimate which is + * the larger of a) bytes actually seen written to disk, and b) input + * bytes/128. */ +size_t Tarfile::size_estimate() { - StringOutputStream s; - s.write_uuid(segment); - s.write_u32(object); - return s.contents(); -} + struct stat statbuf; -SegmentWriter::SegmentWriter(OutputStream *output, struct uuid u) - : raw_out(output), - id(u), - object_stream(NULL) -{ - /* All output data will be checksummed except the very last few bytes, - * which are the checksum itself. */ - out = new ChecksumOutputStream(*raw_out); - - /* Write out the segment header first. */ - static const char signature[] = "LBSSEG0\n"; - out->write(signature, strlen(signature)); - out->write_uuid(id); + if (fstat(filter->get_raw_fd(), &statbuf) == 0) + return max((int64_t)statbuf.st_size, (int64_t)(size / 128)); + + /* Couldn't stat the file on disk, so just return the actual number of + * bytes, before compression. */ + return size; } -SegmentWriter::~SegmentWriter() +static const size_t SEGMENT_SIZE = 4 * 1024 * 1024; + +/* Backup size summary: segment type -> (uncompressed size, compressed size) */ +static map > group_sizes; + +ObjectReference TarSegmentStore::write_object(const char *data, size_t len, + const std::string &group, + const std::string &checksum, + double age) { - if (object_stream) - finish_object(); - - // Write out the object table which gives the sizes and locations of all - // objects, and then add the trailing signature, which indicates the end of - // the segment and gives the offset of the object table. - int64_t index_offset = out->get_pos(); - - for (object_table::const_iterator i = objects.begin(); - i != objects.end(); ++i) { - out->write_s64(i->offset); - out->write_s64(i->size); - out->write(i->type, sizeof(i->type)); - } + struct segment_info *segment; - static const char signature2[] = "LBSEND"; - out->write(signature2, strlen(signature2)); - out->write_s64(index_offset); - out->write_u32(objects.size()); + // Find the segment into which the object should be written, looking up by + // group. If no segment exists yet, create one. + if (segments.find(group) == segments.end()) { + segment = new segment_info; - /* Finally, append a checksum to the end of the file, so that its integrity - * (against accidental, not malicious, corruption) can be verified. */ - const uint8_t *csum = out->finish_and_checksum(); - raw_out->write(csum, out->checksum_size()); + segment->name = generate_uuid(); + segment->group = group; + segment->basename = segment->name + ".tar"; + segment->basename += filter_extension; + segment->count = 0; + segment->data_size = 0; + segment->rf = remote->alloc_file(segment->basename, + group == "metadata" ? "segments0" + : "segments1"); + segment->file = new Tarfile(segment->rf, segment->name); - /* The SegmentWriter takes ownership of the OutputStream it is writing to, - * and destroys it automatically when done with the segment. */ - delete out; - delete raw_out; -} + segments[group] = segment; + } else { + segment = segments[group]; + } -OutputStream *SegmentWriter::new_object(int *id, const char *type) -{ - if (object_stream) - finish_object(); + int id = segment->count; + char id_buf[64]; + sprintf(id_buf, "%08x", id); - if (id != NULL) - *id = objects.size(); + segment->file->write_object(id, data, len); + segment->count++; + segment->data_size += len; - struct index_info info; - info.offset = out->get_pos(); - info.size = -1; // Will be filled in when object is finished - strncpy(info.type, type, sizeof(info.type)); - objects.push_back(info); + group_sizes[group].first += len; - object_stream = new WrapperOutputStream(*out); - return object_stream; + ObjectReference ref(segment->name, id_buf); + ref.set_range(0, len, true); + if (checksum.size() > 0) + ref.set_checksum(checksum); + if (db != NULL) + db->StoreObject(ref, age); + + // If this segment meets or exceeds the size target, close it so that + // future objects will go into a new segment. + if (segment->file->size_estimate() >= SEGMENT_SIZE) + close_segment(group); + + return ref; } -void SegmentWriter::finish_object() +void TarSegmentStore::sync() { - assert(object_stream != NULL); - - // Fill in object size, which could not be stored at start - objects.back().size = object_stream->get_pos(); + while (!segments.empty()) + close_segment(segments.begin()->first); +} - delete object_stream; - object_stream = NULL; +void TarSegmentStore::dump_stats() +{ + printf("Data written:\n"); + for (map >::iterator i = group_sizes.begin(); + i != group_sizes.end(); ++i) { + printf(" %s: %lld (%lld compressed)\n", i->first.c_str(), + (long long)i->second.first, (long long)i->second.second); + } } -struct uuid SegmentWriter::generate_uuid() +void TarSegmentStore::close_segment(const string &group) { - struct uuid u; + struct segment_info *segment = segments[group]; - uuid_generate(u.bytes); + delete segment->file; - return u; -} + if (db != NULL) { + struct stat stat_buf; + int disk_size = 0; + if (stat(segment->rf->get_local_path().c_str(), &stat_buf) == 0) { + disk_size = stat_buf.st_size; + group_sizes[segment->group].second += disk_size; + } -string SegmentWriter::format_uuid(const struct uuid u) -{ - // A UUID only takes 36 bytes, plus the trailing '\0', so this is safe. - char buf[40]; + string checksum + = Hash::hash_file(segment->rf->get_local_path().c_str()); - uuid_unparse_lower(u.bytes, buf); + db->SetSegmentMetadata(segment->name, segment->rf->get_remote_path(), + checksum, group, segment->data_size, disk_size); + } - return string(buf); -} + segment->rf->send(); -SegmentStore::SegmentStore(const string &path) - : directory(path) -{ + segments.erase(segments.find(group)); + delete segment; } -SegmentWriter *SegmentStore::new_segment() +string TarSegmentStore::object_reference_to_segment(const string &object) { - struct uuid id = SegmentWriter::generate_uuid(); - string filename = directory + "/" + SegmentWriter::format_uuid(id); - - FILE *f = fopen(filename.c_str(), "wb"); - if (f == NULL) - throw IOException("Unable to open new segment"); - - return new SegmentWriter(new FileOutputStream(f), id); + return object; } -SegmentPartitioner::SegmentPartitioner(SegmentStore *s) - : store(s), - segment(NULL), - object(NULL) +LbsObject::LbsObject() + : group(""), age(0.0), data(NULL), data_len(0), written(false) { - // Default target size is around 1 MB - target_size = 1024 * 1024; } -SegmentPartitioner::~SegmentPartitioner() +LbsObject::~LbsObject() { - if (segment) - delete segment; } -OutputStream *SegmentPartitioner::new_object(struct uuid *uuid, int *id, - const char *type) +void LbsObject::set_data(const char *d, size_t len, const char *checksum) { - if (segment != NULL && segment->get_size() > target_size) { - delete segment; - segment = NULL; - } + data = d; + data_len = len; - if (segment == NULL) - segment = store->new_segment(); + if (checksum != NULL) { + this->checksum = checksum; + } else { + Hash *hash = Hash::New(); + hash->update(data, data_len); + this->checksum = hash->digest_str(); + delete hash; + } +} - if (uuid != NULL) - *uuid = segment->get_uuid(); +void LbsObject::write(TarSegmentStore *store) +{ + assert(data != NULL); + assert(!written); - return segment->new_object(id, type); + ref = store->write_object(data, data_len, group, checksum, age); + written = true; }