Support for spreading objects across segments.
[cumulus.git] / store.cc
1 /* LBS: An LFS-inspired filesystem backup system
2  * Copyright (C) 2006  Michael Vrable
3  *
4  * Backup data is stored in a collection of objects, which are grouped together
5  * into segments for storage purposes.  This file provides interfaces for
6  * reading and writing objects and segments. */
7
8 #include <assert.h>
9 #include <uuid/uuid.h>
10
11 #include "store.h"
12
13 using std::string;
14
15 OutputStream::OutputStream()
16     : bytes_written(0)
17 {
18 }
19
20 void OutputStream::write(const void *data, size_t len)
21 {
22     write_internal(data, len);
23     bytes_written += len;
24 }
25
26 void OutputStream::write_u8(uint8_t val)
27 {
28     write(&val, 1);
29 }
30
31 void OutputStream::write_u16(uint16_t val)
32 {
33     unsigned char buf[2];
34
35     buf[0] = val & 0xff;
36     buf[1] = (val >> 8) & 0xff;
37     write(buf, 2);
38 }
39
40 void OutputStream::write_u32(uint32_t val)
41 {
42     unsigned char buf[4];
43
44     buf[0] = val & 0xff;
45     buf[1] = (val >> 8) & 0xff;
46     buf[2] = (val >> 16) & 0xff;
47     buf[3] = (val >> 24) & 0xff;
48     write(buf, 4);
49 }
50
51 void OutputStream::write_u64(uint64_t val)
52 {
53     unsigned char buf[8];
54
55     buf[0] = val & 0xff;
56     buf[1] = (val >> 8) & 0xff;
57     buf[2] = (val >> 16) & 0xff;
58     buf[3] = (val >> 24) & 0xff;
59     buf[4] = (val >> 32) & 0xff;
60     buf[5] = (val >> 40) & 0xff;
61     buf[6] = (val >> 48) & 0xff;
62     buf[7] = (val >> 56) & 0xff;
63     write(buf, 8);
64 }
65
66 /* Writes an integer to an output stream using a variable-sized representation:
67  * seven bits are written at a time (little-endian), and the eigth bit of each
68  * byte is set if more data follows. */
69 void OutputStream::write_varint(uint64_t val)
70 {
71     do {
72         uint8_t remainder = (val & 0x7f);
73         val >>= 7;
74         if (val)
75             remainder |= 0x80;
76         write_u8(remainder);
77     } while (val);
78 }
79
80 /* Write an arbitrary string by first writing out the length, followed by the
81  * data itself. */
82 void OutputStream::write_string(const string &s)
83 {
84     size_t len = s.length();
85     write_varint(len);
86     write(s.data(), len);
87 }
88
89 void OutputStream::write_dictionary(const dictionary &d)
90 {
91     size_t size = d.size();
92     size_t written = 0;
93
94     write_varint(size);
95
96     for (dictionary::const_iterator i = d.begin(); i != d.end(); ++i) {
97         write_string(i->first);
98         write_string(i->second);
99         written++;
100     }
101
102     assert(written == size);
103 }
104
105 StringOutputStream::StringOutputStream()
106     : buf(std::ios_base::out)
107 {
108 }
109
110 void StringOutputStream::write_internal(const void *data, size_t len)
111 {
112     buf.write((const char *)data, len);
113     if (!buf.good())
114         throw IOException("error writing to StringOutputStream");
115 }
116
117 FileOutputStream::FileOutputStream(FILE *file)
118 {
119     f = file;
120 }
121
122 FileOutputStream::~FileOutputStream()
123 {
124     fclose(f);
125 }
126
127 void FileOutputStream::write_internal(const void *data, size_t len)
128 {
129     size_t res;
130
131     res = fwrite(data, 1, len, f);
132     if (res != len) {
133         throw IOException("write error");
134     }
135 }
136
137 WrapperOutputStream::WrapperOutputStream(OutputStream &o)
138     : real(o)
139 {
140 }
141
142 void WrapperOutputStream::write_internal(const void *data, size_t len)
143 {
144     real.write(data, len);
145 }
146
147 /* Provide checksumming of a data stream. */
148 ChecksumOutputStream::ChecksumOutputStream(OutputStream &o)
149     : real(o)
150 {
151 }
152
153 void ChecksumOutputStream::write_internal(const void *data, size_t len)
154 {
155     real.write(data, len);
156     csum.process(data, len);
157 }
158
159 const uint8_t *ChecksumOutputStream::finish_and_checksum()
160 {
161     return csum.checksum();
162 }
163
164 /* Utility functions, for encoding data types to strings. */
165 string encode_u16(uint16_t val)
166 {
167     StringOutputStream s;
168     s.write_u16(val);
169     return s.contents();
170 }
171
172 string encode_u32(uint32_t val)
173 {
174     StringOutputStream s;
175     s.write_u32(val);
176     return s.contents();
177 }
178
179 string encode_u64(uint64_t val)
180 {
181     StringOutputStream s;
182     s.write_u64(val);
183     return s.contents();
184 }
185
186 SegmentWriter::SegmentWriter(OutputStream *output, struct uuid u)
187     : raw_out(output),
188       id(u),
189       object_stream(NULL)
190 {
191     /* All output data will be checksummed except the very last few bytes,
192      * which are the checksum itself. */
193     out = new ChecksumOutputStream(*raw_out);
194
195     /* Write out the segment header first. */
196     static const char signature[] = "LBSSEG0\n";
197     out->write(signature, strlen(signature));
198     out->write(id.bytes, sizeof(struct uuid));
199 }
200
201 SegmentWriter::~SegmentWriter()
202 {
203     if (object_stream)
204         finish_object();
205
206     // Write out the object table which gives the sizes and locations of all
207     // objects, and then add the trailing signature, which indicates the end of
208     // the segment and gives the offset of the object table.
209     int64_t index_offset = out->get_pos();
210
211     for (object_table::const_iterator i = objects.begin();
212          i != objects.end(); ++i) {
213         out->write_s64(i->first);
214         out->write_s64(i->second);
215     }
216
217     static const char signature2[] = "LBSEND";
218     out->write(signature2, strlen(signature2));
219     out->write_s64(index_offset);
220     out->write_u32(objects.size());
221
222     /* Finally, append a checksum to the end of the file, so that its integrity
223      * (against accidental, not malicious, corruption) can be verified. */
224     const uint8_t *csum = out->finish_and_checksum();
225     raw_out->write(csum, out->checksum_size());
226
227     /* The SegmentWriter takes ownership of the OutputStream it is writing to,
228      * and destroys it automatically when done with the segment. */
229     delete out;
230     delete raw_out;
231 }
232
233 OutputStream *SegmentWriter::new_object()
234 {
235     if (object_stream)
236         finish_object();
237
238     object_start_offset = out->get_pos();
239     object_stream = new WrapperOutputStream(*out);
240
241     return object_stream;
242 }
243
244 void SegmentWriter::finish_object()
245 {
246     assert(object_stream != NULL);
247
248     // store (start, length) information for locating this object
249     objects.push_back(std::make_pair(object_start_offset,
250                                      object_stream->get_pos()));
251
252     delete object_stream;
253     object_stream = NULL;
254 }
255
256 struct uuid SegmentWriter::generate_uuid()
257 {
258     struct uuid u;
259
260     uuid_generate(u.bytes);
261
262     return u;
263 }
264
265 string SegmentWriter::format_uuid(const struct uuid u)
266 {
267     // A UUID only takes 36 bytes, plus the trailing '\0', so this is safe.
268     char buf[40];
269
270     uuid_unparse_lower(u.bytes, buf);
271
272     return string(buf);
273 }
274
275 SegmentStore::SegmentStore(const string &path)
276     : directory(path)
277 {
278 }
279
280 SegmentWriter *SegmentStore::new_segment()
281 {
282     struct uuid id = SegmentWriter::generate_uuid();
283     string filename = directory + "/" + SegmentWriter::format_uuid(id);
284
285     FILE *f = fopen(filename.c_str(), "wb");
286     if (f == NULL)
287         throw IOException("Unable to open new segment");
288
289     return new SegmentWriter(new FileOutputStream(f), id);
290 }
291
292 SegmentPartitioner::SegmentPartitioner(SegmentStore *s)
293     : store(s),
294       segment(NULL),
295       object(NULL)
296 {
297     // Default target size is around 1 MB
298     target_size = 1024 * 1024;
299 }
300
301 SegmentPartitioner::~SegmentPartitioner()
302 {
303     if (segment)
304         delete segment;
305 }
306
307 OutputStream *SegmentPartitioner::new_object()
308 {
309     if (segment != NULL && segment->get_size() > target_size) {
310         delete segment;
311         segment = NULL;
312     }
313
314     if (segment == NULL)
315         segment = store->new_segment();
316
317     return segment->new_object();
318 }