X-Git-Url: http://git.vrable.net/?a=blobdiff_plain;f=store.cc;h=cf3eea952f3f27524450765b1b8fc232594d0bca;hb=58a0d3f8749111c15e9afa9d929016d65ed32250;hp=804992f105f4ff9b6ab0989ce82d52be717ef1e5;hpb=51859528c5de1c90c553d1f174869005711f162a;p=cumulus.git diff --git a/store.cc b/store.cc index 804992f..cf3eea9 100644 --- a/store.cc +++ b/store.cc @@ -10,42 +10,118 @@ #include #include #include +#include +#include #include #include #include -#include +#include #include #include #include #include #include "store.h" +#include "ref.h" +using std::max; using std::list; using std::set; using std::string; -list TarSegmentStore::norefs; +/* Default filter program is bzip2 */ +const char *filter_program = "bzip2 -c"; +const char *filter_extension = ".bz2"; + +static void cloexec(int fd) +{ + long flags = fcntl(fd, F_GETFD); + + if (flags < 0) + return; + + fcntl(fd, F_SETFD, flags | FD_CLOEXEC); +} Tarfile::Tarfile(const string &path, const string &segment) : size(0), segment_name(segment) { - if (tar_open(&t, (char *)path.c_str(), NULL, O_WRONLY | O_CREAT, 0600, - TAR_VERBOSE | TAR_GNU) == -1) + real_fd = open(path.c_str(), O_WRONLY | O_CREAT, 0600); + if (real_fd < 0) + throw IOException("Error opening output file"); + + filter_fd = spawn_filter(real_fd); + + if (tar_fdopen(&t, filter_fd, (char *)path.c_str(), NULL, + O_WRONLY | O_CREAT, 0600, TAR_VERBOSE | TAR_GNU) == -1) throw IOException("Error opening Tarfile"); } Tarfile::~Tarfile() { - string checksum_list = checksums.str(); - internal_write_object(segment_name + "/checksums", - checksum_list.data(), checksum_list.size()); + /* Close the tar file... */ tar_append_eof(t); if (tar_close(t) != 0) throw IOException("Error closing Tarfile"); + + /* ...and wait for filter process to finish. */ + int status; + waitpid(filter_pid, &status, 0); + + if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) { + throw IOException("Filter process error"); + } + + close(real_fd); +} + +/* Launch a child process which can act as a filter (compress, encrypt, etc.) + * on the TAR output. The file descriptor to which output should be written + * must be specified; the return value is the file descriptor which will be + * attached to the standard input of the filter program. */ +int Tarfile::spawn_filter(int fd_out) +{ + int fds[2]; + + /* Create a pipe for communicating with the filter process. */ + if (pipe(fds) < 0) { + throw IOException("Unable to create pipe for filter"); + } + + /* Create a child process which can exec() the filter program. */ + filter_pid = fork(); + if (filter_pid < 0) + throw IOException("Unable to fork filter process"); + + if (filter_pid > 0) { + /* Parent process */ + close(fds[0]); + cloexec(fds[1]); + } else { + /* Child process. Rearrange file descriptors. stdin is fds[0], stdout + * is fd_out, stderr is unchanged. */ + close(fds[1]); + + if (dup2(fds[0], 0) < 0) + exit(1); + close(fds[0]); + + if (dup2(fd_out, 1) < 0) + exit(1); + close(fd_out); + + /* Exec the filter program. */ + execlp("/bin/sh", "/bin/sh", "-c", filter_program, NULL); + + /* Should not reach here except for error cases. */ + fprintf(stderr, "Could not exec filter: %m\n"); + exit(1); + } + + return fds[1]; } void Tarfile::write_object(int id, const char *data, size_t len) @@ -55,13 +131,6 @@ void Tarfile::write_object(int id, const char *data, size_t len) string path = segment_name + "/" + buf; internal_write_object(path, data, len); - - // Compute a checksum for the data block, which will be stored at the end - // of the TAR file. - SHA1Checksum hash; - hash.process(data, len); - sprintf(buf, "%08x", id); - checksums << buf << " " << hash.checksum_str() << "\n"; } void Tarfile::internal_write_object(const string &path, @@ -103,11 +172,30 @@ void Tarfile::internal_write_object(const string &path, size += blocks * T_BLOCKSIZE; } +/* Estimate the size based on the size of the actual output file on disk. + * However, it might be the case that the filter program is buffering all its + * data, and might potentially not write a single byte until we have closed + * our end of the pipe. If we don't do so until we see data written, we have + * a problem. So, arbitrarily pick an upper bound on the compression ratio + * that the filter will achieve (128:1), and return a size estimate which is + * the larger of a) bytes actually seen written to disk, and b) input + * bytes/128. */ +size_t Tarfile::size_estimate() +{ + struct stat statbuf; + + if (fstat(real_fd, &statbuf) == 0) + return max((int64_t)statbuf.st_size, (int64_t)(size / 128)); + + /* Couldn't stat the file on disk, so just return the actual number of + * bytes, before compression. */ + return size; +} + static const size_t SEGMENT_SIZE = 4 * 1024 * 1024; -string TarSegmentStore::write_object(const char *data, size_t len, const - std::string &group, - const std::list &refs) +ObjectReference TarSegmentStore::write_object(const char *data, size_t len, + const std::string &group) { struct segment_info *segment; @@ -116,13 +204,10 @@ string TarSegmentStore::write_object(const char *data, size_t len, const if (segments.find(group) == segments.end()) { segment = new segment_info; - uint8_t uuid[16]; - char uuid_buf[40]; - uuid_generate(uuid); - uuid_unparse_lower(uuid, uuid_buf); - segment->name = uuid_buf; + segment->name = generate_uuid(); string filename = path + "/" + segment->name + ".tar"; + filename += filter_extension; segment->file = new Tarfile(filename, segment->name); segment->count = 0; @@ -139,20 +224,14 @@ string TarSegmentStore::write_object(const char *data, size_t len, const segment->file->write_object(id, data, len); segment->count++; - string full_name = segment->name + "/" + id_buf; - - // Store any dependencies this object has on other segments, so they can be - // written when the segment is closed. - for (list::const_iterator i = refs.begin(); i != refs.end(); ++i) { - segment->refs.insert(*i); - } + ObjectReference ref(segment->name, id_buf); // If this segment meets or exceeds the size target, close it so that // future objects will go into a new segment. if (segment->file->size_estimate() >= SEGMENT_SIZE) close_segment(group); - return full_name; + return ref; } void TarSegmentStore::sync() @@ -164,16 +243,6 @@ void TarSegmentStore::sync() void TarSegmentStore::close_segment(const string &group) { struct segment_info *segment = segments[group]; - fprintf(stderr, "Closing segment group %s (%s)\n", - group.c_str(), segment->name.c_str()); - - string reflist; - for (set::iterator i = segment->refs.begin(); - i != segment->refs.end(); ++i) { - reflist += *i + "\n"; - } - segment->file->internal_write_object(segment->name + "/references", - reflist.data(), reflist.size()); delete segment->file; segments.erase(segments.find(group)); @@ -194,18 +263,20 @@ LbsObject::~LbsObject() { } -void LbsObject::add_reference(const LbsObject *o) -{ - // TODO: Implement -} - void LbsObject::write(TarSegmentStore *store) { assert(data != NULL); assert(!written); - name = store->write_object(data, data_len, group); - + ref = store->write_object(data, data_len, group); written = true; - data = NULL; +} + +void LbsObject::checksum() +{ + assert(written); + + SHA1Checksum hash; + hash.process(data, data_len); + ref.set_checksum(hash.checksum_str()); }