Use a scoped_ptr to avoid FileFilter leaks.
[cumulus.git] / python / cumulus / rebuild_database.py
1 #!/usr/bin/python
2 #
3 # Cumulus: Efficient Filesystem Backup to the Cloud
4 # Copyright (C) 2013 The Cumulus Developers
5 # See the AUTHORS file for a list of contributors.
6 #
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 2 of the License, or
10 # (at your option) any later version.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License along
18 # with this program; if not, write to the Free Software Foundation, Inc.,
19 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20
21 """Code for rebuilding the local database.
22
23 Given a previous metadata dump and a filesystem tree (which may or may not
24 exactly match), recompute block signatures to the extent possible to rebuild
25 the local database.  This can be used to recover from a local database loss,
26 given data from a previous backup.
27 """
28
29 import base64
30 import hashlib
31 import itertools
32 import os
33 import re
34 import struct
35 import subprocess
36 import sys
37 import tarfile
38
39 import cumulus
40
41 CHECKSUM_ALGORITHM = "sha224"
42
43 CHUNKER_PROGRAM = "cumulus-chunker-standalone"
44
45 class Chunker(object):
46     """Compute sub-file chunk boundaries using a sliding Rabin fingerprint.
47
48     This duplicates the chunking algorithm in third_party/chunk.cc.
49     """
50     # Chunking parameters.  These should match those in third_party/chunk.cc.
51     MODULUS = 0xbfe6b8a5bf378d83
52     WINDOW_SIZE = 48
53     BREAKMARK_VALUE = 0x78
54     MIN_CHUNK_SIZE = 2048
55     MAX_CHUNK_SIZE = 65535
56     TARGET_CHUNK_SIZE = 4096
57     ALGORITHM_NAME = "lbfs-%d/%s" % (TARGET_CHUNK_SIZE, CHECKSUM_ALGORITHM)
58
59     # Minimum size of a block before we should bother storing subfile
60     # signatures (only applies when full blocks are used to store a file;
61     # subfile signatures are always used when subfile incrementals are
62     # present).
63     MINIMUM_OBJECT_SIZE = 16384
64
65     def __init__(self):
66         degree = self.MODULUS.bit_length() - 1
67         self.degree = degree
68
69         # Lookup table for polynomial reduction when shifting a new byte in,
70         # based on the high-order bits.
71         self.T = [self.polymult(1, i << degree, self.MODULUS) ^ (i << degree)
72                   for i in range(256)]
73
74         # Values to remove a byte from the signature when it falls out of the
75         # window.
76         self.U = [self.polymult(i, 1 << 8*(self.WINDOW_SIZE - 1),
77                                 self.MODULUS) for i in range(256)]
78
79         self.hash_algorithm = cumulus.CHECKSUM_ALGORITHMS[CHECKSUM_ALGORITHM]
80         self.hash_size = self.hash_algorithm().digestsize
81
82     def polymult(self, x, y, n):
83         # Polynomial multiplication: result = x * y
84         result = 0
85         for i in range(x.bit_length()):
86             if (x >> i) & 1:
87                 result ^= y << i
88         # Reduction modulo n
89         size = n.bit_length()
90         while result.bit_length() >= size:
91             result ^= n << (result.bit_length() - size)
92         return result
93
94     def window_init(self):
95         # Sliding signature state is:
96         #   [signature value, history buffer index, history buffer contents]
97         return [0, 0, [0] * self.WINDOW_SIZE]
98
99     def window_update(self, signature, byte):
100         poly = signature[0]
101         offset = signature[1]
102         undo = self.U[signature[2][offset]]
103         poly = ((poly ^ undo) << 8) + byte
104         poly ^= self.T[poly >> self.degree]
105
106         signature[0] = poly
107         signature[1] = (offset + 1) % self.WINDOW_SIZE
108         signature[2][offset] = byte
109
110     def compute_breaks(self, buf):
111         breaks = [0]
112         signature = self.window_init()
113         for i in xrange(len(buf)):
114             self.window_update(signature, ord(buf[i]))
115             block_len = i - breaks[-1] + 1
116             if ((signature[0] % self.TARGET_CHUNK_SIZE == self.BREAKMARK_VALUE
117                         and block_len >= self.MIN_CHUNK_SIZE)
118                     or block_len >= self.MAX_CHUNK_SIZE):
119                 breaks.append(i + 1)
120         if breaks[-1] < len(buf):
121             breaks.append(len(buf))
122         return breaks
123
124     def compute_signatures(self, buf, buf_offset=0):
125         """Break a buffer into chunks and compute chunk signatures.
126
127         Args:
128             buf: The data buffer.
129             buf_offset: The offset of the data buffer within the original
130                 block, to handle cases where only a portion of the block is
131                 available.
132
133         Returns:
134             A dictionary containing signature data.  Keys are chunk offsets
135             (from the beginning of the block), and values are tuples (size, raw
136             hash value).
137         """
138         breaks = self.compute_breaks(buf)
139         signatures = {}
140         for i in range(1, len(breaks)):
141             chunk = buf[breaks[i-1]:breaks[i]]
142             hasher = self.hash_algorithm()
143             hasher.update(chunk)
144             signatures[breaks[i-1] + buf_offset] = (breaks[i] - breaks[i-1],
145                                                     hasher.digest())
146         return signatures
147
148     def dump_signatures(self, signatures):
149         """Convert signatures to the binary format stored in the database."""
150         records = []
151
152         # Emit records indicating that no signatures are available for the next
153         # n bytes.  Since the size is a 16-bit value, to skip larger distances
154         # multiple records must be emitted.  An all-zero signature indicates
155         # the lack of data.
156         def skip(n):
157             while n > 0:
158                 i = min(n, self.MAX_CHUNK_SIZE)
159                 records.append(struct.pack(">H", i) + "\x00" * self.hash_size)
160                 n -= i
161
162         position = 0
163         for next_start, (size, digest) in sorted(signatures.iteritems()):
164             if next_start < position:
165                 print "Warning: overlapping signatures, ignoring"
166                 continue
167             skip(next_start - position)
168             records.append(struct.pack(">H", size) + digest)
169             position = next_start + size
170
171         return "".join(records)
172
173     def load_signatures(self, signatures):
174         """Loads signatures from the binary format stored in the database."""
175         entry_size = 2 + self.hash_size
176         if len(signatures) % entry_size != 0:
177             print "Warning: Invalid signatures to load"
178             return {}
179
180         null_digest = "\x00" * self.hash_size
181         position = 0
182         result = {}
183         for i in range(len(signatures) // entry_size):
184             sig = signatures[i*entry_size:(i+1)*entry_size]
185             size, digest = struct.unpack(">H", sig[:2])[0], sig[2:]
186             if digest != null_digest:
187                 result[position] = (size, digest)
188             position += size
189         return result
190
191
192 class ChunkerExternal(Chunker):
193     """A Chunker which uses an external program to compute Rabin fingerprints.
194
195     This can run much more quickly than the Python code, but should otherwise
196     give identical results.
197     """
198
199     def __init__(self):
200         super(ChunkerExternal, self).__init__()
201         self.subproc = subprocess.Popen([CHUNKER_PROGRAM],
202                                         stdin=subprocess.PIPE,
203                                         stdout=subprocess.PIPE)
204
205     def compute_breaks(self, buf):
206         if len(buf) == 0:
207             return [0]
208         self.subproc.stdin.write(struct.pack(">i", len(buf)))
209         self.subproc.stdin.write(buf)
210         self.subproc.stdin.flush()
211         breaks = self.subproc.stdout.readline()
212         return [0] + [int(x) + 1 for x in breaks.split()]
213
214
215 class DatabaseRebuilder(object):
216     def __init__(self, database):
217         self.database = database
218         self.cursor = database.cursor()
219         self.segment_ids = {}
220         self.chunker = ChunkerExternal()
221         #self.chunker = Chunker()
222
223     def segment_to_id(self, segment):
224         if segment in self.segment_ids: return self.segment_ids[segment]
225
226         self.cursor.execute("""insert or ignore into segments(segment)
227                                values (?)""", (segment,))
228         self.cursor.execute("""select segmentid from segments
229                                where segment = ?""", (segment,))
230         id = self.cursor.fetchone()[0]
231         self.segment_ids[segment] = id
232         return id
233
234     def rebuild(self, metadata, reference_path):
235         """Iterate through old metadata and use it to rebuild the database.
236
237         Args:
238             metadata: An iterable containing lines of the metadata log.
239             reference_path: Path to the root of a file system tree which may be
240                 similar to data in the metadata log (used to recompute block
241                 signatures).
242         """
243         for fields in cumulus.parse(metadata, lambda l: len(l) == 0):
244             metadata = cumulus.MetadataItem(fields, None)
245             # Only process regular files; skip over other types (directories,
246             # symlinks, etc.)
247             if metadata.items.type not in ("-", "f"): continue
248             try:
249                 path = os.path.join(reference_path, metadata.items.name)
250                 print "Path:", path
251                 # TODO: Check file size for early abort if different
252                 self.rebuild_file(open(path), metadata)
253             except IOError as e:
254                 print e
255                 pass  # Ignore the file
256
257         self.database.commit()
258
259     def rebuild_file(self, fp, metadata):
260         """Recompute database signatures if a file is unchanged.
261
262         If the current file contents match that from the old metadata (the
263         full-file hash matches), then recompute block- and chunk-level
264         signatures for the objects referenced by the file.
265         """
266         blocks = [cumulus.CumulusStore.parse_ref(b) for b in metadata.data()]
267         verifier = cumulus.ChecksumVerifier(metadata.items.checksum)
268         checksums = {}
269         subblock = {}
270         for segment, object, checksum, slice in blocks:
271             # Given a reference to a block of unknown size we don't know how to
272             # match up the data, so we have to give up on rebuilding for this
273             # file.
274             if slice is None: return
275
276             start, length, exact = slice
277             buf = fp.read(length)
278             verifier.update(buf)
279
280             if exact:
281                 csum = cumulus.ChecksumCreator(CHECKSUM_ALGORITHM)
282                 csum.update(buf)
283                 checksums[(segment, object)] = (length, csum.compute())
284             else:
285                 # Compute a lower bound on the object size.
286                 oldlength, csum = checksums.get((segment, object), (0, None))
287                 checksums[(segment, object)] = (max(oldlength, start + length),
288                                                 csum)
289
290             if length >= self.chunker.MINIMUM_OBJECT_SIZE or not exact:
291                 signatures = self.chunker.compute_signatures(buf, start)
292                 subblock.setdefault((segment, object), {}).update(signatures)
293
294         if verifier.valid():
295             for k in subblock:
296                 subblock[k] = self.chunker.dump_signatures(subblock[k])
297             self.store_checksums(checksums, subblock)
298         else:
299             print "Checksum mismatch"
300
301     def store_checksums(self, block_checksums, subblock_signatures):
302         for (segment, object), (size, checksum) in block_checksums.iteritems():
303             segmentid = self.segment_to_id(segment)
304             self.cursor.execute(
305                 """insert or ignore into block_index(segmentid, object)
306                    values (?, ?)""",
307                 (segmentid, object))
308             self.cursor.execute("""select blockid from block_index
309                                    where segmentid = ? and object = ?""",
310                                 (segmentid, object))
311             blockid = self.cursor.fetchall()[0][0]
312
313             # Store checksum only if it is available; we don't want to
314             # overwrite an existing checksum in the database with NULL.
315             if checksum is not None:
316                 self.cursor.execute("""update block_index
317                                        set checksum = ?
318                                        where blockid = ?""",
319                                     (checksum, blockid))
320
321             # Update the object size.  Our size may be an estimate, based on
322             # slices that we have seen.  The size in the database must not be
323             # larger than the true size, but update it to the largest value
324             # possible.
325             self.cursor.execute("""update block_index
326                                    set size = max(?, coalesce(size, 0))
327                                    where blockid = ?""",
328                                 (size, blockid))
329
330             # Store subblock signatures, if available.
331             # TODO: Even better would be to merge signature data, to handle the
332             # case where different files see different chunks of a block.
333             sigs = subblock_signatures.get((segment, object))
334             if sigs:
335                 self.cursor.execute(
336                     """insert or replace into subblock_signatures(
337                            blockid, algorithm, signatures)
338                        values (?, ?, ?)""",
339                     (blockid, self.chunker.ALGORITHM_NAME, buffer(sigs)))
340
341
342 class SegmentStateRebuilder(object):
343     """Reconstructs segment metadata files from raw segment data."""
344
345     def __init__(self):
346         self.filters = dict(cumulus.SEGMENT_FILTERS)
347         self.segment_pattern = cumulus.SEARCH_PATHS["segments"]
348
349     def compute_metadata(self, path, relative_path):
350         """Recompute metadata of a single segment.
351
352         Args:
353             path: Path to the segment file in the file system.
354             relative_path: Path relative to the root for the storage backend.
355         """
356         # Does the filename match that of a segment?  If so, extract the
357         # extension to determine the filter to apply to decompress.
358         filename = os.path.basename(relative_path)
359         m = self.segment_pattern.match(filename)
360         if not m: return
361         segment_name = m.group(1)
362         extension = m.group(2)
363         if extension not in self.filters: return
364         filter_cmd = self.filters[extension]
365
366         # Compute attributes of the compressed segment data.
367         BLOCK_SIZE = 4096
368         with open(path) as segment:
369             disk_size = 0
370             checksummer = cumulus.ChecksumCreator(CHECKSUM_ALGORITHM)
371             while True:
372                 buf = segment.read(BLOCK_SIZE)
373                 if len(buf) == 0: break
374                 disk_size += len(buf)
375                 checksummer.update(buf)
376         checksum = checksummer.compute()
377
378         # Compute attributes of the objects within the segment.
379         data_size = 0
380         object_count = 0
381         with open(path) as segment:
382             decompressed = cumulus.CumulusStore.filter_data(segment, filter_cmd)
383             objects = tarfile.open(mode='r|', fileobj=decompressed)
384             for tarinfo in objects:
385                 data_size += tarinfo.size
386                 object_count += 1
387
388         return {"segment": segment_name,
389                 "path": relative_path,
390                 "checksum": checksum,
391                 "data_size": data_size,
392                 "disk_size": disk_size}
393
394 if __name__ == "__main__":
395     if False:
396         segment_rebuilder = SegmentStateRebuilder()
397         topdir = sys.argv[1]
398         files = []
399         for dirpath, dirnames, filenames in os.walk(topdir):
400             for f in filenames:
401                 files.append(os.path.join(dirpath, f))
402         files.sort()
403         for f in files:
404             metadata = segment_rebuilder.compute_metadata(
405                 f,
406                 os.path.relpath(f, topdir))
407             if metadata:
408                 for (k, v) in sorted(metadata.items()):
409                     print "%s: %s" % (k, cumulus.uri_encode(str(v)))
410                 print
411         sys.exit(0)
412
413     # Read metadata from stdin; filter out lines starting with "@@" so the
414     # statcache file can be parsed as well.
415     metadata = (x for x in sys.stdin if not x.startswith("@@"))
416
417     rebuilder = DatabaseRebuilder(cumulus.LocalDatabase(sys.argv[1]))
418     rebuilder.rebuild(metadata, "/")