3 # Cumulus: Efficient Filesystem Backup to the Cloud
4 # Copyright (C) 2013 The Cumulus Developers
5 # See the AUTHORS file for a list of contributors.
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 2 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License along
18 # with this program; if not, write to the Free Software Foundation, Inc.,
19 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 """Code for rebuilding the local database.
23 Given a previous metadata dump and a filesystem tree (which may or may not
24 exactly match), recompute block signatures to the extent possible to rebuild
25 the local database. This can be used to recover from a local database loss,
26 given data from a previous backup.
29 from __future__ import division, print_function, unicode_literals
44 CHECKSUM_ALGORITHM = "sha224"
45 CHUNKER_PROGRAM = "cumulus-chunker-standalone"
47 # TODO: Move to somewhere common
48 SQLITE_TIMESTAMP = "%Y-%m-%d %H:%M:%S"
50 class Chunker(object):
51 """Compute sub-file chunk boundaries using a sliding Rabin fingerprint.
53 This duplicates the chunking algorithm in third_party/chunk.cc.
55 # Chunking parameters. These should match those in third_party/chunk.cc.
56 MODULUS = 0xbfe6b8a5bf378d83
58 BREAKMARK_VALUE = 0x78
60 MAX_CHUNK_SIZE = 65535
61 TARGET_CHUNK_SIZE = 4096
62 ALGORITHM_NAME = "lbfs-%d/%s" % (TARGET_CHUNK_SIZE, CHECKSUM_ALGORITHM)
64 # Minimum size of a block before we should bother storing subfile
65 # signatures (only applies when full blocks are used to store a file;
66 # subfile signatures are always used when subfile incrementals are
68 MINIMUM_OBJECT_SIZE = 16384
71 degree = self.MODULUS.bit_length() - 1
74 # Lookup table for polynomial reduction when shifting a new byte in,
75 # based on the high-order bits.
76 self.T = [self.polymult(1, i << degree, self.MODULUS) ^ (i << degree)
79 # Values to remove a byte from the signature when it falls out of the
81 self.U = [self.polymult(i, 1 << 8*(self.WINDOW_SIZE - 1),
82 self.MODULUS) for i in range(256)]
84 self.hash_algorithm = cumulus.CHECKSUM_ALGORITHMS[CHECKSUM_ALGORITHM]
85 self.hash_size = self.hash_algorithm().digestsize
87 def polymult(self, x, y, n):
88 # Polynomial multiplication: result = x * y
90 for i in range(x.bit_length()):
95 while result.bit_length() >= size:
96 result ^= n << (result.bit_length() - size)
99 def window_init(self):
100 # Sliding signature state is:
101 # [signature value, history buffer index, history buffer contents]
102 return [0, 0, [0] * self.WINDOW_SIZE]
104 def window_update(self, signature, byte):
106 offset = signature[1]
107 undo = self.U[signature[2][offset]]
108 poly = ((poly ^ undo) << 8) + byte
109 poly ^= self.T[poly >> self.degree]
112 signature[1] = (offset + 1) % self.WINDOW_SIZE
113 signature[2][offset] = byte
115 def compute_breaks(self, buf):
117 signature = self.window_init()
118 for i in range(len(buf)):
119 self.window_update(signature, ord(buf[i]))
120 block_len = i - breaks[-1] + 1
121 if ((signature[0] % self.TARGET_CHUNK_SIZE == self.BREAKMARK_VALUE
122 and block_len >= self.MIN_CHUNK_SIZE)
123 or block_len >= self.MAX_CHUNK_SIZE):
125 if breaks[-1] < len(buf):
126 breaks.append(len(buf))
129 def compute_signatures(self, buf, buf_offset=0):
130 """Break a buffer into chunks and compute chunk signatures.
133 buf: The data buffer.
134 buf_offset: The offset of the data buffer within the original
135 block, to handle cases where only a portion of the block is
139 A dictionary containing signature data. Keys are chunk offsets
140 (from the beginning of the block), and values are tuples (size, raw
143 breaks = self.compute_breaks(buf)
145 for i in range(1, len(breaks)):
146 chunk = buf[breaks[i-1]:breaks[i]]
147 hasher = self.hash_algorithm()
149 signatures[breaks[i-1] + buf_offset] = (breaks[i] - breaks[i-1],
153 def dump_signatures(self, signatures):
154 """Convert signatures to the binary format stored in the database."""
157 # Emit records indicating that no signatures are available for the next
158 # n bytes. Since the size is a 16-bit value, to skip larger distances
159 # multiple records must be emitted. An all-zero signature indicates
163 i = min(n, self.MAX_CHUNK_SIZE)
164 records.append(struct.pack(">H", i) + "\x00" * self.hash_size)
168 for next_start, (size, digest) in sorted(signatures.items()):
169 if next_start < position:
170 print("Warning: overlapping signatures, ignoring")
172 skip(next_start - position)
173 records.append(struct.pack(">H", size) + digest)
174 position = next_start + size
176 return "".join(records)
178 def load_signatures(self, signatures):
179 """Loads signatures from the binary format stored in the database."""
180 entry_size = 2 + self.hash_size
181 if len(signatures) % entry_size != 0:
182 print("Warning: Invalid signatures to load")
185 null_digest = "\x00" * self.hash_size
188 for i in range(len(signatures) // entry_size):
189 sig = signatures[i*entry_size:(i+1)*entry_size]
190 size, digest = struct.unpack(">H", sig[:2])[0], sig[2:]
191 if digest != null_digest:
192 result[position] = (size, digest)
197 class ChunkerExternal(Chunker):
198 """A Chunker which uses an external program to compute Rabin fingerprints.
200 This can run much more quickly than the Python code, but should otherwise
201 give identical results.
205 super(ChunkerExternal, self).__init__()
206 self.subproc = subprocess.Popen([CHUNKER_PROGRAM],
207 stdin=subprocess.PIPE,
208 stdout=subprocess.PIPE)
210 def compute_breaks(self, buf):
213 self.subproc.stdin.write(struct.pack(">i", len(buf)))
214 self.subproc.stdin.write(buf)
215 self.subproc.stdin.flush()
216 breaks = self.subproc.stdout.readline()
217 return [0] + [int(x) + 1 for x in breaks.split()]
220 class DatabaseRebuilder(object):
221 def __init__(self, database):
222 self.database = database
223 self.cursor = database.cursor()
224 self.segment_ids = {}
225 self.chunker = ChunkerExternal()
226 #self.chunker = Chunker()
228 def segment_to_id(self, segment):
229 if segment in self.segment_ids: return self.segment_ids[segment]
231 self.cursor.execute("""insert or ignore into segments(segment)
232 values (?)""", (segment,))
233 self.cursor.execute("""select segmentid from segments
234 where segment = ?""", (segment,))
235 id = self.cursor.fetchone()[0]
236 self.segment_ids[segment] = id
239 def rebuild(self, metadata, reference_path):
240 """Iterate through old metadata and use it to rebuild the database.
243 metadata: An iterable containing lines of the metadata log.
244 reference_path: Path to the root of a file system tree which may be
245 similar to data in the metadata log (used to recompute block
248 for fields in cumulus.parse(metadata, lambda l: len(l) == 0):
249 metadata = cumulus.MetadataItem(fields, None)
250 # Only process regular files; skip over other types (directories,
252 if metadata.items.type not in ("-", "f"): continue
254 path = os.path.join(reference_path, metadata.items.name)
256 # TODO: Check file size for early abort if different
257 self.rebuild_file(open(path), metadata)
260 pass # Ignore the file
262 self.database.commit()
264 def reload_segment_metadata(self, segment_metadata):
265 """Read a segment metadata (.meta) file into the local database.
267 Updates the segments table in the local database with information from
268 a a segment metadata backup file. Old data is not overwritten, so
269 loading a .meta file with partial information is fine.
271 for info in cumulus.parse(segment_metadata,
272 terminate=lambda l: len(l) == 0):
273 segment = info.pop("segment")
274 self.insert_segment_info(segment, info)
276 self.database.commit()
278 def insert_segment_info(self, segment, info):
279 id = self.segment_to_id(segment)
280 for k, v in info.items():
281 self.cursor.execute("update segments set " + k + " = ? "
282 "where segmentid = ?",
285 def rebuild_file(self, fp, metadata):
286 """Recompute database signatures if a file is unchanged.
288 If the current file contents match that from the old metadata (the
289 full-file hash matches), then recompute block- and chunk-level
290 signatures for the objects referenced by the file.
292 blocks = [cumulus.CumulusStore.parse_ref(b) for b in metadata.data()]
293 verifier = cumulus.ChecksumVerifier(metadata.items.checksum)
296 for segment, object, checksum, slice in blocks:
297 # Given a reference to a block of unknown size we don't know how to
298 # match up the data, so we have to give up on rebuilding for this
300 if slice is None: return
302 start, length, exact = slice
303 buf = fp.read(length)
306 # Zero blocks get no checksums, so skip further processing on them.
307 if object is None: continue
310 csum = cumulus.ChecksumCreator(CHECKSUM_ALGORITHM)
312 checksums[(segment, object)] = (length, csum.compute())
314 # Compute a lower bound on the object size.
315 oldlength, csum = checksums.get((segment, object), (0, None))
316 checksums[(segment, object)] = (max(oldlength, start + length),
319 if length >= self.chunker.MINIMUM_OBJECT_SIZE or not exact:
320 signatures = self.chunker.compute_signatures(buf, start)
321 subblock.setdefault((segment, object), {}).update(signatures)
325 subblock[k] = self.chunker.dump_signatures(subblock[k])
326 self.store_checksums(checksums, subblock)
328 print("Checksum mismatch")
330 def store_checksums(self, block_checksums, subblock_signatures):
331 for (segment, object), (size, checksum) in block_checksums.items():
332 segmentid = self.segment_to_id(segment)
334 """insert or ignore into block_index(segmentid, object)
337 self.cursor.execute("""select blockid from block_index
338 where segmentid = ? and object = ?""",
340 blockid = self.cursor.fetchall()[0][0]
342 # Store checksum only if it is available; we don't want to
343 # overwrite an existing checksum in the database with NULL.
344 if checksum is not None:
345 self.cursor.execute("""update block_index
347 where blockid = ?""",
350 # Update the object size. Our size may be an estimate, based on
351 # slices that we have seen. The size in the database must not be
352 # larger than the true size, but update it to the largest value
354 self.cursor.execute("""update block_index
355 set size = max(?, coalesce(size, 0))
356 where blockid = ?""",
359 # Store subblock signatures, if available.
360 # TODO: Even better would be to merge signature data, to handle the
361 # case where different files see different chunks of a block.
362 sigs = subblock_signatures.get((segment, object))
365 """insert or replace into subblock_signatures(
366 blockid, algorithm, signatures)
368 (blockid, self.chunker.ALGORITHM_NAME, buffer(sigs)))
371 class SegmentStateRebuilder(object):
372 """Reconstructs segment metadata files from raw segment data."""
375 self.filters = dict(cumulus.SEGMENT_FILTERS)
376 self.segment_pattern = cumulus.SEARCH_PATHS["segments"]
378 def compute_metadata(self, path, relative_path):
379 """Recompute metadata of a single segment.
382 path: Path to the segment file in the file system.
383 relative_path: Path relative to the root for the storage backend.
385 # Does the filename match that of a segment? If so, extract the
386 # extension to determine the filter to apply to decompress.
387 filename = os.path.basename(relative_path)
388 m = self.segment_pattern.match(filename)
390 segment_name = m.group(1)
391 extension = m.group(2)
392 if extension not in self.filters: return
393 filter_cmd = self.filters[extension]
396 st_buf = os.stat(path)
397 timestamp = time.strftime(SQLITE_TIMESTAMP,
398 time.gmtime(st_buf.st_mtime))
400 # Compute attributes of the compressed segment data.
402 with open(path) as segment:
404 checksummer = cumulus.ChecksumCreator(CHECKSUM_ALGORITHM)
406 buf = segment.read(BLOCK_SIZE)
407 if len(buf) == 0: break
408 disk_size += len(buf)
409 checksummer.update(buf)
410 checksum = checksummer.compute()
412 # Compute attributes of the objects within the segment.
415 with open(path) as segment:
416 decompressed = cumulus.CumulusStore.filter_data(segment, filter_cmd)
417 objects = tarfile.open(mode='r|', fileobj=decompressed)
418 for tarinfo in objects:
419 data_size += tarinfo.size
422 return {"segment": cumulus.uri_encode(segment_name),
423 "path": cumulus.uri_encode(relative_path),
424 "checksum": checksum,
425 "data_size": data_size,
426 "disk_size": disk_size,
427 "timestamp": timestamp}
429 if __name__ == "__main__":
430 # Sample code to reconstruct segment metadata--ought to be relocated.
432 segment_rebuilder = SegmentStateRebuilder()
435 for dirpath, dirnames, filenames in os.walk(topdir):
437 files.append(os.path.join(dirpath, f))
440 metadata = segment_rebuilder.compute_metadata(
442 os.path.relpath(f, topdir))
444 for (k, v) in sorted(metadata.items()):
445 print("%s: %s" % (k, v))
449 # Sample code to rebuild the segments table from metadata--needs to be
450 # merged with the code below.
452 rebuilder = DatabaseRebuilder(cumulus.LocalDatabase(sys.argv[1]))
453 rebuilder.reload_segment_metadata(open(sys.argv[2]))
456 # Read metadata from stdin; filter out lines starting with "@@" so the
457 # statcache file can be parsed as well.
458 metadata = (x for x in sys.stdin if not x.startswith("@@"))
460 rebuilder = DatabaseRebuilder(cumulus.LocalDatabase(sys.argv[1]))
461 rebuilder.rebuild(metadata, "/")