X-Git-Url: http://git.vrable.net/?a=blobdiff_plain;f=store.cc;h=57ee11ba163ded18af96b2a9206dd2b63508f7eb;hb=57890fff9773a368f241a25c89931a5d67fccb4e;hp=52304c2d8f484b3b99ad9fe9a55299170ea5482a;hpb=25b6639fb1783e0061affa177e6d6d2131c457f5;p=cumulus.git diff --git a/store.cc b/store.cc index 52304c2..57ee11b 100644 --- a/store.cc +++ b/store.cc @@ -1,290 +1,279 @@ /* LBS: An LFS-inspired filesystem backup system - * Copyright (C) 2006 Michael Vrable + * Copyright (C) 2007 Michael Vrable * * Backup data is stored in a collection of objects, which are grouped together - * into segments for storage purposes. This file provides interfaces for - * reading and writing objects and segments. */ + * into segments for storage purposes. This implementation of the object store + * is built on top of libtar, and represents segments as TAR files and objects + * as files within them. */ #include -#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include #include "store.h" +#include "ref.h" +using std::max; +using std::list; +using std::set; using std::string; -OutputStream::OutputStream() - : bytes_written(0) -{ -} +static char *const filter_program[] = {"bzip2", "-c", NULL}; -void OutputStream::write(const void *data, size_t len) +static void cloexec(int fd) { - write_internal(data, len); - bytes_written += len; -} + long flags = fcntl(fd, F_GETFD); -void OutputStream::write_u8(uint8_t val) -{ - write(&val, 1); + if (flags < 0) + return; + + fcntl(fd, F_SETFD, flags | FD_CLOEXEC); } -void OutputStream::write_u16(uint16_t val) +Tarfile::Tarfile(const string &path, const string &segment) + : size(0), + segment_name(segment) { - unsigned char buf[2]; + real_fd = open(path.c_str(), O_WRONLY | O_CREAT, 0600); + if (real_fd < 0) + throw IOException("Error opening output file"); + + filter_fd = spawn_filter(real_fd); - buf[0] = val & 0xff; - buf[1] = (val >> 8) & 0xff; - write(buf, 2); + if (tar_fdopen(&t, filter_fd, (char *)path.c_str(), NULL, + O_WRONLY | O_CREAT, 0600, TAR_VERBOSE | TAR_GNU) == -1) + throw IOException("Error opening Tarfile"); } -void OutputStream::write_u32(uint32_t val) +Tarfile::~Tarfile() { - unsigned char buf[4]; + /* Close the tar file... */ + tar_append_eof(t); - buf[0] = val & 0xff; - buf[1] = (val >> 8) & 0xff; - buf[2] = (val >> 16) & 0xff; - buf[3] = (val >> 24) & 0xff; - write(buf, 4); -} + if (tar_close(t) != 0) + throw IOException("Error closing Tarfile"); -void OutputStream::write_u64(uint64_t val) -{ - unsigned char buf[8]; - - buf[0] = val & 0xff; - buf[1] = (val >> 8) & 0xff; - buf[2] = (val >> 16) & 0xff; - buf[3] = (val >> 24) & 0xff; - buf[4] = (val >> 32) & 0xff; - buf[5] = (val >> 40) & 0xff; - buf[6] = (val >> 48) & 0xff; - buf[7] = (val >> 56) & 0xff; - write(buf, 8); -} + /* ...and wait for filter process to finish. */ + int status; + waitpid(filter_pid, &status, 0); -/* Writes an integer to an output stream using a variable-sized representation: - * seven bits are written at a time (little-endian), and the eigth bit of each - * byte is set if more data follows. */ -void OutputStream::write_varint(uint64_t val) -{ - do { - uint8_t remainder = (val & 0x7f); - val >>= 7; - if (val) - remainder |= 0x80; - write_u8(remainder); - } while (val); -} + if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) { + throw IOException("Filter process error"); + } -/* Write an arbitrary string by first writing out the length, followed by the - * data itself. */ -void OutputStream::write_string(const string &s) -{ - size_t len = s.length(); - write_varint(len); - write(s.data(), len); + close(real_fd); } -void OutputStream::write_dictionary(const dictionary &d) +/* Launch a child process which can act as a filter (compress, encrypt, etc.) + * on the TAR output. The file descriptor to which output should be written + * must be specified; the return value is the file descriptor which will be + * attached to the standard input of the filter program. */ +int Tarfile::spawn_filter(int fd_out) { - size_t size = d.size(); - size_t written = 0; + int fds[2]; - write_varint(size); + /* Create a pipe for communicating with the filter process. */ + if (pipe(fds) < 0) { + throw IOException("Unable to create pipe for filter"); + } - for (dictionary::const_iterator i = d.begin(); i != d.end(); ++i) { - write_string(i->first); - write_string(i->second); - written++; + /* Create a child process which can exec() the filter program. */ + filter_pid = fork(); + if (filter_pid < 0) + throw IOException("Unable to fork filter process"); + + if (filter_pid > 0) { + /* Parent process */ + close(fds[0]); + cloexec(fds[1]); + } else { + /* Child process. Rearrange file descriptors. stdin is fds[0], stdout + * is fd_out, stderr is unchanged. */ + close(fds[1]); + + if (dup2(fds[0], 0) < 0) + exit(1); + close(fds[0]); + + if (dup2(fd_out, 1) < 0) + exit(1); + close(fd_out); + + /* Exec the filter program. */ + execvp(filter_program[0], filter_program); + + /* Should not reach here except for error cases. */ + fprintf(stderr, "Could not exec filter: %m\n"); + exit(1); } - assert(written == size); + return fds[1]; } -StringOutputStream::StringOutputStream() - : buf(std::ios_base::out) +void Tarfile::write_object(int id, const char *data, size_t len) { -} + char buf[64]; + sprintf(buf, "%08x", id); + string path = segment_name + "/" + buf; -void StringOutputStream::write_internal(const void *data, size_t len) -{ - buf.write((const char *)data, len); - if (!buf.good()) - throw IOException("error writing to StringOutputStream"); + internal_write_object(path, data, len); } -FileOutputStream::FileOutputStream(FILE *file) +void Tarfile::internal_write_object(const string &path, + const char *data, size_t len) { - f = file; -} + memset(&t->th_buf, 0, sizeof(struct tar_header)); -FileOutputStream::~FileOutputStream() -{ - fclose(f); -} + th_set_type(t, S_IFREG | 0600); + th_set_user(t, 0); + th_set_group(t, 0); + th_set_mode(t, 0600); + th_set_size(t, len); + th_set_mtime(t, time(NULL)); + th_set_path(t, const_cast(path.c_str())); + th_finish(t); -void FileOutputStream::write_internal(const void *data, size_t len) -{ - size_t res; + if (th_write(t) != 0) + throw IOException("Error writing tar header"); - res = fwrite(data, 1, len, f); - if (res != len) { - throw IOException("write error"); - } -} + size += T_BLOCKSIZE; -WrapperOutputStream::WrapperOutputStream(OutputStream &o) - : real(o) -{ -} + if (len == 0) + return; -void WrapperOutputStream::write_internal(const void *data, size_t len) -{ - real.write(data, len); -} + size_t blocks = (len + T_BLOCKSIZE - 1) / T_BLOCKSIZE; + size_t padding = blocks * T_BLOCKSIZE - len; -/* Provide checksumming of a data stream. */ -ChecksumOutputStream::ChecksumOutputStream(OutputStream &o) - : real(o) -{ -} + for (size_t i = 0; i < blocks - 1; i++) { + if (tar_block_write(t, &data[i * T_BLOCKSIZE]) == -1) + throw IOException("Error writing tar block"); + } -void ChecksumOutputStream::write_internal(const void *data, size_t len) -{ - real.write(data, len); - csum.process(data, len); -} + char block[T_BLOCKSIZE]; + memset(block, 0, sizeof(block)); + memcpy(block, &data[T_BLOCKSIZE * (blocks - 1)], T_BLOCKSIZE - padding); + if (tar_block_write(t, block) == -1) + throw IOException("Error writing final tar block"); -const uint8_t *ChecksumOutputStream::finish_and_checksum() -{ - return csum.checksum(); + size += blocks * T_BLOCKSIZE; } -/* Utility functions, for encoding data types to strings. */ -string encode_u16(uint16_t val) +/* Estimate the size based on the size of the actual output file on disk. + * However, it might be the case that the filter program is buffering all its + * data, and might potentially not write a single byte until we have closed + * our end of the pipe. If we don't do so until we see data written, we have + * a problem. So, arbitrarily pick an upper bound on the compression ratio + * that the filter will achieve (128:1), and return a size estimate which is + * the larger of a) bytes actually seen written to disk, and b) input + * bytes/128. */ +size_t Tarfile::size_estimate() { - StringOutputStream s; - s.write_u16(val); - return s.contents(); -} + struct stat statbuf; -string encode_u32(uint32_t val) -{ - StringOutputStream s; - s.write_u32(val); - return s.contents(); -} + if (fstat(real_fd, &statbuf) == 0) + return max((int64_t)statbuf.st_size, (int64_t)(size / 128)); -string encode_u64(uint64_t val) -{ - StringOutputStream s; - s.write_u64(val); - return s.contents(); + /* Couldn't stat the file on disk, so just return the actual number of + * bytes, before compression. */ + return size; } -SegmentWriter::SegmentWriter(OutputStream *output, struct uuid u) - : raw_out(output), - id(u), - object_stream(NULL) -{ - /* All output data will be checksummed except the very last few bytes, - * which are the checksum itself. */ - out = new ChecksumOutputStream(*raw_out); - - /* Write out the segment header first. */ - static const char signature[] = "LBSSEG0\n"; - out->write(signature, strlen(signature)); - out->write(id.bytes, sizeof(struct uuid)); -} +static const size_t SEGMENT_SIZE = 4 * 1024 * 1024; -SegmentWriter::~SegmentWriter() +ObjectReference TarSegmentStore::write_object(const char *data, size_t len, + const std::string &group) { - if (object_stream) - finish_object(); - - // Write out the object table which gives the sizes and locations of all - // objects, and then add the trailing signature, which indicates the end of - // the segment and gives the offset of the object table. - int64_t index_offset = out->get_pos(); - - for (object_table::const_iterator i = objects.begin(); - i != objects.end(); ++i) { - out->write_s64(i->first); - out->write_s64(i->second); - } + struct segment_info *segment; - static const char signature2[] = "LBSEND"; - out->write(signature2, strlen(signature2)); - out->write_s64(index_offset); - out->write_u32(objects.size()); + // Find the segment into which the object should be written, looking up by + // group. If no segment exists yet, create one. + if (segments.find(group) == segments.end()) { + segment = new segment_info; - /* Finally, append a checksum to the end of the file, so that its integrity - * (against accidental, not malicious, corruption) can be verified. */ - const uint8_t *csum = out->finish_and_checksum(); - raw_out->write(csum, out->checksum_size()); + segment->name = generate_uuid(); - /* The SegmentWriter takes ownership of the OutputStream it is writing to, - * and destroys it automatically when done with the segment. */ - delete out; - delete raw_out; -} + string filename = path + "/" + segment->name + ".tar.bz2"; + segment->file = new Tarfile(filename, segment->name); -OutputStream *SegmentWriter::new_object() -{ - if (object_stream) - finish_object(); + segment->count = 0; - object_start_offset = out->get_pos(); - object_stream = new WrapperOutputStream(*out); + segments[group] = segment; + } else { + segment = segments[group]; + } - return object_stream; -} + int id = segment->count; + char id_buf[64]; + sprintf(id_buf, "%08x", id); -void SegmentWriter::finish_object() -{ - assert(object_stream != NULL); + segment->file->write_object(id, data, len); + segment->count++; - // store (start, length) information for locating this object - objects.push_back(std::make_pair(object_start_offset, - object_stream->get_pos())); + ObjectReference ref(segment->name, id_buf); - delete object_stream; - object_stream = NULL; + // If this segment meets or exceeds the size target, close it so that + // future objects will go into a new segment. + if (segment->file->size_estimate() >= SEGMENT_SIZE) + close_segment(group); + + return ref; } -struct uuid SegmentWriter::generate_uuid() +void TarSegmentStore::sync() { - struct uuid u; + while (!segments.empty()) + close_segment(segments.begin()->first); +} - uuid_generate(u.bytes); +void TarSegmentStore::close_segment(const string &group) +{ + struct segment_info *segment = segments[group]; - return u; + delete segment->file; + segments.erase(segments.find(group)); + delete segment; } -string SegmentWriter::format_uuid(const struct uuid u) +string TarSegmentStore::object_reference_to_segment(const string &object) { - // A UUID only takes 36 bytes, plus the trailing '\0', so this is safe. - char buf[40]; - - uuid_unparse_lower(u.bytes, buf); + return object; +} - return string(buf); +LbsObject::LbsObject() + : group(""), data(NULL), data_len(0), written(false) +{ } -SegmentStore::SegmentStore(const string &path) - : directory(path) +LbsObject::~LbsObject() { } -SegmentWriter *SegmentStore::new_segment() +void LbsObject::write(TarSegmentStore *store) { - struct uuid id = SegmentWriter::generate_uuid(); - string filename = directory + "/" + SegmentWriter::format_uuid(id); + assert(data != NULL); + assert(!written); - FILE *f = fopen(filename.c_str(), "wb"); - if (f == NULL) - throw IOException("Unable to open new segment"); + ref = store->write_object(data, data_len, group); + written = true; +} + +void LbsObject::checksum() +{ + assert(written); - return new SegmentWriter(new FileOutputStream(f), id); + SHA1Checksum hash; + hash.process(data, data_len); + ref.set_checksum(hash.checksum_str()); }