Initial support for efficient sub-file incrementals.
[cumulus.git] / subfile.cc
1 /* Cumulus: Smart Filesystem Backup to Dumb Servers
2  *
3  * Copyright (C) 2008  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License along
17  * with this program; if not, write to the Free Software Foundation, Inc.,
18  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19  */
20
21 /* Allow for sub-file incremental backups: if only a portion of a file changes,
22  * allow the new data to be written out, and the old data to simply be
23  * referenced from the new metadata log. */
24
25 #include <stdlib.h>
26 #include <assert.h>
27 #include <arpa/inet.h>
28
29 #include "subfile.h"
30 #include "chunk.h"
31 #include "sha1.h"
32
33 using std::list;
34 using std::map;
35 using std::set;
36 using std::string;
37 using std::vector;
38 using std::pair;
39 using std::make_pair;
40
41 Subfile::Subfile(LocalDb *localdb)
42     : db(localdb), checksums_loaded(false), new_block_summary_valid(false)
43 {
44 }
45
46 Subfile::~Subfile()
47 {
48     for (size_t i = 0; i < block_list.size(); i++) {
49         delete[] block_list[i].chunks;
50     }
51
52     free_analysis();
53 }
54
55 void Subfile::free_analysis()
56 {
57     if (new_block_summary_valid)
58         delete[] new_block_summary.chunks;
59
60     new_block_summary_valid = false;
61 }
62
63 void Subfile::load_old_blocks(const list<ObjectReference> &blocks)
64 {
65     for (list<ObjectReference>::const_iterator i = blocks.begin();
66          i != blocks.end(); ++i) {
67         if (!i->is_normal())
68             continue;
69
70         ObjectReference base = i->base();
71         if (old_blocks.find(base) == old_blocks.end()) {
72             old_blocks.insert(base);
73             if (checksums_loaded)
74                 index_chunks(base);
75         }
76     }
77 }
78
79 /* Actually load chunk signatures from the database, and index them in memory.
80  * This should only be called once per segment. */
81 void Subfile::index_chunks(ObjectReference ref)
82 {
83     string refstr = ref.to_string();
84
85     if (!db->IsAvailable(ref))
86         return;
87
88     /* Look for checksums for this block in the database.  They may not exist,
89      * in which case simply return without error. */
90     char *packed_sigs;
91     size_t len;
92     string algorithm;
93     if (!db->LoadChunkSignatures(ref, (void **)&packed_sigs, &len, &algorithm))
94         return;
95     if (algorithm != get_algorithm()) {
96         free(packed_sigs);
97         return;
98     }
99
100     int block_id = block_list.size();
101
102     block_summary summary;
103     summary.ref = ref.base();
104     summary.num_chunks = len / (2 + HASH_SIZE);
105     summary.chunks = new chunk_info[summary.num_chunks];
106
107     int block_start = 0;
108     for (int i = 0; i < summary.num_chunks; i++) {
109         char *packed_info = &packed_sigs[i * (2 + HASH_SIZE)];
110         memcpy(summary.chunks[i].hash, &packed_info[2], HASH_SIZE);
111
112         uint16_t chunk_len;
113         memcpy(&chunk_len, &packed_info[0], 2);
114         summary.chunks[i].len = ntohs(chunk_len);
115         summary.chunks[i].offset = block_start;
116         block_start += summary.chunks[i].len;
117
118         chunk_index[string(summary.chunks[i].hash, HASH_SIZE)]
119             = make_pair(block_id, i);
120     }
121
122     block_list.push_back(summary);
123     free(packed_sigs);
124 }
125
126 /* Signatures can be loaded lazily; this method should be called before any
127  * actual access to the chunk signatures is required, to ensure the data has
128  * actually been loaded. */
129 void Subfile::ensure_signatures_loaded()
130 {
131     if (checksums_loaded)
132         return;
133
134     for (set<ObjectReference>::iterator i = old_blocks.begin();
135          i != old_blocks.end(); ++i) {
136         index_chunks(*i);
137     }
138
139     checksums_loaded = true;
140 }
141
142 void Subfile::analyze_new_block(const char *buf, size_t len)
143 {
144     analyzed_buf = buf;
145     analyzed_len = len;
146     int max_chunks = chunk_compute_max_num_breaks(len);
147
148     free_analysis();
149
150     size_t *breakpoints = new size_t[max_chunks];
151     int num_breakpoints = chunk_compute_breaks(buf, len, breakpoints);
152
153     if (num_breakpoints == 0) {
154         delete[] breakpoints;
155         return;
156     }
157
158     new_block_summary.num_chunks = num_breakpoints;
159     new_block_summary.chunks = new chunk_info[num_breakpoints];
160
161     int block_start = 0;
162     for (int i = 0; i < num_breakpoints; i++) {
163         new_block_summary.chunks[i].offset = block_start;
164         new_block_summary.chunks[i].len = breakpoints[i] - block_start + 1;
165         block_start = breakpoints[i] + 1;
166
167         SHA1Checksum hash;
168         hash.process(&buf[new_block_summary.chunks[i].offset],
169                      new_block_summary.chunks[i].len);
170         assert(hash.checksum_size() == (size_t)HASH_SIZE);
171         memcpy(new_block_summary.chunks[i].hash, hash.checksum(), HASH_SIZE);
172     }
173
174     new_block_summary_valid = true;
175     delete[] breakpoints;
176 }
177
178 void Subfile::store_block_signatures(ObjectReference ref, block_summary summary)
179 {
180     int n = summary.num_chunks;
181     char *packed = (char *)malloc(n * (2 + HASH_SIZE));
182
183     for (int i = 0; i < n; i++) {
184         assert(summary.chunks[i].len >= 0 && summary.chunks[i].len <= 0xffff);
185         uint16_t len = htons(summary.chunks[i].len);
186         char *packed_info = &packed[i * (2 + HASH_SIZE)];
187         memcpy(&packed_info[0], &len, 2);
188         memcpy(&packed_info[2], summary.chunks[i].hash, HASH_SIZE);
189     }
190
191     db->StoreChunkSignatures(ref, packed, n * (2 + HASH_SIZE),
192                              get_algorithm());
193
194     free(packed);
195 }
196
197 /* Compute an incremental representation of the most recent block analyzed. */
198 enum subfile_item_type { SUBFILE_COPY, SUBFILE_NEW };
199
200 struct subfile_item {
201     subfile_item_type type;
202
203     // For type SUBFILE_COPY
204     ObjectReference ref;
205
206     // For type SUBFILE_NEW
207     int src_offset, dst_offset;
208     int len;
209     char hash[Subfile::HASH_SIZE];
210 };
211
212 /* Compute an incremental representation of the data last analyzed.  A list of
213  * references will be returned corresponding to the data.  If new data must be
214  * written out to the backup, it will be written out via the LbsObject
215  * provided, to the provided TarSegmentStore. */
216 list<ObjectReference> Subfile::create_incremental(TarSegmentStore *tss,
217                                                   LbsObject *o,
218                                                   double block_age)
219 {
220     assert(new_block_summary_valid);
221     bool matched_old = false;
222     size_t new_data = 0;
223
224     list<subfile_item> items;
225     list<ObjectReference> refs;
226
227     ensure_signatures_loaded();
228
229     assert(new_block_summary.num_chunks > 0);
230
231     for (int i = 0; i < new_block_summary.num_chunks; i++) {
232         map<string, pair<int, int> >::iterator m
233             = chunk_index.find(string(new_block_summary.chunks[i].hash,
234                                       HASH_SIZE));
235
236         struct subfile_item item;
237         if (m == chunk_index.end()) {
238             item.type = SUBFILE_NEW;
239             item.src_offset = new_block_summary.chunks[i].offset;
240             item.dst_offset = new_data;
241             item.len = new_block_summary.chunks[i].len;
242             memcpy(item.hash, new_block_summary.chunks[i].hash, HASH_SIZE);
243             new_data += item.len;
244         } else {
245             struct chunk_info &old_chunk
246                 = block_list[m->second.first].chunks[m->second.second];
247             item.type = SUBFILE_COPY;
248             item.ref = block_list[m->second.first].ref;
249             item.ref.set_range(old_chunk.offset, old_chunk.len);
250             matched_old = true;
251         }
252
253         items.push_back(item);
254     }
255
256     // No data was matched.  The entire block can be written out as is into a
257     // new object, and the new_block_summary used to save chunk signatures.
258     if (!matched_old) {
259         SHA1Checksum block_hash;
260         block_hash.process(analyzed_buf, analyzed_len);
261         string block_csum = block_hash.checksum_str();
262
263         o->set_data(analyzed_buf, analyzed_len);
264         o->write(tss);
265         ObjectReference ref = o->get_ref();
266         db->StoreObject(ref, block_csum, analyzed_len, block_age);
267         store_block_signatures(ref, new_block_summary);
268         refs.push_back(ref);
269         delete o;
270         return refs;
271     }
272
273     // Otherwise, construct a new block containing all literal data needed (if
274     // any exists), write it out, and construct the appropriate list of
275     // references.
276     list<subfile_item>::iterator i;
277     if (new_data > 0) {
278         char *literal_buf = new char[new_data];
279         for (i = items.begin(); i != items.end(); ++i) {
280             if (i->type == SUBFILE_NEW) {
281                 memcpy(&literal_buf[i->dst_offset],
282                        &analyzed_buf[i->src_offset], i->len);
283             }
284         }
285
286         SHA1Checksum block_hash;
287         block_hash.process(literal_buf, new_data);
288         string block_csum = block_hash.checksum_str();
289
290         o->set_group("data");
291         o->set_data(literal_buf, new_data);
292         o->write(tss);
293         ObjectReference ref = o->get_ref();
294         for (i = items.begin(); i != items.end(); ++i) {
295             if (i->type == SUBFILE_NEW) {
296                 i->ref = ref;
297                 i->ref.set_range(i->dst_offset, i->len);
298             }
299         }
300
301         db->StoreObject(ref, block_csum, new_data, 0.0);
302
303         block_summary summary;
304         summary.ref = ref;
305         summary.num_chunks = 0;
306         summary.chunks = new chunk_info[items.size()];
307         for (i = items.begin(); i != items.end(); ++i) {
308             if (i->type == SUBFILE_NEW) {
309                 chunk_info &info = summary.chunks[summary.num_chunks];
310                 memcpy(info.hash, i->hash, HASH_SIZE);
311                 info.offset = i->dst_offset;
312                 info.len = i->len;
313                 summary.num_chunks++;
314             }
315         }
316
317         store_block_signatures(ref, summary);
318
319         delete[] summary.chunks;
320         delete[] literal_buf;
321     }
322
323     delete o;
324
325     ObjectReference ref;
326     for (i = items.begin(); i != items.end(); ++i) {
327         string refstr = i->ref.to_string();
328         if (!ref.merge(i->ref)) {
329             refs.push_back(ref);
330             ref = i->ref;
331         }
332     }
333     assert(!ref.is_null());
334     refs.push_back(ref);
335
336     return refs;
337 }