Add a new object-oriented wrapper for building object references.
[cumulus.git] / store.cc
index 0a833ab..0533630 100644 (file)
--- a/store.cc
+++ b/store.cc
 /* LBS: An LFS-inspired filesystem backup system
- * Copyright (C) 2006  Michael Vrable
+ * Copyright (C) 2007  Michael Vrable
  *
  * Backup data is stored in a collection of objects, which are grouped together
- * into segments for storage purposes.  This file provides interfaces for
- * reading and writing objects and segments. */
+ * into segments for storage purposes.  This implementation of the object store
+ * is built on top of libtar, and represents segments as TAR files and objects
+ * as files within them. */
 
 #include <assert.h>
-#include <uuid/uuid.h>
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <time.h>
+
+#include <list>
+#include <set>
+#include <string>
+#include <iostream>
 
 #include "store.h"
+#include "ref.h"
 
+using std::list;
+using std::set;
 using std::string;
 
-OutputStream::OutputStream()
-    : bytes_written(0)
-{
-}
-
-void OutputStream::write(const void *data, size_t len)
-{
-    write_internal(data, len);
-    bytes_written += len;
-}
+list<string> TarSegmentStore::norefs;
 
-void OutputStream::write_u8(uint8_t val)
+Tarfile::Tarfile(const string &path, const string &segment)
+    : size(0),
+      segment_name(segment)
 {
-    write(&val, 1);
+    if (tar_open(&t, (char *)path.c_str(), NULL, O_WRONLY | O_CREAT, 0600,
+                 TAR_VERBOSE | TAR_GNU) == -1)
+        throw IOException("Error opening Tarfile");
 }
 
-void OutputStream::write_u16(uint16_t val)
+Tarfile::~Tarfile()
 {
-    unsigned char buf[2];
+    string checksum_list = checksums.str();
+    internal_write_object(segment_name + "/checksums",
+                          checksum_list.data(), checksum_list.size());
+    tar_append_eof(t);
 
-    buf[0] = val & 0xff;
-    buf[1] = (val >> 8) & 0xff;
-    write(buf, 2);
+    if (tar_close(t) != 0)
+        throw IOException("Error closing Tarfile");
 }
 
-void OutputStream::write_u32(uint32_t val)
+void Tarfile::write_object(int id, const char *data, size_t len)
 {
-    unsigned char buf[4];
+    char buf[64];
+    sprintf(buf, "%08x", id);
+    string path = segment_name + "/" + buf;
 
-    buf[0] = val & 0xff;
-    buf[1] = (val >> 8) & 0xff;
-    buf[2] = (val >> 16) & 0xff;
-    buf[3] = (val >> 24) & 0xff;
-    write(buf, 4);
-}
+    internal_write_object(path, data, len);
 
-void OutputStream::write_u64(uint64_t val)
-{
-    unsigned char buf[8];
-
-    buf[0] = val & 0xff;
-    buf[1] = (val >> 8) & 0xff;
-    buf[2] = (val >> 16) & 0xff;
-    buf[3] = (val >> 24) & 0xff;
-    buf[4] = (val >> 32) & 0xff;
-    buf[5] = (val >> 40) & 0xff;
-    buf[6] = (val >> 48) & 0xff;
-    buf[7] = (val >> 56) & 0xff;
-    write(buf, 8);
+    // Compute a checksum for the data block, which will be stored at the end
+    // of the TAR file.
+    SHA1Checksum hash;
+    hash.process(data, len);
+    sprintf(buf, "%08x", id);
+    checksums << buf << " " << hash.checksum_str() << "\n";
 }
 
-/* Writes an integer to an output stream using a variable-sized representation:
- * seven bits are written at a time (little-endian), and the eigth bit of each
- * byte is set if more data follows. */
-void OutputStream::write_varint(uint64_t val)
+void Tarfile::internal_write_object(const string &path,
+                                    const char *data, size_t len)
 {
-    do {
-        uint8_t remainder = (val & 0x7f);
-        val >>= 7;
-        if (val)
-            remainder |= 0x80;
-        write_u8(remainder);
-    } while (val);
-}
+    memset(&t->th_buf, 0, sizeof(struct tar_header));
 
-void OutputStream::write_uuid(const struct uuid &u)
-{
-    write(u.bytes, 16);
-}
-
-/* Write an arbitrary string by first writing out the length, followed by the
- * data itself. */
-void OutputStream::write_string(const string &s)
-{
-    size_t len = s.length();
-    write_varint(len);
-    write(s.data(), len);
-}
+    th_set_type(t, S_IFREG | 0600);
+    th_set_user(t, 0);
+    th_set_group(t, 0);
+    th_set_mode(t, 0600);
+    th_set_size(t, len);
+    th_set_mtime(t, time(NULL));
+    th_set_path(t, const_cast<char *>(path.c_str()));
+    th_finish(t);
 
