Do not store subfile signatures for very short blocks.
[cumulus.git] / subfile.cc
1 /* Cumulus: Smart Filesystem Backup to Dumb Servers
2  *
3  * Copyright (C) 2008  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License along
17  * with this program; if not, write to the Free Software Foundation, Inc.,
18  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19  */
20
21 /* Allow for sub-file incremental backups: if only a portion of a file changes,
22  * allow the new data to be written out, and the old data to simply be
23  * referenced from the new metadata log. */
24
25 #include <stdlib.h>
26 #include <assert.h>
27 #include <arpa/inet.h>
28
29 #include "subfile.h"
30 #include "chunk.h"
31 #include "sha1.h"
32
33 using std::list;
34 using std::map;
35 using std::set;
36 using std::string;
37 using std::vector;
38 using std::pair;
39 using std::make_pair;
40
41 Subfile::Subfile(LocalDb *localdb)
42     : db(localdb), checksums_loaded(false), new_block_summary_valid(false)
43 {
44 }
45
46 Subfile::~Subfile()
47 {
48     for (size_t i = 0; i < block_list.size(); i++) {
49         delete[] block_list[i].chunks;
50     }
51
52     free_analysis();
53 }
54
55 void Subfile::free_analysis()
56 {
57     if (new_block_summary_valid)
58         delete[] new_block_summary.chunks;
59
60     new_block_summary_valid = false;
61 }
62
63 void Subfile::load_old_blocks(const list<ObjectReference> &blocks)
64 {
65     for (list<ObjectReference>::const_iterator i = blocks.begin();
66          i != blocks.end(); ++i) {
67         if (!i->is_normal())
68             continue;
69
70         ObjectReference base = i->base();
71         if (old_blocks.find(base) == old_blocks.end()) {
72             old_blocks.insert(base);
73             if (checksums_loaded)
74                 index_chunks(base);
75         }
76     }
77 }
78
79 /* Actually load chunk signatures from the database, and index them in memory.
80  * This should only be called once per segment. */
81 void Subfile::index_chunks(ObjectReference ref)
82 {
83     string refstr = ref.to_string();
84
85     if (!db->IsAvailable(ref))
86         return;
87
88     /* Look for checksums for this block in the database.  They may not exist,
89      * in which case simply return without error. */
90     char *packed_sigs;
91     size_t len;
92     string algorithm;
93     if (!db->LoadChunkSignatures(ref, (void **)&packed_sigs, &len, &algorithm))
94         return;
95     if (algorithm != get_algorithm()) {
96         free(packed_sigs);
97         return;
98     }
99
100     int block_id = block_list.size();
101
102     block_summary summary;
103     summary.ref = ref.base();
104     summary.num_chunks = len / (2 + HASH_SIZE);
105     summary.chunks = new chunk_info[summary.num_chunks];
106
107     int block_start = 0;
108     for (int i = 0; i < summary.num_chunks; i++) {
109         char *packed_info = &packed_sigs[i * (2 + HASH_SIZE)];
110         memcpy(summary.chunks[i].hash, &packed_info[2], HASH_SIZE);
111
112         uint16_t chunk_len;
113         memcpy(&chunk_len, &packed_info[0], 2);
114         summary.chunks[i].len = ntohs(chunk_len);
115         summary.chunks[i].offset = block_start;
116         block_start += summary.chunks[i].len;
117
118         chunk_index[string(summary.chunks[i].hash, HASH_SIZE)]
119             = make_pair(block_id, i);
120     }
121
122     block_list.push_back(summary);
123     free(packed_sigs);
124 }
125
126 /* Signatures can be loaded lazily; this method should be called before any
127  * actual access to the chunk signatures is required, to ensure the data has
128  * actually been loaded. */
129 void Subfile::ensure_signatures_loaded()
130 {
131     if (checksums_loaded)
132         return;
133
134     for (set<ObjectReference>::iterator i = old_blocks.begin();
135          i != old_blocks.end(); ++i) {
136         index_chunks(*i);
137     }
138
139     checksums_loaded = true;
140 }
141
142 void Subfile::analyze_new_block(const char *buf, size_t len)
143 {
144     analyzed_buf = buf;
145     analyzed_len = len;
146     int max_chunks = chunk_compute_max_num_breaks(len);
147
148     free_analysis();
149
150     size_t *breakpoints = new size_t[max_chunks];
151     int num_breakpoints = chunk_compute_breaks(buf, len, breakpoints);
152
153     if (num_breakpoints == 0) {
154         delete[] breakpoints;
155         return;
156     }
157
158     new_block_summary.num_chunks = num_breakpoints;
159     new_block_summary.chunks = new chunk_info[num_breakpoints];
160
161     int block_start = 0;
162     for (int i = 0; i < num_breakpoints; i++) {
163         new_block_summary.chunks[i].offset = block_start;
164         new_block_summary.chunks[i].len = breakpoints[i] - block_start + 1;
165         block_start = breakpoints[i] + 1;
166
167         SHA1Checksum hash;
168         hash.process(&buf[new_block_summary.chunks[i].offset],
169                      new_block_summary.chunks[i].len);
170         assert(hash.checksum_size() == (size_t)HASH_SIZE);
171         memcpy(new_block_summary.chunks[i].hash, hash.checksum(), HASH_SIZE);
172     }
173
174     new_block_summary_valid = true;
175     delete[] breakpoints;
176 }
177
178 void Subfile::store_block_signatures(ObjectReference ref, block_summary summary)
179 {
180     int n = summary.num_chunks;
181     char *packed = (char *)malloc(n * (2 + HASH_SIZE));
182
183     for (int i = 0; i < n; i++) {
184         assert(summary.chunks[i].len >= 0 && summary.chunks[i].len <= 0xffff);
185         uint16_t len = htons(summary.chunks[i].len);
186         char *packed_info = &packed[i * (2 + HASH_SIZE)];
187         memcpy(&packed_info[0], &len, 2);
188         memcpy(&packed_info[2], summary.chunks[i].hash, HASH_SIZE);
189     }
190
191     db->StoreChunkSignatures(ref, packed, n * (2 + HASH_SIZE),
192                              get_algorithm());
193
194     free(packed);
195 }
196
197 /* Compute an incremental representation of the most recent block analyzed. */
198 enum subfile_item_type { SUBFILE_COPY, SUBFILE_NEW };
199
200 struct subfile_item {
201     subfile_item_type type;
202
203     // For type SUBFILE_COPY
204     ObjectReference ref;
205
206     // For type SUBFILE_NEW
207     int src_offset, dst_offset;
208     int len;
209     char hash[Subfile::HASH_SIZE];
210 };
211
212 /* Compute an incremental representation of the data last analyzed.  A list of
213  * references will be returned corresponding to the data.  If new data must be
214  * written out to the backup, it will be written out via the LbsObject
215  * provided, to the provided TarSegmentStore. */
216 list<ObjectReference> Subfile::create_incremental(TarSegmentStore *tss,
217                                                   LbsObject *o,
218                                                   double block_age)
219 {
220     assert(new_block_summary_valid);
221     bool matched_old = false;
222     size_t new_data = 0;
223
224     list<subfile_item> items;
225     list<ObjectReference> refs;
226
227     ensure_signatures_loaded();
228
229     assert(new_block_summary.num_chunks > 0);
230
231     for (int i = 0; i < new_block_summary.num_chunks; i++) {
232         map<string, pair<int, int> >::iterator m
233             = chunk_index.find(string(new_block_summary.chunks[i].hash,
234                                       HASH_SIZE));
235
236         struct subfile_item item;
237         if (m == chunk_index.end()) {
238             item.type = SUBFILE_NEW;
239             item.src_offset = new_block_summary.chunks[i].offset;
240             item.dst_offset = new_data;
241             item.len = new_block_summary.chunks[i].len;
242             memcpy(item.hash, new_block_summary.chunks[i].hash, HASH_SIZE);
243             new_data += item.len;
244         } else {
245             struct chunk_info &old_chunk
246                 = block_list[m->second.first].chunks[m->second.second];
247             item.type = SUBFILE_COPY;
248             item.ref = block_list[m->second.first].ref;
249             item.ref.set_range(old_chunk.offset, old_chunk.len);
250             matched_old = true;
251         }
252
253         items.push_back(item);
254     }
255
256     // No data was matched.  The entire block can be written out as is into a
257     // new object, and the new_block_summary used to save chunk signatures.
258     if (!matched_old) {
259         SHA1Checksum block_hash;
260         block_hash.process(analyzed_buf, analyzed_len);
261         string block_csum = block_hash.checksum_str();
262
263         o->set_data(analyzed_buf, analyzed_len);
264         o->write(tss);
265         ObjectReference ref = o->get_ref();
266         db->StoreObject(ref, block_csum, analyzed_len, block_age);
267         if (analyzed_len >= 16384)
268             store_block_signatures(ref, new_block_summary);
269         refs.push_back(ref);
270         delete o;
271         return refs;
272     }
273
274     // Otherwise, construct a new block containing all literal data needed (if
275     // any exists), write it out, and construct the appropriate list of
276     // references.
277     list<subfile_item>::iterator i;
278     if (new_data > 0) {
279         char *literal_buf = new char[new_data];
280         for (i = items.begin(); i != items.end(); ++i) {
281             if (i->type == SUBFILE_NEW) {
282                 memcpy(&literal_buf[i->dst_offset],
283                        &analyzed_buf[i->src_offset], i->len);
284             }
285         }
286
287         SHA1Checksum block_hash;
288         block_hash.process(literal_buf, new_data);
289         string block_csum = block_hash.checksum_str();
290
291         o->set_group("data");
292         o->set_data(literal_buf, new_data);
293         o->write(tss);
294         ObjectReference ref = o->get_ref();
295         for (i = items.begin(); i != items.end(); ++i) {
296             if (i->type == SUBFILE_NEW) {
297                 i->ref = ref;
298                 i->ref.set_range(i->dst_offset, i->len);
299             }
300         }
301
302         db->StoreObject(ref, block_csum, new_data, 0.0);
303
304         block_summary summary;
305         summary.ref = ref;
306         summary.num_chunks = 0;
307         summary.chunks = new chunk_info[items.size()];
308         for (i = items.begin(); i != items.end(); ++i) {
309             if (i->type == SUBFILE_NEW) {
310                 chunk_info &info = summary.chunks[summary.num_chunks];
311                 memcpy(info.hash, i->hash, HASH_SIZE);
312                 info.offset = i->dst_offset;
313                 info.len = i->len;
314                 summary.num_chunks++;
315             }
316         }
317
318         store_block_signatures(ref, summary);
319
320         delete[] summary.chunks;
321         delete[] literal_buf;
322     }
323
324     delete o;
325
326     ObjectReference ref;
327     for (i = items.begin(); i != items.end(); ++i) {
328         string refstr = i->ref.to_string();
329         if (!ref.merge(i->ref)) {
330             refs.push_back(ref);
331             ref = i->ref;
332         }
333     }
334     assert(!ref.is_null());
335     refs.push_back(ref);
336
337     return refs;
338 }