Ensure metadata file is written prior to calculating checksum.
[cumulus.git] / subfile.cc
1 /* Cumulus: Efficient Filesystem Backup to the Cloud
2  * Copyright (C) 2008 The Cumulus Developers
3  * See the AUTHORS file for a list of contributors.
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 2 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18  */
19
20 /* Allow for sub-file incremental backups: if only a portion of a file changes,
21  * allow the new data to be written out, and the old data to simply be
22  * referenced from the new metadata log. */
23
24 #include <stdlib.h>
25 #include <string.h>
26 #include <assert.h>
27 #include <arpa/inet.h>
28
29 #include "hash.h"
30 #include "subfile.h"
31 #include "third_party/chunk.h"
32
33 using std::list;
34 using std::map;
35 using std::set;
36 using std::string;
37 using std::vector;
38 using std::pair;
39 using std::make_pair;
40
41 Subfile::Subfile(LocalDb *localdb)
42     : db(localdb), checksums_loaded(false), new_block_summary_valid(false)
43 {
44     Hash *hasher = Hash::New();
45     hasher->digest();
46     algorithm_name = chunk_algorithm_name() + "/" + hasher->name();
47     hash_size = hasher->digest_size();
48     delete hasher;
49 }
50
51 Subfile::~Subfile()
52 {
53     for (size_t i = 0; i < block_list.size(); i++) {
54         delete[] block_list[i].chunks;
55     }
56
57     free_analysis();
58 }
59
60 void Subfile::free_analysis()
61 {
62     if (new_block_summary_valid)
63         delete[] new_block_summary.chunks;
64
65     new_block_summary_valid = false;
66 }
67
68 void Subfile::load_old_blocks(const list<ObjectReference> &blocks)
69 {
70     for (list<ObjectReference>::const_iterator i = blocks.begin();
71          i != blocks.end(); ++i) {
72         if (!i->is_normal())
73             continue;
74
75         ObjectReference base = i->base();
76         if (old_blocks.find(base) == old_blocks.end()) {
77             old_blocks.insert(base);
78             if (checksums_loaded)
79                 index_chunks(base);
80         }
81     }
82 }
83
84 /* Actually load chunk signatures from the database, and index them in memory.
85  * This should only be called once per segment. */
86 void Subfile::index_chunks(ObjectReference ref)
87 {
88     string refstr = ref.to_string();
89
90     if (!db->IsAvailable(ref))
91         return;
92
93     /* Look for checksums for this block in the database.  They may not exist,
94      * in which case simply return without error. */
95     char *packed_sigs;
96     size_t len;
97     string algorithm;
98     if (!db->LoadChunkSignatures(ref, (void **)&packed_sigs, &len, &algorithm))
99         return;
100     if (algorithm != algorithm_name) {
101         free(packed_sigs);
102         return;
103     }
104
105     int block_id = block_list.size();
106
107     block_summary summary;
108     summary.ref = ref.base();
109     summary.num_chunks = len / (2 + hash_size);
110     summary.chunks = new chunk_info[summary.num_chunks];
111
112     int block_start = 0;
113     for (int i = 0; i < summary.num_chunks; i++) {
114         char *packed_info = &packed_sigs[i * (2 + hash_size)];
115         summary.chunks[i].hash = string(&packed_info[2], hash_size);
116
117         uint16_t chunk_len;
118         memcpy(&chunk_len, &packed_info[0], 2);
119         summary.chunks[i].len = ntohs(chunk_len);
120         summary.chunks[i].offset = block_start;
121         block_start += summary.chunks[i].len;
122
123         chunk_index[summary.chunks[i].hash] = make_pair(block_id, i);
124     }
125
126     block_list.push_back(summary);
127     free(packed_sigs);
128 }
129
130 /* Signatures can be loaded lazily; this method should be called before any
131  * actual access to the chunk signatures is required, to ensure the data has
132  * actually been loaded. */
133 void Subfile::ensure_signatures_loaded()
134 {
135     if (checksums_loaded)
136         return;
137
138     for (set<ObjectReference>::iterator i = old_blocks.begin();
139          i != old_blocks.end(); ++i) {
140         index_chunks(*i);
141     }
142
143     checksums_loaded = true;
144 }
145
146 void Subfile::analyze_new_block(const char *buf, size_t len)
147 {
148     analyzed_buf = buf;
149     analyzed_len = len;
150     int max_chunks = chunk_compute_max_num_breaks(len);
151
152     free_analysis();
153
154     size_t *breakpoints = new size_t[max_chunks];
155     int num_breakpoints = chunk_compute_breaks(buf, len, breakpoints);
156
157     if (num_breakpoints == 0) {
158         delete[] breakpoints;
159         return;
160     }
161
162     new_block_summary.num_chunks = num_breakpoints;
163     new_block_summary.chunks = new chunk_info[num_breakpoints];
164
165     int block_start = 0;
166     for (int i = 0; i < num_breakpoints; i++) {
167         new_block_summary.chunks[i].offset = block_start;
168         new_block_summary.chunks[i].len = breakpoints[i] - block_start + 1;
169         block_start = breakpoints[i] + 1;
170
171         Hash *hasher = Hash::New();
172         hasher->update(&buf[new_block_summary.chunks[i].offset],
173                        new_block_summary.chunks[i].len);
174         new_block_summary.chunks[i].hash
175             = string(reinterpret_cast<const char *>(hasher->digest()),
176                      hasher->digest_size());
177         delete hasher;
178     }
179
180     new_block_summary_valid = true;
181     delete[] breakpoints;
182 }
183
184 void Subfile::store_block_signatures(ObjectReference ref, block_summary summary)
185 {
186     int n = summary.num_chunks;
187     char *packed = (char *)malloc(n * (2 + hash_size));
188
189     for (int i = 0; i < n; i++) {
190         assert(summary.chunks[i].len >= 0 && summary.chunks[i].len <= 0xffff);
191         uint16_t len = htons(summary.chunks[i].len);
192         char *packed_info = &packed[i * (2 + hash_size)];
193         memcpy(&packed_info[0], &len, 2);
194         memcpy(&packed_info[2], summary.chunks[i].hash.data(), hash_size);
195     }
196
197     db->StoreChunkSignatures(ref, packed, n * (2 + hash_size), algorithm_name);
198
199     free(packed);
200 }
201
202 void Subfile::store_analyzed_signatures(ObjectReference ref)
203 {
204     if (analyzed_len >= 16384)
205         store_block_signatures(ref, new_block_summary);
206 }
207
208 /* Compute an incremental representation of the most recent block analyzed. */
209 enum subfile_item_type { SUBFILE_COPY, SUBFILE_NEW };
210
211 struct subfile_item {
212     subfile_item_type type;
213
214     // For type SUBFILE_COPY
215     ObjectReference ref;
216
217     // For type SUBFILE_NEW
218     int src_offset, dst_offset;
219     int len;
220     string hash;
221 };
222
223 /* Compute an incremental representation of the data last analyzed.  A list of
224  * references will be returned corresponding to the data.  If new data must be
225  * written out to the backup, it will be written out via the LbsObject
226  * provided, to the provided TarSegmentStore. */
227 list<ObjectReference> Subfile::create_incremental(TarSegmentStore *tss,
228                                                   LbsObject *o,
229                                                   double block_age)
230 {
231     assert(new_block_summary_valid);
232     bool matched_old = false;
233     size_t new_data = 0;
234
235     list<subfile_item> items;
236     list<ObjectReference> refs;
237
238     ensure_signatures_loaded();
239
240     assert(new_block_summary.num_chunks > 0);
241
242     for (int i = 0; i < new_block_summary.num_chunks; i++) {
243         map<string, pair<int, int> >::iterator m
244             = chunk_index.find(new_block_summary.chunks[i].hash);
245
246         struct subfile_item item;
247         if (m == chunk_index.end()) {
248             item.type = SUBFILE_NEW;
249             item.src_offset = new_block_summary.chunks[i].offset;
250             item.dst_offset = new_data;
251             item.len = new_block_summary.chunks[i].len;
252             item.hash = new_block_summary.chunks[i].hash;
253             new_data += item.len;
254         } else {
255             struct chunk_info &old_chunk
256                 = block_list[m->second.first].chunks[m->second.second];
257             item.type = SUBFILE_COPY;
258             item.ref = block_list[m->second.first].ref;
259             item.ref.set_range(old_chunk.offset, old_chunk.len);
260             matched_old = true;
261         }
262
263         items.push_back(item);
264     }
265
266     // No data was matched.  The entire block can be written out as is into a
267     // new object, and the new_block_summary used to save chunk signatures.
268     if (!matched_old) {
269         o->set_age(block_age);
270         o->set_data(analyzed_buf, analyzed_len, NULL);
271         o->write(tss);
272         ObjectReference ref = o->get_ref();
273         store_analyzed_signatures(ref);
274         refs.push_back(ref);
275         delete o;
276         return refs;
277     }
278
279     // Otherwise, construct a new block containing all literal data needed (if
280     // any exists), write it out, and construct the appropriate list of
281     // references.
282     list<subfile_item>::iterator i;
283     if (new_data > 0) {
284         char *literal_buf = new char[new_data];
285         for (i = items.begin(); i != items.end(); ++i) {
286             if (i->type == SUBFILE_NEW) {
287                 memcpy(&literal_buf[i->dst_offset],
288                        &analyzed_buf[i->src_offset], i->len);
289             }
290         }
291
292         Hash *hasher = Hash::New();
293         hasher->update(literal_buf, new_data);
294         string block_csum = hasher->digest_str();
295         delete hasher;
296
297         o->set_group("data");
298         o->set_data(literal_buf, new_data, NULL);
299         o->write(tss);
300         ObjectReference ref = o->get_ref();
301         for (i = items.begin(); i != items.end(); ++i) {
302             if (i->type == SUBFILE_NEW) {
303                 i->ref = ref;
304                 i->ref.set_range(i->dst_offset, i->len);
305             }
306         }
307
308         //db->StoreObject(ref, 0.0);
309
310         block_summary summary;
311         summary.ref = ref;
312         summary.num_chunks = 0;
313         summary.chunks = new chunk_info[items.size()];
314         for (i = items.begin(); i != items.end(); ++i) {
315             if (i->type == SUBFILE_NEW) {
316                 chunk_info &info = summary.chunks[summary.num_chunks];
317                 info.hash = i->hash;
318                 info.offset = i->dst_offset;
319                 info.len = i->len;
320                 summary.num_chunks++;
321             }
322         }
323
324         store_block_signatures(ref, summary);
325
326         delete[] summary.chunks;
327         delete[] literal_buf;
328     }
329
330     delete o;
331
332     ObjectReference ref;
333     for (i = items.begin(); i != items.end(); ++i) {
334         string refstr = i->ref.to_string();
335         if (!ref.merge(i->ref)) {
336             refs.push_back(ref);
337             ref = i->ref;
338         }
339     }
340     assert(!ref.is_null());
341     refs.push_back(ref);
342
343     return refs;
344 }