Bugfix for database checksum reconstruction with zero blocks.
[cumulus.git] / python / cumulus / rebuild_database.py
1 #!/usr/bin/python
2 #
3 # Cumulus: Efficient Filesystem Backup to the Cloud
4 # Copyright (C) 2013 The Cumulus Developers
5 # See the AUTHORS file for a list of contributors.
6 #
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 2 of the License, or
10 # (at your option) any later version.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License along
18 # with this program; if not, write to the Free Software Foundation, Inc.,
19 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20
21 """Code for rebuilding the local database.
22
23 Given a previous metadata dump and a filesystem tree (which may or may not
24 exactly match), recompute block signatures to the extent possible to rebuild
25 the local database.  This can be used to recover from a local database loss,
26 given data from a previous backup.
27 """
28
29 import base64
30 import hashlib
31 import itertools
32 import os
33 import re
34 import struct
35 import subprocess
36 import sys
37 import tarfile
38 import time
39
40 import cumulus
41
42 CHECKSUM_ALGORITHM = "sha224"
43 CHUNKER_PROGRAM = "cumulus-chunker-standalone"
44
45 # TODO: Move to somewhere common
46 SQLITE_TIMESTAMP = "%Y-%m-%d %H:%M:%S"
47
48 class Chunker(object):
49     """Compute sub-file chunk boundaries using a sliding Rabin fingerprint.
50
51     This duplicates the chunking algorithm in third_party/chunk.cc.
52     """
53     # Chunking parameters.  These should match those in third_party/chunk.cc.
54     MODULUS = 0xbfe6b8a5bf378d83
55     WINDOW_SIZE = 48
56     BREAKMARK_VALUE = 0x78
57     MIN_CHUNK_SIZE = 2048
58     MAX_CHUNK_SIZE = 65535
59     TARGET_CHUNK_SIZE = 4096
60     ALGORITHM_NAME = "lbfs-%d/%s" % (TARGET_CHUNK_SIZE, CHECKSUM_ALGORITHM)
61
62     # Minimum size of a block before we should bother storing subfile
63     # signatures (only applies when full blocks are used to store a file;
64     # subfile signatures are always used when subfile incrementals are
65     # present).
66     MINIMUM_OBJECT_SIZE = 16384
67
68     def __init__(self):
69         degree = self.MODULUS.bit_length() - 1
70         self.degree = degree
71
72         # Lookup table for polynomial reduction when shifting a new byte in,
73         # based on the high-order bits.
74         self.T = [self.polymult(1, i << degree, self.MODULUS) ^ (i << degree)
75                   for i in range(256)]
76
77         # Values to remove a byte from the signature when it falls out of the
78         # window.
79         self.U = [self.polymult(i, 1 << 8*(self.WINDOW_SIZE - 1),
80                                 self.MODULUS) for i in range(256)]
81
82         self.hash_algorithm = cumulus.CHECKSUM_ALGORITHMS[CHECKSUM_ALGORITHM]
83         self.hash_size = self.hash_algorithm().digestsize
84
85     def polymult(self, x, y, n):
86         # Polynomial multiplication: result = x * y
87         result = 0
88         for i in range(x.bit_length()):
89             if (x >> i) & 1:
90                 result ^= y << i
91         # Reduction modulo n
92         size = n.bit_length()
93         while result.bit_length() >= size:
94             result ^= n << (result.bit_length() - size)
95         return result
96
97     def window_init(self):
98         # Sliding signature state is:
99         #   [signature value, history buffer index, history buffer contents]
100         return [0, 0, [0] * self.WINDOW_SIZE]
101
102     def window_update(self, signature, byte):
103         poly = signature[0]
104         offset = signature[1]
105         undo = self.U[signature[2][offset]]
106         poly = ((poly ^ undo) << 8) + byte
107         poly ^= self.T[poly >> self.degree]
108
109         signature[0] = poly
110         signature[1] = (offset + 1) % self.WINDOW_SIZE
111         signature[2][offset] = byte
112
113     def compute_breaks(self, buf):
114         breaks = [0]
115         signature = self.window_init()
116         for i in xrange(len(buf)):
117             self.window_update(signature, ord(buf[i]))
118             block_len = i - breaks[-1] + 1
119             if ((signature[0] % self.TARGET_CHUNK_SIZE == self.BREAKMARK_VALUE
120                         and block_len >= self.MIN_CHUNK_SIZE)
121                     or block_len >= self.MAX_CHUNK_SIZE):
122                 breaks.append(i + 1)
123         if breaks[-1] < len(buf):
124             breaks.append(len(buf))
125         return breaks
126
127     def compute_signatures(self, buf, buf_offset=0):
128         """Break a buffer into chunks and compute chunk signatures.
129
130         Args:
131             buf: The data buffer.
132             buf_offset: The offset of the data buffer within the original
133                 block, to handle cases where only a portion of the block is
134                 available.
135
136         Returns:
137             A dictionary containing signature data.  Keys are chunk offsets
138             (from the beginning of the block), and values are tuples (size, raw
139             hash value).
140         """
141         breaks = self.compute_breaks(buf)
142         signatures = {}
143         for i in range(1, len(breaks)):
144             chunk = buf[breaks[i-1]:breaks[i]]
145             hasher = self.hash_algorithm()
146             hasher.update(chunk)
147             signatures[breaks[i-1] + buf_offset] = (breaks[i] - breaks[i-1],
148                                                     hasher.digest())
149         return signatures
150
151     def dump_signatures(self, signatures):
152         """Convert signatures to the binary format stored in the database."""
153         records = []
154
155         # Emit records indicating that no signatures are available for the next
156         # n bytes.  Since the size is a 16-bit value, to skip larger distances
157         # multiple records must be emitted.  An all-zero signature indicates
158         # the lack of data.
159         def skip(n):
160             while n > 0:
161                 i = min(n, self.MAX_CHUNK_SIZE)
162                 records.append(struct.pack(">H", i) + "\x00" * self.hash_size)
163                 n -= i
164
165         position = 0
166         for next_start, (size, digest) in sorted(signatures.iteritems()):
167             if next_start < position:
168                 print "Warning: overlapping signatures, ignoring"
169                 continue
170             skip(next_start - position)
171             records.append(struct.pack(">H", size) + digest)
172             position = next_start + size
173
174         return "".join(records)
175
176     def load_signatures(self, signatures):
177         """Loads signatures from the binary format stored in the database."""
178         entry_size = 2 + self.hash_size
179         if len(signatures) % entry_size != 0:
180             print "Warning: Invalid signatures to load"
181             return {}
182
183         null_digest = "\x00" * self.hash_size
184         position = 0
185         result = {}
186         for i in range(len(signatures) // entry_size):
187             sig = signatures[i*entry_size:(i+1)*entry_size]
188             size, digest = struct.unpack(">H", sig[:2])[0], sig[2:]
189             if digest != null_digest:
190                 result[position] = (size, digest)
191             position += size
192         return result
193
194
195 class ChunkerExternal(Chunker):
196     """A Chunker which uses an external program to compute Rabin fingerprints.
197
198     This can run much more quickly than the Python code, but should otherwise
199     give identical results.
200     """
201
202     def __init__(self):
203         super(ChunkerExternal, self).__init__()
204         self.subproc = subprocess.Popen([CHUNKER_PROGRAM],
205                                         stdin=subprocess.PIPE,
206                                         stdout=subprocess.PIPE)
207
208     def compute_breaks(self, buf):
209         if len(buf) == 0:
210             return [0]
211         self.subproc.stdin.write(struct.pack(">i", len(buf)))
212         self.subproc.stdin.write(buf)
213         self.subproc.stdin.flush()
214         breaks = self.subproc.stdout.readline()
215         return [0] + [int(x) + 1 for x in breaks.split()]
216
217
218 class DatabaseRebuilder(object):
219     def __init__(self, database):
220         self.database = database
221         self.cursor = database.cursor()
222         self.segment_ids = {}
223         self.chunker = ChunkerExternal()
224         #self.chunker = Chunker()
225
226     def segment_to_id(self, segment):
227         if segment in self.segment_ids: return self.segment_ids[segment]
228
229         self.cursor.execute("""insert or ignore into segments(segment)
230                                values (?)""", (segment,))
231         self.cursor.execute("""select segmentid from segments
232                                where segment = ?""", (segment,))
233         id = self.cursor.fetchone()[0]
234         self.segment_ids[segment] = id
235         return id
236
237     def rebuild(self, metadata, reference_path):
238         """Iterate through old metadata and use it to rebuild the database.
239
240         Args:
241             metadata: An iterable containing lines of the metadata log.
242             reference_path: Path to the root of a file system tree which may be
243                 similar to data in the metadata log (used to recompute block
244                 signatures).
245         """
246         for fields in cumulus.parse(metadata, lambda l: len(l) == 0):
247             metadata = cumulus.MetadataItem(fields, None)
248             # Only process regular files; skip over other types (directories,
249             # symlinks, etc.)
250             if metadata.items.type not in ("-", "f"): continue
251             try:
252                 path = os.path.join(reference_path, metadata.items.name)
253                 print "Path:", path
254                 # TODO: Check file size for early abort if different
255                 self.rebuild_file(open(path), metadata)
256             except IOError as e:
257                 print e
258                 pass  # Ignore the file
259
260         self.database.commit()
261
262     def reload_segment_metadata(self, segment_metadata):
263         """Read a segment metadata (.meta) file into the local database.
264
265         Updates the segments table in the local database with information from
266         a a segment metadata backup file.  Old data is not overwritten, so
267         loading a .meta file with partial information is fine.
268         """
269         for info in cumulus.parse(segment_metadata,
270                                      terminate=lambda l: len(l) == 0):
271             segment = info.pop("segment")
272             self.insert_segment_info(segment, info)
273
274         self.database.commit()
275
276     def insert_segment_info(self, segment, info):
277         id = self.segment_to_id(segment)
278         for k, v in info.items():
279             self.cursor.execute("update segments set " + k + " = ? "
280                                 "where segmentid = ?",
281                                 (v, id))
282
283     def rebuild_file(self, fp, metadata):
284         """Recompute database signatures if a file is unchanged.
285
286         If the current file contents match that from the old metadata (the
287         full-file hash matches), then recompute block- and chunk-level
288         signatures for the objects referenced by the file.
289         """
290         blocks = [cumulus.CumulusStore.parse_ref(b) for b in metadata.data()]
291         verifier = cumulus.ChecksumVerifier(metadata.items.checksum)
292         checksums = {}
293         subblock = {}
294         for segment, object, checksum, slice in blocks:
295             # Given a reference to a block of unknown size we don't know how to
296             # match up the data, so we have to give up on rebuilding for this
297             # file.
298             if slice is None: return
299
300             start, length, exact = slice
301             buf = fp.read(length)
302             verifier.update(buf)
303
304             # Zero blocks get no checksums, so skip further processing on them.
305             if object is None: continue
306
307             if exact:
308                 csum = cumulus.ChecksumCreator(CHECKSUM_ALGORITHM)
309                 csum.update(buf)
310                 checksums[(segment, object)] = (length, csum.compute())
311             else:
312                 # Compute a lower bound on the object size.
313                 oldlength, csum = checksums.get((segment, object), (0, None))
314                 checksums[(segment, object)] = (max(oldlength, start + length),
315                                                 csum)
316
317             if length >= self.chunker.MINIMUM_OBJECT_SIZE or not exact:
318                 signatures = self.chunker.compute_signatures(buf, start)
319                 subblock.setdefault((segment, object), {}).update(signatures)
320
321         if verifier.valid():
322             for k in subblock:
323                 subblock[k] = self.chunker.dump_signatures(subblock[k])
324             self.store_checksums(checksums, subblock)
325         else:
326             print "Checksum mismatch"
327
328     def store_checksums(self, block_checksums, subblock_signatures):
329         for (segment, object), (size, checksum) in block_checksums.iteritems():
330             segmentid = self.segment_to_id(segment)
331             self.cursor.execute(
332                 """insert or ignore into block_index(segmentid, object)
333                    values (?, ?)""",
334                 (segmentid, object))
335             self.cursor.execute("""select blockid from block_index
336                                    where segmentid = ? and object = ?""",
337                                 (segmentid, object))
338             blockid = self.cursor.fetchall()[0][0]
339
340             # Store checksum only if it is available; we don't want to
341             # overwrite an existing checksum in the database with NULL.
342             if checksum is not None:
343                 self.cursor.execute("""update block_index
344                                        set checksum = ?
345                                        where blockid = ?""",
346                                     (checksum, blockid))
347
348             # Update the object size.  Our size may be an estimate, based on
349             # slices that we have seen.  The size in the database must not be
350             # larger than the true size, but update it to the largest value
351             # possible.
352             self.cursor.execute("""update block_index
353                                    set size = max(?, coalesce(size, 0))
354                                    where blockid = ?""",
355                                 (size, blockid))
356
357             # Store subblock signatures, if available.
358             # TODO: Even better would be to merge signature data, to handle the
359             # case where different files see different chunks of a block.
360             sigs = subblock_signatures.get((segment, object))
361             if sigs:
362                 self.cursor.execute(
363                     """insert or replace into subblock_signatures(
364                            blockid, algorithm, signatures)
365                        values (?, ?, ?)""",
366                     (blockid, self.chunker.ALGORITHM_NAME, buffer(sigs)))
367
368
369 class SegmentStateRebuilder(object):
370     """Reconstructs segment metadata files from raw segment data."""
371
372     def __init__(self):
373         self.filters = dict(cumulus.SEGMENT_FILTERS)
374         self.segment_pattern = cumulus.SEARCH_PATHS["segments"]
375
376     def compute_metadata(self, path, relative_path):
377         """Recompute metadata of a single segment.
378
379         Args:
380             path: Path to the segment file in the file system.
381             relative_path: Path relative to the root for the storage backend.
382         """
383         # Does the filename match that of a segment?  If so, extract the
384         # extension to determine the filter to apply to decompress.
385         filename = os.path.basename(relative_path)
386         m = self.segment_pattern.match(filename)
387         if not m: return
388         segment_name = m.group(1)
389         extension = m.group(2)
390         if extension not in self.filters: return
391         filter_cmd = self.filters[extension]
392
393         # File attributes.
394         st_buf = os.stat(path)
395         timestamp = time.strftime(SQLITE_TIMESTAMP,
396                                   time.gmtime(st_buf.st_mtime))
397
398         # Compute attributes of the compressed segment data.
399         BLOCK_SIZE = 4096
400         with open(path) as segment:
401             disk_size = 0
402             checksummer = cumulus.ChecksumCreator(CHECKSUM_ALGORITHM)
403             while True:
404                 buf = segment.read(BLOCK_SIZE)
405                 if len(buf) == 0: break
406                 disk_size += len(buf)
407                 checksummer.update(buf)
408         checksum = checksummer.compute()
409
410         # Compute attributes of the objects within the segment.
411         data_size = 0
412         object_count = 0
413         with open(path) as segment:
414             decompressed = cumulus.CumulusStore.filter_data(segment, filter_cmd)
415             objects = tarfile.open(mode='r|', fileobj=decompressed)
416             for tarinfo in objects:
417                 data_size += tarinfo.size
418                 object_count += 1
419
420         return {"segment": cumulus.uri_encode(segment_name),
421                 "path": cumulus.uri_encode(relative_path),
422                 "checksum": checksum,
423                 "data_size": data_size,
424                 "disk_size": disk_size,
425                 "timestamp": timestamp}
426
427 if __name__ == "__main__":
428     # Sample code to reconstruct segment metadata--ought to be relocated.
429     if False:
430         segment_rebuilder = SegmentStateRebuilder()
431         topdir = sys.argv[1]
432         files = []
433         for dirpath, dirnames, filenames in os.walk(topdir):
434             for f in filenames:
435                 files.append(os.path.join(dirpath, f))
436         files.sort()
437         for f in files:
438             metadata = segment_rebuilder.compute_metadata(
439                 f,
440                 os.path.relpath(f, topdir))
441             if metadata:
442                 for (k, v) in sorted(metadata.items()):
443                     print "%s: %s" % (k, v)
444                 print
445         sys.exit(0)
446
447     # Sample code to rebuild the segments table from metadata--needs to be
448     # merged with the code below.
449     if False:
450         rebuilder = DatabaseRebuilder(cumulus.LocalDatabase(sys.argv[1]))
451         rebuilder.reload_segment_metadata(open(sys.argv[2]))
452         sys.exit(0)
453
454     # Read metadata from stdin; filter out lines starting with "@@" so the
455     # statcache file can be parsed as well.
456     metadata = (x for x in sys.stdin if not x.startswith("@@"))
457
458     rebuilder = DatabaseRebuilder(cumulus.LocalDatabase(sys.argv[1]))
459     rebuilder.rebuild(metadata, "/")