X-Git-Url: http://git.vrable.net/?p=cumulus.git;a=blobdiff_plain;f=store.cc;h=a77d64af0a144ef197116aab55ac93f3e3be98bb;hp=00d08780007dde3a8b2617ceb7e89d5d75afd5cd;hb=HEAD;hpb=52df48ca169e07caa5c726d51ed4ed83aed748a5 diff --git a/store.cc b/store.cc index 00d0878..a77d64a 100644 --- a/store.cc +++ b/store.cc @@ -1,13 +1,30 @@ -/* LBS: An LFS-inspired filesystem backup system - * Copyright (C) 2008 Michael Vrable +/* Cumulus: Efficient Filesystem Backup to the Cloud + * Copyright (C) 2008-2009 The Cumulus Developers + * See the AUTHORS file for a list of contributors. * - * Backup data is stored in a collection of objects, which are grouped together + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +/* Backup data is stored in a collection of objects, which are grouped together * into segments for storage purposes. This implementation of the object store * represents segments as TAR files and objects as files within them. */ #include #include #include +#include #include #include #include @@ -23,8 +40,11 @@ #include #include +#include "hash.h" +#include "localdb.h" #include "store.h" #include "ref.h" +#include "util.h" using std::max; using std::list; @@ -37,16 +57,6 @@ using std::string; const char *filter_program = "bzip2 -c"; const char *filter_extension = ".bz2"; -static void cloexec(int fd) -{ - long flags = fcntl(fd, F_GETFD); - - if (flags < 0) - return; - - fcntl(fd, F_SETFD, flags | FD_CLOEXEC); -} - Tarfile::Tarfile(RemoteFile *file, const string &segment) : size(0), segment_name(segment) @@ -54,8 +64,7 @@ Tarfile::Tarfile(RemoteFile *file, const string &segment) assert(sizeof(struct tar_header) == TAR_BLOCK_SIZE); this->file = file; - real_fd = file->get_fd(); - filter_fd = spawn_filter(real_fd, filter_program, &filter_pid); + this->filter.reset(FileFilter::New(file->get_fd(), filter_program)); } Tarfile::~Tarfile() @@ -67,38 +76,70 @@ Tarfile::~Tarfile() tar_write(buf, TAR_BLOCK_SIZE); tar_write(buf, TAR_BLOCK_SIZE); - if (close(filter_fd) != 0) - throw IOException("Error closing Tarfile"); + if (close(filter->get_wrapped_fd()) != 0) + fatal("Error closing Tarfile"); /* ...and wait for filter process to finish. */ + if (filter->wait() != 0) { + fatal("Filter process error"); + } +} + +FileFilter::FileFilter(int raw, int wrapped, pid_t pid) + : fd_raw(raw), fd_wrapped(wrapped), pid(pid) { } + +FileFilter *FileFilter::New(int fd, const char *program) +{ + if (program == NULL || strlen(program) == 0) { + return new FileFilter(fd, fd, -1); + } + + pid_t pid; + int wrapped_fd = spawn_filter(fd, program, &pid); + return new FileFilter(fd, wrapped_fd, pid); +} + +int FileFilter::wait() +{ + // No filter program was launched implies no need to wait. + if (pid == -1) + return 0; + + // The raw file descriptor was held open to track the output file size, but + // is not needed any longer. + close(fd_raw); + int status; - waitpid(filter_pid, &status, 0); + if (waitpid(pid, &status, 0) < 0) { + fprintf(stderr, "Error waiting for filter process: %m\n"); + return -1; + } if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) { - throw IOException("Filter process error"); + fprintf(stderr, "Filter process error: %d\n", WEXITSTATUS(status)); } - close(real_fd); + return status; } /* Launch a child process which can act as a filter (compress, encrypt, etc.) * on the TAR output. The file descriptor to which output should be written * must be specified; the return value is the file descriptor which will be * attached to the standard input of the filter program. */ -int spawn_filter(int fd_out, const char *program, pid_t *filter_pid) +int FileFilter::spawn_filter(int fd_out, const char *program, pid_t *filter_pid) { int fds[2]; pid_t pid; /* Create a pipe for communicating with the filter process. */ if (pipe(fds) < 0) { - throw IOException("Unable to create pipe for filter"); + fatal("Unable to create pipe for filter"); } /* Create a child process which can exec() the filter program. */ pid = fork(); if (pid < 0) - throw IOException("Unable to fork filter process"); + fatal("Unable to fork filter process"); if (pid > 0) { /* Parent process */ @@ -135,13 +176,13 @@ void Tarfile::tar_write(const char *data, size_t len) size += len; while (len > 0) { - int res = write(filter_fd, data, len); + int res = write(filter->get_wrapped_fd(), data, len); if (res < 0) { if (errno == EINTR) continue; fprintf(stderr, "Write error: %m\n"); - throw IOException("Write error"); + fatal("Write error"); } len -= res; @@ -163,7 +204,7 @@ void Tarfile::write_object(int id, const char *data, size_t len) sprintf(header.mode, "%07o", 0600); sprintf(header.uid, "%07o", 0); sprintf(header.gid, "%07o", 0); - sprintf(header.size, "%011o", len); + sprintf(header.size, "%011o", (int)len); sprintf(header.mtime, "%011o", (int)time(NULL)); header.typeflag = '0'; strcpy(header.magic, "ustar "); @@ -203,7 +244,7 @@ size_t Tarfile::size_estimate() { struct stat statbuf; - if (fstat(real_fd, &statbuf) == 0) + if (fstat(filter->get_raw_fd(), &statbuf) == 0) return max((int64_t)statbuf.st_size, (int64_t)(size / 128)); /* Couldn't stat the file on disk, so just return the actual number of @@ -217,7 +258,9 @@ static const size_t SEGMENT_SIZE = 4 * 1024 * 1024; static map > group_sizes; ObjectReference TarSegmentStore::write_object(const char *data, size_t len, - const std::string &group) + const std::string &group, + const std::string &checksum, + double age) { struct segment_info *segment; @@ -231,8 +274,10 @@ ObjectReference TarSegmentStore::write_object(const char *data, size_t len, segment->basename = segment->name + ".tar"; segment->basename += filter_extension; segment->count = 0; - segment->size = 0; - segment->rf = remote->alloc_file(segment->basename, "segments"); + segment->data_size = 0; + segment->rf = remote->alloc_file(segment->basename, + group == "metadata" ? "segments0" + : "segments1"); segment->file = new Tarfile(segment->rf, segment->name); segments[group] = segment; @@ -246,11 +291,16 @@ ObjectReference TarSegmentStore::write_object(const char *data, size_t len, segment->file->write_object(id, data, len); segment->count++; - segment->size += len; + segment->data_size += len; group_sizes[group].first += len; ObjectReference ref(segment->name, id_buf); + ref.set_range(0, len, true); + if (checksum.size() > 0) + ref.set_checksum(checksum); + if (db != NULL) + db->StoreObject(ref, age); // If this segment meets or exceeds the size target, close it so that // future objects will go into a new segment. @@ -272,7 +322,7 @@ void TarSegmentStore::dump_stats() for (map >::iterator i = group_sizes.begin(); i != group_sizes.end(); ++i) { printf(" %s: %lld (%lld compressed)\n", i->first.c_str(), - i->second.first, i->second.second); + (long long)i->second.first, (long long)i->second.second); } } @@ -283,17 +333,18 @@ void TarSegmentStore::close_segment(const string &group) delete segment->file; if (db != NULL) { - SHA1Checksum segment_checksum; - if (segment_checksum.process_file(segment->rf->get_local_path().c_str())) { - string checksum = segment_checksum.checksum_str(); - db->SetSegmentChecksum(segment->name, segment->basename, checksum, - segment->size); - } - struct stat stat_buf; + int disk_size = 0; if (stat(segment->rf->get_local_path().c_str(), &stat_buf) == 0) { - group_sizes[segment->group].second += stat_buf.st_size; + disk_size = stat_buf.st_size; + group_sizes[segment->group].second += disk_size; } + + string checksum + = Hash::hash_file(segment->rf->get_local_path().c_str()); + + db->SetSegmentMetadata(segment->name, segment->rf->get_remote_path(), + checksum, group, segment->data_size, disk_size); } segment->rf->send(); @@ -308,7 +359,7 @@ string TarSegmentStore::object_reference_to_segment(const string &object) } LbsObject::LbsObject() - : group(""), data(NULL), data_len(0), written(false) + : group(""), age(0.0), data(NULL), data_len(0), written(false) { } @@ -316,20 +367,26 @@ LbsObject::~LbsObject() { } -void LbsObject::write(TarSegmentStore *store) +void LbsObject::set_data(const char *d, size_t len, const char *checksum) { - assert(data != NULL); - assert(!written); + data = d; + data_len = len; - ref = store->write_object(data, data_len, group); - written = true; + if (checksum != NULL) { + this->checksum = checksum; + } else { + Hash *hash = Hash::New(); + hash->update(data, data_len); + this->checksum = hash->digest_str(); + delete hash; + } } -void LbsObject::checksum() +void LbsObject::write(TarSegmentStore *store) { - assert(written); + assert(data != NULL); + assert(!written); - SHA1Checksum hash; - hash.process(data, data_len); - ref.set_checksum(hash.checksum_str()); + ref = store->write_object(data, data_len, group, checksum, age); + written = true; }