Bugfix for database checksum reconstruction with zero blocks.
[cumulus.git] / python / cumulus / rebuild_database.py
index a1ba9e8..10a5f9a 100755 (executable)
@@ -30,16 +30,21 @@ import base64
 import hashlib
 import itertools
 import os
+import re
 import struct
 import subprocess
 import sys
+import tarfile
+import time
 
 import cumulus
 
 CHECKSUM_ALGORITHM = "sha224"
-
 CHUNKER_PROGRAM = "cumulus-chunker-standalone"
 
+# TODO: Move to somewhere common
+SQLITE_TIMESTAMP = "%Y-%m-%d %H:%M:%S"
+
 class Chunker(object):
     """Compute sub-file chunk boundaries using a sliding Rabin fingerprint.
 
@@ -54,6 +59,12 @@ class Chunker(object):
     TARGET_CHUNK_SIZE = 4096
     ALGORITHM_NAME = "lbfs-%d/%s" % (TARGET_CHUNK_SIZE, CHECKSUM_ALGORITHM)
 
+    # Minimum size of a block before we should bother storing subfile
+    # signatures (only applies when full blocks are used to store a file;
+    # subfile signatures are always used when subfile incrementals are
+    # present).
+    MINIMUM_OBJECT_SIZE = 16384
+
     def __init__(self):
         degree = self.MODULUS.bit_length() - 1
         self.degree = degree
@@ -201,7 +212,7 @@ class ChunkerExternal(Chunker):
         self.subproc.stdin.write(buf)
         self.subproc.stdin.flush()
         breaks = self.subproc.stdout.readline()
-        return [int(x) + 1 for x in breaks.split()]
+        return [0] + [int(x) + 1 for x in breaks.split()]
 
 
 class DatabaseRebuilder(object):
@@ -209,10 +220,8 @@ class DatabaseRebuilder(object):
         self.database = database
         self.cursor = database.cursor()
         self.segment_ids = {}
-        #try:
         self.chunker = ChunkerExternal()
-        #except:
-        #    self.chunker = Chunker()
+        #self.chunker = Chunker()
 
     def segment_to_id(self, segment):
         if segment in self.segment_ids: return self.segment_ids[segment]
@@ -250,8 +259,34 @@ class DatabaseRebuilder(object):
 
         self.database.commit()
 
+    def reload_segment_metadata(self, segment_metadata):
+        """Read a segment metadata (.meta) file into the local database.
+
+        Updates the segments table in the local database with information from
+        a a segment metadata backup file.  Old data is not overwritten, so
+        loading a .meta file with partial information is fine.
+        """
+        for info in cumulus.parse(segment_metadata,
+                                     terminate=lambda l: len(l) == 0):
+            segment = info.pop("segment")
+            self.insert_segment_info(segment, info)
+
+        self.database.commit()
+
+    def insert_segment_info(self, segment, info):
+        id = self.segment_to_id(segment)
+        for k, v in info.items():
+            self.cursor.execute("update segments set " + k + " = ? "
+                                "where segmentid = ?",
+                                (v, id))
+
     def rebuild_file(self, fp, metadata):
-        """Compare"""
+        """Recompute database signatures if a file is unchanged.
+
+        If the current file contents match that from the old metadata (the
+        full-file hash matches), then recompute block- and chunk-level
+        signatures for the objects referenced by the file.
+        """
         blocks = [cumulus.CumulusStore.parse_ref(b) for b in metadata.data()]
         verifier = cumulus.ChecksumVerifier(metadata.items.checksum)
         checksums = {}
@@ -266,21 +301,26 @@ class DatabaseRebuilder(object):
             buf = fp.read(length)
             verifier.update(buf)
 
+            # Zero blocks get no checksums, so skip further processing on them.
+            if object is None: continue
+
             if exact:
                 csum = cumulus.ChecksumCreator(CHECKSUM_ALGORITHM)
                 csum.update(buf)
                 checksums[(segment, object)] = (length, csum.compute())
+            else:
+                # Compute a lower bound on the object size.
+                oldlength, csum = checksums.get((segment, object), (0, None))
+                checksums[(segment, object)] = (max(oldlength, start + length),
+                                                csum)
 
-            signatures = self.chunker.compute_signatures(buf, start)
-            subblock.setdefault((segment, object), {}).update(signatures)
+            if length >= self.chunker.MINIMUM_OBJECT_SIZE or not exact:
+                signatures = self.chunker.compute_signatures(buf, start)
+                subblock.setdefault((segment, object), {}).update(signatures)
 
         if verifier.valid():
-            print "Checksum matches, computed:", checksums
             for k in subblock:
                 subblock[k] = self.chunker.dump_signatures(subblock[k])
-            print "Subblock signatures:"
-            for k, v in subblock.iteritems():
-                print k, base64.b16encode(v)
             self.store_checksums(checksums, subblock)
         else:
             print "Checksum mismatch"
@@ -288,40 +328,129 @@ class DatabaseRebuilder(object):
     def store_checksums(self, block_checksums, subblock_signatures):
         for (segment, object), (size, checksum) in block_checksums.iteritems():
             segmentid = self.segment_to_id(segment)
+            self.cursor.execute(
+                """insert or ignore into block_index(segmentid, object)
+                   values (?, ?)""",
+                (segmentid, object))
             self.cursor.execute("""select blockid from block_index
                                    where segmentid = ? and object = ?""",
                                 (segmentid, object))
-            blockid = self.cursor.fetchall()
-            if blockid:
-                blockid = blockid[0][0]
-            else:
-                blockid = None
+            blockid = self.cursor.fetchall()[0][0]
 
-            if blockid is not None:
+            # Store checksum only if it is available; we don't want to
+            # overwrite an existing checksum in the database with NULL.
+            if checksum is not None:
                 self.cursor.execute("""update block_index
