* reading and writing objects and segments. */
#include <assert.h>
+#include <uuid/uuid.h>
#include "store.h"
using std::string;
+OutputStream::OutputStream()
+ : bytes_written(0)
+{
+}
+
+void OutputStream::write(const void *data, size_t len)
+{
+ write_internal(data, len);
+ bytes_written += len;
+}
+
void OutputStream::write_u8(uint8_t val)
{
write(&val, 1);
} while (val);
}
+void OutputStream::write_uuid(const struct uuid &u)
+{
+ write(u.bytes, 16);
+}
+
/* Write an arbitrary string by first writing out the length, followed by the
* data itself. */
void OutputStream::write_string(const string &s)
{
}
-void StringOutputStream::write(const void *data, size_t len)
+void StringOutputStream::write_internal(const void *data, size_t len)
{
buf.write((const char *)data, len);
if (!buf.good())
fclose(f);
}
-void FileOutputStream::write(const void *data, size_t len)
+void FileOutputStream::write_internal(const void *data, size_t len)
{
size_t res;
}
}
+WrapperOutputStream::WrapperOutputStream(OutputStream &o)
+ : real(o)
+{
+}
+
+void WrapperOutputStream::write_internal(const void *data, size_t len)
+{
+ real.write(data, len);
+}
+
+/* Provide checksumming of a data stream. */
+ChecksumOutputStream::ChecksumOutputStream(OutputStream &o)
+ : real(o)
+{
+}
+
+void ChecksumOutputStream::write_internal(const void *data, size_t len)
+{
+ real.write(data, len);
+ csum.process(data, len);
+}
+
+const uint8_t *ChecksumOutputStream::finish_and_checksum()
+{
+ return csum.checksum();
+}
+
/* Utility functions, for encoding data types to strings. */
string encode_u16(uint16_t val)
{
s.write_u64(val);
return s.contents();
}
+
+string encode_objref(const struct uuid &segment, uint32_t object)
+{
+ StringOutputStream s;
+ s.write_uuid(segment);
+ s.write_u32(object);
+ return s.contents();
+}
+
+SegmentWriter::SegmentWriter(OutputStream *output, struct uuid u)
+ : raw_out(output),
+ id(u),
+ object_stream(NULL)
+{
+ /* All output data will be checksummed except the very last few bytes,
+ * which are the checksum itself. */
+ out = new ChecksumOutputStream(*raw_out);
+
+ /* Write out the segment header first. */
+ static const char signature[] = "LBSSEG0\n";
+ out->write(signature, strlen(signature));
+ out->write_uuid(id);
+}
+
+SegmentWriter::~SegmentWriter()
+{
+ if (object_stream)
+ finish_object();
+
+ // Write out the object table which gives the sizes and locations of all
+ // objects, and then add the trailing signature, which indicates the end of
+ // the segment and gives the offset of the object table.
+ int64_t index_offset = out->get_pos();
+
+ for (object_table::const_iterator i = objects.begin();
+ i != objects.end(); ++i) {
+ out->write_s64(i->offset);
+ out->write_s64(i->size);
+ out->write(i->type, sizeof(i->type));
+ }
+
+ static const char signature2[] = "LBSEND";
+ out->write(signature2, strlen(signature2));
+ out->write_s64(index_offset);
+ out->write_u32(objects.size());
+
+ /* Finally, append a checksum to the end of the file, so that its integrity
+ * (against accidental, not malicious, corruption) can be verified. */
+ const uint8_t *csum = out->finish_and_checksum();
+ raw_out->write(csum, out->checksum_size());
+
+ /* The SegmentWriter takes ownership of the OutputStream it is writing to,
+ * and destroys it automatically when done with the segment. */
+ delete out;
+ delete raw_out;
+}
+
+OutputStream *SegmentWriter::new_object(int *id, const char *type)
+{
+ if (object_stream)
+ finish_object();
+
+ if (id != NULL)
+ *id = objects.size();
+
+ struct index_info info;
+ info.offset = out->get_pos();
+ info.size = -1; // Will be filled in when object is finished
+ strncpy(info.type, type, sizeof(info.type));
+ objects.push_back(info);
+
+ object_stream = new WrapperOutputStream(*out);
+ return object_stream;
+}
+
+void SegmentWriter::finish_object()
+{
+ assert(object_stream != NULL);
+
+ // Fill in object size, which could not be stored at start
+ objects.back().size = object_stream->get_pos();
+
+ delete object_stream;
+ object_stream = NULL;
+}
+
+struct uuid SegmentWriter::generate_uuid()
+{
+ struct uuid u;
+
+ uuid_generate(u.bytes);
+
+ return u;
+}
+
+string SegmentWriter::format_uuid(const struct uuid u)
+{
+ // A UUID only takes 36 bytes, plus the trailing '\0', so this is safe.
+ char buf[40];
+
+ uuid_unparse_lower(u.bytes, buf);
+
+ return string(buf);
+}
+
+SegmentStore::SegmentStore(const string &path)
+ : directory(path)
+{
+}
+
+SegmentWriter *SegmentStore::new_segment()
+{
+ struct uuid id = SegmentWriter::generate_uuid();
+ string filename = directory + "/" + SegmentWriter::format_uuid(id);
+
+ FILE *f = fopen(filename.c_str(), "wb");
+ if (f == NULL)
+ throw IOException("Unable to open new segment");
+
+ return new SegmentWriter(new FileOutputStream(f), id);
+}
+
+SegmentPartitioner::SegmentPartitioner(SegmentStore *s)
+ : store(s),
+ segment(NULL),
+ object(NULL)
+{
+ // Default target size is around 1 MB
+ target_size = 1024 * 1024;
+}
+
+SegmentPartitioner::~SegmentPartitioner()
+{
+ if (segment)
+ delete segment;
+}
+
+OutputStream *SegmentPartitioner::new_object(struct uuid *uuid, int *id,
+ const char *type)
+{
+ if (segment != NULL && segment->get_size() > target_size) {
+ delete segment;
+ segment = NULL;
+ }
+
+ if (segment == NULL)
+ segment = store->new_segment();
+
+ if (uuid != NULL)
+ *uuid = segment->get_uuid();
+
+ return segment->new_object(id, type);
+}