import hashlib
import itertools
import os
+import re
import struct
import subprocess
import sys
+import tarfile
+import time
import cumulus
CHECKSUM_ALGORITHM = "sha224"
-
CHUNKER_PROGRAM = "cumulus-chunker-standalone"
+# TODO: Move to somewhere common
+SQLITE_TIMESTAMP = "%Y-%m-%d %H:%M:%S"
+
class Chunker(object):
"""Compute sub-file chunk boundaries using a sliding Rabin fingerprint.
TARGET_CHUNK_SIZE = 4096
ALGORITHM_NAME = "lbfs-%d/%s" % (TARGET_CHUNK_SIZE, CHECKSUM_ALGORITHM)
+ # Minimum size of a block before we should bother storing subfile
+ # signatures (only applies when full blocks are used to store a file;
+ # subfile signatures are always used when subfile incrementals are
+ # present).
+ MINIMUM_OBJECT_SIZE = 16384
+
def __init__(self):
degree = self.MODULUS.bit_length() - 1
self.degree = degree
self.database.commit()
+ def reload_segment_metadata(self, segment_metadata):
+ """Read a segment metadata (.meta) file into the local database.
+
+ Updates the segments table in the local database with information from
+ a a segment metadata backup file. Old data is not overwritten, so
+ loading a .meta file with partial information is fine.
+ """
+ for info in cumulus.parse(segment_metadata,
+ terminate=lambda l: len(l) == 0):
+ segment = info.pop("segment")
+ self.insert_segment_info(segment, info)
+
+ self.database.commit()
+
+ def insert_segment_info(self, segment, info):
+ id = self.segment_to_id(segment)
+ for k, v in info.items():
+ self.cursor.execute("update segments set " + k + " = ? "
+ "where segmentid = ?",
+ (v, id))
+
def rebuild_file(self, fp, metadata):
- """Compare"""
+ """Recompute database signatures if a file is unchanged.
+
+ If the current file contents match that from the old metadata (the
+ full-file hash matches), then recompute block- and chunk-level
+ signatures for the objects referenced by the file.
+ """
blocks = [cumulus.CumulusStore.parse_ref(b) for b in metadata.data()]
verifier = cumulus.ChecksumVerifier(metadata.items.checksum)
checksums = {}
buf = fp.read(length)
verifier.update(buf)
+ # Zero blocks get no checksums, so skip further processing on them.
+ if object is None: continue
+
if exact:
csum = cumulus.ChecksumCreator(CHECKSUM_ALGORITHM)
csum.update(buf)
checksums[(segment, object)] = (length, csum.compute())
+ else:
+ # Compute a lower bound on the object size.
+ oldlength, csum = checksums.get((segment, object), (0, None))
+ checksums[(segment, object)] = (max(oldlength, start + length),
+ csum)
- signatures = self.chunker.compute_signatures(buf, start)
- subblock.setdefault((segment, object), {}).update(signatures)
+ if length >= self.chunker.MINIMUM_OBJECT_SIZE or not exact:
+ signatures = self.chunker.compute_signatures(buf, start)
+ subblock.setdefault((segment, object), {}).update(signatures)
if verifier.valid():
- print "Checksum matches, computed:", checksums
for k in subblock:
subblock[k] = self.chunker.dump_signatures(subblock[k])
- print "Subblock signatures:"
- for k, v in subblock.iteritems():
- print k, base64.b16encode(v)
self.store_checksums(checksums, subblock)
else:
print "Checksum mismatch"
def store_checksums(self, block_checksums, subblock_signatures):
for (segment, object), (size, checksum) in block_checksums.iteritems():
segmentid = self.segment_to_id(segment)
+ self.cursor.execute(
+ """insert or ignore into block_index(segmentid, object)
+ values (?, ?)""",
+ (segmentid, object))
self.cursor.execute("""select blockid from block_index
where segmentid = ? and object = ?""",
(segmentid, object))
- blockid = self.cursor.fetchall()
- if blockid:
- blockid = blockid[0][0]
- else:
- blockid = None
+ blockid = self.cursor.fetchall()[0][0]
- if blockid is not None:
+ # Store checksum only if it is available; we don't want to
+ # overwrite an existing checksum in the database with NULL.
+ if checksum is not None:
self.cursor.execute("""update block_index
- set checksum = ?, size = ?
+ set checksum = ?
where blockid = ?""",
- (checksum, size, blockid))
- else:
- self.cursor.execute(
- """insert into block_index(
- segmentid, object, checksum, size, timestamp)
- values (?, ?, ?, ?, julianday('now'))""",
- (segmentid, object, checksum, size))
- blockid = self.cursor.lastrowid
+ (checksum, blockid))
+
+ # Update the object size. Our size may be an estimate, based on
+ # slices that we have seen. The size in the database must not be
+ # larger than the true size, but update it to the largest value
+ # possible.
+ self.cursor.execute("""update block_index
+ set size = max(?, coalesce(size, 0))
+ where blockid = ?""",
+ (size, blockid))
# Store subblock signatures, if available.
+ # TODO: Even better would be to merge signature data, to handle the
+ # case where different files see different chunks of a block.
sigs = subblock_signatures.get((segment, object))
if sigs:
self.cursor.execute(
(blockid, self.chunker.ALGORITHM_NAME, buffer(sigs)))
+class SegmentStateRebuilder(object):
+ """Reconstructs segment metadata files from raw segment data."""
+
+ def __init__(self):
+ self.filters = dict(cumulus.SEGMENT_FILTERS)
+ self.segment_pattern = cumulus.SEARCH_PATHS["segments"]
+
+ def compute_metadata(self, path, relative_path):
+ """Recompute metadata of a single segment.
+
+ Args:
+ path: Path to the segment file in the file system.
+ relative_path: Path relative to the root for the storage backend.
+ """
+ # Does the filename match that of a segment? If so, extract the
+ # extension to determine the filter to apply to decompress.
+ filename = os.path.basename(relative_path)
+ m = self.segment_pattern.match(filename)
+ if not m: return
+ segment_name = m.group(1)
+ extension = m.group(2)
+ if extension not in self.filters: return
+ filter_cmd = self.filters[extension]
+
+ # File attributes.
+ st_buf = os.stat(path)
+ timestamp = time.strftime(SQLITE_TIMESTAMP,
+ time.gmtime(st_buf.st_mtime))
+
+ # Compute attributes of the compressed segment data.
+ BLOCK_SIZE = 4096
+ with open(path) as segment:
+ disk_size = 0
+ checksummer = cumulus.ChecksumCreator(CHECKSUM_ALGORITHM)
+ while True:
+ buf = segment.read(BLOCK_SIZE)
+ if len(buf) == 0: break
+ disk_size += len(buf)
+ checksummer.update(buf)
+ checksum = checksummer.compute()
+
+ # Compute attributes of the objects within the segment.
+ data_size = 0
+ object_count = 0
+ with open(path) as segment:
+ decompressed = cumulus.CumulusStore.filter_data(segment, filter_cmd)
+ objects = tarfile.open(mode='r|', fileobj=decompressed)
+ for tarinfo in objects:
+ data_size += tarinfo.size
+ object_count += 1
+
+ return {"segment": cumulus.uri_encode(segment_name),
+ "path": cumulus.uri_encode(relative_path),
+ "checksum": checksum,
+ "data_size": data_size,
+ "disk_size": disk_size,
+ "timestamp": timestamp}
+
if __name__ == "__main__":
+ # Sample code to reconstruct segment metadata--ought to be relocated.
+ if False:
+ segment_rebuilder = SegmentStateRebuilder()
+ topdir = sys.argv[1]
+ files = []
+ for dirpath, dirnames, filenames in os.walk(topdir):
+ for f in filenames:
+ files.append(os.path.join(dirpath, f))
+ files.sort()
+ for f in files:
+ metadata = segment_rebuilder.compute_metadata(
+ f,
+ os.path.relpath(f, topdir))
+ if metadata:
+ for (k, v) in sorted(metadata.items()):
+ print "%s: %s" % (k, v)
+ print
+ sys.exit(0)
+
+ # Sample code to rebuild the segments table from metadata--needs to be
+ # merged with the code below.
+ if False:
+ rebuilder = DatabaseRebuilder(cumulus.LocalDatabase(sys.argv[1]))
+ rebuilder.reload_segment_metadata(open(sys.argv[2]))
+ sys.exit(0)
+
# Read metadata from stdin; filter out lines starting with "@@" so the
# statcache file can be parsed as well.
metadata = (x for x in sys.stdin if not x.startswith("@@"))