Create a third_party directory for files copied from other projects.
[cumulus.git] / subfile.cc
1 /* Cumulus: Smart Filesystem Backup to Dumb Servers
2  *
3  * Copyright (C) 2008  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License along
17  * with this program; if not, write to the Free Software Foundation, Inc.,
18  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19  */
20
21 /* Allow for sub-file incremental backups: if only a portion of a file changes,
22  * allow the new data to be written out, and the old data to simply be
23  * referenced from the new metadata log. */
24
25 #include <stdlib.h>
26 #include <string.h>
27 #include <assert.h>
28 #include <arpa/inet.h>
29
30 #include "subfile.h"
31 #include "third_party/chunk.h"
32 #include "third_party/sha1.h"
33
34 using std::list;
35 using std::map;
36 using std::set;
37 using std::string;
38 using std::vector;
39 using std::pair;
40 using std::make_pair;
41
42 Subfile::Subfile(LocalDb *localdb)
43     : db(localdb), checksums_loaded(false), new_block_summary_valid(false)
44 {
45 }
46
47 Subfile::~Subfile()
48 {
49     for (size_t i = 0; i < block_list.size(); i++) {
50         delete[] block_list[i].chunks;
51     }
52
53     free_analysis();
54 }
55
56 void Subfile::free_analysis()
57 {
58     if (new_block_summary_valid)
59         delete[] new_block_summary.chunks;
60
61     new_block_summary_valid = false;
62 }
63
64 void Subfile::load_old_blocks(const list<ObjectReference> &blocks)
65 {
66     for (list<ObjectReference>::const_iterator i = blocks.begin();
67          i != blocks.end(); ++i) {
68         if (!i->is_normal())
69             continue;
70
71         ObjectReference base = i->base();
72         if (old_blocks.find(base) == old_blocks.end()) {
73             old_blocks.insert(base);
74             if (checksums_loaded)
75                 index_chunks(base);
76         }
77     }
78 }
79
80 /* Actually load chunk signatures from the database, and index them in memory.
81  * This should only be called once per segment. */
82 void Subfile::index_chunks(ObjectReference ref)
83 {
84     string refstr = ref.to_string();
85
86     if (!db->IsAvailable(ref))
87         return;
88
89     /* Look for checksums for this block in the database.  They may not exist,
90      * in which case simply return without error. */
91     char *packed_sigs;
92     size_t len;
93     string algorithm;
94     if (!db->LoadChunkSignatures(ref, (void **)&packed_sigs, &len, &algorithm))
95         return;
96     if (algorithm != get_algorithm()) {
97         free(packed_sigs);
98         return;
99     }
100
101     int block_id = block_list.size();
102
103     block_summary summary;
104     summary.ref = ref.base();
105     summary.num_chunks = len / (2 + HASH_SIZE);
106     summary.chunks = new chunk_info[summary.num_chunks];
107
108     int block_start = 0;
109     for (int i = 0; i < summary.num_chunks; i++) {
110         char *packed_info = &packed_sigs[i * (2 + HASH_SIZE)];
111         memcpy(summary.chunks[i].hash, &packed_info[2], HASH_SIZE);
112
113         uint16_t chunk_len;
114         memcpy(&chunk_len, &packed_info[0], 2);
115         summary.chunks[i].len = ntohs(chunk_len);
116         summary.chunks[i].offset = block_start;
117         block_start += summary.chunks[i].len;
118
119         chunk_index[string(summary.chunks[i].hash, HASH_SIZE)]
120             = make_pair(block_id, i);
121     }
122
123     block_list.push_back(summary);
124     free(packed_sigs);
125 }
126
127 /* Signatures can be loaded lazily; this method should be called before any
128  * actual access to the chunk signatures is required, to ensure the data has
129  * actually been loaded. */
130 void Subfile::ensure_signatures_loaded()
131 {
132     if (checksums_loaded)
133         return;
134
135     for (set<ObjectReference>::iterator i = old_blocks.begin();
136          i != old_blocks.end(); ++i) {
137         index_chunks(*i);
138     }
139
140     checksums_loaded = true;
141 }
142
143 void Subfile::analyze_new_block(const char *buf, size_t len)
144 {
145     analyzed_buf = buf;
146     analyzed_len = len;
147     int max_chunks = chunk_compute_max_num_breaks(len);
148
149     free_analysis();
150
151     size_t *breakpoints = new size_t[max_chunks];
152     int num_breakpoints = chunk_compute_breaks(buf, len, breakpoints);
153
154     if (num_breakpoints == 0) {
155         delete[] breakpoints;
156         return;
157     }
158
159     new_block_summary.num_chunks = num_breakpoints;
160     new_block_summary.chunks = new chunk_info[num_breakpoints];
161
162     int block_start = 0;
163     for (int i = 0; i < num_breakpoints; i++) {
164         new_block_summary.chunks[i].offset = block_start;
165         new_block_summary.chunks[i].len = breakpoints[i] - block_start + 1;
166         block_start = breakpoints[i] + 1;
167
168         SHA1Checksum hash;
169         hash.process(&buf[new_block_summary.chunks[i].offset],
170                      new_block_summary.chunks[i].len);
171         assert(hash.checksum_size() == (size_t)HASH_SIZE);
172         memcpy(new_block_summary.chunks[i].hash, hash.checksum(), HASH_SIZE);
173     }
174
175     new_block_summary_valid = true;
176     delete[] breakpoints;
177 }
178
179 void Subfile::store_block_signatures(ObjectReference ref, block_summary summary)
180 {
181     int n = summary.num_chunks;
182     char *packed = (char *)malloc(n * (2 + HASH_SIZE));
183
184     for (int i = 0; i < n; i++) {
185         assert(summary.chunks[i].len >= 0 && summary.chunks[i].len <= 0xffff);
186         uint16_t len = htons(summary.chunks[i].len);
187         char *packed_info = &packed[i * (2 + HASH_SIZE)];
188         memcpy(&packed_info[0], &len, 2);
189         memcpy(&packed_info[2], summary.chunks[i].hash, HASH_SIZE);
190     }
191
192     db->StoreChunkSignatures(ref, packed, n * (2 + HASH_SIZE),
193                              get_algorithm());
194
195     free(packed);
196 }
197
198 void Subfile::store_analyzed_signatures(ObjectReference ref)
199 {
200     if (analyzed_len >= 16384)
201         store_block_signatures(ref, new_block_summary);
202 }
203
204 /* Compute an incremental representation of the most recent block analyzed. */
205 enum subfile_item_type { SUBFILE_COPY, SUBFILE_NEW };
206
207 struct subfile_item {
208     subfile_item_type type;
209
210     // For type SUBFILE_COPY
211     ObjectReference ref;
212
213     // For type SUBFILE_NEW
214     int src_offset, dst_offset;
215     int len;
216     char hash[Subfile::HASH_SIZE];
217 };
218
219 /* Compute an incremental representation of the data last analyzed.  A list of
220  * references will be returned corresponding to the data.  If new data must be
221  * written out to the backup, it will be written out via the LbsObject
222  * provided, to the provided TarSegmentStore. */
223 list<ObjectReference> Subfile::create_incremental(TarSegmentStore *tss,
224                                                   LbsObject *o,
225                                                   double block_age)
226 {
227     assert(new_block_summary_valid);
228     bool matched_old = false;
229     size_t new_data = 0;
230
231     list<subfile_item> items;
232     list<ObjectReference> refs;
233
234     ensure_signatures_loaded();
235
236     assert(new_block_summary.num_chunks > 0);
237
238     for (int i = 0; i < new_block_summary.num_chunks; i++) {
239         map<string, pair<int, int> >::iterator m
240             = chunk_index.find(string(new_block_summary.chunks[i].hash,
241                                       HASH_SIZE));
242
243         struct subfile_item item;
244         if (m == chunk_index.end()) {
245             item.type = SUBFILE_NEW;
246             item.src_offset = new_block_summary.chunks[i].offset;
247             item.dst_offset = new_data;
248             item.len = new_block_summary.chunks[i].len;
249             memcpy(item.hash, new_block_summary.chunks[i].hash, HASH_SIZE);
250             new_data += item.len;
251         } else {
252             struct chunk_info &old_chunk
253                 = block_list[m->second.first].chunks[m->second.second];
254             item.type = SUBFILE_COPY;
255             item.ref = block_list[m->second.first].ref;
256             item.ref.set_range(old_chunk.offset, old_chunk.len);
257             matched_old = true;
258         }
259
260         items.push_back(item);
261     }
262
263     // No data was matched.  The entire block can be written out as is into a
264     // new object, and the new_block_summary used to save chunk signatures.
265     if (!matched_old) {
266         SHA1Checksum block_hash;
267         block_hash.process(analyzed_buf, analyzed_len);
268         string block_csum = block_hash.checksum_str();
269
270         o->set_data(analyzed_buf, analyzed_len);
271         o->write(tss);
272         ObjectReference ref = o->get_ref();
273         db->StoreObject(ref, block_csum, analyzed_len, block_age);
274         store_analyzed_signatures(ref);
275         ref.set_range(0, analyzed_len, true);
276         refs.push_back(ref);
277         delete o;
278         return refs;
279     }
280
281     // Otherwise, construct a new block containing all literal data needed (if
282     // any exists), write it out, and construct the appropriate list of
283     // references.
284     list<subfile_item>::iterator i;
285     if (new_data > 0) {
286         char *literal_buf = new char[new_data];
287         for (i = items.begin(); i != items.end(); ++i) {
288             if (i->type == SUBFILE_NEW) {
289                 memcpy(&literal_buf[i->dst_offset],
290                        &analyzed_buf[i->src_offset], i->len);
291             }
292         }
293
294         SHA1Checksum block_hash;
295         block_hash.process(literal_buf, new_data);
296         string block_csum = block_hash.checksum_str();
297
298         o->set_group("data");
299         o->set_data(literal_buf, new_data);
300         o->write(tss);
301         ObjectReference ref = o->get_ref();
302         for (i = items.begin(); i != items.end(); ++i) {
303             if (i->type == SUBFILE_NEW) {
304                 i->ref = ref;
305                 i->ref.set_range(i->dst_offset, i->len);
306             }
307         }
308
309         db->StoreObject(ref, block_csum, new_data, 0.0);
310
311         block_summary summary;
312         summary.ref = ref;
313         summary.num_chunks = 0;
314         summary.chunks = new chunk_info[items.size()];
315         for (i = items.begin(); i != items.end(); ++i) {
316             if (i->type == SUBFILE_NEW) {
317                 chunk_info &info = summary.chunks[summary.num_chunks];
318                 memcpy(info.hash, i->hash, HASH_SIZE);
319                 info.offset = i->dst_offset;
320                 info.len = i->len;
321                 summary.num_chunks++;
322             }
323         }
324
325         store_block_signatures(ref, summary);
326
327         delete[] summary.chunks;
328         delete[] literal_buf;
329     }
330
331     delete o;
332
333     ObjectReference ref;
334     for (i = items.begin(); i != items.end(); ++i) {
335         string refstr = i->ref.to_string();
336         if (!ref.merge(i->ref)) {
337             refs.push_back(ref);
338             ref = i->ref;
339         }
340     }
341     assert(!ref.is_null());
342     refs.push_back(ref);
343
344     return refs;
345 }