Replace boost::scoped_ptr with std::unique_ptr.
[cumulus.git] / store.cc
index 1686798..a77d64a 100644 (file)
--- a/store.cc
+++ b/store.cc
-/* LBS: An LFS-inspired filesystem backup system
- * Copyright (C) 2006  Michael Vrable
+/* Cumulus: Efficient Filesystem Backup to the Cloud
+ * Copyright (C) 2008-2009 The Cumulus Developers
+ * See the AUTHORS file for a list of contributors.
  *
- * Backup data is stored in a collection of objects, which are grouped together
- * into segments for storage purposes.  This file provides interfaces for
- * reading and writing objects and segments. */
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
 
-#include <assert.h>
-#include <uuid/uuid.h>
+/* Backup data is stored in a collection of objects, which are grouped together
+ * into segments for storage purposes.  This implementation of the object store
+ * represents segments as TAR files and objects as files within them. */
 
+#include <assert.h>
+#include <errno.h>
+#include <stdio.h>
+#include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/resource.h>
+#include <sys/wait.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <time.h>
+
+#include <algorithm>
+#include <list>
+#include <map>
+#include <set>
+#include <string>
+#include <iostream>
+
+#include "hash.h"
+#include "localdb.h"
 #include "store.h"
-
+#include "ref.h"
+#include "util.h"
+
+using std::max;
+using std::list;
+using std::map;
+using std::pair;
+using std::set;
 using std::string;
 
-OutputStream::OutputStream()
-    : bytes_written(0)
-{
-}
+/* Default filter program is bzip2 */
+const char *filter_program = "bzip2 -c";
+const char *filter_extension = ".bz2";
 
-void OutputStream::write(const void *data, size_t len)
+Tarfile::Tarfile(RemoteFile *file, const string &segment)
+    : size(0),
+      segment_name(segment)
 {
-    write_internal(data, len);
-    bytes_written += len;
-}
+    assert(sizeof(struct tar_header) == TAR_BLOCK_SIZE);
 
-void OutputStream::write_u8(uint8_t val)
-{
-    write(&val, 1);
+    this->file = file;
+    this->filter.reset(FileFilter::New(file->get_fd(), filter_program));
 }
 
-void OutputStream::write_u16(uint16_t val)
+Tarfile::~Tarfile()
 {
-    unsigned char buf[2];
+    char buf[TAR_BLOCK_SIZE];
 
-    buf[0] = val & 0xff;
-    buf[1] = (val >> 8) & 0xff;
-    write(buf, 2);
-}
+    /* Append the EOF marker: two blocks filled with nulls. */
+    memset(buf, 0, sizeof(buf));
+    tar_write(buf, TAR_BLOCK_SIZE);
+    tar_write(buf, TAR_BLOCK_SIZE);
 
-void OutputStream::write_u32(uint32_t val)
-{
-    unsigned char buf[4];
+    if (close(filter->get_wrapped_fd()) != 0)
+        fatal("Error closing Tarfile");
 
-    buf[0] = val & 0xff;
-    buf[1] = (val >> 8) & 0xff;
-    buf[2] = (val >> 16) & 0xff;
-    buf[3] = (val >> 24) & 0xff;
-    write(buf, 4);
+    /* ...and wait for filter process to finish. */
+    if (filter->wait() != 0) {
+        fatal("Filter process error");
+    }
 }
 
-void OutputStream::write_u64(uint64_t val)
-{
-    unsigned char buf[8];
-
-    buf[0] = val & 0xff;
-    buf[1] = (val >> 8) & 0xff;
-    buf[2] = (val >> 16) & 0xff;
-    buf[3] = (val >> 24) & 0xff;
-    buf[4] = (val >> 32) & 0xff;
-    buf[5] = (val >> 40) & 0xff;
-    buf[6] = (val >> 48) & 0xff;
-    buf[7] = (val >> 56) & 0xff;
-    write(buf, 8);
-}
+FileFilter::FileFilter(int raw, int wrapped, pid_t pid)
+    : fd_raw(raw), fd_wrapped(wrapped), pid(pid) { }
 
-/* Writes an integer to an output stream using a variable-sized representation:
- * seven bits are written at a time (little-endian), and the eigth bit of each
- * byte is set if more data follows. */
-void OutputStream::write_varint(uint64_t val)
+FileFilter *FileFilter::New(int fd, const char *program)
 {
-    do {
-        uint8_t remainder = (val & 0x7f);
-        val >>= 7;
-        if (val)
-            remainder |= 0x80;
-        write_u8(remainder);
-    } while (val);
-}
+    if (program == NULL || strlen(program) == 0) {
+        return new FileFilter(fd, fd, -1);
+    }
 
