import subprocess
import sys
import tarfile
+import time
import cumulus
CHECKSUM_ALGORITHM = "sha224"
-
CHUNKER_PROGRAM = "cumulus-chunker-standalone"
+# TODO: Move to somewhere common
+SQLITE_TIMESTAMP = "%Y-%m-%d %H:%M:%S"
+
class Chunker(object):
"""Compute sub-file chunk boundaries using a sliding Rabin fingerprint.
TARGET_CHUNK_SIZE = 4096
ALGORITHM_NAME = "lbfs-%d/%s" % (TARGET_CHUNK_SIZE, CHECKSUM_ALGORITHM)
+ # Minimum size of a block before we should bother storing subfile
+ # signatures (only applies when full blocks are used to store a file;
+ # subfile signatures are always used when subfile incrementals are
+ # present).
+ MINIMUM_OBJECT_SIZE = 16384
+
def __init__(self):
degree = self.MODULUS.bit_length() - 1
self.degree = degree
self.database.commit()
+ def reload_segment_metadata(self, segment_metadata):
+ """Read a segment metadata (.meta) file into the local database.
+
+ Updates the segments table in the local database with information from
+ a a segment metadata backup file. Old data is not overwritten, so
+ loading a .meta file with partial information is fine.
+ """
+ for info in cumulus.parse(segment_metadata,
+ terminate=lambda l: len(l) == 0):
+ segment = info.pop("segment")
+ self.insert_segment_info(segment, info)
+
+ self.database.commit()
+
+ def insert_segment_info(self, segment, info):
+ id = self.segment_to_id(segment)
+ for k, v in info.items():
+ self.cursor.execute("update segments set " + k + " = ? "
+ "where segmentid = ?",
+ (v, id))
+
def rebuild_file(self, fp, metadata):
- """Compare"""
+ """Recompute database signatures if a file is unchanged.
+
+ If the current file contents match that from the old metadata (the
+ full-file hash matches), then recompute block- and chunk-level
+ signatures for the objects referenced by the file.
+ """
blocks = [cumulus.CumulusStore.parse_ref(b) for b in metadata.data()]
verifier = cumulus.ChecksumVerifier(metadata.items.checksum)
checksums = {}
buf = fp.read(length)
verifier.update(buf)
+ # Zero blocks get no checksums, so skip further processing on them.
+ if object is None: continue
+
if exact:
csum = cumulus.ChecksumCreator(CHECKSUM_ALGORITHM)
csum.update(buf)
checksums[(segment, object)] = (length, csum.compute())
+ else:
+ # Compute a lower bound on the object size.
+ oldlength, csum = checksums.get((segment, object), (0, None))
+ checksums[(segment, object)] = (max(oldlength, start + length),
+ csum)
- signatures = self.chunker.compute_signatures(buf, start)
- subblock.setdefault((segment, object), {}).update(signatures)
+ if length >= self.chunker.MINIMUM_OBJECT_SIZE or not exact:
+ signatures = self.chunker.compute_signatures(buf, start)
+ subblock.setdefault((segment, object), {}).update(signatures)
if verifier.valid():
- print "Checksum matches, computed:", checksums
for k in subblock:
subblock[k] = self.chunker.dump_signatures(subblock[k])
- print "Subblock signatures:"
- for k, v in subblock.iteritems():
- print k, base64.b16encode(v)
self.store_checksums(checksums, subblock)
else:
print "Checksum mismatch"
def store_checksums(self, block_checksums, subblock_signatures):
for (segment, object), (size, checksum) in block_checksums.iteritems():
segmentid = self.segment_to_id(segment)
+ self.cursor.execute(
+ """insert or ignore into block_index(segmentid, object)
+ values (?, ?)""",
+ (segmentid, object))
self.cursor.execute("""select blockid from block_index
where segmentid = ? and object = ?""",
(segmentid, object))
- blockid = self.cursor.fetchall()
- if blockid:
- blockid = blockid[0][0]
- else:
- blockid = None
+ blockid = self.cursor.fetchall()[0][0]
- if blockid is not None:
+ # Store checksum only if it is available; we don't want to
+ # overwrite an existing checksum in the database with NULL.
+ if checksum is not None:
self.cursor.execute("""update block_index
- set checksum = ?, size = ?
+ set checksum = ?
where blockid = ?""",
- (checksum, size, blockid))
- else:
- self.cursor.execute(
- """insert into block_index(
- segmentid, object, checksum, size, timestamp)
- values (?, ?, ?, ?, julianday('now'))""",
- (segmentid, object, checksum, size))
- blockid = self.cursor.lastrowid
+ (checksum, blockid))
+
+ # Update the object size. Our size may be an estimate, based on
+ # slices that we have seen. The size in the database must not be
+ # larger than the true size, but update it to the largest value
+ # possible.
+ self.cursor.execute("""update block_index
+ set size = max(?, coalesce(size, 0))
+ where blockid = ?""",
+ (size, blockid))
# Store subblock signatures, if available.
+ # TODO: Even better would be to merge signature data, to handle the
+ # case where different files see different chunks of a block.
sigs = subblock_signatures.get((segment, object))
if sigs:
self.cursor.execute(
if extension not in self.filters: return
filter_cmd = self.filters[extension]
+ # File attributes.
+ st_buf = os.stat(path)
+ timestamp = time.strftime(SQLITE_TIMESTAMP,
+ time.gmtime(st_buf.st_mtime))
+
# Compute attributes of the compressed segment data.
BLOCK_SIZE = 4096
with open(path) as segment:
data_size += tarinfo.size
object_count += 1
- return {"segment": segment_name,
- "path": relative_path,
+ return {"segment": cumulus.uri_encode(segment_name),
+ "path": cumulus.uri_encode(relative_path),
"checksum": checksum,
"data_size": data_size,
- "disk_size": disk_size}
+ "disk_size": disk_size,
+ "timestamp": timestamp}
if __name__ == "__main__":
- segment_rebuilder = SegmentStateRebuilder()
- topdir = sys.argv[1]
- files = []
- for dirpath, dirnames, filenames in os.walk(topdir):
- for f in filenames:
- files.append(os.path.join(dirpath, f))
- files.sort()
- for f in files:
- metadata = segment_rebuilder.compute_metadata(
- f,
- os.path.relpath(f, topdir))
- if metadata:
- for (k, v) in sorted(metadata.items()):
- print "%s: %s" % (k, cumulus.uri_encode(str(v)))
- print
- sys.exit(0)
+ # Sample code to reconstruct segment metadata--ought to be relocated.
+ if False:
+ segment_rebuilder = SegmentStateRebuilder()
+ topdir = sys.argv[1]
+ files = []
+ for dirpath, dirnames, filenames in os.walk(topdir):
+ for f in filenames:
+ files.append(os.path.join(dirpath, f))
+ files.sort()
+ for f in files:
+ metadata = segment_rebuilder.compute_metadata(
+ f,
+ os.path.relpath(f, topdir))
+ if metadata:
+ for (k, v) in sorted(metadata.items()):
+ print "%s: %s" % (k, v)
+ print
+ sys.exit(0)
+
+ # Sample code to rebuild the segments table from metadata--needs to be
+ # merged with the code below.
+ if False:
+ rebuilder = DatabaseRebuilder(cumulus.LocalDatabase(sys.argv[1]))
+ rebuilder.reload_segment_metadata(open(sys.argv[2]))
+ sys.exit(0)
# Read metadata from stdin; filter out lines starting with "@@" so the
# statcache file can be parsed as well.