From: Michael Vrable Date: Thu, 25 Apr 2013 03:52:15 +0000 (-0700) Subject: Add code to rebuild_database.py to recompute segment metadata. X-Git-Url: http://git.vrable.net/?p=cumulus.git;a=commitdiff_plain;h=5949214bc01b2c762adfb724d1e63b7e130c91f4 Add code to rebuild_database.py to recompute segment metadata. This requires that the segments be available. --- diff --git a/python/cumulus/__init__.py b/python/cumulus/__init__.py index 8d57c21..ef35325 100644 --- a/python/cumulus/__init__.py +++ b/python/cumulus/__init__.py @@ -55,6 +55,7 @@ SEGMENT_FILTERS = [ (".gpg", "cumulus-filter-gpg --decrypt"), (".gz", "gzip -dc"), (".bz2", "bzip2 -dc"), + ("", None), ] def uri_decode(s): @@ -204,13 +205,16 @@ class SearchPath(object): continue raise cumulus.store.NotFoundError(basename) + def match(self, filename): + return self._regex.match(filename) + def list(self, backend): success = False for d in self.directories(): try: for f in backend.list(d): success = True - m = self._regex.match(f) + m = self.match(f) if m: yield (os.path.join(d, f), m) except cumulus.store.NotFoundError: pass @@ -229,7 +233,7 @@ SEARCH_PATHS = { SearchPathEntry("", ".sha1sums")]), "segments": SearchPath( (r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})" - r"(\.\S+)?$"), + r"\.tar(\.\S+)?$"), itertools.chain( _build_segments_searchpath("segments0"), _build_segments_searchpath("segments1"), @@ -345,12 +349,10 @@ class CumulusStore: snapshot_file = self.backend.open_snapshot(snapshot)[0] return snapshot_file.read().splitlines(True) - def get_segment(self, segment): - accessed_segments.add(segment) - - (segment_fp, path, filter_cmd) = self.backend.open_segment(segment) + @staticmethod + def filter_data(filehandle, filter_cmd): if filter_cmd is None: - return segment_fp + return filehandle (input, output) = os.popen2(filter_cmd) def copy_thread(src, dst): BLOCK_SIZE = 4096 @@ -360,9 +362,15 @@ class CumulusStore: dst.write(block) src.close() dst.close() - thread.start_new_thread(copy_thread, (segment_fp, input)) + thread.start_new_thread(copy_thread, (filehandle, input)) return output + def get_segment(self, segment): + accessed_segments.add(segment) + + (segment_fp, path, filter_cmd) = self.backend.open_segment(segment) + return self.filter_data(segment_fp, filter_cmd) + def load_segment(self, segment): seg = tarfile.open(segment, 'r|', self.get_segment(segment)) for item in seg: diff --git a/python/cumulus/rebuild_database.py b/python/cumulus/rebuild_database.py index 53481d5..a02bdd8 100755 --- a/python/cumulus/rebuild_database.py +++ b/python/cumulus/rebuild_database.py @@ -30,9 +30,11 @@ import base64 import hashlib import itertools import os +import re import struct import subprocess import sys +import tarfile import cumulus @@ -318,7 +320,76 @@ class DatabaseRebuilder(object): (blockid, self.chunker.ALGORITHM_NAME, buffer(sigs))) +class SegmentStateRebuilder(object): + """Reconstructs segment metadata files from raw segment data.""" + + def __init__(self): + self.filters = dict(cumulus.SEGMENT_FILTERS) + self.segment_pattern = cumulus.SEARCH_PATHS["segments"] + + def compute_metadata(self, path, relative_path): + """Recompute metadata of a single segment. + + Args: + path: Path to the segment file in the file system. + relative_path: Path relative to the root for the storage backend. + """ + # Does the filename match that of a segment? If so, extract the + # extension to determine the filter to apply to decompress. + filename = os.path.basename(relative_path) + m = self.segment_pattern.match(filename) + if not m: return + segment_name = m.group(1) + extension = m.group(2) + if extension not in self.filters: return + filter_cmd = self.filters[extension] + + # Compute attributes of the compressed segment data. + BLOCK_SIZE = 4096 + with open(path) as segment: + disk_size = 0 + checksummer = cumulus.ChecksumCreator(CHECKSUM_ALGORITHM) + while True: + buf = segment.read(BLOCK_SIZE) + if len(buf) == 0: break + disk_size += len(buf) + checksummer.update(buf) + checksum = checksummer.compute() + + # Compute attributes of the objects within the segment. + data_size = 0 + object_count = 0 + with open(path) as segment: + decompressed = cumulus.CumulusStore.filter_data(segment, filter_cmd) + objects = tarfile.open(mode='r|', fileobj=decompressed) + for tarinfo in objects: + data_size += tarinfo.size + object_count += 1 + + return {"segment": segment_name, + "path": relative_path, + "checksum": checksum, + "data_size": data_size, + "disk_size": disk_size} + if __name__ == "__main__": + segment_rebuilder = SegmentStateRebuilder() + topdir = sys.argv[1] + files = [] + for dirpath, dirnames, filenames in os.walk(topdir): + for f in filenames: + files.append(os.path.join(dirpath, f)) + files.sort() + for f in files: + metadata = segment_rebuilder.compute_metadata( + f, + os.path.relpath(f, topdir)) + if metadata: + for (k, v) in sorted(metadata.items()): + print "%s: %s" % (k, cumulus.uri_encode(str(v))) + print + sys.exit(0) + # Read metadata from stdin; filter out lines starting with "@@" so the # statcache file can be parsed as well. metadata = (x for x in sys.stdin if not x.startswith("@@"))