3 # Cumulus: Efficient Filesystem Backup to the Cloud
4 # Copyright (C) 2013 The Cumulus Developers
5 # See the AUTHORS file for a list of contributors.
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 2 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License along
18 # with this program; if not, write to the Free Software Foundation, Inc.,
19 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 """Code for rebuilding the local database.
23 Given a previous metadata dump and a filesystem tree (which may or may not
24 exactly match), recompute block signatures to the extent possible to rebuild
25 the local database. This can be used to recover from a local database loss,
26 given data from a previous backup.
29 from __future__ import division, print_function, unicode_literals
43 from cumulus import util
45 CHECKSUM_ALGORITHM = "sha224"
46 CHUNKER_PROGRAM = "cumulus-chunker-standalone"
48 # TODO: Move to somewhere common
49 SQLITE_TIMESTAMP = "%Y-%m-%d %H:%M:%S"
51 class Chunker(object):
52 """Compute sub-file chunk boundaries using a sliding Rabin fingerprint.
54 This duplicates the chunking algorithm in third_party/chunk.cc.
56 # Chunking parameters. These should match those in third_party/chunk.cc.
57 MODULUS = 0xbfe6b8a5bf378d83
59 BREAKMARK_VALUE = 0x78
61 MAX_CHUNK_SIZE = 65535
62 TARGET_CHUNK_SIZE = 4096
63 ALGORITHM_NAME = "lbfs-%d/%s" % (TARGET_CHUNK_SIZE, CHECKSUM_ALGORITHM)
65 # Minimum size of a block before we should bother storing subfile
66 # signatures (only applies when full blocks are used to store a file;
67 # subfile signatures are always used when subfile incrementals are
69 MINIMUM_OBJECT_SIZE = 16384
72 degree = self.MODULUS.bit_length() - 1
75 # Lookup table for polynomial reduction when shifting a new byte in,
76 # based on the high-order bits.
77 self.T = [self.polymult(1, i << degree, self.MODULUS) ^ (i << degree)
80 # Values to remove a byte from the signature when it falls out of the
82 self.U = [self.polymult(i, 1 << 8*(self.WINDOW_SIZE - 1),
83 self.MODULUS) for i in range(256)]
85 self.hash_algorithm = cumulus.CHECKSUM_ALGORITHMS[CHECKSUM_ALGORITHM]
86 self.hash_size = self.hash_algorithm().digestsize
88 def polymult(self, x, y, n):
89 # Polynomial multiplication: result = x * y
91 for i in range(x.bit_length()):
96 while result.bit_length() >= size:
97 result ^= n << (result.bit_length() - size)
100 def window_init(self):
101 # Sliding signature state is:
102 # [signature value, history buffer index, history buffer contents]
103 return [0, 0, [0] * self.WINDOW_SIZE]
105 def window_update(self, signature, byte):
107 offset = signature[1]
108 undo = self.U[signature[2][offset]]
109 poly = ((poly ^ undo) << 8) + byte
110 poly ^= self.T[poly >> self.degree]
113 signature[1] = (offset + 1) % self.WINDOW_SIZE
114 signature[2][offset] = byte
116 def compute_breaks(self, buf):
118 signature = self.window_init()
119 for i in range(len(buf)):
120 self.window_update(signature, ord(buf[i]))
121 block_len = i - breaks[-1] + 1
122 if ((signature[0] % self.TARGET_CHUNK_SIZE == self.BREAKMARK_VALUE
123 and block_len >= self.MIN_CHUNK_SIZE)
124 or block_len >= self.MAX_CHUNK_SIZE):
126 if breaks[-1] < len(buf):
127 breaks.append(len(buf))
130 def compute_signatures(self, buf, buf_offset=0):
131 """Break a buffer into chunks and compute chunk signatures.
134 buf: The data buffer.
135 buf_offset: The offset of the data buffer within the original
136 block, to handle cases where only a portion of the block is
140 A dictionary containing signature data. Keys are chunk offsets
141 (from the beginning of the block), and values are tuples (size, raw
144 breaks = self.compute_breaks(buf)
146 for i in range(1, len(breaks)):
147 chunk = buf[breaks[i-1]:breaks[i]]
148 hasher = self.hash_algorithm()
150 signatures[breaks[i-1] + buf_offset] = (breaks[i] - breaks[i-1],
154 def dump_signatures(self, signatures):
155 """Convert signatures to the binary format stored in the database."""
158 # Emit records indicating that no signatures are available for the next
159 # n bytes. Since the size is a 16-bit value, to skip larger distances
160 # multiple records must be emitted. An all-zero signature indicates
164 i = min(n, self.MAX_CHUNK_SIZE)
165 records.append(struct.pack(">H", i) + "\x00" * self.hash_size)
169 for next_start, (size, digest) in sorted(signatures.items()):
170 if next_start < position:
171 print("Warning: overlapping signatures, ignoring")
173 skip(next_start - position)
174 records.append(struct.pack(">H", size) + digest)
175 position = next_start + size
177 return "".join(records)
179 def load_signatures(self, signatures):
180 """Loads signatures from the binary format stored in the database."""
181 entry_size = 2 + self.hash_size
182 if len(signatures) % entry_size != 0:
183 print("Warning: Invalid signatures to load")
186 null_digest = "\x00" * self.hash_size
189 for i in range(len(signatures) // entry_size):
190 sig = signatures[i*entry_size:(i+1)*entry_size]
191 size, digest = struct.unpack(">H", sig[:2])[0], sig[2:]
192 if digest != null_digest:
193 result[position] = (size, digest)
198 class ChunkerExternal(Chunker):
199 """A Chunker which uses an external program to compute Rabin fingerprints.
201 This can run much more quickly than the Python code, but should otherwise
202 give identical results.
206 super(ChunkerExternal, self).__init__()
207 self.subproc = subprocess.Popen([CHUNKER_PROGRAM],
208 stdin=subprocess.PIPE,
209 stdout=subprocess.PIPE)
211 def compute_breaks(self, buf):
214 self.subproc.stdin.write(struct.pack(">i", len(buf)))
215 self.subproc.stdin.write(buf)
216 self.subproc.stdin.flush()
217 breaks = self.subproc.stdout.readline()
218 return [0] + [int(x) + 1 for x in breaks.split()]
221 class DatabaseRebuilder(object):
222 def __init__(self, database):
223 self.database = database
224 self.cursor = database.cursor()
225 self.segment_ids = {}
226 self.chunker = ChunkerExternal()
227 #self.chunker = Chunker()
229 def segment_to_id(self, segment):
230 if segment in self.segment_ids: return self.segment_ids[segment]
232 self.cursor.execute("""insert or ignore into segments(segment)
233 values (?)""", (segment,))
234 self.cursor.execute("""select segmentid from segments
235 where segment = ?""", (segment,))
236 id = self.cursor.fetchone()[0]
237 self.segment_ids[segment] = id
240 def rebuild(self, metadata, reference_path):
241 """Iterate through old metadata and use it to rebuild the database.
244 metadata: An iterable containing lines of the metadata log.
245 reference_path: Path to the root of a file system tree which may be
246 similar to data in the metadata log (used to recompute block
249 for fields in cumulus.parse(metadata, lambda l: len(l) == 0):
250 metadata = cumulus.MetadataItem(fields, None)
251 # Only process regular files; skip over other types (directories,
253 if metadata.items.type not in ("-", "f"): continue
255 path = os.path.join(reference_path, metadata.items.name)
257 # TODO: Check file size for early abort if different
258 self.rebuild_file(open(path), metadata)
261 pass # Ignore the file
263 self.database.commit()
265 def reload_segment_metadata(self, segment_metadata):
266 """Read a segment metadata (.meta) file into the local database.
268 Updates the segments table in the local database with information from
269 a a segment metadata backup file. Old data is not overwritten, so
270 loading a .meta file with partial information is fine.
272 for info in cumulus.parse(segment_metadata,
273 terminate=lambda l: len(l) == 0):
274 segment = info.pop("segment")
275 self.insert_segment_info(segment, info)
277 self.database.commit()
279 def insert_segment_info(self, segment, info):
280 id = self.segment_to_id(segment)
281 for k, v in info.items():
282 self.cursor.execute("update segments set " + k + " = ? "
283 "where segmentid = ?",
286 def rebuild_file(self, fp, metadata):
287 """Recompute database signatures if a file is unchanged.
289 If the current file contents match that from the old metadata (the
290 full-file hash matches), then recompute block- and chunk-level
291 signatures for the objects referenced by the file.
293 blocks = [cumulus.CumulusStore.parse_ref(b) for b in metadata.data()]
294 verifier = cumulus.ChecksumVerifier(metadata.items.checksum)
297 for segment, object, checksum, slice in blocks:
298 # Given a reference to a block of unknown size we don't know how to
299 # match up the data, so we have to give up on rebuilding for this
301 if slice is None: return
303 start, length, exact = slice
304 buf = fp.read(length)
307 # Zero blocks get no checksums, so skip further processing on them.
308 if object is None: continue
311 csum = cumulus.ChecksumCreator(CHECKSUM_ALGORITHM)
313 checksums[(segment, object)] = (length, csum.compute())
315 # Compute a lower bound on the object size.
316 oldlength, csum = checksums.get((segment, object), (0, None))
317 checksums[(segment, object)] = (max(oldlength, start + length),
320 if length >= self.chunker.MINIMUM_OBJECT_SIZE or not exact:
321 signatures = self.chunker.compute_signatures(buf, start)
322 subblock.setdefault((segment, object), {}).update(signatures)
326 subblock[k] = self.chunker.dump_signatures(subblock[k])
327 self.store_checksums(checksums, subblock)
329 print("Checksum mismatch")
331 def store_checksums(self, block_checksums, subblock_signatures):
332 for (segment, object), (size, checksum) in block_checksums.items():
333 segmentid = self.segment_to_id(segment)
335 """insert or ignore into block_index(segmentid, object)
338 self.cursor.execute("""select blockid from block_index
339 where segmentid = ? and object = ?""",
341 blockid = self.cursor.fetchall()[0][0]
343 # Store checksum only if it is available; we don't want to
344 # overwrite an existing checksum in the database with NULL.
345 if checksum is not None:
346 self.cursor.execute("""update block_index
348 where blockid = ?""",
351 # Update the object size. Our size may be an estimate, based on
352 # slices that we have seen. The size in the database must not be
353 # larger than the true size, but update it to the largest value
355 self.cursor.execute("""update block_index
356 set size = max(?, coalesce(size, 0))
357 where blockid = ?""",
360 # Store subblock signatures, if available.
361 # TODO: Even better would be to merge signature data, to handle the
362 # case where different files see different chunks of a block.
363 sigs = subblock_signatures.get((segment, object))
366 """insert or replace into subblock_signatures(
367 blockid, algorithm, signatures)
369 (blockid, self.chunker.ALGORITHM_NAME, buffer(sigs)))
372 class SegmentStateRebuilder(object):
373 """Reconstructs segment metadata files from raw segment data."""
376 self.filters = dict(cumulus.SEGMENT_FILTERS)
377 self.segment_pattern = cumulus.SEARCH_PATHS["segments"]
379 def compute_metadata(self, path, relative_path):
380 """Recompute metadata of a single segment.
383 path: Path to the segment file in the file system.
384 relative_path: Path relative to the root for the storage backend.
386 # Does the filename match that of a segment? If so, extract the
387 # extension to determine the filter to apply to decompress.
388 filename = os.path.basename(relative_path)
389 m = self.segment_pattern.match(filename)
391 segment_name = m.group(1)
392 extension = m.group(2)
393 if extension not in self.filters: return
394 filter_cmd = self.filters[extension]
397 st_buf = os.stat(path)
398 timestamp = time.strftime(SQLITE_TIMESTAMP,
399 time.gmtime(st_buf.st_mtime))
401 # Compute attributes of the compressed segment data.
403 with open(path) as segment:
405 checksummer = cumulus.ChecksumCreator(CHECKSUM_ALGORITHM)
407 buf = segment.read(BLOCK_SIZE)
408 if len(buf) == 0: break
409 disk_size += len(buf)
410 checksummer.update(buf)
411 checksum = checksummer.compute()
413 # Compute attributes of the objects within the segment.
416 with open(path) as segment:
417 decompressed = cumulus.CumulusStore.filter_data(segment, filter_cmd)
418 objects = tarfile.open(mode='r|', fileobj=decompressed)
419 for tarinfo in objects:
420 data_size += tarinfo.size
423 return {"segment": util.uri_encode_pathname(segment_name),
424 "path": util.uri_encode_pathname(relative_path),
425 "checksum": checksum,
426 "data_size": data_size,
427 "disk_size": disk_size,
428 "timestamp": timestamp}
430 if __name__ == "__main__":
431 # Sample code to reconstruct segment metadata--ought to be relocated.
433 segment_rebuilder = SegmentStateRebuilder()
436 for dirpath, dirnames, filenames in os.walk(topdir):
438 files.append(os.path.join(dirpath, f))
441 metadata = segment_rebuilder.compute_metadata(
443 os.path.relpath(f, topdir))
445 for (k, v) in sorted(metadata.items()):
446 print("%s: %s" % (k, v))
450 # Sample code to rebuild the segments table from metadata--needs to be
451 # merged with the code below.
453 rebuilder = DatabaseRebuilder(cumulus.LocalDatabase(sys.argv[1]))
454 rebuilder.reload_segment_metadata(open(sys.argv[2]))
457 # Read metadata from stdin; filter out lines starting with "@@" so the
458 # statcache file can be parsed as well.
459 metadata = (x for x in sys.stdin if not x.startswith("@@"))
461 rebuilder = DatabaseRebuilder(cumulus.LocalDatabase(sys.argv[1]))
462 rebuilder.rebuild(metadata, "/")