Start on a tool to rebuild the local database if it is lost.
authorMichael Vrable <vrable@cs.hmc.edu>
Wed, 16 Jan 2013 20:15:22 +0000 (12:15 -0800)
committerMichael Vrable <vrable@cs.hmc.edu>
Wed, 22 Jan 2014 05:11:09 +0000 (21:11 -0800)
This can also be used to reconstruct the local database in a new format, or
upgrade hash algorithms.

Makefile
chunker-standalone.cc [new file with mode: 0644]
python/cumulus/__init__.py
python/cumulus/rebuild_database.py [new file with mode: 0755]

index a57062c..454f1c4 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -10,9 +10,14 @@ SRCS=exclude.cc hash.cc localdb.cc main.cc metadata.cc ref.cc remote.cc \
      store.cc subfile.cc util.cc $(addprefix third_party/,$(THIRD_PARTY_SRCS))
 OBJS=$(SRCS:.cc=.o)
 
+all : cumulus cumulus-chunker-standalone
+
 cumulus : $(OBJS)
        $(CXX) -o $@ $^ $(LDFLAGS)
 
+cumulus-chunker-standalone : chunker-standalone.o third_party/chunk.o
+       $(CXX) -o $@ $^ $(LDFLAGS)
+
 version : NEWS
        (git describe || (head -n1 NEWS | cut -d" " -f1)) >version 2>/dev/null
 $(OBJS) : version
diff --git a/chunker-standalone.cc b/chunker-standalone.cc
new file mode 100644 (file)
index 0000000..9c64c5a
--- /dev/null
@@ -0,0 +1,68 @@
+/* Cumulus: Efficient Filesystem Backup to the Cloud
+ * Copyright (C) 2013 The Cumulus Developers
+ * See the AUTHORS file for a list of contributors.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+/* Small utility program for computing chunk breakpoints for subfile
+ * signatures.  This can be used by the Python database rebuilder; while the
+ * Python code can compute chunk breakpoints the C++ version runs much more
+ * quickly.
+ *
+ * Protocol: The input is binary, consisting of a 4-byte record, giving the
+ * length of a data buffer in network byte order, followed by the raw data.
+ * The output is line-oriented: each line consists of whitespace-separated
+ * integers giving the computed breakpoints.  An input with a specified length
+ * of zero ends the computation. */
+
+#include <arpa/inet.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdint.h>
+
+#include "third_party/chunk.h"
+
+#define MAX_BUFSIZE (1 << 24)
+
+int main(int, char **)
+{
+    char *buf = new char[MAX_BUFSIZE];
+    size_t *breakpoints = new size_t[chunk_compute_max_num_breaks(MAX_BUFSIZE)];
+
+    while (true) {
+        int32_t blocklen;
+        if (fread(&blocklen, 4, 1, stdin) != 1) {
+            /* Unexpected end of input or other error. */
+            return 1;
+        }
+
+        blocklen = ntohl(blocklen);
+        if (blocklen == 0)
+            return 0;
+        if (blocklen < 0 || blocklen > MAX_BUFSIZE)
+            return 1;
+
+        if (fread(buf, 1, blocklen, stdin) != static_cast<size_t>(blocklen))
+            return 1;
+
+        int num_breakpoints = chunk_compute_breaks(buf, blocklen, breakpoints);
+        for (int i = 0; i < num_breakpoints; i++) {
+            printf("%zd%c", breakpoints[i],
+                   i == num_breakpoints - 1 ? '\n' : ' ');
+        }
+        fflush(stdout);
+    }
+}
index a40c58d..d86d5ed 100644 (file)
@@ -191,7 +191,7 @@ class ObjectStore:
     def parse_ref(refstr):
         m = re.match(r"^zero\[(\d+)\]$", refstr)
         if m:
-            return ("zero", None, None, (0, int(m.group(1))))
+            return ("zero", None, None, (0, int(m.group(1)), False))
 
         m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(((\d+)\+)?(\d+)|=(\d+))\])?$", refstr)
         if not m: return
