Start on a tool to rebuild the local database if it is lost.
[cumulus.git] / python / cumulus / rebuild_database.py
1 #!/usr/bin/python
2 #
3 # Cumulus: Efficient Filesystem Backup to the Cloud
4 # Copyright (C) 2013 The Cumulus Developers
5 # See the AUTHORS file for a list of contributors.
6 #
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 2 of the License, or
10 # (at your option) any later version.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License along
18 # with this program; if not, write to the Free Software Foundation, Inc.,
19 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20
21 """Code for rebuilding the local database.
22
23 Given a previous metadata dump and a filesystem tree (which may or may not
24 exactly match), recompute block signatures to the extent possible to rebuild
25 the local database.  This can be used to recover from a local database loss,
26 given data from a previous backup.
27 """
28
29 import base64
30 import hashlib
31 import itertools
32 import os
33 import struct
34 import subprocess
35 import sys
36
37 import cumulus
38
39 CHECKSUM_ALGORITHM = "sha224"
40
41 CHUNKER_PROGRAM = "cumulus-chunker-standalone"
42
43 class Chunker(object):
44     """Compute sub-file chunk boundaries using a sliding Rabin fingerprint.
45
46     This duplicates the chunking algorithm in third_party/chunk.cc.
47     """
48     # Chunking parameters.  These should match those in third_party/chunk.cc.
49     MODULUS = 0xbfe6b8a5bf378d83
50     WINDOW_SIZE = 48
51     BREAKMARK_VALUE = 0x78
52     MIN_CHUNK_SIZE = 2048
53     MAX_CHUNK_SIZE = 65535
54     TARGET_CHUNK_SIZE = 4096
55     ALGORITHM_NAME = "lbfs-%d/%s" % (TARGET_CHUNK_SIZE, CHECKSUM_ALGORITHM)
56
57     def __init__(self):
58         degree = self.MODULUS.bit_length() - 1
59         self.degree = degree
60
61         # Lookup table for polynomial reduction when shifting a new byte in,
62         # based on the high-order bits.
63         self.T = [self.polymult(1, i << degree, self.MODULUS) ^ (i << degree)
64                   for i in range(256)]
65
66         # Values to remove a byte from the signature when it falls out of the
67         # window.
68         self.U = [self.polymult(i, 1 << 8*(self.WINDOW_SIZE - 1),
69                                 self.MODULUS) for i in range(256)]
70
71         self.hash_algorithm = cumulus.CHECKSUM_ALGORITHMS[CHECKSUM_ALGORITHM]
72         self.hash_size = self.hash_algorithm().digestsize
73
74     def polymult(self, x, y, n):
75         # Polynomial multiplication: result = x * y
76         result = 0
77         for i in range(x.bit_length()):
78             if (x >> i) & 1:
79                 result ^= y << i
80         # Reduction modulo n
81         size = n.bit_length()
82         while result.bit_length() >= size:
83             result ^= n << (result.bit_length() - size)
84         return result
85
86     def window_init(self):
87         # Sliding signature state is:
88         #   [signature value, history buffer index, history buffer contents]
89         return [0, 0, [0] * self.WINDOW_SIZE]
90
91     def window_update(self, signature, byte):
92         poly = signature[0]
93         offset = signature[1]
94         undo = self.U[signature[2][offset]]
95         poly = ((poly ^ undo) << 8) + byte
96         poly ^= self.T[poly >> self.degree]
97
98         signature[0] = poly
99         signature[1] = (offset + 1) % self.WINDOW_SIZE
100         signature[2][offset] = byte
101
102     def compute_breaks(self, buf):
103         breaks = [0]
104         signature = self.window_init()
105         for i in xrange(len(buf)):
106             self.window_update(signature, ord(buf[i]))
107             block_len = i - breaks[-1] + 1
108             if ((signature[0] % self.TARGET_CHUNK_SIZE == self.BREAKMARK_VALUE
109                         and block_len >= self.MIN_CHUNK_SIZE)
110                     or block_len >= self.MAX_CHUNK_SIZE):
111                 breaks.append(i + 1)
112         if breaks[-1] < len(buf):
113             breaks.append(len(buf))
114         return breaks
115
116     def compute_signatures(self, buf, buf_offset=0):
117         """Break a buffer into chunks and compute chunk signatures.
118
119         Args:
120             buf: The data buffer.
121             buf_offset: The offset of the data buffer within the original
122                 block, to handle cases where only a portion of the block is
123                 available.
124
125         Returns:
126             A dictionary containing signature data.  Keys are chunk offsets
127             (from the beginning of the block), and values are tuples (size, raw
128             hash value).
129         """
130         breaks = self.compute_breaks(buf)
131         signatures = {}
132         for i in range(1, len(breaks)):
133             chunk = buf[breaks[i-1]:breaks[i]]
134             hasher = self.hash_algorithm()
135             hasher.update(chunk)
136             signatures[breaks[i-1] + buf_offset] = (breaks[i] - breaks[i-1],
137                                                     hasher.digest())
138         return signatures
139
140     def dump_signatures(self, signatures):
141         """Convert signatures to the binary format stored in the database."""
142         records = []
143
144         # Emit records indicating that no signatures are available for the next
145         # n bytes.  Since the size is a 16-bit value, to skip larger distances
146         # multiple records must be emitted.  An all-zero signature indicates
147         # the lack of data.
148         def skip(n):
149             while n > 0:
150                 i = min(n, self.MAX_CHUNK_SIZE)
151                 records.append(struct.pack(">H", i) + "\x00" * self.hash_size)
152                 n -= i
153
154         position = 0
155         for next_start, (size, digest) in sorted(signatures.iteritems()):
156             if next_start < position:
157                 print "Warning: overlapping signatures, ignoring"
158                 continue
159             skip(next_start - position)
160             records.append(struct.pack(">H", size) + digest)
161             position = next_start + size
162
163         return "".join(records)
164
165     def load_signatures(self, signatures):
166         """Loads signatures from the binary format stored in the database."""
167         entry_size = 2 + self.hash_size
168         if len(signatures) % entry_size != 0:
169             print "Warning: Invalid signatures to load"
170             return {}
171
172         null_digest = "\x00" * self.hash_size
173         position = 0
174         result = {}
175         for i in range(len(signatures) // entry_size):
176             sig = signatures[i*entry_size:(i+1)*entry_size]
177             size, digest = struct.unpack(">H", sig[:2])[0], sig[2:]
178             if digest != null_digest:
179                 result[position] = (size, digest)
180             position += size
181         return result
182
183
184 class ChunkerExternal(Chunker):
185     """A Chunker which uses an external program to compute Rabin fingerprints.
186
187     This can run much more quickly than the Python code, but should otherwise
188     give identical results.
189     """
190
191     def __init__(self):
192         super(ChunkerExternal, self).__init__()
193         self.subproc = subprocess.Popen([CHUNKER_PROGRAM],
194                                         stdin=subprocess.PIPE,
195                                         stdout=subprocess.PIPE)
196
197     def compute_breaks(self, buf):
198         if len(buf) == 0:
199             return [0]
200         self.subproc.stdin.write(struct.pack(">i", len(buf)))
201         self.subproc.stdin.write(buf)
202         self.subproc.stdin.flush()
203         breaks = self.subproc.stdout.readline()
204         return [int(x) + 1 for x in breaks.split()]
205
206
207 class DatabaseRebuilder(object):
208     def __init__(self, database):
209         self.database = database
210         self.cursor = database.cursor()
211         self.segment_ids = {}
212         #try:
213         self.chunker = ChunkerExternal()
214         #except:
215         #    self.chunker = Chunker()
216
217     def segment_to_id(self, segment):
218         if segment in self.segment_ids: return self.segment_ids[segment]
219
220         self.cursor.execute("""insert or ignore into segments(segment)
221                                values (?)""", (segment,))
222         self.cursor.execute("""select segmentid from segments
223                                where segment = ?""", (segment,))
224         id = self.cursor.fetchone()[0]
225         self.segment_ids[segment] = id
226         return id
227
228     def rebuild(self, metadata, reference_path):
229         """Iterate through old metadata and use it to rebuild the database.
230
231         Args:
232             metadata: An iterable containing lines of the metadata log.
233             reference_path: Path to the root of a file system tree which may be
234                 similar to data in the metadata log (used to recompute block
235                 signatures).
236         """
237         for fields in cumulus.parse(metadata, lambda l: len(l) == 0):
238             metadata = cumulus.MetadataItem(fields, None)
239             # Only process regular files; skip over other types (directories,
240             # symlinks, etc.)
241             if metadata.items.type not in ("-", "f"): continue
242             try:
243                 path = os.path.join(reference_path, metadata.items.name)
244                 print "Path:", path
245                 # TODO: Check file size for early abort if different
246                 self.rebuild_file(open(path), metadata)
247             except IOError as e:
248                 print e
249                 pass  # Ignore the file
250
251         self.database.commit()
252
253     def rebuild_file(self, fp, metadata):
254         """Compare"""
255         blocks = [cumulus.CumulusStore.parse_ref(b) for b in metadata.data()]
256         verifier = cumulus.ChecksumVerifier(metadata.items.checksum)
257         checksums = {}
258         subblock = {}
259         for segment, object, checksum, slice in blocks:
260             # Given a reference to a block of unknown size we don't know how to
261             # match up the data, so we have to give up on rebuilding for this
262             # file.
263             if slice is None: return
264
265             start, length, exact = slice
266             buf = fp.read(length)
267             verifier.update(buf)
268
269             if exact:
270                 csum = cumulus.ChecksumCreator(CHECKSUM_ALGORITHM)
271                 csum.update(buf)
272                 checksums[(segment, object)] = (length, csum.compute())
273
274             signatures = self.chunker.compute_signatures(buf, start)
275             subblock.setdefault((segment, object), {}).update(signatures)
276
277         if verifier.valid():
278             print "Checksum matches, computed:", checksums
279             for k in subblock:
280                 subblock[k] = self.chunker.dump_signatures(subblock[k])
281             print "Subblock signatures:"
282             for k, v in subblock.iteritems():
283                 print k, base64.b16encode(v)
284             self.store_checksums(checksums, subblock)
285         else:
286             print "Checksum mismatch"
287
288     def store_checksums(self, block_checksums, subblock_signatures):
289         for (segment, object), (size, checksum) in block_checksums.iteritems():
290             segmentid = self.segment_to_id(segment)
291             self.cursor.execute("""select blockid from block_index
292                                    where segmentid = ? and object = ?""",
293                                 (segmentid, object))
294             blockid = self.cursor.fetchall()
295             if blockid:
296                 blockid = blockid[0][0]
297             else:
298                 blockid = None
299
300             if blockid is not None:
301                 self.cursor.execute("""update block_index
302                                        set checksum = ?, size = ?
303                                        where blockid = ?""",
304                                     (checksum, size, blockid))
305             else:
306                 self.cursor.execute(
307                     """insert into block_index(
308                            segmentid, object, checksum, size, timestamp)
309                        values (?, ?, ?, ?, julianday('now'))""",
310                     (segmentid, object, checksum, size))
311                 blockid = self.cursor.lastrowid
312
313             # Store subblock signatures, if available.
314             print "blockid:", blockid
315             if (segment, object) in subblock_signatures:
316                 self.cursor.execute(
317                     """insert or replace into subblock_signatures(
318                            blockid, algorithm, signatures)
319                        values (?, ?, ?)""",
320                     (blockid, self.chunker.ALGORITHM_NAME,
321                      buffer(subblock_signatures[(segment, object)])))
322
323
324 if __name__ == "__main__":
325     # Read metadata from stdin; filter out lines starting with "@@" so the
326     # statcache file can be parsed as well.
327     metadata = (x for x in sys.stdin if not x.startswith("@@"))
328
329     rebuilder = DatabaseRebuilder(cumulus.LocalDatabase(sys.argv[1]))
330     rebuilder.rebuild(metadata, "/")