-                                       set checksum = ?, size = ?
+                                       set checksum = ?
                                        where blockid = ?""",
-                                    (checksum, size, blockid))
-            else:
-                self.cursor.execute(
-                    """insert into block_index(
-                           segmentid, object, checksum, size, timestamp)
-                       values (?, ?, ?, ?, julianday('now'))""",
-                    (segmentid, object, checksum, size))
-                blockid = self.cursor.lastrowid
+                                    (checksum, blockid))
+
+            # Update the object size.  Our size may be an estimate, based on
+            # slices that we have seen.  The size in the database must not be
+            # larger than the true size, but update it to the largest value
+            # possible.
+            self.cursor.execute("""update block_index
+                                   set size = max(?, coalesce(size, 0))
+                                   where blockid = ?""",
+                                (size, blockid))
 
             # Store subblock signatures, if available.
-            print "blockid:", blockid
-            if (segment, object) in subblock_signatures:
+            # TODO: Even better would be to merge signature data, to handle the
+            # case where different files see different chunks of a block.
+            sigs = subblock_signatures.get((segment, object))
+            if sigs:
                 self.cursor.execute(
                     """insert or replace into subblock_signatures(
                            blockid, algorithm, signatures)
                        values (?, ?, ?)""",
-                    (blockid, self.chunker.ALGORITHM_NAME,
-                     buffer(subblock_signatures[(segment, object)])))
+                    (blockid, self.chunker.ALGORITHM_NAME, buffer(sigs)))
+
+
+class SegmentStateRebuilder(object):
+    """Reconstructs segment metadata files from raw segment data."""
 
+    def __init__(self):
+        self.filters = dict(cumulus.SEGMENT_FILTERS)
+        self.segment_pattern = cumulus.SEARCH_PATHS["segments"]
+
+    def compute_metadata(self, path, relative_path):
+        """Recompute metadata of a single segment.
+
+        Args:
+            path: Path to the segment file in the file system.
+            relative_path: Path relative to the root for the storage backend.
+        """
+        # Does the filename match that of a segment?  If so, extract the
+        # extension to determine the filter to apply to decompress.
+        filename = os.path.basename(relative_path)
+        m = self.segment_pattern.match(filename)
+        if not m: return
+        segment_name = m.group(1)
+        extension = m.group(2)
+        if extension not in self.filters: return
+        filter_cmd = self.filters[extension]
+
+        # File attributes.
+        st_buf = os.stat(path)
+        timestamp = time.strftime(SQLITE_TIMESTAMP,
+                                  time.gmtime(st_buf.st_mtime))
+
+        # Compute attributes of the compressed segment data.
+        BLOCK_SIZE = 4096
+        with open(path) as segment:
+            disk_size = 0
+            checksummer = cumulus.ChecksumCreator(CHECKSUM_ALGORITHM)
+            while True:
+                buf = segment.read(BLOCK_SIZE)
+                if len(buf) == 0: break
+                disk_size += len(buf)
+                checksummer.update(buf)
+        checksum = checksummer.compute()
+
+        # Compute attributes of the objects within the segment.
+        data_size = 0
+        object_count = 0
+        with open(path) as segment:
+            decompressed = cumulus.CumulusStore.filter_data(segment, filter_cmd)
+            objects = tarfile.open(mode='r|', fileobj=decompressed)
+            for tarinfo in objects:
+                data_size += tarinfo.size
+                object_count += 1
+
+        return {"segment": cumulus.uri_encode(segment_name),
+                "path": cumulus.uri_encode(relative_path),
+                "checksum": checksum,
+                "data_size": data_size,
+                "disk_size": disk_size,
+                "timestamp": timestamp}
 
 if __name__ == "__main__":
+    # Sample code to reconstruct segment metadata--ought to be relocated.
+    if False:
+        segment_rebuilder = SegmentStateRebuilder()
+        topdir = sys.argv[1]
+        files = []
+        for dirpath, dirnames, filenames in os.walk(topdir):
+            for f in filenames:
+                files.append(os.path.join(dirpath, f))
+        files.sort()
+        for f in files:
+            metadata = segment_rebuilder.compute_metadata(
+                f,
+                os.path.relpath(f, topdir))
+            if metadata:
+                for (k, v) in sorted(metadata.items()):
+                    print "%s: %s" % (k, v)
+                print
+        sys.exit(0)
+
+    # Sample code to rebuild the segments table from metadata--needs to be
+    # merged with the code below.
+    if False:
+        rebuilder = DatabaseRebuilder(cumulus.LocalDatabase(sys.argv[1]))
+        rebuilder.reload_segment_metadata(open(sys.argv[2]))
+        sys.exit(0)
+
     # Read metadata from stdin; filter out lines starting with "@@" so the
     # statcache file can be parsed as well.
     metadata = (x for x in sys.stdin if not x.startswith("@@"))