-/* Write an arbitrary string by first writing out the length, followed by the
- * data itself. */
-void OutputStream::write_string(const string &s)
-{
-    size_t len = s.length();
-    write_varint(len);
-    write(s.data(), len);
+    pid_t pid;
+    int wrapped_fd = spawn_filter(fd, program, &pid);
+    return new FileFilter(fd, wrapped_fd, pid);
 }
 
-void OutputStream::write_dictionary(const dictionary &d)
+int FileFilter::wait()
 {
-    size_t size = d.size();
-    size_t written = 0;
-
-    write_varint(size);
+    // No filter program was launched implies no need to wait.
+    if (pid == -1)
+        return 0;
+
+    // The raw file descriptor was held open to track the output file size, but
+    // is not needed any longer.
+    close(fd_raw);
+
+    int status;
+    if (waitpid(pid, &status, 0) < 0) {
+        fprintf(stderr, "Error waiting for filter process: %m\n");
+        return -1;
+    }
 
-    for (dictionary::const_iterator i = d.begin(); i != d.end(); ++i) {
-        write_string(i->first);
-        write_string(i->second);
-        written++;
+    if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
+        fprintf(stderr, "Filter process error: %d\n", WEXITSTATUS(status));
     }
 
-    assert(written == size);
+    return status;
 }
 
-StringOutputStream::StringOutputStream()
-    : buf(std::ios_base::out)
+/* Launch a child process which can act as a filter (compress, encrypt, etc.)
+ * on the TAR output.  The file descriptor to which output should be written
+ * must be specified; the return value is the file descriptor which will be
+ * attached to the standard input of the filter program. */
+int FileFilter::spawn_filter(int fd_out, const char *program, pid_t *filter_pid)
 {
-}
+    int fds[2];
+    pid_t pid;
 
-void StringOutputStream::write_internal(const void *data, size_t len)
-{
-    buf.write((const char *)data, len);
-    if (!buf.good())
-        throw IOException("error writing to StringOutputStream");
-}
+    /* Create a pipe for communicating with the filter process. */
+    if (pipe(fds) < 0) {
+        fatal("Unable to create pipe for filter");
+    }
 
-FileOutputStream::FileOutputStream(FILE *file)
-{
-    f = file;
-}
+    /* Create a child process which can exec() the filter program. */
+    pid = fork();
+    if (pid < 0)
+        fatal("Unable to fork filter process");
+
+    if (pid > 0) {
+        /* Parent process */
+        close(fds[0]);
+        cloexec(fds[1]);
+        if (filter_pid != NULL)
+            *filter_pid = pid;
+    } else {
+        /* Child process.  Rearrange file descriptors.  stdin is fds[0], stdout
+         * is fd_out, stderr is unchanged. */
+        close(fds[1]);
+
+        if (dup2(fds[0], 0) < 0)
+            exit(1);
+        close(fds[0]);
+
+        if (dup2(fd_out, 1) < 0)
+            exit(1);
+        close(fd_out);
+
+        /* Exec the filter program. */
+        execlp("/bin/sh", "/bin/sh", "-c", program, NULL);
+
+        /* Should not reach here except for error cases. */
+        fprintf(stderr, "Could not exec filter: %m\n");
+        exit(1);
+    }
 
-FileOutputStream::~FileOutputStream()
-{
-    fclose(f);
+    return fds[1];
 }
 
