Add code to rebuild_database.py to recompute segment metadata.
[cumulus.git] / python / cumulus / rebuild_database.py
1 #!/usr/bin/python
2 #
3 # Cumulus: Efficient Filesystem Backup to the Cloud
4 # Copyright (C) 2013 The Cumulus Developers
5 # See the AUTHORS file for a list of contributors.
6 #
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 2 of the License, or
10 # (at your option) any later version.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License along
18 # with this program; if not, write to the Free Software Foundation, Inc.,
19 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20
21 """Code for rebuilding the local database.
22
23 Given a previous metadata dump and a filesystem tree (which may or may not
24 exactly match), recompute block signatures to the extent possible to rebuild
25 the local database.  This can be used to recover from a local database loss,
26 given data from a previous backup.
27 """
28
29 import base64
30 import hashlib
31 import itertools
32 import os
33 import re
34 import struct
35 import subprocess
36 import sys
37 import tarfile
38
39 import cumulus
40
41 CHECKSUM_ALGORITHM = "sha224"
42
43 CHUNKER_PROGRAM = "cumulus-chunker-standalone"
44
45 class Chunker(object):
46     """Compute sub-file chunk boundaries using a sliding Rabin fingerprint.
47
48     This duplicates the chunking algorithm in third_party/chunk.cc.
49     """
50     # Chunking parameters.  These should match those in third_party/chunk.cc.
51     MODULUS = 0xbfe6b8a5bf378d83
52     WINDOW_SIZE = 48
53     BREAKMARK_VALUE = 0x78
54     MIN_CHUNK_SIZE = 2048
55     MAX_CHUNK_SIZE = 65535
56     TARGET_CHUNK_SIZE = 4096
57     ALGORITHM_NAME = "lbfs-%d/%s" % (TARGET_CHUNK_SIZE, CHECKSUM_ALGORITHM)
58
59     def __init__(self):
60         degree = self.MODULUS.bit_length() - 1
61         self.degree = degree
62
63         # Lookup table for polynomial reduction when shifting a new byte in,
64         # based on the high-order bits.
65         self.T = [self.polymult(1, i << degree, self.MODULUS) ^ (i << degree)
66                   for i in range(256)]
67
68         # Values to remove a byte from the signature when it falls out of the
69         # window.
70         self.U = [self.polymult(i, 1 << 8*(self.WINDOW_SIZE - 1),
71                                 self.MODULUS) for i in range(256)]
72
73         self.hash_algorithm = cumulus.CHECKSUM_ALGORITHMS[CHECKSUM_ALGORITHM]
74         self.hash_size = self.hash_algorithm().digestsize
75
76     def polymult(self, x, y, n):
77         # Polynomial multiplication: result = x * y
78         result = 0
79         for i in range(x.bit_length()):
80             if (x >> i) & 1:
81                 result ^= y << i
82         # Reduction modulo n
83         size = n.bit_length()
84         while result.bit_length() >= size:
85             result ^= n << (result.bit_length() - size)
86         return result
87
88     def window_init(self):
89         # Sliding signature state is:
90         #   [signature value, history buffer index, history buffer contents]
91         return [0, 0, [0] * self.WINDOW_SIZE]
92
93     def window_update(self, signature, byte):
94         poly = signature[0]
95         offset = signature[1]
96         undo = self.U[signature[2][offset]]
97         poly = ((poly ^ undo) << 8) + byte
98         poly ^= self.T[poly >> self.degree]
99
100         signature[0] = poly
101         signature[1] = (offset + 1) % self.WINDOW_SIZE
102         signature[2][offset] = byte
103
104     def compute_breaks(self, buf):
105         breaks = [0]
106         signature = self.window_init()
107         for i in xrange(len(buf)):
108             self.window_update(signature, ord(buf[i]))
109             block_len = i - breaks[-1] + 1
110             if ((signature[0] % self.TARGET_CHUNK_SIZE == self.BREAKMARK_VALUE
111                         and block_len >= self.MIN_CHUNK_SIZE)
112                     or block_len >= self.MAX_CHUNK_SIZE):
113                 breaks.append(i + 1)
114         if breaks[-1] < len(buf):
115             breaks.append(len(buf))
116         return breaks
117
118     def compute_signatures(self, buf, buf_offset=0):
119         """Break a buffer into chunks and compute chunk signatures.
120
121         Args:
122             buf: The data buffer.
123             buf_offset: The offset of the data buffer within the original
124                 block, to handle cases where only a portion of the block is
125                 available.
126
127         Returns:
128             A dictionary containing signature data.  Keys are chunk offsets
129             (from the beginning of the block), and values are tuples (size, raw
130             hash value).
131         """
132         breaks = self.compute_breaks(buf)
133         signatures = {}
134         for i in range(1, len(breaks)):
135             chunk = buf[breaks[i-1]:breaks[i]]
136             hasher = self.hash_algorithm()
137             hasher.update(chunk)
138             signatures[breaks[i-1] + buf_offset] = (breaks[i] - breaks[i-1],
139                                                     hasher.digest())
140         return signatures
141
142     def dump_signatures(self, signatures):
143         """Convert signatures to the binary format stored in the database."""
144         records = []
145
146         # Emit records indicating that no signatures are available for the next
147         # n bytes.  Since the size is a 16-bit value, to skip larger distances
148         # multiple records must be emitted.  An all-zero signature indicates
149         # the lack of data.
150         def skip(n):
151             while n > 0:
152                 i = min(n, self.MAX_CHUNK_SIZE)
153                 records.append(struct.pack(">H", i) + "\x00" * self.hash_size)
154                 n -= i
155
156         position = 0
157         for next_start, (size, digest) in sorted(signatures.iteritems()):
158             if next_start < position:
159                 print "Warning: overlapping signatures, ignoring"
160                 continue
161             skip(next_start - position)
162             records.append(struct.pack(">H", size) + digest)
163             position = next_start + size
164
165         return "".join(records)
166
167     def load_signatures(self, signatures):
168         """Loads signatures from the binary format stored in the database."""
169         entry_size = 2 + self.hash_size
170         if len(signatures) % entry_size != 0:
171             print "Warning: Invalid signatures to load"
172             return {}
173
174         null_digest = "\x00" * self.hash_size
175         position = 0
176         result = {}
177         for i in range(len(signatures) // entry_size):
178             sig = signatures[i*entry_size:(i+1)*entry_size]
179             size, digest = struct.unpack(">H", sig[:2])[0], sig[2:]
180             if digest != null_digest:
181                 result[position] = (size, digest)
182             position += size
183         return result
184
185
186 class ChunkerExternal(Chunker):
187     """A Chunker which uses an external program to compute Rabin fingerprints.
188
189     This can run much more quickly than the Python code, but should otherwise
190     give identical results.
191     """
192
193     def __init__(self):
194         super(ChunkerExternal, self).__init__()
195         self.subproc = subprocess.Popen([CHUNKER_PROGRAM],
196                                         stdin=subprocess.PIPE,
197                                         stdout=subprocess.PIPE)
198
199     def compute_breaks(self, buf):
200         if len(buf) == 0:
201             return [0]
202         self.subproc.stdin.write(struct.pack(">i", len(buf)))
203         self.subproc.stdin.write(buf)
204         self.subproc.stdin.flush()
205         breaks = self.subproc.stdout.readline()
206         return [0] + [int(x) + 1 for x in breaks.split()]
207
208
209 class DatabaseRebuilder(object):
210     def __init__(self, database):
211         self.database = database
212         self.cursor = database.cursor()
213         self.segment_ids = {}
214         self.chunker = ChunkerExternal()
215         #self.chunker = Chunker()
216
217     def segment_to_id(self, segment):
218         if segment in self.segment_ids: return self.segment_ids[segment]
219
220         self.cursor.execute("""insert or ignore into segments(segment)
221                                values (?)""", (segment,))
222         self.cursor.execute("""select segmentid from segments
223                                where segment = ?""", (segment,))
224         id = self.cursor.fetchone()[0]
225         self.segment_ids[segment] = id
226         return id
227
228     def rebuild(self, metadata, reference_path):
229         """Iterate through old metadata and use it to rebuild the database.
230
231         Args:
232             metadata: An iterable containing lines of the metadata log.
233             reference_path: Path to the root of a file system tree which may be
234                 similar to data in the metadata log (used to recompute block
235                 signatures).
236         """
237         for fields in cumulus.parse(metadata, lambda l: len(l) == 0):
238             metadata = cumulus.MetadataItem(fields, None)
239             # Only process regular files; skip over other types (directories,
240             # symlinks, etc.)
241             if metadata.items.type not in ("-", "f"): continue
242             try:
243                 path = os.path.join(reference_path, metadata.items.name)
244                 print "Path:", path
245                 # TODO: Check file size for early abort if different
246                 self.rebuild_file(open(path), metadata)
247             except IOError as e:
248                 print e
249                 pass  # Ignore the file
250
251         self.database.commit()
252
253     def rebuild_file(self, fp, metadata):
254         """Compare"""
255         blocks = [cumulus.CumulusStore.parse_ref(b) for b in metadata.data()]
256         verifier = cumulus.ChecksumVerifier(metadata.items.checksum)
257         checksums = {}
258         subblock = {}
259         for segment, object, checksum, slice in blocks:
260             # Given a reference to a block of unknown size we don't know how to
261             # match up the data, so we have to give up on rebuilding for this
262             # file.
263             if slice is None: return
264
265             start, length, exact = slice
266             buf = fp.read(length)
267             verifier.update(buf)
268
269             if exact:
270                 csum = cumulus.ChecksumCreator(CHECKSUM_ALGORITHM)
271                 csum.update(buf)
272                 checksums[(segment, object)] = (length, csum.compute())
273
274             signatures = self.chunker.compute_signatures(buf, start)
275             subblock.setdefault((segment, object), {}).update(signatures)
276
277         if verifier.valid():
278             print "Checksum matches, computed:", checksums
279             for k in subblock:
280                 subblock[k] = self.chunker.dump_signatures(subblock[k])
281             print "Subblock signatures:"
282             for k, v in subblock.iteritems():
283                 print k, base64.b16encode(v)
284             self.store_checksums(checksums, subblock)
285         else:
286             print "Checksum mismatch"
287
288     def store_checksums(self, block_checksums, subblock_signatures):
289         for (segment, object), (size, checksum) in block_checksums.iteritems():
290             segmentid = self.segment_to_id(segment)
291             self.cursor.execute("""select blockid from block_index
292                                    where segmentid = ? and object = ?""",
293                                 (segmentid, object))
294             blockid = self.cursor.fetchall()
295             if blockid:
296                 blockid = blockid[0][0]
297             else:
298                 blockid = None
299
300             if blockid is not None:
301                 self.cursor.execute("""update block_index
302                                        set checksum = ?, size = ?
303                                        where blockid = ?""",
304                                     (checksum, size, blockid))
305             else:
306                 self.cursor.execute(
307                     """insert into block_index(
308                            segmentid, object, checksum, size, timestamp)
309                        values (?, ?, ?, ?, julianday('now'))""",
310                     (segmentid, object, checksum, size))
311                 blockid = self.cursor.lastrowid
312
313             # Store subblock signatures, if available.
314             sigs = subblock_signatures.get((segment, object))
315             if sigs:
316                 self.cursor.execute(
317                     """insert or replace into subblock_signatures(
318                            blockid, algorithm, signatures)
319                        values (?, ?, ?)""",
320                     (blockid, self.chunker.ALGORITHM_NAME, buffer(sigs)))
321
322
323 class SegmentStateRebuilder(object):
324     """Reconstructs segment metadata files from raw segment data."""
325
326     def __init__(self):
327         self.filters = dict(cumulus.SEGMENT_FILTERS)
328         self.segment_pattern = cumulus.SEARCH_PATHS["segments"]
329
330     def compute_metadata(self, path, relative_path):
331         """Recompute metadata of a single segment.
332
333         Args:
334             path: Path to the segment file in the file system.
335             relative_path: Path relative to the root for the storage backend.
336         """
337         # Does the filename match that of a segment?  If so, extract the
338         # extension to determine the filter to apply to decompress.
339         filename = os.path.basename(relative_path)
340         m = self.segment_pattern.match(filename)
341         if not m: return
342         segment_name = m.group(1)
343         extension = m.group(2)
344         if extension not in self.filters: return
345         filter_cmd = self.filters[extension]
346
347         # Compute attributes of the compressed segment data.
348         BLOCK_SIZE = 4096
349         with open(path) as segment:
350             disk_size = 0
351             checksummer = cumulus.ChecksumCreator(CHECKSUM_ALGORITHM)
352             while True:
353                 buf = segment.read(BLOCK_SIZE)
354                 if len(buf) == 0: break
355                 disk_size += len(buf)
356                 checksummer.update(buf)
357         checksum = checksummer.compute()
358
359         # Compute attributes of the objects within the segment.
360         data_size = 0
361         object_count = 0
362         with open(path) as segment:
363             decompressed = cumulus.CumulusStore.filter_data(segment, filter_cmd)
364             objects = tarfile.open(mode='r|', fileobj=decompressed)
365             for tarinfo in objects:
366                 data_size += tarinfo.size
367                 object_count += 1
368
369         return {"segment": segment_name,
370                 "path": relative_path,
371                 "checksum": checksum,
372                 "data_size": data_size,
373                 "disk_size": disk_size}
374
375 if __name__ == "__main__":
376     segment_rebuilder = SegmentStateRebuilder()
377     topdir = sys.argv[1]
378     files = []
379     for dirpath, dirnames, filenames in os.walk(topdir):
380         for f in filenames:
381             files.append(os.path.join(dirpath, f))
382     files.sort()
383     for f in files:
384         metadata = segment_rebuilder.compute_metadata(
385             f,
386             os.path.relpath(f, topdir))
387         if metadata:
388             for (k, v) in sorted(metadata.items()):
389                 print "%s: %s" % (k, cumulus.uri_encode(str(v)))
390             print
391     sys.exit(0)
392
393     # Read metadata from stdin; filter out lines starting with "@@" so the
394     # statcache file can be parsed as well.
395     metadata = (x for x in sys.stdin if not x.startswith("@@"))
396
397     rebuilder = DatabaseRebuilder(cumulus.LocalDatabase(sys.argv[1]))
398     rebuilder.rebuild(metadata, "/")