X-Git-Url: http://git.vrable.net/?a=blobdiff_plain;f=store.cc;h=0a833ab322cf32764fbef1171ab6037a48e55b92;hb=15f7cef45e82e54a039e5486f08134c4f9a05471;hp=168679819d5b9982e4a3aaeefc6fa34792d230d7;hpb=4a6945983fa171fa843c6f8955ba601f733f3ca5;p=cumulus.git diff --git a/store.cc b/store.cc index 1686798..0a833ab 100644 --- a/store.cc +++ b/store.cc @@ -77,6 +77,11 @@ void OutputStream::write_varint(uint64_t val) } while (val); } +void OutputStream::write_uuid(const struct uuid &u) +{ + write(u.bytes, 16); +} + /* Write an arbitrary string by first writing out the length, followed by the * data itself. */ void OutputStream::write_string(const string &s) @@ -144,6 +149,23 @@ void WrapperOutputStream::write_internal(const void *data, size_t len) real.write(data, len); } +/* Provide checksumming of a data stream. */ +ChecksumOutputStream::ChecksumOutputStream(OutputStream &o) + : real(o) +{ +} + +void ChecksumOutputStream::write_internal(const void *data, size_t len) +{ + real.write(data, len); + csum.process(data, len); +} + +const uint8_t *ChecksumOutputStream::finish_and_checksum() +{ + return csum.checksum(); +} + /* Utility functions, for encoding data types to strings. */ string encode_u16(uint16_t val) { @@ -166,15 +188,27 @@ string encode_u64(uint64_t val) return s.contents(); } +string encode_objref(const struct uuid &segment, uint32_t object) +{ + StringOutputStream s; + s.write_uuid(segment); + s.write_u32(object); + return s.contents(); +} + SegmentWriter::SegmentWriter(OutputStream *output, struct uuid u) - : out(output), + : raw_out(output), id(u), object_stream(NULL) { + /* All output data will be checksummed except the very last few bytes, + * which are the checksum itself. */ + out = new ChecksumOutputStream(*raw_out); + /* Write out the segment header first. */ static const char signature[] = "LBSSEG0\n"; out->write(signature, strlen(signature)); - out->write(id.bytes, sizeof(struct uuid)); + out->write_uuid(id); } SegmentWriter::~SegmentWriter() @@ -189,8 +223,9 @@ SegmentWriter::~SegmentWriter() for (object_table::const_iterator i = objects.begin(); i != objects.end(); ++i) { - out->write_s64(i->first); - out->write_s64(i->second); + out->write_s64(i->offset); + out->write_s64(i->size); + out->write(i->type, sizeof(i->type)); } static const char signature2[] = "LBSEND"; @@ -198,19 +233,32 @@ SegmentWriter::~SegmentWriter() out->write_s64(index_offset); out->write_u32(objects.size()); + /* Finally, append a checksum to the end of the file, so that its integrity + * (against accidental, not malicious, corruption) can be verified. */ + const uint8_t *csum = out->finish_and_checksum(); + raw_out->write(csum, out->checksum_size()); + /* The SegmentWriter takes ownership of the OutputStream it is writing to, * and destroys it automatically when done with the segment. */ delete out; + delete raw_out; } -OutputStream *SegmentWriter::new_object() +OutputStream *SegmentWriter::new_object(int *id, const char *type) { if (object_stream) finish_object(); - object_start_offset = out->get_pos(); - object_stream = new WrapperOutputStream(*out); + if (id != NULL) + *id = objects.size(); + struct index_info info; + info.offset = out->get_pos(); + info.size = -1; // Will be filled in when object is finished + strncpy(info.type, type, sizeof(info.type)); + objects.push_back(info); + + object_stream = new WrapperOutputStream(*out); return object_stream; } @@ -218,9 +266,8 @@ void SegmentWriter::finish_object() { assert(object_stream != NULL); - // store (start, length) information for locating this object - objects.push_back(std::make_pair(object_start_offset, - object_stream->get_pos())); + // Fill in object size, which could not be stored at start + objects.back().size = object_stream->get_pos(); delete object_stream; object_stream = NULL; @@ -261,3 +308,35 @@ SegmentWriter *SegmentStore::new_segment() return new SegmentWriter(new FileOutputStream(f), id); } + +SegmentPartitioner::SegmentPartitioner(SegmentStore *s) + : store(s), + segment(NULL), + object(NULL) +{ + // Default target size is around 1 MB + target_size = 1024 * 1024; +} + +SegmentPartitioner::~SegmentPartitioner() +{ + if (segment) + delete segment; +} + +OutputStream *SegmentPartitioner::new_object(struct uuid *uuid, int *id, + const char *type) +{ + if (segment != NULL && segment->get_size() > target_size) { + delete segment; + segment = NULL; + } + + if (segment == NULL) + segment = store->new_segment(); + + if (uuid != NULL) + *uuid = segment->get_uuid(); + + return segment->new_object(id, type); +}