X-Git-Url: http://git.vrable.net/?a=blobdiff_plain;f=python%2Fcumulus%2Frebuild_database.py;h=a4af6403fe3581bd96978608e8a9a5b35e15b6ad;hb=a5f66616b1ec0c38328ad5131bf1c889ccc43659;hp=53481d5729f4909bf9d98b106f9d9374f61ecd0a;hpb=8f5e4e22660dba64b733acdaa9e4ed94731bdb72;p=cumulus.git diff --git a/python/cumulus/rebuild_database.py b/python/cumulus/rebuild_database.py index 53481d5..a4af640 100755 --- a/python/cumulus/rebuild_database.py +++ b/python/cumulus/rebuild_database.py @@ -30,16 +30,21 @@ import base64 import hashlib import itertools import os +import re import struct import subprocess import sys +import tarfile +import time import cumulus CHECKSUM_ALGORITHM = "sha224" - CHUNKER_PROGRAM = "cumulus-chunker-standalone" +# TODO: Move to somewhere common +SQLITE_TIMESTAMP = "%Y-%m-%d %H:%M:%S" + class Chunker(object): """Compute sub-file chunk boundaries using a sliding Rabin fingerprint. @@ -54,6 +59,12 @@ class Chunker(object): TARGET_CHUNK_SIZE = 4096 ALGORITHM_NAME = "lbfs-%d/%s" % (TARGET_CHUNK_SIZE, CHECKSUM_ALGORITHM) + # Minimum size of a block before we should bother storing subfile + # signatures (only applies when full blocks are used to store a file; + # subfile signatures are always used when subfile incrementals are + # present). + MINIMUM_OBJECT_SIZE = 16384 + def __init__(self): degree = self.MODULUS.bit_length() - 1 self.degree = degree @@ -102,7 +113,7 @@ class Chunker(object): def compute_breaks(self, buf): breaks = [0] signature = self.window_init() - for i in xrange(len(buf)): + for i in range(len(buf)): self.window_update(signature, ord(buf[i])) block_len = i - breaks[-1] + 1 if ((signature[0] % self.TARGET_CHUNK_SIZE == self.BREAKMARK_VALUE @@ -152,9 +163,9 @@ class Chunker(object): n -= i position = 0 - for next_start, (size, digest) in sorted(signatures.iteritems()): + for next_start, (size, digest) in sorted(signatures.items()): if next_start < position: - print "Warning: overlapping signatures, ignoring" + print("Warning: overlapping signatures, ignoring") continue skip(next_start - position) records.append(struct.pack(">H", size) + digest) @@ -166,7 +177,7 @@ class Chunker(object): """Loads signatures from the binary format stored in the database.""" entry_size = 2 + self.hash_size if len(signatures) % entry_size != 0: - print "Warning: Invalid signatures to load" + print("Warning: Invalid signatures to load") return {} null_digest = "\x00" * self.hash_size @@ -239,17 +250,43 @@ class DatabaseRebuilder(object): if metadata.items.type not in ("-", "f"): continue try: path = os.path.join(reference_path, metadata.items.name) - print "Path:", path + print("Path:", path) # TODO: Check file size for early abort if different self.rebuild_file(open(path), metadata) except IOError as e: - print e + print(e) pass # Ignore the file self.database.commit() + def reload_segment_metadata(self, segment_metadata): + """Read a segment metadata (.meta) file into the local database. + + Updates the segments table in the local database with information from + a a segment metadata backup file. Old data is not overwritten, so + loading a .meta file with partial information is fine. + """ + for info in cumulus.parse(segment_metadata, + terminate=lambda l: len(l) == 0): + segment = info.pop("segment") + self.insert_segment_info(segment, info) + + self.database.commit() + + def insert_segment_info(self, segment, info): + id = self.segment_to_id(segment) + for k, v in list(info.items()): + self.cursor.execute("update segments set " + k + " = ? " + "where segmentid = ?", + (v, id)) + def rebuild_file(self, fp, metadata): - """Compare""" + """Recompute database signatures if a file is unchanged. + + If the current file contents match that from the old metadata (the + full-file hash matches), then recompute block- and chunk-level + signatures for the objects referenced by the file. + """ blocks = [cumulus.CumulusStore.parse_ref(b) for b in metadata.data()] verifier = cumulus.ChecksumVerifier(metadata.items.checksum) checksums = {} @@ -264,51 +301,62 @@ class DatabaseRebuilder(object): buf = fp.read(length) verifier.update(buf) + # Zero blocks get no checksums, so skip further processing on them. + if object is None: continue + if exact: csum = cumulus.ChecksumCreator(CHECKSUM_ALGORITHM) csum.update(buf) checksums[(segment, object)] = (length, csum.compute()) + else: + # Compute a lower bound on the object size. + oldlength, csum = checksums.get((segment, object), (0, None)) + checksums[(segment, object)] = (max(oldlength, start + length), + csum) - signatures = self.chunker.compute_signatures(buf, start) - subblock.setdefault((segment, object), {}).update(signatures) + if length >= self.chunker.MINIMUM_OBJECT_SIZE or not exact: + signatures = self.chunker.compute_signatures(buf, start) + subblock.setdefault((segment, object), {}).update(signatures) if verifier.valid(): - print "Checksum matches, computed:", checksums for k in subblock: subblock[k] = self.chunker.dump_signatures(subblock[k]) - print "Subblock signatures:" - for k, v in subblock.iteritems(): - print k, base64.b16encode(v) self.store_checksums(checksums, subblock) else: - print "Checksum mismatch" + print("Checksum mismatch") def store_checksums(self, block_checksums, subblock_signatures): - for (segment, object), (size, checksum) in block_checksums.iteritems(): + for (segment, object), (size, checksum) in block_checksums.items(): segmentid = self.segment_to_id(segment) + self.cursor.execute( + """insert or ignore into block_index(segmentid, object) + values (?, ?)""", + (segmentid, object)) self.cursor.execute("""select blockid from block_index where segmentid = ? and object = ?""", (segmentid, object)) - blockid = self.cursor.fetchall() - if blockid: - blockid = blockid[0][0] - else: - blockid = None + blockid = self.cursor.fetchall()[0][0] - if blockid is not None: + # Store checksum only if it is available; we don't want to + # overwrite an existing checksum in the database with NULL. + if checksum is not None: self.cursor.execute("""update block_index - set checksum = ?, size = ? + set checksum = ? where blockid = ?""", - (checksum, size, blockid)) - else: - self.cursor.execute( - """insert into block_index( - segmentid, object, checksum, size, timestamp) - values (?, ?, ?, ?, julianday('now'))""", - (segmentid, object, checksum, size)) - blockid = self.cursor.lastrowid + (checksum, blockid)) + + # Update the object size. Our size may be an estimate, based on + # slices that we have seen. The size in the database must not be + # larger than the true size, but update it to the largest value + # possible. + self.cursor.execute("""update block_index + set size = max(?, coalesce(size, 0)) + where blockid = ?""", + (size, blockid)) # Store subblock signatures, if available. + # TODO: Even better would be to merge signature data, to handle the + # case where different files see different chunks of a block. sigs = subblock_signatures.get((segment, object)) if sigs: self.cursor.execute( @@ -318,7 +366,91 @@ class DatabaseRebuilder(object): (blockid, self.chunker.ALGORITHM_NAME, buffer(sigs))) +class SegmentStateRebuilder(object): + """Reconstructs segment metadata files from raw segment data.""" + + def __init__(self): + self.filters = dict(cumulus.SEGMENT_FILTERS) + self.segment_pattern = cumulus.SEARCH_PATHS["segments"] + + def compute_metadata(self, path, relative_path): + """Recompute metadata of a single segment. + + Args: + path: Path to the segment file in the file system. + relative_path: Path relative to the root for the storage backend. + """ + # Does the filename match that of a segment? If so, extract the + # extension to determine the filter to apply to decompress. + filename = os.path.basename(relative_path) + m = self.segment_pattern.match(filename) + if not m: return + segment_name = m.group(1) + extension = m.group(2) + if extension not in self.filters: return + filter_cmd = self.filters[extension] + + # File attributes. + st_buf = os.stat(path) + timestamp = time.strftime(SQLITE_TIMESTAMP, + time.gmtime(st_buf.st_mtime)) + + # Compute attributes of the compressed segment data. + BLOCK_SIZE = 4096 + with open(path) as segment: + disk_size = 0 + checksummer = cumulus.ChecksumCreator(CHECKSUM_ALGORITHM) + while True: + buf = segment.read(BLOCK_SIZE) + if len(buf) == 0: break + disk_size += len(buf) + checksummer.update(buf) + checksum = checksummer.compute() + + # Compute attributes of the objects within the segment. + data_size = 0 + object_count = 0 + with open(path) as segment: + decompressed = cumulus.CumulusStore.filter_data(segment, filter_cmd) + objects = tarfile.open(mode='r|', fileobj=decompressed) + for tarinfo in objects: + data_size += tarinfo.size + object_count += 1 + + return {"segment": cumulus.uri_encode(segment_name), + "path": cumulus.uri_encode(relative_path), + "checksum": checksum, + "data_size": data_size, + "disk_size": disk_size, + "timestamp": timestamp} + if __name__ == "__main__": + # Sample code to reconstruct segment metadata--ought to be relocated. + if False: + segment_rebuilder = SegmentStateRebuilder() + topdir = sys.argv[1] + files = [] + for dirpath, dirnames, filenames in os.walk(topdir): + for f in filenames: + files.append(os.path.join(dirpath, f)) + files.sort() + for f in files: + metadata = segment_rebuilder.compute_metadata( + f, + os.path.relpath(f, topdir)) + if metadata: + for (k, v) in sorted(metadata.items()): + print("%s: %s" % (k, v)) + print() + sys.exit(0) + + # Sample code to rebuild the segments table from metadata--needs to be + # merged with the code below. + if False: + rebuilder = DatabaseRebuilder(cumulus.LocalDatabase(sys.argv[1])) + rebuilder.reload_segment_metadata(open(sys.argv[2])) + sys.exit(0) + # Read metadata from stdin; filter out lines starting with "@@" so the # statcache file can be parsed as well. metadata = (x for x in sys.stdin if not x.startswith("@@"))