39bf7c11941479d2ced24c0a97281faf7e32beef
[cumulus.git] / python / cumulus / rebuild_database.py
1 #!/usr/bin/python
2 #
3 # Cumulus: Efficient Filesystem Backup to the Cloud
4 # Copyright (C) 2013 The Cumulus Developers
5 # See the AUTHORS file for a list of contributors.
6 #
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 2 of the License, or
10 # (at your option) any later version.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License along
18 # with this program; if not, write to the Free Software Foundation, Inc.,
19 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20
21 """Code for rebuilding the local database.
22
23 Given a previous metadata dump and a filesystem tree (which may or may not
24 exactly match), recompute block signatures to the extent possible to rebuild
25 the local database.  This can be used to recover from a local database loss,
26 given data from a previous backup.
27 """
28
29 import base64
30 import hashlib
31 import itertools
32 import os
33 import re
34 import struct
35 import subprocess
36 import sys
37 import tarfile
38 import time
39
40 import cumulus
41
42 CHECKSUM_ALGORITHM = "sha224"
43 CHUNKER_PROGRAM = "cumulus-chunker-standalone"
44
45 # TODO: Move to somewhere common
46 SQLITE_TIMESTAMP = "%Y-%m-%d %H:%M:%S"
47
48 class Chunker(object):
49     """Compute sub-file chunk boundaries using a sliding Rabin fingerprint.
50
51     This duplicates the chunking algorithm in third_party/chunk.cc.
52     """
53     # Chunking parameters.  These should match those in third_party/chunk.cc.
54     MODULUS = 0xbfe6b8a5bf378d83
55     WINDOW_SIZE = 48
56     BREAKMARK_VALUE = 0x78
57     MIN_CHUNK_SIZE = 2048
58     MAX_CHUNK_SIZE = 65535
59     TARGET_CHUNK_SIZE = 4096
60     ALGORITHM_NAME = "lbfs-%d/%s" % (TARGET_CHUNK_SIZE, CHECKSUM_ALGORITHM)
61
62     # Minimum size of a block before we should bother storing subfile
63     # signatures (only applies when full blocks are used to store a file;
64     # subfile signatures are always used when subfile incrementals are
65     # present).
66     MINIMUM_OBJECT_SIZE = 16384
67
68     def __init__(self):
69         degree = self.MODULUS.bit_length() - 1
70         self.degree = degree
71
72         # Lookup table for polynomial reduction when shifting a new byte in,
73         # based on the high-order bits.
74         self.T = [self.polymult(1, i << degree, self.MODULUS) ^ (i << degree)
75                   for i in range(256)]
76
77         # Values to remove a byte from the signature when it falls out of the
78         # window.
79         self.U = [self.polymult(i, 1 << 8*(self.WINDOW_SIZE - 1),
80                                 self.MODULUS) for i in range(256)]
81
82         self.hash_algorithm = cumulus.CHECKSUM_ALGORITHMS[CHECKSUM_ALGORITHM]
83         self.hash_size = self.hash_algorithm().digestsize
84
85     def polymult(self, x, y, n):
86         # Polynomial multiplication: result = x * y
87         result = 0
88         for i in range(x.bit_length()):
89             if (x >> i) & 1:
90                 result ^= y << i
91         # Reduction modulo n
92         size = n.bit_length()
93         while result.bit_length() >= size:
94             result ^= n << (result.bit_length() - size)
95         return result
96
97     def window_init(self):
98         # Sliding signature state is:
99         #   [signature value, history buffer index, history buffer contents]
100         return [0, 0, [0] * self.WINDOW_SIZE]
101
102     def window_update(self, signature, byte):
103         poly = signature[0]
104         offset = signature[1]
105         undo = self.U[signature[2][offset]]
106         poly = ((poly ^ undo) << 8) + byte
107         poly ^= self.T[poly >> self.degree]
108
109         signature[0] = poly
110         signature[1] = (offset + 1) % self.WINDOW_SIZE
111         signature[2][offset] = byte
112
113     def compute_breaks(self, buf):
114         breaks = [0]
115         signature = self.window_init()
116         for i in xrange(len(buf)):
117             self.window_update(signature, ord(buf[i]))
118             block_len = i - breaks[-1] + 1
119             if ((signature[0] % self.TARGET_CHUNK_SIZE == self.BREAKMARK_VALUE
120                         and block_len >= self.MIN_CHUNK_SIZE)
121                     or block_len >= self.MAX_CHUNK_SIZE):
122                 breaks.append(i + 1)
123         if breaks[-1] < len(buf):
124             breaks.append(len(buf))
125         return breaks
126
127     def compute_signatures(self, buf, buf_offset=0):
128         """Break a buffer into chunks and compute chunk signatures.
129
130         Args:
131             buf: The data buffer.
132             buf_offset: The offset of the data buffer within the original
133                 block, to handle cases where only a portion of the block is
134                 available.
135
136         Returns:
137             A dictionary containing signature data.  Keys are chunk offsets
138             (from the beginning of the block), and values are tuples (size, raw
139             hash value).
140         """
141         breaks = self.compute_breaks(buf)
142         signatures = {}
143         for i in range(1, len(breaks)):
144             chunk = buf[breaks[i-1]:breaks[i]]
145             hasher = self.hash_algorithm()
146             hasher.update(chunk)
147             signatures[breaks[i-1] + buf_offset] = (breaks[i] - breaks[i-1],
148                                                     hasher.digest())
149         return signatures
150
151     def dump_signatures(self, signatures):
152         """Convert signatures to the binary format stored in the database."""
153         records = []
154
155         # Emit records indicating that no signatures are available for the next
156         # n bytes.  Since the size is a 16-bit value, to skip larger distances
157         # multiple records must be emitted.  An all-zero signature indicates
158         # the lack of data.
159         def skip(n):
160             while n > 0:
161                 i = min(n, self.MAX_CHUNK_SIZE)
162                 records.append(struct.pack(">H", i) + "\x00" * self.hash_size)
163                 n -= i
164
165         position = 0
166         for next_start, (size, digest) in sorted(signatures.iteritems()):
167             if next_start < position:
168                 print "Warning: overlapping signatures, ignoring"
169                 continue
170             skip(next_start - position)
171             records.append(struct.pack(">H", size) + digest)
172             position = next_start + size
173
174         return "".join(records)
175
176     def load_signatures(self, signatures):
177         """Loads signatures from the binary format stored in the database."""
178         entry_size = 2 + self.hash_size
179         if len(signatures) % entry_size != 0:
180             print "Warning: Invalid signatures to load"
181             return {}
182
183         null_digest = "\x00" * self.hash_size
184         position = 0
185         result = {}
186         for i in range(len(signatures) // entry_size):
187             sig = signatures[i*entry_size:(i+1)*entry_size]
188             size, digest = struct.unpack(">H", sig[:2])[0], sig[2:]
189             if digest != null_digest:
190                 result[position] = (size, digest)
191             position += size
192         return result
193
194
195 class ChunkerExternal(Chunker):
196     """A Chunker which uses an external program to compute Rabin fingerprints.
197
198     This can run much more quickly than the Python code, but should otherwise
199     give identical results.
200     """
201
202     def __init__(self):
203         super(ChunkerExternal, self).__init__()
204         self.subproc = subprocess.Popen([CHUNKER_PROGRAM],
205                                         stdin=subprocess.PIPE,
206                                         stdout=subprocess.PIPE)
207
208     def compute_breaks(self, buf):
209         if len(buf) == 0:
210             return [0]
211         self.subproc.stdin.write(struct.pack(">i", len(buf)))
212         self.subproc.stdin.write(buf)
213         self.subproc.stdin.flush()
214         breaks = self.subproc.stdout.readline()
215         return [0] + [int(x) + 1 for x in breaks.split()]
216
217
218 class DatabaseRebuilder(object):
219     def __init__(self, database):
220         self.database = database
221         self.cursor = database.cursor()
222         self.segment_ids = {}
223         self.chunker = ChunkerExternal()
224         #self.chunker = Chunker()
225
226     def segment_to_id(self, segment):
227         if segment in self.segment_ids: return self.segment_ids[segment]
228
229         self.cursor.execute("""insert or ignore into segments(segment)
230                                values (?)""", (segment,))
231         self.cursor.execute("""select segmentid from segments
232                                where segment = ?""", (segment,))
233         id = self.cursor.fetchone()[0]
234         self.segment_ids[segment] = id
235         return id
236
237     def rebuild(self, metadata, reference_path):
238         """Iterate through old metadata and use it to rebuild the database.
239
240         Args:
241             metadata: An iterable containing lines of the metadata log.
242             reference_path: Path to the root of a file system tree which may be
243                 similar to data in the metadata log (used to recompute block
244                 signatures).
245         """
246         for fields in cumulus.parse(metadata, lambda l: len(l) == 0):
247             metadata = cumulus.MetadataItem(fields, None)
248             # Only process regular files; skip over other types (directories,
249             # symlinks, etc.)
250             if metadata.items.type not in ("-", "f"): continue
251             try:
252                 path = os.path.join(reference_path, metadata.items.name)
253                 print "Path:", path
254                 # TODO: Check file size for early abort if different
255                 self.rebuild_file(open(path), metadata)
256             except IOError as e:
257                 print e
258                 pass  # Ignore the file
259
260         self.database.commit()
261
262     def reload_segment_metadata(self, segment_metadata):
263         """Read a segment metadata (.meta) file into the local database.
264
265         Updates the segments table in the local database with information from
266         a a segment metadata backup file.  Old data is not overwritten, so
267         loading a .meta file with partial information is fine.
268         """
269         for info in cumulus.parse(segment_metadata,
270                                      terminate=lambda l: len(l) == 0):
271             segment = info.pop("segment")
272             self.insert_segment_info(segment, info)
273
274         self.database.commit()
275
276     def insert_segment_info(self, segment, info):
277         id = self.segment_to_id(segment)
278         for k, v in info.items():
279             self.cursor.execute("update segments set " + k + " = ? "
280                                 "where segmentid = ?",
281                                 (v, id))
282
283     def rebuild_file(self, fp, metadata):
284         """Recompute database signatures if a file is unchanged.
285
286         If the current file contents match that from the old metadata (the
287         full-file hash matches), then recompute block- and chunk-level
288         signatures for the objects referenced by the file.
289         """
290         blocks = [cumulus.CumulusStore.parse_ref(b) for b in metadata.data()]
291         verifier = cumulus.ChecksumVerifier(metadata.items.checksum)
292         checksums = {}
293         subblock = {}
294         for segment, object, checksum, slice in blocks:
295             # Given a reference to a block of unknown size we don't know how to
296             # match up the data, so we have to give up on rebuilding for this
297             # file.
298             if slice is None: return
299
300             start, length, exact = slice
301             buf = fp.read(length)
302             verifier.update(buf)
303
304             if exact:
305                 csum = cumulus.ChecksumCreator(CHECKSUM_ALGORITHM)
306                 csum.update(buf)
307                 checksums[(segment, object)] = (length, csum.compute())
308             else:
309                 # Compute a lower bound on the object size.
310                 oldlength, csum = checksums.get((segment, object), (0, None))
311                 checksums[(segment, object)] = (max(oldlength, start + length),
312                                                 csum)
313
314             if length >= self.chunker.MINIMUM_OBJECT_SIZE or not exact:
315                 signatures = self.chunker.compute_signatures(buf, start)
316                 subblock.setdefault((segment, object), {}).update(signatures)
317
318         if verifier.valid():
319             for k in subblock:
320                 subblock[k] = self.chunker.dump_signatures(subblock[k])
321             self.store_checksums(checksums, subblock)
322         else:
323             print "Checksum mismatch"
324
325     def store_checksums(self, block_checksums, subblock_signatures):
326         for (segment, object), (size, checksum) in block_checksums.iteritems():
327             segmentid = self.segment_to_id(segment)
328             self.cursor.execute(
329                 """insert or ignore into block_index(segmentid, object)
330                    values (?, ?)""",
331                 (segmentid, object))
332             self.cursor.execute("""select blockid from block_index
333                                    where segmentid = ? and object = ?""",
334                                 (segmentid, object))
335             blockid = self.cursor.fetchall()[0][0]
336
337             # Store checksum only if it is available; we don't want to
338             # overwrite an existing checksum in the database with NULL.
339             if checksum is not None:
340                 self.cursor.execute("""update block_index
341                                        set checksum = ?
342                                        where blockid = ?""",
343                                     (checksum, blockid))
344
345             # Update the object size.  Our size may be an estimate, based on
346             # slices that we have seen.  The size in the database must not be
347             # larger than the true size, but update it to the largest value
348             # possible.
349             self.cursor.execute("""update block_index
350                                    set size = max(?, coalesce(size, 0))
351                                    where blockid = ?""",
352                                 (size, blockid))
353
354             # Store subblock signatures, if available.
355             # TODO: Even better would be to merge signature data, to handle the
356             # case where different files see different chunks of a block.
357             sigs = subblock_signatures.get((segment, object))
358             if sigs:
359                 self.cursor.execute(
360                     """insert or replace into subblock_signatures(
361                            blockid, algorithm, signatures)
362                        values (?, ?, ?)""",
363                     (blockid, self.chunker.ALGORITHM_NAME, buffer(sigs)))
364
365
366 class SegmentStateRebuilder(object):
367     """Reconstructs segment metadata files from raw segment data."""
368
369     def __init__(self):
370         self.filters = dict(cumulus.SEGMENT_FILTERS)
371         self.segment_pattern = cumulus.SEARCH_PATHS["segments"]
372
373     def compute_metadata(self, path, relative_path):
374         """Recompute metadata of a single segment.
375
376         Args:
377             path: Path to the segment file in the file system.
378             relative_path: Path relative to the root for the storage backend.
379         """
380         # Does the filename match that of a segment?  If so, extract the
381         # extension to determine the filter to apply to decompress.
382         filename = os.path.basename(relative_path)
383         m = self.segment_pattern.match(filename)
384         if not m: return
385         segment_name = m.group(1)
386         extension = m.group(2)
387         if extension not in self.filters: return
388         filter_cmd = self.filters[extension]
389
390         # File attributes.
391         st_buf = os.stat(path)
392         timestamp = time.strftime(SQLITE_TIMESTAMP,
393                                   time.gmtime(st_buf.st_mtime))
394
395         # Compute attributes of the compressed segment data.
396         BLOCK_SIZE = 4096
397         with open(path) as segment:
398             disk_size = 0
399             checksummer = cumulus.ChecksumCreator(CHECKSUM_ALGORITHM)
400             while True:
401                 buf = segment.read(BLOCK_SIZE)
402                 if len(buf) == 0: break
403                 disk_size += len(buf)
404                 checksummer.update(buf)
405         checksum = checksummer.compute()
406
407         # Compute attributes of the objects within the segment.
408         data_size = 0
409         object_count = 0
410         with open(path) as segment:
411             decompressed = cumulus.CumulusStore.filter_data(segment, filter_cmd)
412             objects = tarfile.open(mode='r|', fileobj=decompressed)
413             for tarinfo in objects:
414                 data_size += tarinfo.size
415                 object_count += 1
416
417         return {"segment": cumulus.uri_encode(segment_name),
418                 "path": cumulus.uri_encode(relative_path),
419                 "checksum": checksum,
420                 "data_size": data_size,
421                 "disk_size": disk_size,
422                 "timestamp": timestamp}
423
424 if __name__ == "__main__":
425     # Sample code to reconstruct segment metadata--ought to be relocated.
426     if False:
427         segment_rebuilder = SegmentStateRebuilder()
428         topdir = sys.argv[1]
429         files = []
430         for dirpath, dirnames, filenames in os.walk(topdir):
431             for f in filenames:
432                 files.append(os.path.join(dirpath, f))
433         files.sort()
434         for f in files:
435             metadata = segment_rebuilder.compute_metadata(
436                 f,
437                 os.path.relpath(f, topdir))
438             if metadata:
439                 for (k, v) in sorted(metadata.items()):
440                     print "%s: %s" % (k, v)
441                 print
442         sys.exit(0)
443
444     # Sample code to rebuild the segments table from metadata--needs to be
445     # merged with the code below.
446     if False:
447         rebuilder = DatabaseRebuilder(cumulus.LocalDatabase(sys.argv[1]))
448         rebuilder.reload_segment_metadata(open(sys.argv[2]))
449         sys.exit(0)
450
451     # Read metadata from stdin; filter out lines starting with "@@" so the
452     # statcache file can be parsed as well.
453     metadata = (x for x in sys.stdin if not x.startswith("@@"))
454
455     rebuilder = DatabaseRebuilder(cumulus.LocalDatabase(sys.argv[1]))
456     rebuilder.rebuild(metadata, "/")