53481d5729f4909bf9d98b106f9d9374f61ecd0a
[cumulus.git] / python / cumulus / rebuild_database.py
1 #!/usr/bin/python
2 #
3 # Cumulus: Efficient Filesystem Backup to the Cloud
4 # Copyright (C) 2013 The Cumulus Developers
5 # See the AUTHORS file for a list of contributors.
6 #
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 2 of the License, or
10 # (at your option) any later version.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License along
18 # with this program; if not, write to the Free Software Foundation, Inc.,
19 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20
21 """Code for rebuilding the local database.
22
23 Given a previous metadata dump and a filesystem tree (which may or may not
24 exactly match), recompute block signatures to the extent possible to rebuild
25 the local database.  This can be used to recover from a local database loss,
26 given data from a previous backup.
27 """
28
29 import base64
30 import hashlib
31 import itertools
32 import os
33 import struct
34 import subprocess
35 import sys
36
37 import cumulus
38
39 CHECKSUM_ALGORITHM = "sha224"
40
41 CHUNKER_PROGRAM = "cumulus-chunker-standalone"
42
43 class Chunker(object):
44     """Compute sub-file chunk boundaries using a sliding Rabin fingerprint.
45
46     This duplicates the chunking algorithm in third_party/chunk.cc.
47     """
48     # Chunking parameters.  These should match those in third_party/chunk.cc.
49     MODULUS = 0xbfe6b8a5bf378d83
50     WINDOW_SIZE = 48
51     BREAKMARK_VALUE = 0x78
52     MIN_CHUNK_SIZE = 2048
53     MAX_CHUNK_SIZE = 65535
54     TARGET_CHUNK_SIZE = 4096
55     ALGORITHM_NAME = "lbfs-%d/%s" % (TARGET_CHUNK_SIZE, CHECKSUM_ALGORITHM)
56
57     def __init__(self):
58         degree = self.MODULUS.bit_length() - 1
59         self.degree = degree
60
61         # Lookup table for polynomial reduction when shifting a new byte in,
62         # based on the high-order bits.
63         self.T = [self.polymult(1, i << degree, self.MODULUS) ^ (i << degree)
64                   for i in range(256)]
65
66         # Values to remove a byte from the signature when it falls out of the
67         # window.
68         self.U = [self.polymult(i, 1 << 8*(self.WINDOW_SIZE - 1),
69                                 self.MODULUS) for i in range(256)]
70
71         self.hash_algorithm = cumulus.CHECKSUM_ALGORITHMS[CHECKSUM_ALGORITHM]
72         self.hash_size = self.hash_algorithm().digestsize
73
74     def polymult(self, x, y, n):
75         # Polynomial multiplication: result = x * y
76         result = 0
77         for i in range(x.bit_length()):
78             if (x >> i) & 1:
79                 result ^= y << i
80         # Reduction modulo n
81         size = n.bit_length()
82         while result.bit_length() >= size:
83             result ^= n << (result.bit_length() - size)
84         return result
85
86     def window_init(self):
87         # Sliding signature state is:
88         #   [signature value, history buffer index, history buffer contents]
89         return [0, 0, [0] * self.WINDOW_SIZE]
90
91     def window_update(self, signature, byte):
92         poly = signature[0]
93         offset = signature[1]
94         undo = self.U[signature[2][offset]]
95         poly = ((poly ^ undo) << 8) + byte
96         poly ^= self.T[poly >> self.degree]
97
98         signature[0] = poly
99         signature[1] = (offset + 1) % self.WINDOW_SIZE
100         signature[2][offset] = byte
101
102     def compute_breaks(self, buf):
103         breaks = [0]
104         signature = self.window_init()
105         for i in xrange(len(buf)):
106             self.window_update(signature, ord(buf[i]))
107             block_len = i - breaks[-1] + 1
108             if ((signature[0] % self.TARGET_CHUNK_SIZE == self.BREAKMARK_VALUE
109                         and block_len >= self.MIN_CHUNK_SIZE)
110                     or block_len >= self.MAX_CHUNK_SIZE):
111                 breaks.append(i + 1)
112         if breaks[-1] < len(buf):
113             breaks.append(len(buf))
114         return breaks
115
116     def compute_signatures(self, buf, buf_offset=0):
117         """Break a buffer into chunks and compute chunk signatures.
118
119         Args:
120             buf: The data buffer.
121             buf_offset: The offset of the data buffer within the original
122                 block, to handle cases where only a portion of the block is
123                 available.
124
125         Returns:
126             A dictionary containing signature data.  Keys are chunk offsets
127             (from the beginning of the block), and values are tuples (size, raw
128             hash value).
129         """
130         breaks = self.compute_breaks(buf)
131         signatures = {}
132         for i in range(1, len(breaks)):
133             chunk = buf[breaks[i-1]:breaks[i]]
134             hasher = self.hash_algorithm()
135             hasher.update(chunk)
136             signatures[breaks[i-1] + buf_offset] = (breaks[i] - breaks[i-1],
137                                                     hasher.digest())
138         return signatures
139
140     def dump_signatures(self, signatures):
141         """Convert signatures to the binary format stored in the database."""
142         records = []
143
144         # Emit records indicating that no signatures are available for the next
145         # n bytes.  Since the size is a 16-bit value, to skip larger distances
146         # multiple records must be emitted.  An all-zero signature indicates
147         # the lack of data.
148         def skip(n):
149             while n > 0:
150                 i = min(n, self.MAX_CHUNK_SIZE)
151                 records.append(struct.pack(">H", i) + "\x00" * self.hash_size)
152                 n -= i
153
154         position = 0
155         for next_start, (size, digest) in sorted(signatures.iteritems()):
156             if next_start < position:
157                 print "Warning: overlapping signatures, ignoring"
158                 continue
159             skip(next_start - position)
160             records.append(struct.pack(">H", size) + digest)
161             position = next_start + size
162
163         return "".join(records)
164
165     def load_signatures(self, signatures):
166         """Loads signatures from the binary format stored in the database."""
167         entry_size = 2 + self.hash_size
168         if len(signatures) % entry_size != 0:
169             print "Warning: Invalid signatures to load"
170             return {}
171
172         null_digest = "\x00" * self.hash_size
173         position = 0
174         result = {}
175         for i in range(len(signatures) // entry_size):
176             sig = signatures[i*entry_size:(i+1)*entry_size]
177             size, digest = struct.unpack(">H", sig[:2])[0], sig[2:]
178             if digest != null_digest:
179                 result[position] = (size, digest)
180             position += size
181         return result
182
183
184 class ChunkerExternal(Chunker):
185     """A Chunker which uses an external program to compute Rabin fingerprints.
186
187     This can run much more quickly than the Python code, but should otherwise
188     give identical results.
189     """
190
191     def __init__(self):
192         super(ChunkerExternal, self).__init__()
193         self.subproc = subprocess.Popen([CHUNKER_PROGRAM],
194                                         stdin=subprocess.PIPE,
195                                         stdout=subprocess.PIPE)
196
197     def compute_breaks(self, buf):
198         if len(buf) == 0:
199             return [0]
200         self.subproc.stdin.write(struct.pack(">i", len(buf)))
201         self.subproc.stdin.write(buf)
202         self.subproc.stdin.flush()
203         breaks = self.subproc.stdout.readline()
204         return [0] + [int(x) + 1 for x in breaks.split()]
205
206
207 class DatabaseRebuilder(object):
208     def __init__(self, database):
209         self.database = database
210         self.cursor = database.cursor()
211         self.segment_ids = {}
212         self.chunker = ChunkerExternal()
213         #self.chunker = Chunker()
214
215     def segment_to_id(self, segment):
216         if segment in self.segment_ids: return self.segment_ids[segment]
217
218         self.cursor.execute("""insert or ignore into segments(segment)
219                                values (?)""", (segment,))
220         self.cursor.execute("""select segmentid from segments
221                                where segment = ?""", (segment,))
222         id = self.cursor.fetchone()[0]
223         self.segment_ids[segment] = id
224         return id
225
226     def rebuild(self, metadata, reference_path):
227         """Iterate through old metadata and use it to rebuild the database.
228
229         Args:
230             metadata: An iterable containing lines of the metadata log.
231             reference_path: Path to the root of a file system tree which may be
232                 similar to data in the metadata log (used to recompute block
233                 signatures).
234         """
235         for fields in cumulus.parse(metadata, lambda l: len(l) == 0):
236             metadata = cumulus.MetadataItem(fields, None)
237             # Only process regular files; skip over other types (directories,
238             # symlinks, etc.)
239             if metadata.items.type not in ("-", "f"): continue
240             try:
241                 path = os.path.join(reference_path, metadata.items.name)
242                 print "Path:", path
243                 # TODO: Check file size for early abort if different
244                 self.rebuild_file(open(path), metadata)
245             except IOError as e:
246                 print e
247                 pass  # Ignore the file
248
249         self.database.commit()
250
251     def rebuild_file(self, fp, metadata):
252         """Compare"""
253         blocks = [cumulus.CumulusStore.parse_ref(b) for b in metadata.data()]
254         verifier = cumulus.ChecksumVerifier(metadata.items.checksum)
255         checksums = {}
256         subblock = {}
257         for segment, object, checksum, slice in blocks:
258             # Given a reference to a block of unknown size we don't know how to
259             # match up the data, so we have to give up on rebuilding for this
260             # file.
261             if slice is None: return
262
263             start, length, exact = slice
264             buf = fp.read(length)
265             verifier.update(buf)
266
267             if exact:
268                 csum = cumulus.ChecksumCreator(CHECKSUM_ALGORITHM)
269                 csum.update(buf)
270                 checksums[(segment, object)] = (length, csum.compute())
271
272             signatures = self.chunker.compute_signatures(buf, start)
273             subblock.setdefault((segment, object), {}).update(signatures)
274
275         if verifier.valid():
276             print "Checksum matches, computed:", checksums
277             for k in subblock:
278                 subblock[k] = self.chunker.dump_signatures(subblock[k])
279             print "Subblock signatures:"
280             for k, v in subblock.iteritems():
281                 print k, base64.b16encode(v)
282             self.store_checksums(checksums, subblock)
283         else:
284             print "Checksum mismatch"
285
286     def store_checksums(self, block_checksums, subblock_signatures):
287         for (segment, object), (size, checksum) in block_checksums.iteritems():
288             segmentid = self.segment_to_id(segment)
289             self.cursor.execute("""select blockid from block_index
290                                    where segmentid = ? and object = ?""",
291                                 (segmentid, object))
292             blockid = self.cursor.fetchall()
293             if blockid:
294                 blockid = blockid[0][0]
295             else:
296                 blockid = None
297
298             if blockid is not None:
299                 self.cursor.execute("""update block_index
300                                        set checksum = ?, size = ?
301                                        where blockid = ?""",
302                                     (checksum, size, blockid))
303             else:
304                 self.cursor.execute(
305                     """insert into block_index(
306                            segmentid, object, checksum, size, timestamp)
307                        values (?, ?, ?, ?, julianday('now'))""",
308                     (segmentid, object, checksum, size))
309                 blockid = self.cursor.lastrowid
310
311             # Store subblock signatures, if available.
312             sigs = subblock_signatures.get((segment, object))
313             if sigs:
314                 self.cursor.execute(
315                     """insert or replace into subblock_signatures(
316                            blockid, algorithm, signatures)
317                        values (?, ?, ?)""",
318                     (blockid, self.chunker.ALGORITHM_NAME, buffer(sigs)))
319
320
321 if __name__ == "__main__":
322     # Read metadata from stdin; filter out lines starting with "@@" so the
323     # statcache file can be parsed as well.
324     metadata = (x for x in sys.stdin if not x.startswith("@@"))
325
326     rebuilder = DatabaseRebuilder(cumulus.LocalDatabase(sys.argv[1]))
327     rebuilder.rebuild(metadata, "/")