Start work on tests for Cumulus.
[cumulus.git] / subfile.cc
1 /* Cumulus: Efficient Filesystem Backup to the Cloud
2  * Copyright (C) 2008 The Cumulus Developers
3  * See the AUTHORS file for a list of contributors.
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 2 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18  */
19
20 /* Allow for sub-file incremental backups: if only a portion of a file changes,
21  * allow the new data to be written out, and the old data to simply be
22  * referenced from the new metadata log. */
23
24 #include <stdlib.h>
25 #include <string.h>
26 #include <assert.h>
27 #include <arpa/inet.h>
28
29 #include "subfile.h"
30 #include "third_party/chunk.h"
31 #include "third_party/sha1.h"
32
33 using std::list;
34 using std::map;
35 using std::set;
36 using std::string;
37 using std::vector;
38 using std::pair;
39 using std::make_pair;
40
41 Subfile::Subfile(LocalDb *localdb)
42     : db(localdb), checksums_loaded(false), new_block_summary_valid(false)
43 {
44 }
45
46 Subfile::~Subfile()
47 {
48     for (size_t i = 0; i < block_list.size(); i++) {
49         delete[] block_list[i].chunks;
50     }
51
52     free_analysis();
53 }
54
55 void Subfile::free_analysis()
56 {
57     if (new_block_summary_valid)
58         delete[] new_block_summary.chunks;
59
60     new_block_summary_valid = false;
61 }
62
63 void Subfile::load_old_blocks(const list<ObjectReference> &blocks)
64 {
65     for (list<ObjectReference>::const_iterator i = blocks.begin();
66          i != blocks.end(); ++i) {
67         if (!i->is_normal())
68             continue;
69
70         ObjectReference base = i->base();
71         if (old_blocks.find(base) == old_blocks.end()) {
72             old_blocks.insert(base);
73             if (checksums_loaded)
74                 index_chunks(base);
75         }
76     }
77 }
78
79 /* Actually load chunk signatures from the database, and index them in memory.
80  * This should only be called once per segment. */
81 void Subfile::index_chunks(ObjectReference ref)
82 {
83     string refstr = ref.to_string();
84
85     if (!db->IsAvailable(ref))
86         return;
87
88     /* Look for checksums for this block in the database.  They may not exist,
89      * in which case simply return without error. */
90     char *packed_sigs;
91     size_t len;
92     string algorithm;
93     if (!db->LoadChunkSignatures(ref, (void **)&packed_sigs, &len, &algorithm))
94         return;
95     if (algorithm != get_algorithm()) {
96         free(packed_sigs);
97         return;
98     }
99
100     int block_id = block_list.size();
101
102     block_summary summary;
103     summary.ref = ref.base();
104     summary.num_chunks = len / (2 + HASH_SIZE);
105     summary.chunks = new chunk_info[summary.num_chunks];
106
107     int block_start = 0;
108     for (int i = 0; i < summary.num_chunks; i++) {
109         char *packed_info = &packed_sigs[i * (2 + HASH_SIZE)];
110         memcpy(summary.chunks[i].hash, &packed_info[2], HASH_SIZE);
111
112         uint16_t chunk_len;
113         memcpy(&chunk_len, &packed_info[0], 2);
114         summary.chunks[i].len = ntohs(chunk_len);
115         summary.chunks[i].offset = block_start;
116         block_start += summary.chunks[i].len;
117
118         chunk_index[string(summary.chunks[i].hash, HASH_SIZE)]
119             = make_pair(block_id, i);
120     }
121
122     block_list.push_back(summary);
123     free(packed_sigs);
124 }
125
126 /* Signatures can be loaded lazily; this method should be called before any
127  * actual access to the chunk signatures is required, to ensure the data has
128  * actually been loaded. */
129 void Subfile::ensure_signatures_loaded()
130 {
131     if (checksums_loaded)
132         return;
133
134     for (set<ObjectReference>::iterator i = old_blocks.begin();
135          i != old_blocks.end(); ++i) {
136         index_chunks(*i);
137     }
138
139     checksums_loaded = true;
140 }
141
142 void Subfile::analyze_new_block(const char *buf, size_t len)
143 {
144     analyzed_buf = buf;
145     analyzed_len = len;
146     int max_chunks = chunk_compute_max_num_breaks(len);
147
148     free_analysis();
149
150     size_t *breakpoints = new size_t[max_chunks];
151     int num_breakpoints = chunk_compute_breaks(buf, len, breakpoints);
152
153     if (num_breakpoints == 0) {
154         delete[] breakpoints;
155         return;
156     }
157
158     new_block_summary.num_chunks = num_breakpoints;
159     new_block_summary.chunks = new chunk_info[num_breakpoints];
160
161     int block_start = 0;
162     for (int i = 0; i < num_breakpoints; i++) {
163         new_block_summary.chunks[i].offset = block_start;
164         new_block_summary.chunks[i].len = breakpoints[i] - block_start + 1;
165         block_start = breakpoints[i] + 1;
166
167         SHA1Checksum hash;
168         hash.process(&buf[new_block_summary.chunks[i].offset],
169                      new_block_summary.chunks[i].len);
170         assert(hash.checksum_size() == (size_t)HASH_SIZE);
171         memcpy(new_block_summary.chunks[i].hash, hash.checksum(), HASH_SIZE);
172     }
173
174     new_block_summary_valid = true;
175     delete[] breakpoints;
176 }
177
178 void Subfile::store_block_signatures(ObjectReference ref, block_summary summary)
179 {
180     int n = summary.num_chunks;
181     char *packed = (char *)malloc(n * (2 + HASH_SIZE));
182
183     for (int i = 0; i < n; i++) {
184         assert(summary.chunks[i].len >= 0 && summary.chunks[i].len <= 0xffff);
185         uint16_t len = htons(summary.chunks[i].len);
186         char *packed_info = &packed[i * (2 + HASH_SIZE)];
187         memcpy(&packed_info[0], &len, 2);
188         memcpy(&packed_info[2], summary.chunks[i].hash, HASH_SIZE);
189     }
190
191     db->StoreChunkSignatures(ref, packed, n * (2 + HASH_SIZE),
192                              get_algorithm());
193
194     free(packed);
195 }
196
197 void Subfile::store_analyzed_signatures(ObjectReference ref)
198 {
199     if (analyzed_len >= 16384)
200         store_block_signatures(ref, new_block_summary);
201 }
202
203 /* Compute an incremental representation of the most recent block analyzed. */
204 enum subfile_item_type { SUBFILE_COPY, SUBFILE_NEW };
205
206 struct subfile_item {
207     subfile_item_type type;
208
209     // For type SUBFILE_COPY
210     ObjectReference ref;
211
212     // For type SUBFILE_NEW
213     int src_offset, dst_offset;
214     int len;
215     char hash[Subfile::HASH_SIZE];
216 };
217
218 /* Compute an incremental representation of the data last analyzed.  A list of
219  * references will be returned corresponding to the data.  If new data must be
220  * written out to the backup, it will be written out via the LbsObject
221  * provided, to the provided TarSegmentStore. */
222 list<ObjectReference> Subfile::create_incremental(TarSegmentStore *tss,
223                                                   LbsObject *o,
224                                                   double block_age)
225 {
226     assert(new_block_summary_valid);
227     bool matched_old = false;
228     size_t new_data = 0;
229
230     list<subfile_item> items;
231     list<ObjectReference> refs;
232
233     ensure_signatures_loaded();
234
235     assert(new_block_summary.num_chunks > 0);
236
237     for (int i = 0; i < new_block_summary.num_chunks; i++) {
238         map<string, pair<int, int> >::iterator m
239             = chunk_index.find(string(new_block_summary.chunks[i].hash,
240                                       HASH_SIZE));
241
242         struct subfile_item item;
243         if (m == chunk_index.end()) {
244             item.type = SUBFILE_NEW;
245             item.src_offset = new_block_summary.chunks[i].offset;
246             item.dst_offset = new_data;
247             item.len = new_block_summary.chunks[i].len;
248             memcpy(item.hash, new_block_summary.chunks[i].hash, HASH_SIZE);
249             new_data += item.len;
250         } else {
251             struct chunk_info &old_chunk
252                 = block_list[m->second.first].chunks[m->second.second];
253             item.type = SUBFILE_COPY;
254             item.ref = block_list[m->second.first].ref;
255             item.ref.set_range(old_chunk.offset, old_chunk.len);
256             matched_old = true;
257         }
258
259         items.push_back(item);
260     }
261
262     // No data was matched.  The entire block can be written out as is into a
263     // new object, and the new_block_summary used to save chunk signatures.
264     if (!matched_old) {
265         o->set_age(block_age);
266         o->set_data(analyzed_buf, analyzed_len, NULL);
267         o->write(tss);
268         ObjectReference ref = o->get_ref();
269         store_analyzed_signatures(ref);
270         refs.push_back(ref);
271         delete o;
272         return refs;
273     }
274
275     // Otherwise, construct a new block containing all literal data needed (if
276     // any exists), write it out, and construct the appropriate list of
277     // references.
278     list<subfile_item>::iterator i;
279     if (new_data > 0) {
280         char *literal_buf = new char[new_data];
281         for (i = items.begin(); i != items.end(); ++i) {
282             if (i->type == SUBFILE_NEW) {
283                 memcpy(&literal_buf[i->dst_offset],
284                        &analyzed_buf[i->src_offset], i->len);
285             }
286         }
287
288         SHA1Checksum block_hash;
289         block_hash.process(literal_buf, new_data);
290         string block_csum = block_hash.checksum_str();
291
292         o->set_group("data");
293         o->set_data(literal_buf, new_data, NULL);
294         o->write(tss);
295         ObjectReference ref = o->get_ref();
296         for (i = items.begin(); i != items.end(); ++i) {
297             if (i->type == SUBFILE_NEW) {
298                 i->ref = ref;
299                 i->ref.set_range(i->dst_offset, i->len);
300             }
301         }
302
303         //db->StoreObject(ref, 0.0);
304
305         block_summary summary;
306         summary.ref = ref;
307         summary.num_chunks = 0;
308         summary.chunks = new chunk_info[items.size()];
309         for (i = items.begin(); i != items.end(); ++i) {
310             if (i->type == SUBFILE_NEW) {
311                 chunk_info &info = summary.chunks[summary.num_chunks];
312                 memcpy(info.hash, i->hash, HASH_SIZE);
313                 info.offset = i->dst_offset;
314                 info.len = i->len;
315                 summary.num_chunks++;
316             }
317         }
318
319         store_block_signatures(ref, summary);
320
321         delete[] summary.chunks;
322         delete[] literal_buf;
323     }
324
325     delete o;
326
327     ObjectReference ref;
328     for (i = items.begin(); i != items.end(); ++i) {
329         string refstr = i->ref.to_string();
330         if (!ref.merge(i->ref)) {
331             refs.push_back(ref);
332             ref = i->ref;
333         }
334     }
335     assert(!ref.is_null());
336     refs.push_back(ref);
337
338     return refs;
339 }