Improve database rebuilding.
authorMichael Vrable <vrable@cs.hmc.edu>
Fri, 26 Apr 2013 19:50:30 +0000 (12:50 -0700)
committerMichael Vrable <vrable@cs.hmc.edu>
Sun, 26 Jan 2014 20:26:33 +0000 (12:26 -0800)
  - Compute estimated sizes (lower bounds) for objects.
  - Do not rebuild subfile incrementals for very small objects.

python/cumulus/rebuild_database.py

index a02bdd8..10513ba 100755 (executable)
@@ -56,6 +56,12 @@ class Chunker(object):
     TARGET_CHUNK_SIZE = 4096
     ALGORITHM_NAME = "lbfs-%d/%s" % (TARGET_CHUNK_SIZE, CHECKSUM_ALGORITHM)
 
+    # Minimum size of a block before we should bother storing subfile
+    # signatures (only applies when full blocks are used to store a file;
+    # subfile signatures are always used when subfile incrementals are
+    # present).
+    MINIMUM_OBJECT_SIZE = 16384
+
     def __init__(self):
         degree = self.MODULUS.bit_length() - 1
         self.degree = degree
@@ -251,7 +257,12 @@ class DatabaseRebuilder(object):
         self.database.commit()
 
     def rebuild_file(self, fp, metadata):
-        """Compare"""
+        """Recompute database signatures if a file is unchanged.
+
+        If the current file contents match that from the old metadata (the
+        full-file hash matches), then recompute block- and chunk-level
+        signatures for the objects referenced by the file.
+        """
         blocks = [cumulus.CumulusStore.parse_ref(b) for b in metadata.data()]
         verifier = cumulus.ChecksumVerifier(metadata.items.checksum)
         checksums = {}
@@ -270,17 +281,19 @@ class DatabaseRebuilder(object):
                 csum = cumulus.ChecksumCreator(CHECKSUM_ALGORITHM)
                 csum.update(buf)
                 checksums[(segment, object)] = (length, csum.compute())
+            else:
+                # Compute a lower bound on the object size.
+                oldlength, csum = checksums.get((segment, object), (0, None))
+                checksums[(segment, object)] = (max(oldlength, start + length),
+                                                csum)
 
-            signatures = self.chunker.compute_signatures(buf, start)
-            subblock.setdefault((segment, object), {}).update(signatures)
+            if length >= self.chunker.MINIMUM_OBJECT_SIZE or not exact:
+                signatures = self.chunker.compute_signatures(buf, start)
+                subblock.setdefault((segment, object), {}).update(signatures)
 
         if verifier.valid():
-            print "Checksum matches, computed:", checksums
             for k in subblock:
                 subblock[k] = self.chunker.dump_signatures(subblock[k])
-            print "Subblock signatures:"
-            for k, v in subblock.iteritems():
-                print k, base64.b16encode(v)
             self.store_checksums(checksums, subblock)
         else:
             print "Checksum mismatch"
@@ -288,29 +301,35 @@ class DatabaseRebuilder(object):
     def store_checksums(self, block_checksums, subblock_signatures):
         for (segment, object), (size, checksum) in block_checksums.iteritems():
             segmentid = self.segment_to_id(segment)
+            self.cursor.execute(
+                """insert or ignore into block_index(segmentid, object)
+                   values (?, ?)""",
+                (segmentid, object))
             self.cursor.execute("""select blockid from block_index
                                    where segmentid = ? and object = ?""",
                                 (segmentid, object))
-            blockid = self.cursor.fetchall()
-            if blockid:
-                blockid = blockid[0][0]
-            else:
-                blockid = None
+            blockid = self.cursor.fetchall()[0][0]
 
-            if blockid is not None:
+            # Store checksum only if it is available; we don't want to
+            # overwrite an existing checksum in the database with NULL.
+            if checksum is not None:
                 self.cursor.execute("""update block_index
-                                       set checksum = ?, size = ?
+                                       set checksum = ?
                                        where blockid = ?""",
-                                    (checksum, size, blockid))
-            else:
-                self.cursor.execute(
-                    """insert into block_index(
-                           segmentid, object, checksum, size, timestamp)
-                       values (?, ?, ?, ?, julianday('now'))""",
-                    (segmentid, object, checksum, size))
-                blockid = self.cursor.lastrowid
+                                    (checksum, blockid))
+
+            # Update the object size.  Our size may be an estimate, based on
+            # slices that we have seen.  The size in the database must not be
+            # larger than the true size, but update it to the largest value
+            # possible.
+            self.cursor.execute("""update block_index
+                                   set size = max(?, coalesce(size, 0))
+                                   where blockid = ?""",
+                                (size, blockid))
 
             # Store subblock signatures, if available.
+            # TODO: Even better would be to merge signature data, to handle the
+            # case where different files see different chunks of a block.
             sigs = subblock_signatures.get((segment, object))
             if sigs:
                 self.cursor.execute(
@@ -373,22 +392,23 @@ class SegmentStateRebuilder(object):
                 "disk_size": disk_size}
 
 if __name__ == "__main__":
-    segment_rebuilder = SegmentStateRebuilder()
-    topdir = sys.argv[1]
-    files = []
-    for dirpath, dirnames, filenames in os.walk(topdir):
-        for f in filenames:
-            files.append(os.path.join(dirpath, f))
-    files.sort()
-    for f in files:
-        metadata = segment_rebuilder.compute_metadata(
-            f,
-            os.path.relpath(f, topdir))
-        if metadata:
-            for (k, v) in sorted(metadata.items()):
-                print "%s: %s" % (k, cumulus.uri_encode(str(v)))
-            print
-    sys.exit(0)
+    if False:
+        segment_rebuilder = SegmentStateRebuilder()
+        topdir = sys.argv[1]
+        files = []
+        for dirpath, dirnames, filenames in os.walk(topdir):
+            for f in filenames:
+                files.append(os.path.join(dirpath, f))
+        files.sort()
+        for f in files:
+            metadata = segment_rebuilder.compute_metadata(
+                f,
+                os.path.relpath(f, topdir))
+            if metadata:
+                for (k, v) in sorted(metadata.items()):
+                    print "%s: %s" % (k, cumulus.uri_encode(str(v)))
+                print
+        sys.exit(0)
 
     # Read metadata from stdin; filter out lines starting with "@@" so the
     # statcache file can be parsed as well.