Manual 2to3 fixups.
[cumulus.git] / python / cumulus / rebuild_database.py
1 #!/usr/bin/python
2 #
3 # Cumulus: Efficient Filesystem Backup to the Cloud
4 # Copyright (C) 2013 The Cumulus Developers
5 # See the AUTHORS file for a list of contributors.
6 #
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 2 of the License, or
10 # (at your option) any later version.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License along
18 # with this program; if not, write to the Free Software Foundation, Inc.,
19 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20
21 """Code for rebuilding the local database.
22
23 Given a previous metadata dump and a filesystem tree (which may or may not
24 exactly match), recompute block signatures to the extent possible to rebuild
25 the local database.  This can be used to recover from a local database loss,
26 given data from a previous backup.
27 """
28
29 from __future__ import division, print_function, unicode_literals
30
31 import base64
32 import hashlib
33 import itertools
34 import os
35 import re
36 import struct
37 import subprocess
38 import sys
39 import tarfile
40 import time
41
42 import cumulus
43
44 CHECKSUM_ALGORITHM = "sha224"
45 CHUNKER_PROGRAM = "cumulus-chunker-standalone"
46
47 # TODO: Move to somewhere common
48 SQLITE_TIMESTAMP = "%Y-%m-%d %H:%M:%S"
49
50 class Chunker(object):
51     """Compute sub-file chunk boundaries using a sliding Rabin fingerprint.
52
53     This duplicates the chunking algorithm in third_party/chunk.cc.
54     """
55     # Chunking parameters.  These should match those in third_party/chunk.cc.
56     MODULUS = 0xbfe6b8a5bf378d83
57     WINDOW_SIZE = 48
58     BREAKMARK_VALUE = 0x78
59     MIN_CHUNK_SIZE = 2048
60     MAX_CHUNK_SIZE = 65535
61     TARGET_CHUNK_SIZE = 4096
62     ALGORITHM_NAME = "lbfs-%d/%s" % (TARGET_CHUNK_SIZE, CHECKSUM_ALGORITHM)
63
64     # Minimum size of a block before we should bother storing subfile
65     # signatures (only applies when full blocks are used to store a file;
66     # subfile signatures are always used when subfile incrementals are
67     # present).
68     MINIMUM_OBJECT_SIZE = 16384
69
70     def __init__(self):
71         degree = self.MODULUS.bit_length() - 1
72         self.degree = degree
73
74         # Lookup table for polynomial reduction when shifting a new byte in,
75         # based on the high-order bits.
76         self.T = [self.polymult(1, i << degree, self.MODULUS) ^ (i << degree)
77                   for i in range(256)]
78
79         # Values to remove a byte from the signature when it falls out of the
80         # window.
81         self.U = [self.polymult(i, 1 << 8*(self.WINDOW_SIZE - 1),
82                                 self.MODULUS) for i in range(256)]
83
84         self.hash_algorithm = cumulus.CHECKSUM_ALGORITHMS[CHECKSUM_ALGORITHM]
85         self.hash_size = self.hash_algorithm().digestsize
86
87     def polymult(self, x, y, n):
88         # Polynomial multiplication: result = x * y
89         result = 0
90         for i in range(x.bit_length()):
91             if (x >> i) & 1:
92                 result ^= y << i
93         # Reduction modulo n
94         size = n.bit_length()
95         while result.bit_length() >= size:
96             result ^= n << (result.bit_length() - size)
97         return result
98
99     def window_init(self):
100         # Sliding signature state is:
101         #   [signature value, history buffer index, history buffer contents]
102         return [0, 0, [0] * self.WINDOW_SIZE]
103
104     def window_update(self, signature, byte):
105         poly = signature[0]
106         offset = signature[1]
107         undo = self.U[signature[2][offset]]
108         poly = ((poly ^ undo) << 8) + byte
109         poly ^= self.T[poly >> self.degree]
110
111         signature[0] = poly
112         signature[1] = (offset + 1) % self.WINDOW_SIZE
113         signature[2][offset] = byte
114
115     def compute_breaks(self, buf):
116         breaks = [0]
117         signature = self.window_init()
118         for i in range(len(buf)):
119             self.window_update(signature, ord(buf[i]))
120             block_len = i - breaks[-1] + 1
121             if ((signature[0] % self.TARGET_CHUNK_SIZE == self.BREAKMARK_VALUE
122                         and block_len >= self.MIN_CHUNK_SIZE)
123                     or block_len >= self.MAX_CHUNK_SIZE):
124                 breaks.append(i + 1)
125         if breaks[-1] < len(buf):
126             breaks.append(len(buf))
127         return breaks
128
129     def compute_signatures(self, buf, buf_offset=0):
130         """Break a buffer into chunks and compute chunk signatures.
131
132         Args:
133             buf: The data buffer.
134             buf_offset: The offset of the data buffer within the original
135                 block, to handle cases where only a portion of the block is
136                 available.
137
138         Returns:
139             A dictionary containing signature data.  Keys are chunk offsets
140             (from the beginning of the block), and values are tuples (size, raw
141             hash value).
142         """
143         breaks = self.compute_breaks(buf)
144         signatures = {}
145         for i in range(1, len(breaks)):
146             chunk = buf[breaks[i-1]:breaks[i]]
147             hasher = self.hash_algorithm()
148             hasher.update(chunk)
149             signatures[breaks[i-1] + buf_offset] = (breaks[i] - breaks[i-1],
150                                                     hasher.digest())
151         return signatures
152
153     def dump_signatures(self, signatures):
154         """Convert signatures to the binary format stored in the database."""
155         records = []
156
157         # Emit records indicating that no signatures are available for the next
158         # n bytes.  Since the size is a 16-bit value, to skip larger distances
159         # multiple records must be emitted.  An all-zero signature indicates
160         # the lack of data.
161         def skip(n):
162             while n > 0:
163                 i = min(n, self.MAX_CHUNK_SIZE)
164                 records.append(struct.pack(">H", i) + "\x00" * self.hash_size)
165                 n -= i
166
167         position = 0
168         for next_start, (size, digest) in sorted(signatures.items()):
169             if next_start < position:
170                 print("Warning: overlapping signatures, ignoring")
171                 continue
172             skip(next_start - position)
173             records.append(struct.pack(">H", size) + digest)
174             position = next_start + size
175
176         return "".join(records)
177
178     def load_signatures(self, signatures):
179         """Loads signatures from the binary format stored in the database."""
180         entry_size = 2 + self.hash_size
181         if len(signatures) % entry_size != 0:
182             print("Warning: Invalid signatures to load")
183             return {}
184
185         null_digest = "\x00" * self.hash_size
186         position = 0
187         result = {}
188         for i in range(len(signatures) // entry_size):
189             sig = signatures[i*entry_size:(i+1)*entry_size]
190             size, digest = struct.unpack(">H", sig[:2])[0], sig[2:]
191             if digest != null_digest:
192                 result[position] = (size, digest)
193             position += size
194         return result
195
196
197 class ChunkerExternal(Chunker):
198     """A Chunker which uses an external program to compute Rabin fingerprints.
199
200     This can run much more quickly than the Python code, but should otherwise
201     give identical results.
202     """
203
204     def __init__(self):
205         super(ChunkerExternal, self).__init__()
206         self.subproc = subprocess.Popen([CHUNKER_PROGRAM],
207                                         stdin=subprocess.PIPE,
208                                         stdout=subprocess.PIPE)
209
210     def compute_breaks(self, buf):
211         if len(buf) == 0:
212             return [0]
213         self.subproc.stdin.write(struct.pack(">i", len(buf)))
214         self.subproc.stdin.write(buf)
215         self.subproc.stdin.flush()
216         breaks = self.subproc.stdout.readline()
217         return [0] + [int(x) + 1 for x in breaks.split()]
218
219
220 class DatabaseRebuilder(object):
221     def __init__(self, database):
222         self.database = database
223         self.cursor = database.cursor()
224         self.segment_ids = {}
225         self.chunker = ChunkerExternal()
226         #self.chunker = Chunker()
227
228     def segment_to_id(self, segment):
229         if segment in self.segment_ids: return self.segment_ids[segment]
230
231         self.cursor.execute("""insert or ignore into segments(segment)
232                                values (?)""", (segment,))
233         self.cursor.execute("""select segmentid from segments
234                                where segment = ?""", (segment,))
235         id = self.cursor.fetchone()[0]
236         self.segment_ids[segment] = id
237         return id
238
239     def rebuild(self, metadata, reference_path):
240         """Iterate through old metadata and use it to rebuild the database.
241
242         Args:
243             metadata: An iterable containing lines of the metadata log.
244             reference_path: Path to the root of a file system tree which may be
245                 similar to data in the metadata log (used to recompute block
246                 signatures).
247         """
248         for fields in cumulus.parse(metadata, lambda l: len(l) == 0):
249             metadata = cumulus.MetadataItem(fields, None)
250             # Only process regular files; skip over other types (directories,
251             # symlinks, etc.)
252             if metadata.items.type not in ("-", "f"): continue
253             try:
254                 path = os.path.join(reference_path, metadata.items.name)
255                 print("Path:", path)
256                 # TODO: Check file size for early abort if different
257                 self.rebuild_file(open(path), metadata)
258             except IOError as e:
259                 print(e)
260                 pass  # Ignore the file
261
262         self.database.commit()
263
264     def reload_segment_metadata(self, segment_metadata):
265         """Read a segment metadata (.meta) file into the local database.
266
267         Updates the segments table in the local database with information from
268         a a segment metadata backup file.  Old data is not overwritten, so
269         loading a .meta file with partial information is fine.
270         """
271         for info in cumulus.parse(segment_metadata,
272                                      terminate=lambda l: len(l) == 0):
273             segment = info.pop("segment")
274             self.insert_segment_info(segment, info)
275
276         self.database.commit()
277
278     def insert_segment_info(self, segment, info):
279         id = self.segment_to_id(segment)
280         for k, v in info.items():
281             self.cursor.execute("update segments set " + k + " = ? "
282                                 "where segmentid = ?",
283                                 (v, id))
284
285     def rebuild_file(self, fp, metadata):
286         """Recompute database signatures if a file is unchanged.
287
288         If the current file contents match that from the old metadata (the
289         full-file hash matches), then recompute block- and chunk-level
290         signatures for the objects referenced by the file.
291         """
292         blocks = [cumulus.CumulusStore.parse_ref(b) for b in metadata.data()]
293         verifier = cumulus.ChecksumVerifier(metadata.items.checksum)
294         checksums = {}
295         subblock = {}
296         for segment, object, checksum, slice in blocks:
297             # Given a reference to a block of unknown size we don't know how to
298             # match up the data, so we have to give up on rebuilding for this
299             # file.
300             if slice is None: return
301
302             start, length, exact = slice
303             buf = fp.read(length)
304             verifier.update(buf)
305
306             # Zero blocks get no checksums, so skip further processing on them.
307             if object is None: continue
308
309             if exact:
310                 csum = cumulus.ChecksumCreator(CHECKSUM_ALGORITHM)
311                 csum.update(buf)
312                 checksums[(segment, object)] = (length, csum.compute())
313             else:
314                 # Compute a lower bound on the object size.
315                 oldlength, csum = checksums.get((segment, object), (0, None))
316                 checksums[(segment, object)] = (max(oldlength, start + length),
317                                                 csum)
318
319             if length >= self.chunker.MINIMUM_OBJECT_SIZE or not exact:
320                 signatures = self.chunker.compute_signatures(buf, start)
321                 subblock.setdefault((segment, object), {}).update(signatures)
322
323         if verifier.valid():
324             for k in subblock:
325                 subblock[k] = self.chunker.dump_signatures(subblock[k])
326             self.store_checksums(checksums, subblock)
327         else:
328             print("Checksum mismatch")
329
330     def store_checksums(self, block_checksums, subblock_signatures):
331         for (segment, object), (size, checksum) in block_checksums.items():
332             segmentid = self.segment_to_id(segment)
333             self.cursor.execute(
334                 """insert or ignore into block_index(segmentid, object)
335                    values (?, ?)""",
336                 (segmentid, object))
337             self.cursor.execute("""select blockid from block_index
338                                    where segmentid = ? and object = ?""",
339                                 (segmentid, object))
340             blockid = self.cursor.fetchall()[0][0]
341
342             # Store checksum only if it is available; we don't want to
343             # overwrite an existing checksum in the database with NULL.
344             if checksum is not None:
345                 self.cursor.execute("""update block_index
346                                        set checksum = ?
347                                        where blockid = ?""",
348                                     (checksum, blockid))
349
350             # Update the object size.  Our size may be an estimate, based on
351             # slices that we have seen.  The size in the database must not be
352             # larger than the true size, but update it to the largest value
353             # possible.
354             self.cursor.execute("""update block_index
355                                    set size = max(?, coalesce(size, 0))
356                                    where blockid = ?""",
357                                 (size, blockid))
358
359             # Store subblock signatures, if available.
360             # TODO: Even better would be to merge signature data, to handle the
361             # case where different files see different chunks of a block.
362             sigs = subblock_signatures.get((segment, object))
363             if sigs:
364                 self.cursor.execute(
365                     """insert or replace into subblock_signatures(
366                            blockid, algorithm, signatures)
367                        values (?, ?, ?)""",
368                     (blockid, self.chunker.ALGORITHM_NAME, buffer(sigs)))
369
370
371 class SegmentStateRebuilder(object):
372     """Reconstructs segment metadata files from raw segment data."""
373
374     def __init__(self):
375         self.filters = dict(cumulus.SEGMENT_FILTERS)
376         self.segment_pattern = cumulus.SEARCH_PATHS["segments"]
377
378     def compute_metadata(self, path, relative_path):
379         """Recompute metadata of a single segment.
380
381         Args:
382             path: Path to the segment file in the file system.
383             relative_path: Path relative to the root for the storage backend.
384         """
385         # Does the filename match that of a segment?  If so, extract the
386         # extension to determine the filter to apply to decompress.
387         filename = os.path.basename(relative_path)
388         m = self.segment_pattern.match(filename)
389         if not m: return
390         segment_name = m.group(1)
391         extension = m.group(2)
392         if extension not in self.filters: return
393         filter_cmd = self.filters[extension]
394
395         # File attributes.
396         st_buf = os.stat(path)
397         timestamp = time.strftime(SQLITE_TIMESTAMP,
398                                   time.gmtime(st_buf.st_mtime))
399
400         # Compute attributes of the compressed segment data.
401         BLOCK_SIZE = 4096
402         with open(path) as segment:
403             disk_size = 0
404             checksummer = cumulus.ChecksumCreator(CHECKSUM_ALGORITHM)
405             while True:
406                 buf = segment.read(BLOCK_SIZE)
407                 if len(buf) == 0: break
408                 disk_size += len(buf)
409                 checksummer.update(buf)
410         checksum = checksummer.compute()
411
412         # Compute attributes of the objects within the segment.
413         data_size = 0
414         object_count = 0
415         with open(path) as segment:
416             decompressed = cumulus.CumulusStore.filter_data(segment, filter_cmd)
417             objects = tarfile.open(mode='r|', fileobj=decompressed)
418             for tarinfo in objects:
419                 data_size += tarinfo.size
420                 object_count += 1
421
422         return {"segment": cumulus.uri_encode(segment_name),
423                 "path": cumulus.uri_encode(relative_path),
424                 "checksum": checksum,
425                 "data_size": data_size,
426                 "disk_size": disk_size,
427                 "timestamp": timestamp}
428
429 if __name__ == "__main__":
430     # Sample code to reconstruct segment metadata--ought to be relocated.
431     if False:
432         segment_rebuilder = SegmentStateRebuilder()
433         topdir = sys.argv[1]
434         files = []
435         for dirpath, dirnames, filenames in os.walk(topdir):
436             for f in filenames:
437                 files.append(os.path.join(dirpath, f))
438         files.sort()
439         for f in files:
440             metadata = segment_rebuilder.compute_metadata(
441                 f,
442                 os.path.relpath(f, topdir))
443             if metadata:
444                 for (k, v) in sorted(metadata.items()):
445                     print("%s: %s" % (k, v))
446                 print()
447         sys.exit(0)
448
449     # Sample code to rebuild the segments table from metadata--needs to be
450     # merged with the code below.
451     if False:
452         rebuilder = DatabaseRebuilder(cumulus.LocalDatabase(sys.argv[1]))
453         rebuilder.reload_segment_metadata(open(sys.argv[2]))
454         sys.exit(0)
455
456     # Read metadata from stdin; filter out lines starting with "@@" so the
457     # statcache file can be parsed as well.
458     metadata = (x for x in sys.stdin if not x.startswith("@@"))
459
460     rebuilder = DatabaseRebuilder(cumulus.LocalDatabase(sys.argv[1]))
461     rebuilder.rebuild(metadata, "/")