3 # Cumulus: Efficient Filesystem Backup to the Cloud
4 # Copyright (C) 2013 The Cumulus Developers
5 # See the AUTHORS file for a list of contributors.
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 2 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License along
18 # with this program; if not, write to the Free Software Foundation, Inc.,
19 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 """Code for rebuilding the local database.
23 Given a previous metadata dump and a filesystem tree (which may or may not
24 exactly match), recompute block signatures to the extent possible to rebuild
25 the local database. This can be used to recover from a local database loss,
26 given data from a previous backup.
39 CHECKSUM_ALGORITHM = "sha224"
41 CHUNKER_PROGRAM = "cumulus-chunker-standalone"
43 class Chunker(object):
44 """Compute sub-file chunk boundaries using a sliding Rabin fingerprint.
46 This duplicates the chunking algorithm in third_party/chunk.cc.
48 # Chunking parameters. These should match those in third_party/chunk.cc.
49 MODULUS = 0xbfe6b8a5bf378d83
51 BREAKMARK_VALUE = 0x78
53 MAX_CHUNK_SIZE = 65535
54 TARGET_CHUNK_SIZE = 4096
55 ALGORITHM_NAME = "lbfs-%d/%s" % (TARGET_CHUNK_SIZE, CHECKSUM_ALGORITHM)
58 degree = self.MODULUS.bit_length() - 1
61 # Lookup table for polynomial reduction when shifting a new byte in,
62 # based on the high-order bits.
63 self.T = [self.polymult(1, i << degree, self.MODULUS) ^ (i << degree)
66 # Values to remove a byte from the signature when it falls out of the
68 self.U = [self.polymult(i, 1 << 8*(self.WINDOW_SIZE - 1),
69 self.MODULUS) for i in range(256)]
71 self.hash_algorithm = cumulus.CHECKSUM_ALGORITHMS[CHECKSUM_ALGORITHM]
72 self.hash_size = self.hash_algorithm().digestsize
74 def polymult(self, x, y, n):
75 # Polynomial multiplication: result = x * y
77 for i in range(x.bit_length()):
82 while result.bit_length() >= size:
83 result ^= n << (result.bit_length() - size)
86 def window_init(self):
87 # Sliding signature state is:
88 # [signature value, history buffer index, history buffer contents]
89 return [0, 0, [0] * self.WINDOW_SIZE]
91 def window_update(self, signature, byte):
94 undo = self.U[signature[2][offset]]
95 poly = ((poly ^ undo) << 8) + byte
96 poly ^= self.T[poly >> self.degree]
99 signature[1] = (offset + 1) % self.WINDOW_SIZE
100 signature[2][offset] = byte
102 def compute_breaks(self, buf):
104 signature = self.window_init()
105 for i in xrange(len(buf)):
106 self.window_update(signature, ord(buf[i]))
107 block_len = i - breaks[-1] + 1
108 if ((signature[0] % self.TARGET_CHUNK_SIZE == self.BREAKMARK_VALUE
109 and block_len >= self.MIN_CHUNK_SIZE)
110 or block_len >= self.MAX_CHUNK_SIZE):
112 if breaks[-1] < len(buf):
113 breaks.append(len(buf))
116 def compute_signatures(self, buf, buf_offset=0):
117 """Break a buffer into chunks and compute chunk signatures.
120 buf: The data buffer.
121 buf_offset: The offset of the data buffer within the original
122 block, to handle cases where only a portion of the block is
126 A dictionary containing signature data. Keys are chunk offsets
127 (from the beginning of the block), and values are tuples (size, raw
130 breaks = self.compute_breaks(buf)
132 for i in range(1, len(breaks)):
133 chunk = buf[breaks[i-1]:breaks[i]]
134 hasher = self.hash_algorithm()
136 signatures[breaks[i-1] + buf_offset] = (breaks[i] - breaks[i-1],
140 def dump_signatures(self, signatures):
141 """Convert signatures to the binary format stored in the database."""
144 # Emit records indicating that no signatures are available for the next
145 # n bytes. Since the size is a 16-bit value, to skip larger distances
146 # multiple records must be emitted. An all-zero signature indicates
150 i = min(n, self.MAX_CHUNK_SIZE)
151 records.append(struct.pack(">H", i) + "\x00" * self.hash_size)
155 for next_start, (size, digest) in sorted(signatures.iteritems()):
156 if next_start < position:
157 print "Warning: overlapping signatures, ignoring"
159 skip(next_start - position)
160 records.append(struct.pack(">H", size) + digest)
161 position = next_start + size
163 return "".join(records)
165 def load_signatures(self, signatures):
166 """Loads signatures from the binary format stored in the database."""
167 entry_size = 2 + self.hash_size
168 if len(signatures) % entry_size != 0:
169 print "Warning: Invalid signatures to load"
172 null_digest = "\x00" * self.hash_size
175 for i in range(len(signatures) // entry_size):
176 sig = signatures[i*entry_size:(i+1)*entry_size]
177 size, digest = struct.unpack(">H", sig[:2])[0], sig[2:]
178 if digest != null_digest:
179 result[position] = (size, digest)
184 class ChunkerExternal(Chunker):
185 """A Chunker which uses an external program to compute Rabin fingerprints.
187 This can run much more quickly than the Python code, but should otherwise
188 give identical results.
192 super(ChunkerExternal, self).__init__()
193 self.subproc = subprocess.Popen([CHUNKER_PROGRAM],
194 stdin=subprocess.PIPE,
195 stdout=subprocess.PIPE)
197 def compute_breaks(self, buf):
200 self.subproc.stdin.write(struct.pack(">i", len(buf)))
201 self.subproc.stdin.write(buf)
202 self.subproc.stdin.flush()
203 breaks = self.subproc.stdout.readline()
204 return [int(x) + 1 for x in breaks.split()]
207 class DatabaseRebuilder(object):
208 def __init__(self, database):
209 self.database = database
210 self.cursor = database.cursor()
211 self.segment_ids = {}
213 self.chunker = ChunkerExternal()
215 # self.chunker = Chunker()
217 def segment_to_id(self, segment):
218 if segment in self.segment_ids: return self.segment_ids[segment]
220 self.cursor.execute("""insert or ignore into segments(segment)
221 values (?)""", (segment,))
222 self.cursor.execute("""select segmentid from segments
223 where segment = ?""", (segment,))
224 id = self.cursor.fetchone()[0]
225 self.segment_ids[segment] = id
228 def rebuild(self, metadata, reference_path):
229 """Iterate through old metadata and use it to rebuild the database.
232 metadata: An iterable containing lines of the metadata log.
233 reference_path: Path to the root of a file system tree which may be
234 similar to data in the metadata log (used to recompute block
237 for fields in cumulus.parse(metadata, lambda l: len(l) == 0):
238 metadata = cumulus.MetadataItem(fields, None)
239 # Only process regular files; skip over other types (directories,
241 if metadata.items.type not in ("-", "f"): continue
243 path = os.path.join(reference_path, metadata.items.name)
245 # TODO: Check file size for early abort if different
246 self.rebuild_file(open(path), metadata)
249 pass # Ignore the file
251 self.database.commit()
253 def rebuild_file(self, fp, metadata):
255 blocks = [cumulus.CumulusStore.parse_ref(b) for b in metadata.data()]
256 verifier = cumulus.ChecksumVerifier(metadata.items.checksum)
259 for segment, object, checksum, slice in blocks:
260 # Given a reference to a block of unknown size we don't know how to
261 # match up the data, so we have to give up on rebuilding for this
263 if slice is None: return
265 start, length, exact = slice
266 buf = fp.read(length)
270 csum = cumulus.ChecksumCreator(CHECKSUM_ALGORITHM)
272 checksums[(segment, object)] = (length, csum.compute())
274 signatures = self.chunker.compute_signatures(buf, start)
275 subblock.setdefault((segment, object), {}).update(signatures)
278 print "Checksum matches, computed:", checksums
280 subblock[k] = self.chunker.dump_signatures(subblock[k])
281 print "Subblock signatures:"
282 for k, v in subblock.iteritems():
283 print k, base64.b16encode(v)
284 self.store_checksums(checksums, subblock)
286 print "Checksum mismatch"
288 def store_checksums(self, block_checksums, subblock_signatures):
289 for (segment, object), (size, checksum) in block_checksums.iteritems():
290 segmentid = self.segment_to_id(segment)
291 self.cursor.execute("""select blockid from block_index
292 where segmentid = ? and object = ?""",
294 blockid = self.cursor.fetchall()
296 blockid = blockid[0][0]
300 if blockid is not None:
301 self.cursor.execute("""update block_index
302 set checksum = ?, size = ?
303 where blockid = ?""",
304 (checksum, size, blockid))
307 """insert into block_index(
308 segmentid, object, checksum, size, timestamp)
309 values (?, ?, ?, ?, julianday('now'))""",
310 (segmentid, object, checksum, size))
311 blockid = self.cursor.lastrowid
313 # Store subblock signatures, if available.
314 print "blockid:", blockid
315 if (segment, object) in subblock_signatures:
317 """insert or replace into subblock_signatures(
318 blockid, algorithm, signatures)
320 (blockid, self.chunker.ALGORITHM_NAME,
321 buffer(subblock_signatures[(segment, object)])))
324 if __name__ == "__main__":
325 # Read metadata from stdin; filter out lines starting with "@@" so the
326 # statcache file can be parsed as well.
327 metadata = (x for x in sys.stdin if not x.startswith("@@"))
329 rebuilder = DatabaseRebuilder(cumulus.LocalDatabase(sys.argv[1]))
330 rebuilder.rebuild(metadata, "/")