-void OutputStream::write_dictionary(const dictionary &d)
-{
-    size_t size = d.size();
-    size_t written = 0;
+    if (th_write(t) != 0)
+        throw IOException("Error writing tar header");
 
-    write_varint(size);
+    size += T_BLOCKSIZE;
 
-    for (dictionary::const_iterator i = d.begin(); i != d.end(); ++i) {
-        write_string(i->first);
-        write_string(i->second);
-        written++;
-    }
+    if (len == 0)
+        return;
 
-    assert(written == size);
-}
+    size_t blocks = (len + T_BLOCKSIZE - 1) / T_BLOCKSIZE;
+    size_t padding = blocks * T_BLOCKSIZE - len;
 
-StringOutputStream::StringOutputStream()
-    : buf(std::ios_base::out)
-{
-}
-
-void StringOutputStream::write_internal(const void *data, size_t len)
-{
-    buf.write((const char *)data, len);
-    if (!buf.good())
-        throw IOException("error writing to StringOutputStream");
-}
-
-FileOutputStream::FileOutputStream(FILE *file)
-{
-    f = file;
-}
-
-FileOutputStream::~FileOutputStream()
-{
-    fclose(f);
-}
-
-void FileOutputStream::write_internal(const void *data, size_t len)
-{
-    size_t res;
-
-    res = fwrite(data, 1, len, f);
-    if (res != len) {
-        throw IOException("write error");
+    for (size_t i = 0; i < blocks - 1; i++) {
+        if (tar_block_write(t, &data[i * T_BLOCKSIZE]) == -1)
+            throw IOException("Error writing tar block");
     }
-}
-
-WrapperOutputStream::WrapperOutputStream(OutputStream &o)
-    : real(o)
-{
-}
 
-void WrapperOutputStream::write_internal(const void *data, size_t len)
-{
-    real.write(data, len);
-}
+    char block[T_BLOCKSIZE];
+    memset(block, 0, sizeof(block));
+    memcpy(block, &data[T_BLOCKSIZE * (blocks - 1)], T_BLOCKSIZE - padding);
+    if (tar_block_write(t, block) == -1)
+        throw IOException("Error writing final tar block");
 
-/* Provide checksumming of a data stream. */
-ChecksumOutputStream::ChecksumOutputStream(OutputStream &o)
-    : real(o)
-{
+    size += blocks * T_BLOCKSIZE;
 }
 
-void ChecksumOutputStream::write_internal(const void *data, size_t len)
-{
-    real.write(data, len);
-    csum.process(data, len);
-}
+static const size_t SEGMENT_SIZE = 4 * 1024 * 1024;
 
-const uint8_t *ChecksumOutputStream::finish_and_checksum()
+string TarSegmentStore::write_object(const char *data, size_t len, const
+                                     std::string &group,
+                                     const std::list<std::string> &refs)
 {
-    return csum.checksum();
-}
+    struct segment_info *segment;
 
-/* Utility functions, for encoding data types to strings. */
-string encode_u16(uint16_t val)
-{
-    StringOutputStream s;
-    s.write_u16(val);
-    return s.contents();
-}
+    // Find the segment into which the object should be written, looking up by
+    // group.  If no segment exists yet, create one.
+    if (segments.find(group) == segments.end()) {
+        segment = new segment_info;
 
-string encode_u32(uint32_t val)
-{
-    StringOutputStream s;
-    s.write_u32(val);
-    return s.contents();
-}
+        segment->name = generate_uuid();
 
-string encode_u64(uint64_t val)
-{
-    StringOutputStream s;
-    s.write_u64(val);
-    return s.contents();
-}
+        string filename = path + "/" + segment->name + ".tar";
+        segment->file = new Tarfile(filename, segment->name);
 
-string encode_objref(const struct uuid &segment, uint32_t object)
-{
-    StringOutputStream s;
-    s.write_uuid(segment);
-    s.write_u32(object);
-    return s.contents();
-}
+        segment->count = 0;
 
-SegmentWriter::SegmentWriter(OutputStream *output, struct uuid u)
-    : raw_out(output),
-      id(u),
-      object_stream(NULL)
-{
-    /* All output data will be checksummed except the very last few bytes,
-     * which are the checksum itself. */
-    out = new ChecksumOutputStream(*raw_out);
-
-    /* Write out the segment header first. */
-    static const char signature[] = "LBSSEG0\n";
-    out->write(signature, strlen(signature));
-    out->write_uuid(id);
-}
-
-SegmentWriter::~SegmentWriter()
-{
-    if (object_stream)
-        finish_object();
-
-    // Write out the object table which gives the sizes and locations of all
-    // objects, and then add the trailing signature, which indicates the end of
-    // the segment and gives the offset of the object table.
-    int64_t index_offset = out->get_pos();
-
-    for (object_table::const_iterator i = objects.begin();
-         i != objects.end(); ++i) {
-        out->write_s64(i->offset);
-        out->write_s64(i->size);
-        out->write(i->type, sizeof(i->type));
+        segments[group] = segment;
+    } else {
+        segment = segments[group];
     }
 
-    static const char signature2[] = "LBSEND";
-    out->write(signature2, strlen(signature2));
-    out->write_s64(index_offset);
-    out->write_u32(objects.size());
+    int id = segment->count;
+    char id_buf[64];
+    sprintf(id_buf, "%08x", id);
 
-    /* Finally, append a checksum to the end of the file, so that its integrity
-     * (against accidental, not malicious, corruption) can be verified. */
-    const uint8_t *csum = out->finish_and_checksum();
-    raw_out->write(csum, out->checksum_size());
-
-    /* The SegmentWriter takes ownership of the OutputStream it is writing to,
-     * and destroys it automatically when done with the segment. */
-    delete out;
-    delete raw_out;
-}
+    segment->file->write_object(id, data, len);
+    segment->count++;
 
-OutputStream *SegmentWriter::new_object(int *id, const char *type)
-{
-    if (object_stream)
-        finish_object();
+    string full_name = segment->name + "/" + id_buf;
 
-    if (id != NULL)
-        *id = objects.size();
+    // Store any dependencies this object has on other segments, so they can be
+    // written when the segment is closed.
+    for (list<string>::const_iterator i = refs.begin(); i != refs.end(); ++i) {
+        segment->refs.insert(*i);
+    }
 
-    struct index_info info;
-    info.offset = out->get_pos();
-    info.size = -1;             // Will be filled in when object is finished
-    strncpy(info.type, type, sizeof(info.type));
-    objects.push_back(info);
+    // If this segment meets or exceeds the size target, close it so that
+    // future objects will go into a new segment.
+    if (segment->file->size_estimate() >= SEGMENT_SIZE)
+        close_segment(group);
 
-    object_stream = new WrapperOutputStream(*out);
-    return object_stream;
+    return full_name;
 }
 
-void SegmentWriter::finish_object()
+void TarSegmentStore::sync()
 {
-    assert(object_stream != NULL);
-
-    // Fill in object size, which could not be stored at start
-    objects.back().size = object_stream->get_pos();
-
-    delete object_stream;
-    object_stream = NULL;
+    while (!segments.empty())
+        close_segment(segments.begin()->first);
 }
 
-struct uuid SegmentWriter::generate_uuid()
+void TarSegmentStore::close_segment(const string &group)
 {
-    struct uuid u;
+    struct segment_info *segment = segments[group];
+    fprintf(stderr, "Closing segment group %s (%s)\n",
+            group.c_str(), segment->name.c_str());
 
-    uuid_generate(u.bytes);
+    string reflist;
+    for (set<string>::iterator i = segment->refs.begin();
+         i != segment->refs.end(); ++i) {
+        reflist += *i + "\n";
+    }
+    segment->file->internal_write_object(segment->name + "/references",
+                                         reflist.data(), reflist.size());
 
-    return u;
+    delete segment->file;
+    segments.erase(segments.find(group));
+    delete segment;
 }
 
-string SegmentWriter::format_uuid(const struct uuid u)
+string TarSegmentStore::object_reference_to_segment(const string &object)
 {
-    // A UUID only takes 36 bytes, plus the trailing '\0', so this is safe.
-    char buf[40];
-
-    uuid_unparse_lower(u.bytes, buf);
-
-    return string(buf);
+    return object;
 }
 
-SegmentStore::SegmentStore(const string &path)
-    : directory(path)
+LbsObject::LbsObject()
+    : group(""), data(NULL), data_len(0), written(false)
 {
 }
 
-SegmentWriter *SegmentStore::new_segment()
+LbsObject::~LbsObject()
 {
-    struct uuid id = SegmentWriter::generate_uuid();
-    string filename = directory + "/" + SegmentWriter::format_uuid(id);
-
-    FILE *f = fopen(filename.c_str(), "wb");
-    if (f == NULL)
-        throw IOException("Unable to open new segment");
-
-    return new SegmentWriter(new FileOutputStream(f), id);
 }
 
-SegmentPartitioner::SegmentPartitioner(SegmentStore *s)
-    : store(s),
-      segment(NULL),
-      object(NULL)
+void LbsObject::add_reference(const LbsObject *o)
 {
-    // Default target size is around 1 MB
-    target_size = 1024 * 1024;
+    refs.insert(o->get_name());
 }
 
-SegmentPartitioner::~SegmentPartitioner()
+void LbsObject::write(TarSegmentStore *store)
 {
-    if (segment)
-        delete segment;
-}
+    assert(data != NULL);
+    assert(!written);
 
-OutputStream *SegmentPartitioner::new_object(struct uuid *uuid, int *id,
-                                             const char *type)
-{
-    if (segment != NULL && segment->get_size() > target_size) {
-        delete segment;
-        segment = NULL;
+    list<string> reflist;
+    for (set<string>::iterator i = refs.begin(); i != refs.end(); ++i) {
+        reflist.push_back(*i);
     }
 
-    if (segment == NULL)
-        segment = store->new_segment();
-
-    if (uuid != NULL)
-        *uuid = segment->get_uuid();
+    name = store->write_object(data, data_len, group, reflist);
 
-    return segment->new_object(id, type);
+    written = true;
+    data = NULL;
 }