-void FileOutputStream::write_internal(const void *data, size_t len)
+void Tarfile::tar_write(const char *data, size_t len)
 {
-    size_t res;
+    size += len;
 
-    res = fwrite(data, 1, len, f);
-    if (res != len) {
-        throw IOException("write error");
+    while (len > 0) {
+        int res = write(filter->get_wrapped_fd(), data, len);
+
+        if (res < 0) {
+            if (errno == EINTR)
+                continue;
+            fprintf(stderr, "Write error: %m\n");
+            fatal("Write error");
+        }
+
+        len -= res;
+        data += res;
     }
 }
 
-WrapperOutputStream::WrapperOutputStream(OutputStream &o)
-    : real(o)
+void Tarfile::write_object(int id, const char *data, size_t len)
 {
-}
+    struct tar_header header;
+    memset(&header, 0, sizeof(header));
+
+    char buf[64];
+    sprintf(buf, "%08x", id);
+    string path = segment_name + "/" + buf;
+
+    assert(path.size() < 100);
+    memcpy(header.name, path.data(), path.size());
+    sprintf(header.mode, "%07o", 0600);
+    sprintf(header.uid, "%07o", 0);
+    sprintf(header.gid, "%07o", 0);
+    sprintf(header.size, "%011o", (int)len);
+    sprintf(header.mtime, "%011o", (int)time(NULL));
+    header.typeflag = '0';
+    strcpy(header.magic, "ustar  ");
+    strcpy(header.uname, "root");
+    strcpy(header.gname, "root");
+
+    memset(header.chksum, ' ', sizeof(header.chksum));
+    int checksum = 0;
+    for (int i = 0; i < TAR_BLOCK_SIZE; i++) {
+        checksum += ((uint8_t *)&header)[i];
+    }
+    sprintf(header.chksum, "%06o", checksum);
 
-void WrapperOutputStream::write_internal(const void *data, size_t len)
-{
-    real.write(data, len);
-}
+    tar_write((const char *)&header, TAR_BLOCK_SIZE);
 
-/* Utility functions, for encoding data types to strings. */
-string encode_u16(uint16_t val)
-{
-    StringOutputStream s;
-    s.write_u16(val);
-    return s.contents();
-}
+    if (len == 0)
+        return;
 
-string encode_u32(uint32_t val)
-{
-    StringOutputStream s;
-    s.write_u32(val);
-    return s.contents();
-}
+    tar_write(data, len);
 
-string encode_u64(uint64_t val)
-{
-    StringOutputStream s;
-    s.write_u64(val);
-    return s.contents();
+    char padbuf[TAR_BLOCK_SIZE];
+    size_t blocks = (len + TAR_BLOCK_SIZE - 1) / TAR_BLOCK_SIZE;
+    size_t padding = blocks * TAR_BLOCK_SIZE - len;
+    memset(padbuf, 0, padding);
+    tar_write(padbuf, padding);
 }
 
-SegmentWriter::SegmentWriter(OutputStream *output, struct uuid u)
-    : out(output),
-      id(u),
-      object_stream(NULL)
+/* Estimate the size based on the size of the actual output file on disk.
+ * However, it might be the case that the filter program is buffering all its
+ * data, and might potentially not write a single byte until we have closed
+ * our end of the pipe.  If we don't do so until we see data written, we have
+ * a problem.  So, arbitrarily pick an upper bound on the compression ratio
+ * that the filter will achieve (128:1), and return a size estimate which is
+ * the larger of a) bytes actually seen written to disk, and b) input
+ * bytes/128. */
+size_t Tarfile::size_estimate()
 {
-    /* Write out the segment header first. */
-    static const char signature[] = "LBSSEG0\n";
-    out->write(signature, strlen(signature));
-    out->write(id.bytes, sizeof(struct uuid));
+    struct stat statbuf;
+
+    if (fstat(filter->get_raw_fd(), &statbuf) == 0)
+        return max((int64_t)statbuf.st_size, (int64_t)(size / 128));
+
+    /* Couldn't stat the file on disk, so just return the actual number of
+     * bytes, before compression. */
+    return size;
 }
 
-SegmentWriter::~SegmentWriter()
+static const size_t SEGMENT_SIZE = 4 * 1024 * 1024;
+
+/* Backup size summary: segment type -> (uncompressed size, compressed size) */
+static map<string, pair<int64_t, int64_t> > group_sizes;
+
+ObjectReference TarSegmentStore::write_object(const char *data, size_t len,
+                                              const std::string &group,
+                                              const std::string &checksum,
+                                              double age)
 {
-    if (object_stream)
-        finish_object();
-
-    // Write out the object table which gives the sizes and locations of all
-    // objects, and then add the trailing signature, which indicates the end of
-    // the segment and gives the offset of the object table.
-    int64_t index_offset = out->get_pos();
-
-    for (object_table::const_iterator i = objects.begin();
-         i != objects.end(); ++i) {
-        out->write_s64(i->first);
-        out->write_s64(i->second);
+    struct segment_info *segment;
+
+    // Find the segment into which the object should be written, looking up by
+    // group.  If no segment exists yet, create one.
+    if (segments.find(group) == segments.end()) {
+        segment = new segment_info;
+
+        segment->name = generate_uuid();
+        segment->group = group;
+        segment->basename = segment->name + ".tar";
+        segment->basename += filter_extension;
+        segment->count = 0;
+        segment->data_size = 0;
+        segment->rf = remote->alloc_file(segment->basename,
+                                         group == "metadata" ? "segments0"
+                                                             : "segments1");
+        segment->file = new Tarfile(segment->rf, segment->name);
+
+        segments[group] = segment;
+    } else {
+        segment = segments[group];
     }
 
-    static const char signature2[] = "LBSEND";
-    out->write(signature2, strlen(signature2));
-    out->write_s64(index_offset);
-    out->write_u32(objects.size());
+    int id = segment->count;
+    char id_buf[64];
+    sprintf(id_buf, "%08x", id);
 
-    /* The SegmentWriter takes ownership of the OutputStream it is writing to,
-     * and destroys it automatically when done with the segment. */
-    delete out;
-}
+    segment->file->write_object(id, data, len);
+    segment->count++;
+    segment->data_size += len;
 
-OutputStream *SegmentWriter::new_object()
-{
-    if (object_stream)
-        finish_object();
+    group_sizes[group].first += len;
+
+    ObjectReference ref(segment->name, id_buf);
+    ref.set_range(0, len, true);
+    if (checksum.size() > 0)
+        ref.set_checksum(checksum);
+    if (db != NULL)
+        db->StoreObject(ref, age);
 
-    object_start_offset = out->get_pos();
-    object_stream = new WrapperOutputStream(*out);
+    // If this segment meets or exceeds the size target, close it so that
+    // future objects will go into a new segment.
+    if (segment->file->size_estimate() >= SEGMENT_SIZE)
+        close_segment(group);
 
-    return object_stream;
+    return ref;
 }
 
-void SegmentWriter::finish_object()
+void TarSegmentStore::sync()
 {
-    assert(object_stream != NULL);
-
-    // store (start, length) information for locating this object
-    objects.push_back(std::make_pair(object_start_offset,
-                                     object_stream->get_pos()));
+    while (!segments.empty())
+        close_segment(segments.begin()->first);
+}
 
-    delete object_stream;
-    object_stream = NULL;
+void TarSegmentStore::dump_stats()
+{
+    printf("Data written:\n");
+    for (map<string, pair<int64_t, int64_t> >::iterator i = group_sizes.begin();
+         i != group_sizes.end(); ++i) {
+        printf("    %s: %lld (%lld compressed)\n", i->first.c_str(),
+               (long long)i->second.first, (long long)i->second.second);
+    }
 }
 
-struct uuid SegmentWriter::generate_uuid()
+void TarSegmentStore::close_segment(const string &group)
 {
-    struct uuid u;
+    struct segment_info *segment = segments[group];
 
-    uuid_generate(u.bytes);
+    delete segment->file;
+
+    if (db != NULL) {
+        struct stat stat_buf;
+        int disk_size = 0;
+        if (stat(segment->rf->get_local_path().c_str(), &stat_buf) == 0) {
+            disk_size = stat_buf.st_size;
+            group_sizes[segment->group].second += disk_size;
+        }
+
+        string checksum
+            = Hash::hash_file(segment->rf->get_local_path().c_str());
+
+        db->SetSegmentMetadata(segment->name, segment->rf->get_remote_path(),
+                               checksum, group, segment->data_size, disk_size);
+    }
 
-    return u;
+    segment->rf->send();
+
+    segments.erase(segments.find(group));
+    delete segment;
 }
 
-string SegmentWriter::format_uuid(const struct uuid u)
+string TarSegmentStore::object_reference_to_segment(const string &object)
 {
-    // A UUID only takes 36 bytes, plus the trailing '\0', so this is safe.
-    char buf[40];
-
-    uuid_unparse_lower(u.bytes, buf);
+    return object;
+}
 
-    return string(buf);
+LbsObject::LbsObject()
+    : group(""), age(0.0), data(NULL), data_len(0), written(false)
+{
 }
 
-SegmentStore::SegmentStore(const string &path)
-    : directory(path)
+LbsObject::~LbsObject()
 {
 }
 
-SegmentWriter *SegmentStore::new_segment()
+void LbsObject::set_data(const char *d, size_t len, const char *checksum)
 {
-    struct uuid id = SegmentWriter::generate_uuid();
-    string filename = directory + "/" + SegmentWriter::format_uuid(id);
+    data = d;
+    data_len = len;
+
+    if (checksum != NULL) {
+        this->checksum = checksum;
+    } else {
+        Hash *hash = Hash::New();
+        hash->update(data, data_len);
+        this->checksum = hash->digest_str();
+        delete hash;
+    }
+}
 
-    FILE *f = fopen(filename.c_str(), "wb");
-    if (f == NULL)
-        throw IOException("Unable to open new segment");
+void LbsObject::write(TarSegmentStore *store)
+{
+    assert(data != NULL);
+    assert(!written);
 
-    return new SegmentWriter(new FileOutputStream(f), id);
+    ref = store->write_object(data, data_len, group, checksum, age);
+    written = true;
 }