Rework uri_encode/uri_decode to more cleanly work with bytes/strings.
[cumulus.git] / python / cumulus / rebuild_database.py
1 #!/usr/bin/python
2 #
3 # Cumulus: Efficient Filesystem Backup to the Cloud
4 # Copyright (C) 2013 The Cumulus Developers
5 # See the AUTHORS file for a list of contributors.
6 #
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 2 of the License, or
10 # (at your option) any later version.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License along
18 # with this program; if not, write to the Free Software Foundation, Inc.,
19 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20
21 """Code for rebuilding the local database.
22
23 Given a previous metadata dump and a filesystem tree (which may or may not
24 exactly match), recompute block signatures to the extent possible to rebuild
25 the local database.  This can be used to recover from a local database loss,
26 given data from a previous backup.
27 """
28
29 from __future__ import division, print_function, unicode_literals
30
31 import base64
32 import hashlib
33 import itertools
34 import os
35 import re
36 import struct
37 import subprocess
38 import sys
39 import tarfile
40 import time
41
42 import cumulus
43 from cumulus import util
44
45 CHECKSUM_ALGORITHM = "sha224"
46 CHUNKER_PROGRAM = "cumulus-chunker-standalone"
47
48 # TODO: Move to somewhere common
49 SQLITE_TIMESTAMP = "%Y-%m-%d %H:%M:%S"
50
51 class Chunker(object):
52     """Compute sub-file chunk boundaries using a sliding Rabin fingerprint.
53
54     This duplicates the chunking algorithm in third_party/chunk.cc.
55     """
56     # Chunking parameters.  These should match those in third_party/chunk.cc.
57     MODULUS = 0xbfe6b8a5bf378d83
58     WINDOW_SIZE = 48
59     BREAKMARK_VALUE = 0x78
60     MIN_CHUNK_SIZE = 2048
61     MAX_CHUNK_SIZE = 65535
62     TARGET_CHUNK_SIZE = 4096
63     ALGORITHM_NAME = "lbfs-%d/%s" % (TARGET_CHUNK_SIZE, CHECKSUM_ALGORITHM)
64
65     # Minimum size of a block before we should bother storing subfile
66     # signatures (only applies when full blocks are used to store a file;
67     # subfile signatures are always used when subfile incrementals are
68     # present).
69     MINIMUM_OBJECT_SIZE = 16384
70
71     def __init__(self):
72         degree = self.MODULUS.bit_length() - 1
73         self.degree = degree
74
75         # Lookup table for polynomial reduction when shifting a new byte in,
76         # based on the high-order bits.
77         self.T = [self.polymult(1, i << degree, self.MODULUS) ^ (i << degree)
78                   for i in range(256)]
79
80         # Values to remove a byte from the signature when it falls out of the
81         # window.
82         self.U = [self.polymult(i, 1 << 8*(self.WINDOW_SIZE - 1),
83                                 self.MODULUS) for i in range(256)]
84
85         self.hash_algorithm = cumulus.CHECKSUM_ALGORITHMS[CHECKSUM_ALGORITHM]
86         self.hash_size = self.hash_algorithm().digestsize
87
88     def polymult(self, x, y, n):
89         # Polynomial multiplication: result = x * y
90         result = 0
91         for i in range(x.bit_length()):
92             if (x >> i) & 1:
93                 result ^= y << i
94         # Reduction modulo n
95         size = n.bit_length()
96         while result.bit_length() >= size:
97             result ^= n << (result.bit_length() - size)
98         return result
99
100     def window_init(self):
101         # Sliding signature state is:
102         #   [signature value, history buffer index, history buffer contents]
103         return [0, 0, [0] * self.WINDOW_SIZE]
104
105     def window_update(self, signature, byte):
106         poly = signature[0]
107         offset = signature[1]
108         undo = self.U[signature[2][offset]]
109         poly = ((poly ^ undo) << 8) + byte
110         poly ^= self.T[poly >> self.degree]
111
112         signature[0] = poly
113         signature[1] = (offset + 1) % self.WINDOW_SIZE
114         signature[2][offset] = byte
115
116     def compute_breaks(self, buf):
117         breaks = [0]
118         signature = self.window_init()
119         for i in range(len(buf)):
120             self.window_update(signature, ord(buf[i]))
121             block_len = i - breaks[-1] + 1
122             if ((signature[0] % self.TARGET_CHUNK_SIZE == self.BREAKMARK_VALUE
123                         and block_len >= self.MIN_CHUNK_SIZE)
124                     or block_len >= self.MAX_CHUNK_SIZE):
125                 breaks.append(i + 1)
126         if breaks[-1] < len(buf):
127             breaks.append(len(buf))
128         return breaks
129
130     def compute_signatures(self, buf, buf_offset=0):
131         """Break a buffer into chunks and compute chunk signatures.
132
133         Args:
134             buf: The data buffer.
135             buf_offset: The offset of the data buffer within the original
136                 block, to handle cases where only a portion of the block is
137                 available.
138
139         Returns:
140             A dictionary containing signature data.  Keys are chunk offsets
141             (from the beginning of the block), and values are tuples (size, raw
142             hash value).
143         """
144         breaks = self.compute_breaks(buf)
145         signatures = {}
146         for i in range(1, len(breaks)):
147             chunk = buf[breaks[i-1]:breaks[i]]
148             hasher = self.hash_algorithm()
149             hasher.update(chunk)
150             signatures[breaks[i-1] + buf_offset] = (breaks[i] - breaks[i-1],
151                                                     hasher.digest())
152         return signatures
153
154     def dump_signatures(self, signatures):
155         """Convert signatures to the binary format stored in the database."""
156         records = []
157
158         # Emit records indicating that no signatures are available for the next
159         # n bytes.  Since the size is a 16-bit value, to skip larger distances
160         # multiple records must be emitted.  An all-zero signature indicates
161         # the lack of data.
162         def skip(n):
163             while n > 0:
164                 i = min(n, self.MAX_CHUNK_SIZE)
165                 records.append(struct.pack(">H", i) + "\x00" * self.hash_size)
166                 n -= i
167
168         position = 0
169         for next_start, (size, digest) in sorted(signatures.items()):
170             if next_start < position:
171                 print("Warning: overlapping signatures, ignoring")
172                 continue
173             skip(next_start - position)
174             records.append(struct.pack(">H", size) + digest)
175             position = next_start + size
176
177         return "".join(records)
178
179     def load_signatures(self, signatures):
180         """Loads signatures from the binary format stored in the database."""
181         entry_size = 2 + self.hash_size
182         if len(signatures) % entry_size != 0:
183             print("Warning: Invalid signatures to load")
184             return {}
185
186         null_digest = "\x00" * self.hash_size
187         position = 0
188         result = {}
189         for i in range(len(signatures) // entry_size):
190             sig = signatures[i*entry_size:(i+1)*entry_size]
191             size, digest = struct.unpack(">H", sig[:2])[0], sig[2:]
192             if digest != null_digest:
193                 result[position] = (size, digest)
194             position += size
195         return result
196
197
198 class ChunkerExternal(Chunker):
199     """A Chunker which uses an external program to compute Rabin fingerprints.
200
201     This can run much more quickly than the Python code, but should otherwise
202     give identical results.
203     """
204
205     def __init__(self):
206         super(ChunkerExternal, self).__init__()
207         self.subproc = subprocess.Popen([CHUNKER_PROGRAM],
208                                         stdin=subprocess.PIPE,
209                                         stdout=subprocess.PIPE)
210
211     def compute_breaks(self, buf):
212         if len(buf) == 0:
213             return [0]
214         self.subproc.stdin.write(struct.pack(">i", len(buf)))
215         self.subproc.stdin.write(buf)
216         self.subproc.stdin.flush()
217         breaks = self.subproc.stdout.readline()
218         return [0] + [int(x) + 1 for x in breaks.split()]
219
220
221 class DatabaseRebuilder(object):
222     def __init__(self, database):
223         self.database = database
224         self.cursor = database.cursor()
225         self.segment_ids = {}
226         self.chunker = ChunkerExternal()
227         #self.chunker = Chunker()
228
229     def segment_to_id(self, segment):
230         if segment in self.segment_ids: return self.segment_ids[segment]
231
232         self.cursor.execute("""insert or ignore into segments(segment)
233                                values (?)""", (segment,))
234         self.cursor.execute("""select segmentid from segments
235                                where segment = ?""", (segment,))
236         id = self.cursor.fetchone()[0]
237         self.segment_ids[segment] = id
238         return id
239
240     def rebuild(self, metadata, reference_path):
241         """Iterate through old metadata and use it to rebuild the database.
242
243         Args:
244             metadata: An iterable containing lines of the metadata log.
245             reference_path: Path to the root of a file system tree which may be
246                 similar to data in the metadata log (used to recompute block
247                 signatures).
248         """
249         for fields in cumulus.parse(metadata, lambda l: len(l) == 0):
250             metadata = cumulus.MetadataItem(fields, None)
251             # Only process regular files; skip over other types (directories,
252             # symlinks, etc.)
253             if metadata.items.type not in ("-", "f"): continue
254             try:
255                 path = os.path.join(reference_path, metadata.items.name)
256                 print("Path:", path)
257                 # TODO: Check file size for early abort if different
258                 self.rebuild_file(open(path), metadata)
259             except IOError as e:
260                 print(e)
261                 pass  # Ignore the file
262
263         self.database.commit()
264
265     def reload_segment_metadata(self, segment_metadata):
266         """Read a segment metadata (.meta) file into the local database.
267
268         Updates the segments table in the local database with information from
269         a a segment metadata backup file.  Old data is not overwritten, so
270         loading a .meta file with partial information is fine.
271         """
272         for info in cumulus.parse(segment_metadata,
273                                      terminate=lambda l: len(l) == 0):
274             segment = info.pop("segment")
275             self.insert_segment_info(segment, info)
276
277         self.database.commit()
278
279     def insert_segment_info(self, segment, info):
280         id = self.segment_to_id(segment)
281         for k, v in info.items():
282             self.cursor.execute("update segments set " + k + " = ? "
283                                 "where segmentid = ?",
284                                 (v, id))
285
286     def rebuild_file(self, fp, metadata):
287         """Recompute database signatures if a file is unchanged.
288
289         If the current file contents match that from the old metadata (the
290         full-file hash matches), then recompute block- and chunk-level
291         signatures for the objects referenced by the file.
292         """
293         blocks = [cumulus.CumulusStore.parse_ref(b) for b in metadata.data()]
294         verifier = cumulus.ChecksumVerifier(metadata.items.checksum)
295         checksums = {}
296         subblock = {}
297         for segment, object, checksum, slice in blocks:
298             # Given a reference to a block of unknown size we don't know how to
299             # match up the data, so we have to give up on rebuilding for this
300             # file.
301             if slice is None: return
302
303             start, length, exact = slice
304             buf = fp.read(length)
305             verifier.update(buf)
306
307             # Zero blocks get no checksums, so skip further processing on them.
308             if object is None: continue
309
310             if exact:
311                 csum = cumulus.ChecksumCreator(CHECKSUM_ALGORITHM)
312                 csum.update(buf)
313                 checksums[(segment, object)] = (length, csum.compute())
314             else:
315                 # Compute a lower bound on the object size.
316                 oldlength, csum = checksums.get((segment, object), (0, None))
317                 checksums[(segment, object)] = (max(oldlength, start + length),
318                                                 csum)
319
320             if length >= self.chunker.MINIMUM_OBJECT_SIZE or not exact:
321                 signatures = self.chunker.compute_signatures(buf, start)
322                 subblock.setdefault((segment, object), {}).update(signatures)
323
324         if verifier.valid():
325             for k in subblock:
326                 subblock[k] = self.chunker.dump_signatures(subblock[k])
327             self.store_checksums(checksums, subblock)
328         else:
329             print("Checksum mismatch")
330
331     def store_checksums(self, block_checksums, subblock_signatures):
332         for (segment, object), (size, checksum) in block_checksums.items():
333             segmentid = self.segment_to_id(segment)
334             self.cursor.execute(
335                 """insert or ignore into block_index(segmentid, object)
336                    values (?, ?)""",
337                 (segmentid, object))
338             self.cursor.execute("""select blockid from block_index
339                                    where segmentid = ? and object = ?""",
340                                 (segmentid, object))
341             blockid = self.cursor.fetchall()[0][0]
342
343             # Store checksum only if it is available; we don't want to
344             # overwrite an existing checksum in the database with NULL.
345             if checksum is not None:
346                 self.cursor.execute("""update block_index
347                                        set checksum = ?
348                                        where blockid = ?""",
349                                     (checksum, blockid))
350
351             # Update the object size.  Our size may be an estimate, based on
352             # slices that we have seen.  The size in the database must not be
353             # larger than the true size, but update it to the largest value
354             # possible.
355             self.cursor.execute("""update block_index
356                                    set size = max(?, coalesce(size, 0))
357                                    where blockid = ?""",
358                                 (size, blockid))
359
360             # Store subblock signatures, if available.
361             # TODO: Even better would be to merge signature data, to handle the
362             # case where different files see different chunks of a block.
363             sigs = subblock_signatures.get((segment, object))
364             if sigs:
365                 self.cursor.execute(
366                     """insert or replace into subblock_signatures(
367                            blockid, algorithm, signatures)
368                        values (?, ?, ?)""",
369                     (blockid, self.chunker.ALGORITHM_NAME, buffer(sigs)))
370
371
372 class SegmentStateRebuilder(object):
373     """Reconstructs segment metadata files from raw segment data."""
374
375     def __init__(self):
376         self.filters = dict(cumulus.SEGMENT_FILTERS)
377         self.segment_pattern = cumulus.SEARCH_PATHS["segments"]
378
379     def compute_metadata(self, path, relative_path):
380         """Recompute metadata of a single segment.
381
382         Args:
383             path: Path to the segment file in the file system.
384             relative_path: Path relative to the root for the storage backend.
385         """
386         # Does the filename match that of a segment?  If so, extract the
387         # extension to determine the filter to apply to decompress.
388         filename = os.path.basename(relative_path)
389         m = self.segment_pattern.match(filename)
390         if not m: return
391         segment_name = m.group(1)
392         extension = m.group(2)
393         if extension not in self.filters: return
394         filter_cmd = self.filters[extension]
395
396         # File attributes.
397         st_buf = os.stat(path)
398         timestamp = time.strftime(SQLITE_TIMESTAMP,
399                                   time.gmtime(st_buf.st_mtime))
400
401         # Compute attributes of the compressed segment data.
402         BLOCK_SIZE = 4096
403         with open(path) as segment:
404             disk_size = 0
405             checksummer = cumulus.ChecksumCreator(CHECKSUM_ALGORITHM)
406             while True:
407                 buf = segment.read(BLOCK_SIZE)
408                 if len(buf) == 0: break
409                 disk_size += len(buf)
410                 checksummer.update(buf)
411         checksum = checksummer.compute()
412
413         # Compute attributes of the objects within the segment.
414         data_size = 0
415         object_count = 0
416         with open(path) as segment:
417             decompressed = cumulus.CumulusStore.filter_data(segment, filter_cmd)
418             objects = tarfile.open(mode='r|', fileobj=decompressed)
419             for tarinfo in objects:
420                 data_size += tarinfo.size
421                 object_count += 1
422
423         return {"segment": util.uri_encode_pathname(segment_name),
424                 "path": util.uri_encode_pathname(relative_path),
425                 "checksum": checksum,
426                 "data_size": data_size,
427                 "disk_size": disk_size,
428                 "timestamp": timestamp}
429
430 if __name__ == "__main__":
431     # Sample code to reconstruct segment metadata--ought to be relocated.
432     if False:
433         segment_rebuilder = SegmentStateRebuilder()
434         topdir = sys.argv[1]
435         files = []
436         for dirpath, dirnames, filenames in os.walk(topdir):
437             for f in filenames:
438                 files.append(os.path.join(dirpath, f))
439         files.sort()
440         for f in files:
441             metadata = segment_rebuilder.compute_metadata(
442                 f,
443                 os.path.relpath(f, topdir))
444             if metadata:
445                 for (k, v) in sorted(metadata.items()):
446                     print("%s: %s" % (k, v))
447                 print()
448         sys.exit(0)
449
450     # Sample code to rebuild the segments table from metadata--needs to be
451     # merged with the code below.
452     if False:
453         rebuilder = DatabaseRebuilder(cumulus.LocalDatabase(sys.argv[1]))
454         rebuilder.reload_segment_metadata(open(sys.argv[2]))
455         sys.exit(0)
456
457     # Read metadata from stdin; filter out lines starting with "@@" so the
458     # statcache file can be parsed as well.
459     metadata = (x for x in sys.stdin if not x.startswith("@@"))
460
461     rebuilder = DatabaseRebuilder(cumulus.LocalDatabase(sys.argv[1]))
462     rebuilder.rebuild(metadata, "/")