X-Git-Url: http://git.vrable.net/?p=cumulus.git;a=blobdiff_plain;f=store.cc;h=a77d64af0a144ef197116aab55ac93f3e3be98bb;hp=49f002ae877559033b8ff1574b7f823fb6bfd8df;hb=HEAD;hpb=b1a0bfe834d45694851787317da2cd55add2d7bb diff --git a/store.cc b/store.cc index 49f002a..a77d64a 100644 --- a/store.cc +++ b/store.cc @@ -1,113 +1,266 @@ -/* LBS: An LFS-inspired filesystem backup system - * Copyright (C) 2007 Michael Vrable +/* Cumulus: Efficient Filesystem Backup to the Cloud + * Copyright (C) 2008-2009 The Cumulus Developers + * See the AUTHORS file for a list of contributors. * - * Backup data is stored in a collection of objects, which are grouped together + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + */ + +/* Backup data is stored in a collection of objects, which are grouped together * into segments for storage purposes. This implementation of the object store - * is built on top of libtar, and represents segments as TAR files and objects - * as files within them. */ + * represents segments as TAR files and objects as files within them. */ #include +#include #include +#include #include #include +#include +#include #include #include #include -#include +#include #include +#include #include #include #include +#include "hash.h" +#include "localdb.h" #include "store.h" +#include "ref.h" +#include "util.h" +using std::max; using std::list; +using std::map; +using std::pair; using std::set; using std::string; -list TarSegmentStore::norefs; +/* Default filter program is bzip2 */ +const char *filter_program = "bzip2 -c"; +const char *filter_extension = ".bz2"; -Tarfile::Tarfile(const string &path, const string &segment) +Tarfile::Tarfile(RemoteFile *file, const string &segment) : size(0), segment_name(segment) { - if (tar_open(&t, (char *)path.c_str(), NULL, O_WRONLY | O_CREAT, 0600, - TAR_VERBOSE | TAR_GNU) == -1) - throw IOException("Error opening Tarfile"); + assert(sizeof(struct tar_header) == TAR_BLOCK_SIZE); + + this->file = file; + this->filter.reset(FileFilter::New(file->get_fd(), filter_program)); } Tarfile::~Tarfile() { - string checksum_list = checksums.str(); - internal_write_object(segment_name + "/checksums", - checksum_list.data(), checksum_list.size()); - tar_append_eof(t); + char buf[TAR_BLOCK_SIZE]; + + /* Append the EOF marker: two blocks filled with nulls. */ + memset(buf, 0, sizeof(buf)); + tar_write(buf, TAR_BLOCK_SIZE); + tar_write(buf, TAR_BLOCK_SIZE); - if (tar_close(t) != 0) - throw IOException("Error closing Tarfile"); + if (close(filter->get_wrapped_fd()) != 0) + fatal("Error closing Tarfile"); + + /* ...and wait for filter process to finish. */ + if (filter->wait() != 0) { + fatal("Filter process error"); + } } -void Tarfile::write_object(int id, const char *data, size_t len) +FileFilter::FileFilter(int raw, int wrapped, pid_t pid) + : fd_raw(raw), fd_wrapped(wrapped), pid(pid) { } + +FileFilter *FileFilter::New(int fd, const char *program) { - char buf[64]; - sprintf(buf, "%08x", id); - string path = segment_name + "/" + buf; + if (program == NULL || strlen(program) == 0) { + return new FileFilter(fd, fd, -1); + } - internal_write_object(path, data, len); + pid_t pid; + int wrapped_fd = spawn_filter(fd, program, &pid); + return new FileFilter(fd, wrapped_fd, pid); +} - // Compute a checksum for the data block, which will be stored at the end - // of the TAR file. - SHA1Checksum hash; - hash.process(data, len); - sprintf(buf, "%08x", id); - checksums << buf << " " << hash.checksum_str() << "\n"; +int FileFilter::wait() +{ + // No filter program was launched implies no need to wait. + if (pid == -1) + return 0; + + // The raw file descriptor was held open to track the output file size, but + // is not needed any longer. + close(fd_raw); + + int status; + if (waitpid(pid, &status, 0) < 0) { + fprintf(stderr, "Error waiting for filter process: %m\n"); + return -1; + } + + if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) { + fprintf(stderr, "Filter process error: %d\n", WEXITSTATUS(status)); + } + + return status; } -void Tarfile::internal_write_object(const string &path, - const char *data, size_t len) +/* Launch a child process which can act as a filter (compress, encrypt, etc.) + * on the TAR output. The file descriptor to which output should be written + * must be specified; the return value is the file descriptor which will be + * attached to the standard input of the filter program. */ +int FileFilter::spawn_filter(int fd_out, const char *program, pid_t *filter_pid) { - memset(&t->th_buf, 0, sizeof(struct tar_header)); + int fds[2]; + pid_t pid; - th_set_type(t, S_IFREG | 0600); - th_set_user(t, 0); - th_set_group(t, 0); - th_set_mode(t, 0600); - th_set_size(t, len); - th_set_mtime(t, time(NULL)); - th_set_path(t, const_cast(path.c_str())); - th_finish(t); + /* Create a pipe for communicating with the filter process. */ + if (pipe(fds) < 0) { + fatal("Unable to create pipe for filter"); + } + + /* Create a child process which can exec() the filter program. */ + pid = fork(); + if (pid < 0) + fatal("Unable to fork filter process"); + + if (pid > 0) { + /* Parent process */ + close(fds[0]); + cloexec(fds[1]); + if (filter_pid != NULL) + *filter_pid = pid; + } else { + /* Child process. Rearrange file descriptors. stdin is fds[0], stdout + * is fd_out, stderr is unchanged. */ + close(fds[1]); - if (th_write(t) != 0) - throw IOException("Error writing tar header"); + if (dup2(fds[0], 0) < 0) + exit(1); + close(fds[0]); + + if (dup2(fd_out, 1) < 0) + exit(1); + close(fd_out); + + /* Exec the filter program. */ + execlp("/bin/sh", "/bin/sh", "-c", program, NULL); + + /* Should not reach here except for error cases. */ + fprintf(stderr, "Could not exec filter: %m\n"); + exit(1); + } - size += T_BLOCKSIZE; + return fds[1]; +} + +void Tarfile::tar_write(const char *data, size_t len) +{ + size += len; + + while (len > 0) { + int res = write(filter->get_wrapped_fd(), data, len); + + if (res < 0) { + if (errno == EINTR) + continue; + fprintf(stderr, "Write error: %m\n"); + fatal("Write error"); + } + + len -= res; + data += res; + } +} + +void Tarfile::write_object(int id, const char *data, size_t len) +{ + struct tar_header header; + memset(&header, 0, sizeof(header)); + + char buf[64]; + sprintf(buf, "%08x", id); + string path = segment_name + "/" + buf; + + assert(path.size() < 100); + memcpy(header.name, path.data(), path.size()); + sprintf(header.mode, "%07o", 0600); + sprintf(header.uid, "%07o", 0); + sprintf(header.gid, "%07o", 0); + sprintf(header.size, "%011o", (int)len); + sprintf(header.mtime, "%011o", (int)time(NULL)); + header.typeflag = '0'; + strcpy(header.magic, "ustar "); + strcpy(header.uname, "root"); + strcpy(header.gname, "root"); + + memset(header.chksum, ' ', sizeof(header.chksum)); + int checksum = 0; + for (int i = 0; i < TAR_BLOCK_SIZE; i++) { + checksum += ((uint8_t *)&header)[i]; + } + sprintf(header.chksum, "%06o", checksum); + + tar_write((const char *)&header, TAR_BLOCK_SIZE); if (len == 0) return; - size_t blocks = (len + T_BLOCKSIZE - 1) / T_BLOCKSIZE; - size_t padding = blocks * T_BLOCKSIZE - len; + tar_write(data, len); - for (size_t i = 0; i < blocks - 1; i++) { - if (tar_block_write(t, &data[i * T_BLOCKSIZE]) == -1) - throw IOException("Error writing tar block"); - } + char padbuf[TAR_BLOCK_SIZE]; + size_t blocks = (len + TAR_BLOCK_SIZE - 1) / TAR_BLOCK_SIZE; + size_t padding = blocks * TAR_BLOCK_SIZE - len; + memset(padbuf, 0, padding); + tar_write(padbuf, padding); +} + +/* Estimate the size based on the size of the actual output file on disk. + * However, it might be the case that the filter program is buffering all its + * data, and might potentially not write a single byte until we have closed + * our end of the pipe. If we don't do so until we see data written, we have + * a problem. So, arbitrarily pick an upper bound on the compression ratio + * that the filter will achieve (128:1), and return a size estimate which is + * the larger of a) bytes actually seen written to disk, and b) input + * bytes/128. */ +size_t Tarfile::size_estimate() +{ + struct stat statbuf; - char block[T_BLOCKSIZE]; - memset(block, 0, sizeof(block)); - memcpy(block, &data[T_BLOCKSIZE * (blocks - 1)], T_BLOCKSIZE - padding); - if (tar_block_write(t, block) == -1) - throw IOException("Error writing final tar block"); + if (fstat(filter->get_raw_fd(), &statbuf) == 0) + return max((int64_t)statbuf.st_size, (int64_t)(size / 128)); - size += blocks * T_BLOCKSIZE; + /* Couldn't stat the file on disk, so just return the actual number of + * bytes, before compression. */ + return size; } static const size_t SEGMENT_SIZE = 4 * 1024 * 1024; -string TarSegmentStore::write_object(const char *data, size_t len, const - std::string &group, - const std::list &refs) +/* Backup size summary: segment type -> (uncompressed size, compressed size) */ +static map > group_sizes; + +ObjectReference TarSegmentStore::write_object(const char *data, size_t len, + const std::string &group, + const std::string &checksum, + double age) { struct segment_info *segment; @@ -116,16 +269,16 @@ string TarSegmentStore::write_object(const char *data, size_t len, const if (segments.find(group) == segments.end()) { segment = new segment_info; - uint8_t uuid[16]; - char uuid_buf[40]; - uuid_generate(uuid); - uuid_unparse_lower(uuid, uuid_buf); - segment->name = uuid_buf; - - string filename = path + "/" + segment->name + ".tar"; - segment->file = new Tarfile(filename, segment->name); - + segment->name = generate_uuid(); + segment->group = group; + segment->basename = segment->name + ".tar"; + segment->basename += filter_extension; segment->count = 0; + segment->data_size = 0; + segment->rf = remote->alloc_file(segment->basename, + group == "metadata" ? "segments0" + : "segments1"); + segment->file = new Tarfile(segment->rf, segment->name); segments[group] = segment; } else { @@ -138,21 +291,23 @@ string TarSegmentStore::write_object(const char *data, size_t len, const segment->file->write_object(id, data, len); segment->count++; + segment->data_size += len; - string full_name = segment->name + "/" + id_buf; + group_sizes[group].first += len; - // Store any dependencies this object has on other segments, so they can be - // written when the segment is closed. - for (list::const_iterator i = refs.begin(); i != refs.end(); ++i) { - segment->refs.insert(*i); - } + ObjectReference ref(segment->name, id_buf); + ref.set_range(0, len, true); + if (checksum.size() > 0) + ref.set_checksum(checksum); + if (db != NULL) + db->StoreObject(ref, age); // If this segment meets or exceeds the size target, close it so that // future objects will go into a new segment. if (segment->file->size_estimate() >= SEGMENT_SIZE) close_segment(group); - return full_name; + return ref; } void TarSegmentStore::sync() @@ -161,21 +316,39 @@ void TarSegmentStore::sync() close_segment(segments.begin()->first); } +void TarSegmentStore::dump_stats() +{ + printf("Data written:\n"); + for (map >::iterator i = group_sizes.begin(); + i != group_sizes.end(); ++i) { + printf(" %s: %lld (%lld compressed)\n", i->first.c_str(), + (long long)i->second.first, (long long)i->second.second); + } +} + void TarSegmentStore::close_segment(const string &group) { struct segment_info *segment = segments[group]; - fprintf(stderr, "Closing segment group %s (%s)\n", - group.c_str(), segment->name.c_str()); - string reflist; - for (set::iterator i = segment->refs.begin(); - i != segment->refs.end(); ++i) { - reflist += *i + "\n"; + delete segment->file; + + if (db != NULL) { + struct stat stat_buf; + int disk_size = 0; + if (stat(segment->rf->get_local_path().c_str(), &stat_buf) == 0) { + disk_size = stat_buf.st_size; + group_sizes[segment->group].second += disk_size; + } + + string checksum + = Hash::hash_file(segment->rf->get_local_path().c_str()); + + db->SetSegmentMetadata(segment->name, segment->rf->get_remote_path(), + checksum, group, segment->data_size, disk_size); } - segment->file->internal_write_object(segment->name + "/references", - reflist.data(), reflist.size()); - delete segment->file; + segment->rf->send(); + segments.erase(segments.find(group)); delete segment; } @@ -186,7 +359,7 @@ string TarSegmentStore::object_reference_to_segment(const string &object) } LbsObject::LbsObject() - : group(""), data(NULL), data_len(0), written(false) + : group(""), age(0.0), data(NULL), data_len(0), written(false) { } @@ -194,9 +367,19 @@ LbsObject::~LbsObject() { } -void LbsObject::add_reference(const LbsObject *o) +void LbsObject::set_data(const char *d, size_t len, const char *checksum) { - refs.insert(o->get_name()); + data = d; + data_len = len; + + if (checksum != NULL) { + this->checksum = checksum; + } else { + Hash *hash = Hash::New(); + hash->update(data, data_len); + this->checksum = hash->digest_str(); + delete hash; + } } void LbsObject::write(TarSegmentStore *store) @@ -204,13 +387,6 @@ void LbsObject::write(TarSegmentStore *store) assert(data != NULL); assert(!written); - list reflist; - for (set::iterator i = refs.begin(); i != refs.end(); ++i) { - reflist.push_back(*i); - } - - name = store->write_object(data, data_len, group, reflist); - + ref = store->write_object(data, data_len, group, checksum, age); written = true; - data = NULL; }