X-Git-Url: http://git.vrable.net/?a=blobdiff_plain;f=store.cc;h=52304c2d8f484b3b99ad9fe9a55299170ea5482a;hb=25b6639fb1783e0061affa177e6d6d2131c457f5;hp=2dc6f84e01eb089f6dd3fcea5ba15cd9f0029907;hpb=08721c894385b530ed501498a02d824b8eba7228;p=cumulus.git diff --git a/store.cc b/store.cc index 2dc6f84..52304c2 100644 --- a/store.cc +++ b/store.cc @@ -144,6 +144,23 @@ void WrapperOutputStream::write_internal(const void *data, size_t len) real.write(data, len); } +/* Provide checksumming of a data stream. */ +ChecksumOutputStream::ChecksumOutputStream(OutputStream &o) + : real(o) +{ +} + +void ChecksumOutputStream::write_internal(const void *data, size_t len) +{ + real.write(data, len); + csum.process(data, len); +} + +const uint8_t *ChecksumOutputStream::finish_and_checksum() +{ + return csum.checksum(); +} + /* Utility functions, for encoding data types to strings. */ string encode_u16(uint16_t val) { @@ -166,15 +183,19 @@ string encode_u64(uint64_t val) return s.contents(); } -SegmentWriter::SegmentWriter(OutputStream &output, struct uuid u) - : out(output), +SegmentWriter::SegmentWriter(OutputStream *output, struct uuid u) + : raw_out(output), id(u), object_stream(NULL) { + /* All output data will be checksummed except the very last few bytes, + * which are the checksum itself. */ + out = new ChecksumOutputStream(*raw_out); + /* Write out the segment header first. */ static const char signature[] = "LBSSEG0\n"; - out.write(signature, strlen(signature)); - out.write(id.bytes, sizeof(struct uuid)); + out->write(signature, strlen(signature)); + out->write(id.bytes, sizeof(struct uuid)); } SegmentWriter::~SegmentWriter() @@ -185,18 +206,28 @@ SegmentWriter::~SegmentWriter() // Write out the object table which gives the sizes and locations of all // objects, and then add the trailing signature, which indicates the end of // the segment and gives the offset of the object table. - int64_t index_offset = out.get_pos(); + int64_t index_offset = out->get_pos(); for (object_table::const_iterator i = objects.begin(); i != objects.end(); ++i) { - out.write_s64(i->first); - out.write_s64(i->second); + out->write_s64(i->first); + out->write_s64(i->second); } static const char signature2[] = "LBSEND"; - out.write(signature2, strlen(signature2)); - out.write_s64(index_offset); - out.write_u32(objects.size()); + out->write(signature2, strlen(signature2)); + out->write_s64(index_offset); + out->write_u32(objects.size()); + + /* Finally, append a checksum to the end of the file, so that its integrity + * (against accidental, not malicious, corruption) can be verified. */ + const uint8_t *csum = out->finish_and_checksum(); + raw_out->write(csum, out->checksum_size()); + + /* The SegmentWriter takes ownership of the OutputStream it is writing to, + * and destroys it automatically when done with the segment. */ + delete out; + delete raw_out; } OutputStream *SegmentWriter::new_object() @@ -204,8 +235,8 @@ OutputStream *SegmentWriter::new_object() if (object_stream) finish_object(); - object_start_offset = out.get_pos(); - object_stream = new WrapperOutputStream(out); + object_start_offset = out->get_pos(); + object_stream = new WrapperOutputStream(*out); return object_stream; } @@ -240,3 +271,20 @@ string SegmentWriter::format_uuid(const struct uuid u) return string(buf); } + +SegmentStore::SegmentStore(const string &path) + : directory(path) +{ +} + +SegmentWriter *SegmentStore::new_segment() +{ + struct uuid id = SegmentWriter::generate_uuid(); + string filename = directory + "/" + SegmentWriter::format_uuid(id); + + FILE *f = fopen(filename.c_str(), "wb"); + if (f == NULL) + throw IOException("Unable to open new segment"); + + return new SegmentWriter(new FileOutputStream(f), id); +}