Make use of size assertions in references where possible.
[cumulus.git] / subfile.cc
1 /* Cumulus: Smart Filesystem Backup to Dumb Servers
2  *
3  * Copyright (C) 2008  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License along
17  * with this program; if not, write to the Free Software Foundation, Inc.,
18  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19  */
20
21 /* Allow for sub-file incremental backups: if only a portion of a file changes,
22  * allow the new data to be written out, and the old data to simply be
23  * referenced from the new metadata log. */
24
25 #include <stdlib.h>
26 #include <string.h>
27 #include <assert.h>
28 #include <arpa/inet.h>
29
30 #include "subfile.h"
31 #include "chunk.h"
32 #include "sha1.h"
33
34 using std::list;
35 using std::map;
36 using std::set;
37 using std::string;
38 using std::vector;
39 using std::pair;
40 using std::make_pair;
41
42 Subfile::Subfile(LocalDb *localdb)
43     : db(localdb), checksums_loaded(false), new_block_summary_valid(false)
44 {
45 }
46
47 Subfile::~Subfile()
48 {
49     for (size_t i = 0; i < block_list.size(); i++) {
50         delete[] block_list[i].chunks;
51     }
52
53     free_analysis();
54 }
55
56 void Subfile::free_analysis()
57 {
58     if (new_block_summary_valid)
59         delete[] new_block_summary.chunks;
60
61     new_block_summary_valid = false;
62 }
63
64 void Subfile::load_old_blocks(const list<ObjectReference> &blocks)
65 {
66     for (list<ObjectReference>::const_iterator i = blocks.begin();
67          i != blocks.end(); ++i) {
68         if (!i->is_normal())
69             continue;
70
71         ObjectReference base = i->base();
72         if (old_blocks.find(base) == old_blocks.end()) {
73             old_blocks.insert(base);
74             if (checksums_loaded)
75                 index_chunks(base);
76         }
77     }
78 }
79
80 /* Actually load chunk signatures from the database, and index them in memory.
81  * This should only be called once per segment. */
82 void Subfile::index_chunks(ObjectReference ref)
83 {
84     string refstr = ref.to_string();
85
86     if (!db->IsAvailable(ref))
87         return;
88
89     /* Look for checksums for this block in the database.  They may not exist,
90      * in which case simply return without error. */
91     char *packed_sigs;
92     size_t len;
93     string algorithm;
94     if (!db->LoadChunkSignatures(ref, (void **)&packed_sigs, &len, &algorithm))
95         return;
96     if (algorithm != get_algorithm()) {
97         free(packed_sigs);
98         return;
99     }
100
101     int block_id = block_list.size();
102
103     block_summary summary;
104     summary.ref = ref.base();
105     summary.num_chunks = len / (2 + HASH_SIZE);
106     summary.chunks = new chunk_info[summary.num_chunks];
107
108     int block_start = 0;
109     for (int i = 0; i < summary.num_chunks; i++) {
110         char *packed_info = &packed_sigs[i * (2 + HASH_SIZE)];
111         memcpy(summary.chunks[i].hash, &packed_info[2], HASH_SIZE);
112
113         uint16_t chunk_len;
114         memcpy(&chunk_len, &packed_info[0], 2);
115         summary.chunks[i].len = ntohs(chunk_len);
116         summary.chunks[i].offset = block_start;
117         block_start += summary.chunks[i].len;
118
119         chunk_index[string(summary.chunks[i].hash, HASH_SIZE)]
120             = make_pair(block_id, i);
121     }
122
123     block_list.push_back(summary);
124     free(packed_sigs);
125 }
126
127 /* Signatures can be loaded lazily; this method should be called before any
128  * actual access to the chunk signatures is required, to ensure the data has
129  * actually been loaded. */
130 void Subfile::ensure_signatures_loaded()
131 {
132     if (checksums_loaded)
133         return;
134
135     for (set<ObjectReference>::iterator i = old_blocks.begin();
136          i != old_blocks.end(); ++i) {
137         index_chunks(*i);
138     }
139
140     checksums_loaded = true;
141 }
142
143 void Subfile::analyze_new_block(const char *buf, size_t len)
144 {
145     analyzed_buf = buf;
146     analyzed_len = len;
147     int max_chunks = chunk_compute_max_num_breaks(len);
148
149     free_analysis();
150
151     size_t *breakpoints = new size_t[max_chunks];
152     int num_breakpoints = chunk_compute_breaks(buf, len, breakpoints);
153
154     if (num_breakpoints == 0) {
155         delete[] breakpoints;
156         return;
157     }
158
159     new_block_summary.num_chunks = num_breakpoints;
160     new_block_summary.chunks = new chunk_info[num_breakpoints];
161
162     int block_start = 0;
163     for (int i = 0; i < num_breakpoints; i++) {
164         new_block_summary.chunks[i].offset = block_start;
165         new_block_summary.chunks[i].len = breakpoints[i] - block_start + 1;
166         block_start = breakpoints[i] + 1;
167
168         SHA1Checksum hash;
169         hash.process(&buf[new_block_summary.chunks[i].offset],
170                      new_block_summary.chunks[i].len);
171         assert(hash.checksum_size() == (size_t)HASH_SIZE);
172         memcpy(new_block_summary.chunks[i].hash, hash.checksum(), HASH_SIZE);
173     }
174
175     new_block_summary_valid = true;
176     delete[] breakpoints;
177 }
178
179 void Subfile::store_block_signatures(ObjectReference ref, block_summary summary)
180 {
181     int n = summary.num_chunks;
182     char *packed = (char *)malloc(n * (2 + HASH_SIZE));
183
184     for (int i = 0; i < n; i++) {
185         assert(summary.chunks[i].len >= 0 && summary.chunks[i].len <= 0xffff);
186         uint16_t len = htons(summary.chunks[i].len);
187         char *packed_info = &packed[i * (2 + HASH_SIZE)];
188         memcpy(&packed_info[0], &len, 2);
189         memcpy(&packed_info[2], summary.chunks[i].hash, HASH_SIZE);
190     }
191
192     db->StoreChunkSignatures(ref, packed, n * (2 + HASH_SIZE),
193                              get_algorithm());
194
195     free(packed);
196 }
197
198 /* Compute an incremental representation of the most recent block analyzed. */
199 enum subfile_item_type { SUBFILE_COPY, SUBFILE_NEW };
200
201 struct subfile_item {
202     subfile_item_type type;
203
204     // For type SUBFILE_COPY
205     ObjectReference ref;
206
207     // For type SUBFILE_NEW
208     int src_offset, dst_offset;
209     int len;
210     char hash[Subfile::HASH_SIZE];
211 };
212
213 /* Compute an incremental representation of the data last analyzed.  A list of
214  * references will be returned corresponding to the data.  If new data must be
215  * written out to the backup, it will be written out via the LbsObject
216  * provided, to the provided TarSegmentStore. */
217 list<ObjectReference> Subfile::create_incremental(TarSegmentStore *tss,
218                                                   LbsObject *o,
219                                                   double block_age)
220 {
221     assert(new_block_summary_valid);
222     bool matched_old = false;
223     size_t new_data = 0;
224
225     list<subfile_item> items;
226     list<ObjectReference> refs;
227
228     ensure_signatures_loaded();
229
230     assert(new_block_summary.num_chunks > 0);
231
232     for (int i = 0; i < new_block_summary.num_chunks; i++) {
233         map<string, pair<int, int> >::iterator m
234             = chunk_index.find(string(new_block_summary.chunks[i].hash,
235                                       HASH_SIZE));
236
237         struct subfile_item item;
238         if (m == chunk_index.end()) {
239             item.type = SUBFILE_NEW;
240             item.src_offset = new_block_summary.chunks[i].offset;
241             item.dst_offset = new_data;
242             item.len = new_block_summary.chunks[i].len;
243             memcpy(item.hash, new_block_summary.chunks[i].hash, HASH_SIZE);
244             new_data += item.len;
245         } else {
246             struct chunk_info &old_chunk
247                 = block_list[m->second.first].chunks[m->second.second];
248             item.type = SUBFILE_COPY;
249             item.ref = block_list[m->second.first].ref;
250             item.ref.set_range(old_chunk.offset, old_chunk.len);
251             matched_old = true;
252         }
253
254         items.push_back(item);
255     }
256
257     // No data was matched.  The entire block can be written out as is into a
258     // new object, and the new_block_summary used to save chunk signatures.
259     if (!matched_old) {
260         SHA1Checksum block_hash;
261         block_hash.process(analyzed_buf, analyzed_len);
262         string block_csum = block_hash.checksum_str();
263
264         o->set_data(analyzed_buf, analyzed_len);
265         o->write(tss);
266         ObjectReference ref = o->get_ref();
267         db->StoreObject(ref, block_csum, analyzed_len, block_age);
268         if (analyzed_len >= 16384)
269             store_block_signatures(ref, new_block_summary);
270         ref.set_range(0, analyzed_len, true);
271         refs.push_back(ref);
272         delete o;
273         return refs;
274     }
275
276     // Otherwise, construct a new block containing all literal data needed (if
277     // any exists), write it out, and construct the appropriate list of
278     // references.
279     list<subfile_item>::iterator i;
280     if (new_data > 0) {
281         char *literal_buf = new char[new_data];
282         for (i = items.begin(); i != items.end(); ++i) {
283             if (i->type == SUBFILE_NEW) {
284                 memcpy(&literal_buf[i->dst_offset],
285                        &analyzed_buf[i->src_offset], i->len);
286             }
287         }
288
289         SHA1Checksum block_hash;
290         block_hash.process(literal_buf, new_data);
291         string block_csum = block_hash.checksum_str();
292
293         o->set_group("data");
294         o->set_data(literal_buf, new_data);
295         o->write(tss);
296         ObjectReference ref = o->get_ref();
297         for (i = items.begin(); i != items.end(); ++i) {
298             if (i->type == SUBFILE_NEW) {
299                 i->ref = ref;
300                 i->ref.set_range(i->dst_offset, i->len);
301             }
302         }
303
304         db->StoreObject(ref, block_csum, new_data, 0.0);
305
306         block_summary summary;
307         summary.ref = ref;
308         summary.num_chunks = 0;
309         summary.chunks = new chunk_info[items.size()];
310         for (i = items.begin(); i != items.end(); ++i) {
311             if (i->type == SUBFILE_NEW) {
312                 chunk_info &info = summary.chunks[summary.num_chunks];
313                 memcpy(info.hash, i->hash, HASH_SIZE);
314                 info.offset = i->dst_offset;
315                 info.len = i->len;
316                 summary.num_chunks++;
317             }
318         }
319
320         store_block_signatures(ref, summary);
321
322         delete[] summary.chunks;
323         delete[] literal_buf;
324     }
325
326     delete o;
327
328     ObjectReference ref;
329     for (i = items.begin(); i != items.end(); ++i) {
330         string refstr = i->ref.to_string();
331         if (!ref.merge(i->ref)) {
332             refs.push_back(ref);
333             ref = i->ref;
334         }
335     }
336     assert(!ref.is_null());
337     refs.push_back(ref);
338
339     return refs;
340 }