3 # Cumulus: Efficient Filesystem Backup to the Cloud
4 # Copyright (C) 2013 The Cumulus Developers
5 # See the AUTHORS file for a list of contributors.
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 2 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License along
18 # with this program; if not, write to the Free Software Foundation, Inc.,
19 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 """Code for rebuilding the local database.
23 Given a previous metadata dump and a filesystem tree (which may or may not
24 exactly match), recompute block signatures to the extent possible to rebuild
25 the local database. This can be used to recover from a local database loss,
26 given data from a previous backup.
39 CHECKSUM_ALGORITHM = "sha224"
41 CHUNKER_PROGRAM = "cumulus-chunker-standalone"
43 class Chunker(object):
44 """Compute sub-file chunk boundaries using a sliding Rabin fingerprint.
46 This duplicates the chunking algorithm in third_party/chunk.cc.
48 # Chunking parameters. These should match those in third_party/chunk.cc.
49 MODULUS = 0xbfe6b8a5bf378d83
51 BREAKMARK_VALUE = 0x78
53 MAX_CHUNK_SIZE = 65535
54 TARGET_CHUNK_SIZE = 4096
55 ALGORITHM_NAME = "lbfs-%d/%s" % (TARGET_CHUNK_SIZE, CHECKSUM_ALGORITHM)
58 degree = self.MODULUS.bit_length() - 1
61 # Lookup table for polynomial reduction when shifting a new byte in,
62 # based on the high-order bits.
63 self.T = [self.polymult(1, i << degree, self.MODULUS) ^ (i << degree)
66 # Values to remove a byte from the signature when it falls out of the
68 self.U = [self.polymult(i, 1 << 8*(self.WINDOW_SIZE - 1),
69 self.MODULUS) for i in range(256)]
71 self.hash_algorithm = cumulus.CHECKSUM_ALGORITHMS[CHECKSUM_ALGORITHM]
72 self.hash_size = self.hash_algorithm().digestsize
74 def polymult(self, x, y, n):
75 # Polynomial multiplication: result = x * y
77 for i in range(x.bit_length()):
82 while result.bit_length() >= size:
83 result ^= n << (result.bit_length() - size)
86 def window_init(self):
87 # Sliding signature state is:
88 # [signature value, history buffer index, history buffer contents]
89 return [0, 0, [0] * self.WINDOW_SIZE]
91 def window_update(self, signature, byte):
94 undo = self.U[signature[2][offset]]
95 poly = ((poly ^ undo) << 8) + byte
96 poly ^= self.T[poly >> self.degree]
99 signature[1] = (offset + 1) % self.WINDOW_SIZE
100 signature[2][offset] = byte
102 def compute_breaks(self, buf):
104 signature = self.window_init()
105 for i in xrange(len(buf)):
106 self.window_update(signature, ord(buf[i]))
107 block_len = i - breaks[-1] + 1
108 if ((signature[0] % self.TARGET_CHUNK_SIZE == self.BREAKMARK_VALUE
109 and block_len >= self.MIN_CHUNK_SIZE)
110 or block_len >= self.MAX_CHUNK_SIZE):
112 if breaks[-1] < len(buf):
113 breaks.append(len(buf))
116 def compute_signatures(self, buf, buf_offset=0):
117 """Break a buffer into chunks and compute chunk signatures.
120 buf: The data buffer.
121 buf_offset: The offset of the data buffer within the original
122 block, to handle cases where only a portion of the block is
126 A dictionary containing signature data. Keys are chunk offsets
127 (from the beginning of the block), and values are tuples (size, raw
130 breaks = self.compute_breaks(buf)
132 for i in range(1, len(breaks)):
133 chunk = buf[breaks[i-1]:breaks[i]]
134 hasher = self.hash_algorithm()
136 signatures[breaks[i-1] + buf_offset] = (breaks[i] - breaks[i-1],
140 def dump_signatures(self, signatures):
141 """Convert signatures to the binary format stored in the database."""
144 # Emit records indicating that no signatures are available for the next
145 # n bytes. Since the size is a 16-bit value, to skip larger distances
146 # multiple records must be emitted. An all-zero signature indicates
150 i = min(n, self.MAX_CHUNK_SIZE)
151 records.append(struct.pack(">H", i) + "\x00" * self.hash_size)
155 for next_start, (size, digest) in sorted(signatures.iteritems()):
156 if next_start < position:
157 print "Warning: overlapping signatures, ignoring"
159 skip(next_start - position)
160 records.append(struct.pack(">H", size) + digest)
161 position = next_start + size
163 return "".join(records)
165 def load_signatures(self, signatures):
166 """Loads signatures from the binary format stored in the database."""
167 entry_size = 2 + self.hash_size
168 if len(signatures) % entry_size != 0:
169 print "Warning: Invalid signatures to load"
172 null_digest = "\x00" * self.hash_size
175 for i in range(len(signatures) // entry_size):
176 sig = signatures[i*entry_size:(i+1)*entry_size]
177 size, digest = struct.unpack(">H", sig[:2])[0], sig[2:]
178 if digest != null_digest:
179 result[position] = (size, digest)
184 class ChunkerExternal(Chunker):
185 """A Chunker which uses an external program to compute Rabin fingerprints.
187 This can run much more quickly than the Python code, but should otherwise
188 give identical results.
192 super(ChunkerExternal, self).__init__()
193 self.subproc = subprocess.Popen([CHUNKER_PROGRAM],
194 stdin=subprocess.PIPE,
195 stdout=subprocess.PIPE)
197 def compute_breaks(self, buf):
200 self.subproc.stdin.write(struct.pack(">i", len(buf)))
201 self.subproc.stdin.write(buf)
202 self.subproc.stdin.flush()
203 breaks = self.subproc.stdout.readline()
204 return [0] + [int(x) + 1 for x in breaks.split()]
207 class DatabaseRebuilder(object):
208 def __init__(self, database):
209 self.database = database
210 self.cursor = database.cursor()
211 self.segment_ids = {}
212 self.chunker = ChunkerExternal()
213 #self.chunker = Chunker()
215 def segment_to_id(self, segment):
216 if segment in self.segment_ids: return self.segment_ids[segment]
218 self.cursor.execute("""insert or ignore into segments(segment)
219 values (?)""", (segment,))
220 self.cursor.execute("""select segmentid from segments
221 where segment = ?""", (segment,))
222 id = self.cursor.fetchone()[0]
223 self.segment_ids[segment] = id
226 def rebuild(self, metadata, reference_path):
227 """Iterate through old metadata and use it to rebuild the database.
230 metadata: An iterable containing lines of the metadata log.
231 reference_path: Path to the root of a file system tree which may be
232 similar to data in the metadata log (used to recompute block
235 for fields in cumulus.parse(metadata, lambda l: len(l) == 0):
236 metadata = cumulus.MetadataItem(fields, None)
237 # Only process regular files; skip over other types (directories,
239 if metadata.items.type not in ("-", "f"): continue
241 path = os.path.join(reference_path, metadata.items.name)
243 # TODO: Check file size for early abort if different
244 self.rebuild_file(open(path), metadata)
247 pass # Ignore the file
249 self.database.commit()
251 def rebuild_file(self, fp, metadata):
253 blocks = [cumulus.CumulusStore.parse_ref(b) for b in metadata.data()]
254 verifier = cumulus.ChecksumVerifier(metadata.items.checksum)
257 for segment, object, checksum, slice in blocks:
258 # Given a reference to a block of unknown size we don't know how to
259 # match up the data, so we have to give up on rebuilding for this
261 if slice is None: return
263 start, length, exact = slice
264 buf = fp.read(length)
268 csum = cumulus.ChecksumCreator(CHECKSUM_ALGORITHM)
270 checksums[(segment, object)] = (length, csum.compute())
272 signatures = self.chunker.compute_signatures(buf, start)
273 subblock.setdefault((segment, object), {}).update(signatures)
276 print "Checksum matches, computed:", checksums
278 subblock[k] = self.chunker.dump_signatures(subblock[k])
279 print "Subblock signatures:"
280 for k, v in subblock.iteritems():
281 print k, base64.b16encode(v)
282 self.store_checksums(checksums, subblock)
284 print "Checksum mismatch"
286 def store_checksums(self, block_checksums, subblock_signatures):
287 for (segment, object), (size, checksum) in block_checksums.iteritems():
288 segmentid = self.segment_to_id(segment)
289 self.cursor.execute("""select blockid from block_index
290 where segmentid = ? and object = ?""",
292 blockid = self.cursor.fetchall()
294 blockid = blockid[0][0]
298 if blockid is not None:
299 self.cursor.execute("""update block_index
300 set checksum = ?, size = ?
301 where blockid = ?""",
302 (checksum, size, blockid))
305 """insert into block_index(
306 segmentid, object, checksum, size, timestamp)
307 values (?, ?, ?, ?, julianday('now'))""",
308 (segmentid, object, checksum, size))
309 blockid = self.cursor.lastrowid
311 # Store subblock signatures, if available.
312 sigs = subblock_signatures.get((segment, object))
315 """insert or replace into subblock_signatures(
316 blockid, algorithm, signatures)
318 (blockid, self.chunker.ALGORITHM_NAME, buffer(sigs)))
321 if __name__ == "__main__":
322 # Read metadata from stdin; filter out lines starting with "@@" so the
323 # statcache file can be parsed as well.
324 metadata = (x for x in sys.stdin if not x.startswith("@@"))
326 rebuilder = DatabaseRebuilder(cumulus.LocalDatabase(sys.argv[1]))
327 rebuilder.rebuild(metadata, "/")