diff --git a/python/cumulus/rebuild_database.py b/python/cumulus/rebuild_database.py
new file mode 100755 (executable)
index 0000000..a1ba9e8
--- /dev/null
@@ -0,0 +1,330 @@
+#!/usr/bin/python
+#
+# Cumulus: Efficient Filesystem Backup to the Cloud
+# Copyright (C) 2013 The Cumulus Developers
+# See the AUTHORS file for a list of contributors.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""Code for rebuilding the local database.
+
+Given a previous metadata dump and a filesystem tree (which may or may not
+exactly match), recompute block signatures to the extent possible to rebuild
+the local database.  This can be used to recover from a local database loss,
+given data from a previous backup.
+"""
+
+import base64
+import hashlib
+import itertools
+import os
+import struct
+import subprocess
+import sys
+
+import cumulus
+
+CHECKSUM_ALGORITHM = "sha224"
+
+CHUNKER_PROGRAM = "cumulus-chunker-standalone"
+
+class Chunker(object):
+    """Compute sub-file chunk boundaries using a sliding Rabin fingerprint.
+
+    This duplicates the chunking algorithm in third_party/chunk.cc.
+    """
+    # Chunking parameters.  These should match those in third_party/chunk.cc.
+    MODULUS = 0xbfe6b8a5bf378d83
+    WINDOW_SIZE = 48
+    BREAKMARK_VALUE = 0x78
+    MIN_CHUNK_SIZE = 2048
+    MAX_CHUNK_SIZE = 65535
+    TARGET_CHUNK_SIZE = 4096
+    ALGORITHM_NAME = "lbfs-%d/%s" % (TARGET_CHUNK_SIZE, CHECKSUM_ALGORITHM)
+
+    def __init__(self):
+        degree = self.MODULUS.bit_length() - 1
+        self.degree = degree
+
+        # Lookup table for polynomial reduction when shifting a new byte in,
+        # based on the high-order bits.
+        self.T = [self.polymult(1, i << degree, self.MODULUS) ^ (i << degree)
+                  for i in range(256)]
+
+        # Values to remove a byte from the signature when it falls out of the
+        # window.
+        self.U = [self.polymult(i, 1 << 8*(self.WINDOW_SIZE - 1),
+                                self.MODULUS) for i in range(256)]
+
+        self.hash_algorithm = cumulus.CHECKSUM_ALGORITHMS[CHECKSUM_ALGORITHM]
+        self.hash_size = self.hash_algorithm().digestsize
+
+    def polymult(self, x, y, n):
+        # Polynomial multiplication: result = x * y
+        result = 0
+        for i in range(x.bit_length()):
+            if (x >> i) & 1:
+                result ^= y << i
+        # Reduction modulo n
+        size = n.bit_length()
+        while result.bit_length() >= size:
+            result ^= n << (result.bit_length() - size)
+        return result
+
+    def window_init(self):
+        # Sliding signature state is:
+        #   [signature value, history buffer index, history buffer contents]
+        return [0, 0, [0] * self.WINDOW_SIZE]
+
+    def window_update(self, signature, byte):
+        poly = signature[0]
+        offset = signature[1]
+        undo = self.U[signature[2][offset]]
+        poly = ((poly ^ undo) << 8) + byte
+        poly ^= self.T[poly >> self.degree]
+
+        signature[0] = poly
+        signature[1] = (offset + 1) % self.WINDOW_SIZE
+        signature[2][offset] = byte
+
+    def compute_breaks(self, buf):
+        breaks = [0]
+        signature = self.window_init()
+        for i in xrange(len(buf)):
+            self.window_update(signature, ord(buf[i]))
+            block_len = i - breaks[-1] + 1
+            if ((signature[0] % self.TARGET_CHUNK_SIZE == self.BREAKMARK_VALUE
+                        and block_len >= self.MIN_CHUNK_SIZE)
+                    or block_len >= self.MAX_CHUNK_SIZE):
+                breaks.append(i + 1)
+        if breaks[-1] < len(buf):
+            breaks.append(len(buf))
+        return breaks
+
+    def compute_signatures(self, buf, buf_offset=0):
+        """Break a buffer into chunks and compute chunk signatures.
+
+        Args:
+            buf: The data buffer.
+            buf_offset: The offset of the data buffer within the original
+                block, to handle cases where only a portion of the block is
+                available.
+
+        Returns:
+            A dictionary containing signature data.  Keys are chunk offsets
+            (from the beginning of the block), and values are tuples (size, raw
+            hash value).
+        """
+        breaks = self.compute_breaks(buf)
+        signatures = {}
+        for i in range(1, len(breaks)):
+            chunk = buf[breaks[i-1]:breaks[i]]
+            hasher = self.hash_algorithm()
+            hasher.update(chunk)
+            signatures[breaks[i-1] + buf_offset] = (breaks[i] - breaks[i-1],
+                                                    hasher.digest())
+        return signatures
+
+    def dump_signatures(self, signatures):
+        """Convert signatures to the binary format stored in the database."""
+        records = []
+
+        # Emit records indicating that no signatures are available for the next
+        # n bytes.  Since the size is a 16-bit value, to skip larger distances
+        # multiple records must be emitted.  An all-zero signature indicates
+        # the lack of data.
+        def skip(n):
+            while n > 0:
+                i = min(n, self.MAX_CHUNK_SIZE)
+                records.append(struct.pack(">H", i) + "\x00" * self.hash_size)
+                n -= i
+
+        position = 0
+        for next_start, (size, digest) in sorted(signatures.iteritems()):
+            if next_start < position:
+                print "Warning: overlapping signatures, ignoring"
+                continue
+            skip(next_start - position)
+            records.append(struct.pack(">H", size) + digest)
+            position = next_start + size
+
+        return "".join(records)
+
+    def load_signatures(self, signatures):
+        """Loads signatures from the binary format stored in the database."""
+        entry_size = 2 + self.hash_size
+        if len(signatures) % entry_size != 0:
+            print "Warning: Invalid signatures to load"
+            return {}
+
+        null_digest = "\x00" * self.hash_size
+        position = 0
+        result = {}
+        for i in range(len(signatures) // entry_size):
+            sig = signatures[i*entry_size:(i+1)*entry_size]
+            size, digest = struct.unpack(">H", sig[:2])[0], sig[2:]
+            if digest != null_digest:
+                result[position] = (size, digest)
+            position += size
+        return result
+
+
+class ChunkerExternal(Chunker):
+    """A Chunker which uses an external program to compute Rabin fingerprints.
+
+    This can run much more quickly than the Python code, but should otherwise
+    give identical results.
+    """
+
+    def __init__(self):
+        super(ChunkerExternal, self).__init__()
+        self.subproc = subprocess.Popen([CHUNKER_PROGRAM],
+                                        stdin=subprocess.PIPE,
+                                        stdout=subprocess.PIPE)
+
+    def compute_breaks(self, buf):
+        if len(buf) == 0:
+            return [0]
+        self.subproc.stdin.write(struct.pack(">i", len(buf)))
+        self.subproc.stdin.write(buf)
+        self.subproc.stdin.flush()
+        breaks = self.subproc.stdout.readline()
+        return [int(x) + 1 for x in breaks.split()]
+
+
+class DatabaseRebuilder(object):
+    def __init__(self, database):
+        self.database = database
+        self.cursor = database.cursor()
+        self.segment_ids = {}
+        #try:
+        self.chunker = ChunkerExternal()
+        #except:
+        #    self.chunker = Chunker()
+
+    def segment_to_id(self, segment):
+        if segment in self.segment_ids: return self.segment_ids[segment]
+
+        self.cursor.execute("""insert or ignore into segments(segment)
+                               values (?)""", (segment,))
+        self.cursor.execute("""select segmentid from segments
+                               where segment = ?""", (segment,))
+        id = self.cursor.fetchone()[0]
+        self.segment_ids[segment] = id
+        return id
+
+    def rebuild(self, metadata, reference_path):
+        """Iterate through old metadata and use it to rebuild the database.
+
+        Args:
+            metadata: An iterable containing lines of the metadata log.
+            reference_path: Path to the root of a file system tree which may be
+                similar to data in the metadata log (used to recompute block
+                signatures).
+        """
+        for fields in cumulus.parse(metadata, lambda l: len(l) == 0):
+            metadata = cumulus.MetadataItem(fields, None)
+            # Only process regular files; skip over other types (directories,
+            # symlinks, etc.)
+            if metadata.items.type not in ("-", "f"): continue
+            try:
+                path = os.path.join(reference_path, metadata.items.name)
+                print "Path:", path
+                # TODO: Check file size for early abort if different
+                self.rebuild_file(open(path), metadata)
+            except IOError as e:
+                print e
+                pass  # Ignore the file
+
+        self.database.commit()
+
+    def rebuild_file(self, fp, metadata):
+        """Compare"""
+        blocks = [cumulus.CumulusStore.parse_ref(b) for b in metadata.data()]
+        verifier = cumulus.ChecksumVerifier(metadata.items.checksum)
+        checksums = {}
+        subblock = {}
+        for segment, object, checksum, slice in blocks:
+            # Given a reference to a block of unknown size we don't know how to
+            # match up the data, so we have to give up on rebuilding for this
+            # file.
+            if slice is None: return
+
+            start, length, exact = slice
+            buf = fp.read(length)
+            verifier.update(buf)
+
+            if exact:
+                csum = cumulus.ChecksumCreator(CHECKSUM_ALGORITHM)
+                csum.update(buf)
+                checksums[(segment, object)] = (length, csum.compute())
+
+            signatures = self.chunker.compute_signatures(buf, start)
+            subblock.setdefault((segment, object), {}).update(signatures)
+
+        if verifier.valid():
+            print "Checksum matches, computed:", checksums
+            for k in subblock:
+                subblock[k] = self.chunker.dump_signatures(subblock[k])
+            print "Subblock signatures:"
+            for k, v in subblock.iteritems():
+                print k, base64.b16encode(v)
+            self.store_checksums(checksums, subblock)
+        else:
+            print "Checksum mismatch"
+
+    def store_checksums(self, block_checksums, subblock_signatures):
+        for (segment, object), (size, checksum) in block_checksums.iteritems():
+            segmentid = self.segment_to_id(segment)
+            self.cursor.execute("""select blockid from block_index
+                                   where segmentid = ? and object = ?""",
+                                (segmentid, object))
+            blockid = self.cursor.fetchall()
+            if blockid:
+                blockid = blockid[0][0]
+            else:
+                blockid = None
+
+            if blockid is not None:
+                self.cursor.execute("""update block_index
+                                       set checksum = ?, size = ?
+                                       where blockid = ?""",
+                                    (checksum, size, blockid))
+            else:
+                self.cursor.execute(
+                    """insert into block_index(
+                           segmentid, object, checksum, size, timestamp)
+                       values (?, ?, ?, ?, julianday('now'))""",
+                    (segmentid, object, checksum, size))
+                blockid = self.cursor.lastrowid
+
+            # Store subblock signatures, if available.
+            print "blockid:", blockid
+            if (segment, object) in subblock_signatures:
+                self.cursor.execute(
+                    """insert or replace into subblock_signatures(
+                           blockid, algorithm, signatures)
+                       values (?, ?, ?)""",
+                    (blockid, self.chunker.ALGORITHM_NAME,
+                     buffer(subblock_signatures[(segment, object)])))
+
+
+if __name__ == "__main__":
+    # Read metadata from stdin; filter out lines starting with "@@" so the
+    # statcache file can be parsed as well.
+    metadata = (x for x in sys.stdin if not x.startswith("@@"))
+
+    rebuilder = DatabaseRebuilder(cumulus.LocalDatabase(sys.argv[1]))
+    rebuilder.rebuild(metadata, "/")