Support for spreading objects across segments.
[cumulus.git] / store.cc
index 1686798..06e9453 100644 (file)
--- a/store.cc
+++ b/store.cc
@@ -144,6 +144,23 @@ void WrapperOutputStream::write_internal(const void *data, size_t len)
     real.write(data, len);
 }
 
+/* Provide checksumming of a data stream. */
+ChecksumOutputStream::ChecksumOutputStream(OutputStream &o)
+    : real(o)
+{
+}
+
+void ChecksumOutputStream::write_internal(const void *data, size_t len)
+{
+    real.write(data, len);
+    csum.process(data, len);
+}
+
+const uint8_t *ChecksumOutputStream::finish_and_checksum()
+{
+    return csum.checksum();
+}
+
 /* Utility functions, for encoding data types to strings. */
 string encode_u16(uint16_t val)
 {
@@ -167,10 +184,14 @@ string encode_u64(uint64_t val)
 }
 
 SegmentWriter::SegmentWriter(OutputStream *output, struct uuid u)
-    : out(output),
+    : raw_out(output),
       id(u),
       object_stream(NULL)
 {
+    /* All output data will be checksummed except the very last few bytes,
+     * which are the checksum itself. */
+    out = new ChecksumOutputStream(*raw_out);
+
     /* Write out the segment header first. */
     static const char signature[] = "LBSSEG0\n";
     out->write(signature, strlen(signature));
@@ -198,9 +219,15 @@ SegmentWriter::~SegmentWriter()
     out->write_s64(index_offset);
     out->write_u32(objects.size());
 
+    /* Finally, append a checksum to the end of the file, so that its integrity
+     * (against accidental, not malicious, corruption) can be verified. */
+    const uint8_t *csum = out->finish_and_checksum();
+    raw_out->write(csum, out->checksum_size());
+
     /* The SegmentWriter takes ownership of the OutputStream it is writing to,
      * and destroys it automatically when done with the segment. */
     delete out;
+    delete raw_out;
 }
 
 OutputStream *SegmentWriter::new_object()
@@ -261,3 +288,31 @@ SegmentWriter *SegmentStore::new_segment()
 
     return new SegmentWriter(new FileOutputStream(f), id);
 }
+
+SegmentPartitioner::SegmentPartitioner(SegmentStore *s)
+    : store(s),
+      segment(NULL),
+      object(NULL)
+{
+    // Default target size is around 1 MB
+    target_size = 1024 * 1024;
+}
+
+SegmentPartitioner::~SegmentPartitioner()
+{
+    if (segment)
+        delete segment;
+}
+
+OutputStream *SegmentPartitioner::new_object()
+{
+    if (segment != NULL && segment->get_size() > target_size) {
+        delete segment;
+        segment = NULL;
+    }
+
+    if (segment == NULL)
+        segment = store->new_segment();
+
+    return segment->new_object();
+}