3 # Cumulus: Efficient Filesystem Backup to the Cloud
4 # Copyright (C) 2013 The Cumulus Developers
5 # See the AUTHORS file for a list of contributors.
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 2 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License along
18 # with this program; if not, write to the Free Software Foundation, Inc.,
19 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 """Code for rebuilding the local database.
23 Given a previous metadata dump and a filesystem tree (which may or may not
24 exactly match), recompute block signatures to the extent possible to rebuild
25 the local database. This can be used to recover from a local database loss,
26 given data from a previous backup.
41 CHECKSUM_ALGORITHM = "sha224"
43 CHUNKER_PROGRAM = "cumulus-chunker-standalone"
45 class Chunker(object):
46 """Compute sub-file chunk boundaries using a sliding Rabin fingerprint.
48 This duplicates the chunking algorithm in third_party/chunk.cc.
50 # Chunking parameters. These should match those in third_party/chunk.cc.
51 MODULUS = 0xbfe6b8a5bf378d83
53 BREAKMARK_VALUE = 0x78
55 MAX_CHUNK_SIZE = 65535
56 TARGET_CHUNK_SIZE = 4096
57 ALGORITHM_NAME = "lbfs-%d/%s" % (TARGET_CHUNK_SIZE, CHECKSUM_ALGORITHM)
60 degree = self.MODULUS.bit_length() - 1
63 # Lookup table for polynomial reduction when shifting a new byte in,
64 # based on the high-order bits.
65 self.T = [self.polymult(1, i << degree, self.MODULUS) ^ (i << degree)
68 # Values to remove a byte from the signature when it falls out of the
70 self.U = [self.polymult(i, 1 << 8*(self.WINDOW_SIZE - 1),
71 self.MODULUS) for i in range(256)]
73 self.hash_algorithm = cumulus.CHECKSUM_ALGORITHMS[CHECKSUM_ALGORITHM]
74 self.hash_size = self.hash_algorithm().digestsize
76 def polymult(self, x, y, n):
77 # Polynomial multiplication: result = x * y
79 for i in range(x.bit_length()):
84 while result.bit_length() >= size:
85 result ^= n << (result.bit_length() - size)
88 def window_init(self):
89 # Sliding signature state is:
90 # [signature value, history buffer index, history buffer contents]
91 return [0, 0, [0] * self.WINDOW_SIZE]
93 def window_update(self, signature, byte):
96 undo = self.U[signature[2][offset]]
97 poly = ((poly ^ undo) << 8) + byte
98 poly ^= self.T[poly >> self.degree]
101 signature[1] = (offset + 1) % self.WINDOW_SIZE
102 signature[2][offset] = byte
104 def compute_breaks(self, buf):
106 signature = self.window_init()
107 for i in xrange(len(buf)):
108 self.window_update(signature, ord(buf[i]))
109 block_len = i - breaks[-1] + 1
110 if ((signature[0] % self.TARGET_CHUNK_SIZE == self.BREAKMARK_VALUE
111 and block_len >= self.MIN_CHUNK_SIZE)
112 or block_len >= self.MAX_CHUNK_SIZE):
114 if breaks[-1] < len(buf):
115 breaks.append(len(buf))
118 def compute_signatures(self, buf, buf_offset=0):
119 """Break a buffer into chunks and compute chunk signatures.
122 buf: The data buffer.
123 buf_offset: The offset of the data buffer within the original
124 block, to handle cases where only a portion of the block is
128 A dictionary containing signature data. Keys are chunk offsets
129 (from the beginning of the block), and values are tuples (size, raw
132 breaks = self.compute_breaks(buf)
134 for i in range(1, len(breaks)):
135 chunk = buf[breaks[i-1]:breaks[i]]
136 hasher = self.hash_algorithm()
138 signatures[breaks[i-1] + buf_offset] = (breaks[i] - breaks[i-1],
142 def dump_signatures(self, signatures):
143 """Convert signatures to the binary format stored in the database."""
146 # Emit records indicating that no signatures are available for the next
147 # n bytes. Since the size is a 16-bit value, to skip larger distances
148 # multiple records must be emitted. An all-zero signature indicates
152 i = min(n, self.MAX_CHUNK_SIZE)
153 records.append(struct.pack(">H", i) + "\x00" * self.hash_size)
157 for next_start, (size, digest) in sorted(signatures.iteritems()):
158 if next_start < position:
159 print "Warning: overlapping signatures, ignoring"
161 skip(next_start - position)
162 records.append(struct.pack(">H", size) + digest)
163 position = next_start + size
165 return "".join(records)
167 def load_signatures(self, signatures):
168 """Loads signatures from the binary format stored in the database."""
169 entry_size = 2 + self.hash_size
170 if len(signatures) % entry_size != 0:
171 print "Warning: Invalid signatures to load"
174 null_digest = "\x00" * self.hash_size
177 for i in range(len(signatures) // entry_size):
178 sig = signatures[i*entry_size:(i+1)*entry_size]
179 size, digest = struct.unpack(">H", sig[:2])[0], sig[2:]
180 if digest != null_digest:
181 result[position] = (size, digest)
186 class ChunkerExternal(Chunker):
187 """A Chunker which uses an external program to compute Rabin fingerprints.
189 This can run much more quickly than the Python code, but should otherwise
190 give identical results.
194 super(ChunkerExternal, self).__init__()
195 self.subproc = subprocess.Popen([CHUNKER_PROGRAM],
196 stdin=subprocess.PIPE,
197 stdout=subprocess.PIPE)
199 def compute_breaks(self, buf):
202 self.subproc.stdin.write(struct.pack(">i", len(buf)))
203 self.subproc.stdin.write(buf)
204 self.subproc.stdin.flush()
205 breaks = self.subproc.stdout.readline()
206 return [0] + [int(x) + 1 for x in breaks.split()]
209 class DatabaseRebuilder(object):
210 def __init__(self, database):
211 self.database = database
212 self.cursor = database.cursor()
213 self.segment_ids = {}
214 self.chunker = ChunkerExternal()
215 #self.chunker = Chunker()
217 def segment_to_id(self, segment):
218 if segment in self.segment_ids: return self.segment_ids[segment]
220 self.cursor.execute("""insert or ignore into segments(segment)
221 values (?)""", (segment,))
222 self.cursor.execute("""select segmentid from segments
223 where segment = ?""", (segment,))
224 id = self.cursor.fetchone()[0]
225 self.segment_ids[segment] = id
228 def rebuild(self, metadata, reference_path):
229 """Iterate through old metadata and use it to rebuild the database.
232 metadata: An iterable containing lines of the metadata log.
233 reference_path: Path to the root of a file system tree which may be
234 similar to data in the metadata log (used to recompute block
237 for fields in cumulus.parse(metadata, lambda l: len(l) == 0):
238 metadata = cumulus.MetadataItem(fields, None)
239 # Only process regular files; skip over other types (directories,
241 if metadata.items.type not in ("-", "f"): continue
243 path = os.path.join(reference_path, metadata.items.name)
245 # TODO: Check file size for early abort if different
246 self.rebuild_file(open(path), metadata)
249 pass # Ignore the file
251 self.database.commit()
253 def rebuild_file(self, fp, metadata):
255 blocks = [cumulus.CumulusStore.parse_ref(b) for b in metadata.data()]
256 verifier = cumulus.ChecksumVerifier(metadata.items.checksum)
259 for segment, object, checksum, slice in blocks:
260 # Given a reference to a block of unknown size we don't know how to
261 # match up the data, so we have to give up on rebuilding for this
263 if slice is None: return
265 start, length, exact = slice
266 buf = fp.read(length)
270 csum = cumulus.ChecksumCreator(CHECKSUM_ALGORITHM)
272 checksums[(segment, object)] = (length, csum.compute())
274 signatures = self.chunker.compute_signatures(buf, start)
275 subblock.setdefault((segment, object), {}).update(signatures)
278 print "Checksum matches, computed:", checksums
280 subblock[k] = self.chunker.dump_signatures(subblock[k])
281 print "Subblock signatures:"
282 for k, v in subblock.iteritems():
283 print k, base64.b16encode(v)
284 self.store_checksums(checksums, subblock)
286 print "Checksum mismatch"
288 def store_checksums(self, block_checksums, subblock_signatures):
289 for (segment, object), (size, checksum) in block_checksums.iteritems():
290 segmentid = self.segment_to_id(segment)
291 self.cursor.execute("""select blockid from block_index
292 where segmentid = ? and object = ?""",
294 blockid = self.cursor.fetchall()
296 blockid = blockid[0][0]
300 if blockid is not None:
301 self.cursor.execute("""update block_index
302 set checksum = ?, size = ?
303 where blockid = ?""",
304 (checksum, size, blockid))
307 """insert into block_index(
308 segmentid, object, checksum, size, timestamp)
309 values (?, ?, ?, ?, julianday('now'))""",
310 (segmentid, object, checksum, size))
311 blockid = self.cursor.lastrowid
313 # Store subblock signatures, if available.
314 sigs = subblock_signatures.get((segment, object))
317 """insert or replace into subblock_signatures(
318 blockid, algorithm, signatures)
320 (blockid, self.chunker.ALGORITHM_NAME, buffer(sigs)))
323 class SegmentStateRebuilder(object):
324 """Reconstructs segment metadata files from raw segment data."""
327 self.filters = dict(cumulus.SEGMENT_FILTERS)
328 self.segment_pattern = cumulus.SEARCH_PATHS["segments"]
330 def compute_metadata(self, path, relative_path):
331 """Recompute metadata of a single segment.
334 path: Path to the segment file in the file system.
335 relative_path: Path relative to the root for the storage backend.
337 # Does the filename match that of a segment? If so, extract the
338 # extension to determine the filter to apply to decompress.
339 filename = os.path.basename(relative_path)
340 m = self.segment_pattern.match(filename)
342 segment_name = m.group(1)
343 extension = m.group(2)
344 if extension not in self.filters: return
345 filter_cmd = self.filters[extension]
347 # Compute attributes of the compressed segment data.
349 with open(path) as segment:
351 checksummer = cumulus.ChecksumCreator(CHECKSUM_ALGORITHM)
353 buf = segment.read(BLOCK_SIZE)
354 if len(buf) == 0: break
355 disk_size += len(buf)
356 checksummer.update(buf)
357 checksum = checksummer.compute()
359 # Compute attributes of the objects within the segment.
362 with open(path) as segment:
363 decompressed = cumulus.CumulusStore.filter_data(segment, filter_cmd)
364 objects = tarfile.open(mode='r|', fileobj=decompressed)
365 for tarinfo in objects:
366 data_size += tarinfo.size
369 return {"segment": segment_name,
370 "path": relative_path,
371 "checksum": checksum,
372 "data_size": data_size,
373 "disk_size": disk_size}
375 if __name__ == "__main__":
376 segment_rebuilder = SegmentStateRebuilder()
379 for dirpath, dirnames, filenames in os.walk(topdir):
381 files.append(os.path.join(dirpath, f))
384 metadata = segment_rebuilder.compute_metadata(
386 os.path.relpath(f, topdir))
388 for (k, v) in sorted(metadata.items()):
389 print "%s: %s" % (k, cumulus.uri_encode(str(v)))
393 # Read metadata from stdin; filter out lines starting with "@@" so the
394 # statcache file can be parsed as well.
395 metadata = (x for x in sys.stdin if not x.startswith("@@"))
397 rebuilder = DatabaseRebuilder(cumulus.LocalDatabase(sys.argv[1]))
398 rebuilder.rebuild(metadata, "/")