3 # Cumulus: Efficient Filesystem Backup to the Cloud
4 # Copyright (C) 2013 The Cumulus Developers
5 # See the AUTHORS file for a list of contributors.
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 2 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License along
18 # with this program; if not, write to the Free Software Foundation, Inc.,
19 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 """Code for rebuilding the local database.
23 Given a previous metadata dump and a filesystem tree (which may or may not
24 exactly match), recompute block signatures to the extent possible to rebuild
25 the local database. This can be used to recover from a local database loss,
26 given data from a previous backup.
42 CHECKSUM_ALGORITHM = "sha224"
43 CHUNKER_PROGRAM = "cumulus-chunker-standalone"
45 # TODO: Move to somewhere common
46 SQLITE_TIMESTAMP = "%Y-%m-%d %H:%M:%S"
48 class Chunker(object):
49 """Compute sub-file chunk boundaries using a sliding Rabin fingerprint.
51 This duplicates the chunking algorithm in third_party/chunk.cc.
53 # Chunking parameters. These should match those in third_party/chunk.cc.
54 MODULUS = 0xbfe6b8a5bf378d83
56 BREAKMARK_VALUE = 0x78
58 MAX_CHUNK_SIZE = 65535
59 TARGET_CHUNK_SIZE = 4096
60 ALGORITHM_NAME = "lbfs-%d/%s" % (TARGET_CHUNK_SIZE, CHECKSUM_ALGORITHM)
62 # Minimum size of a block before we should bother storing subfile
63 # signatures (only applies when full blocks are used to store a file;
64 # subfile signatures are always used when subfile incrementals are
66 MINIMUM_OBJECT_SIZE = 16384
69 degree = self.MODULUS.bit_length() - 1
72 # Lookup table for polynomial reduction when shifting a new byte in,
73 # based on the high-order bits.
74 self.T = [self.polymult(1, i << degree, self.MODULUS) ^ (i << degree)
77 # Values to remove a byte from the signature when it falls out of the
79 self.U = [self.polymult(i, 1 << 8*(self.WINDOW_SIZE - 1),
80 self.MODULUS) for i in range(256)]
82 self.hash_algorithm = cumulus.CHECKSUM_ALGORITHMS[CHECKSUM_ALGORITHM]
83 self.hash_size = self.hash_algorithm().digestsize
85 def polymult(self, x, y, n):
86 # Polynomial multiplication: result = x * y
88 for i in range(x.bit_length()):
93 while result.bit_length() >= size:
94 result ^= n << (result.bit_length() - size)
97 def window_init(self):
98 # Sliding signature state is:
99 # [signature value, history buffer index, history buffer contents]
100 return [0, 0, [0] * self.WINDOW_SIZE]
102 def window_update(self, signature, byte):
104 offset = signature[1]
105 undo = self.U[signature[2][offset]]
106 poly = ((poly ^ undo) << 8) + byte
107 poly ^= self.T[poly >> self.degree]
110 signature[1] = (offset + 1) % self.WINDOW_SIZE
111 signature[2][offset] = byte
113 def compute_breaks(self, buf):
115 signature = self.window_init()
116 for i in xrange(len(buf)):
117 self.window_update(signature, ord(buf[i]))
118 block_len = i - breaks[-1] + 1
119 if ((signature[0] % self.TARGET_CHUNK_SIZE == self.BREAKMARK_VALUE
120 and block_len >= self.MIN_CHUNK_SIZE)
121 or block_len >= self.MAX_CHUNK_SIZE):
123 if breaks[-1] < len(buf):
124 breaks.append(len(buf))
127 def compute_signatures(self, buf, buf_offset=0):
128 """Break a buffer into chunks and compute chunk signatures.
131 buf: The data buffer.
132 buf_offset: The offset of the data buffer within the original
133 block, to handle cases where only a portion of the block is
137 A dictionary containing signature data. Keys are chunk offsets
138 (from the beginning of the block), and values are tuples (size, raw
141 breaks = self.compute_breaks(buf)
143 for i in range(1, len(breaks)):
144 chunk = buf[breaks[i-1]:breaks[i]]
145 hasher = self.hash_algorithm()
147 signatures[breaks[i-1] + buf_offset] = (breaks[i] - breaks[i-1],
151 def dump_signatures(self, signatures):
152 """Convert signatures to the binary format stored in the database."""
155 # Emit records indicating that no signatures are available for the next
156 # n bytes. Since the size is a 16-bit value, to skip larger distances
157 # multiple records must be emitted. An all-zero signature indicates
161 i = min(n, self.MAX_CHUNK_SIZE)
162 records.append(struct.pack(">H", i) + "\x00" * self.hash_size)
166 for next_start, (size, digest) in sorted(signatures.iteritems()):
167 if next_start < position:
168 print "Warning: overlapping signatures, ignoring"
170 skip(next_start - position)
171 records.append(struct.pack(">H", size) + digest)
172 position = next_start + size
174 return "".join(records)
176 def load_signatures(self, signatures):
177 """Loads signatures from the binary format stored in the database."""
178 entry_size = 2 + self.hash_size
179 if len(signatures) % entry_size != 0:
180 print "Warning: Invalid signatures to load"
183 null_digest = "\x00" * self.hash_size
186 for i in range(len(signatures) // entry_size):
187 sig = signatures[i*entry_size:(i+1)*entry_size]
188 size, digest = struct.unpack(">H", sig[:2])[0], sig[2:]
189 if digest != null_digest:
190 result[position] = (size, digest)
195 class ChunkerExternal(Chunker):
196 """A Chunker which uses an external program to compute Rabin fingerprints.
198 This can run much more quickly than the Python code, but should otherwise
199 give identical results.
203 super(ChunkerExternal, self).__init__()
204 self.subproc = subprocess.Popen([CHUNKER_PROGRAM],
205 stdin=subprocess.PIPE,
206 stdout=subprocess.PIPE)
208 def compute_breaks(self, buf):
211 self.subproc.stdin.write(struct.pack(">i", len(buf)))
212 self.subproc.stdin.write(buf)
213 self.subproc.stdin.flush()
214 breaks = self.subproc.stdout.readline()
215 return [0] + [int(x) + 1 for x in breaks.split()]
218 class DatabaseRebuilder(object):
219 def __init__(self, database):
220 self.database = database
221 self.cursor = database.cursor()
222 self.segment_ids = {}
223 self.chunker = ChunkerExternal()
224 #self.chunker = Chunker()
226 def segment_to_id(self, segment):
227 if segment in self.segment_ids: return self.segment_ids[segment]
229 self.cursor.execute("""insert or ignore into segments(segment)
230 values (?)""", (segment,))
231 self.cursor.execute("""select segmentid from segments
232 where segment = ?""", (segment,))
233 id = self.cursor.fetchone()[0]
234 self.segment_ids[segment] = id
237 def rebuild(self, metadata, reference_path):
238 """Iterate through old metadata and use it to rebuild the database.
241 metadata: An iterable containing lines of the metadata log.
242 reference_path: Path to the root of a file system tree which may be
243 similar to data in the metadata log (used to recompute block
246 for fields in cumulus.parse(metadata, lambda l: len(l) == 0):
247 metadata = cumulus.MetadataItem(fields, None)
248 # Only process regular files; skip over other types (directories,
250 if metadata.items.type not in ("-", "f"): continue
252 path = os.path.join(reference_path, metadata.items.name)
254 # TODO: Check file size for early abort if different
255 self.rebuild_file(open(path), metadata)
258 pass # Ignore the file
260 self.database.commit()
262 def rebuild_file(self, fp, metadata):
263 """Recompute database signatures if a file is unchanged.
265 If the current file contents match that from the old metadata (the
266 full-file hash matches), then recompute block- and chunk-level
267 signatures for the objects referenced by the file.
269 blocks = [cumulus.CumulusStore.parse_ref(b) for b in metadata.data()]
270 verifier = cumulus.ChecksumVerifier(metadata.items.checksum)
273 for segment, object, checksum, slice in blocks:
274 # Given a reference to a block of unknown size we don't know how to
275 # match up the data, so we have to give up on rebuilding for this
277 if slice is None: return
279 start, length, exact = slice
280 buf = fp.read(length)
284 csum = cumulus.ChecksumCreator(CHECKSUM_ALGORITHM)
286 checksums[(segment, object)] = (length, csum.compute())
288 # Compute a lower bound on the object size.
289 oldlength, csum = checksums.get((segment, object), (0, None))
290 checksums[(segment, object)] = (max(oldlength, start + length),
293 if length >= self.chunker.MINIMUM_OBJECT_SIZE or not exact:
294 signatures = self.chunker.compute_signatures(buf, start)
295 subblock.setdefault((segment, object), {}).update(signatures)
299 subblock[k] = self.chunker.dump_signatures(subblock[k])
300 self.store_checksums(checksums, subblock)
302 print "Checksum mismatch"
304 def store_checksums(self, block_checksums, subblock_signatures):
305 for (segment, object), (size, checksum) in block_checksums.iteritems():
306 segmentid = self.segment_to_id(segment)
308 """insert or ignore into block_index(segmentid, object)
311 self.cursor.execute("""select blockid from block_index
312 where segmentid = ? and object = ?""",
314 blockid = self.cursor.fetchall()[0][0]
316 # Store checksum only if it is available; we don't want to
317 # overwrite an existing checksum in the database with NULL.
318 if checksum is not None:
319 self.cursor.execute("""update block_index
321 where blockid = ?""",
324 # Update the object size. Our size may be an estimate, based on
325 # slices that we have seen. The size in the database must not be
326 # larger than the true size, but update it to the largest value
328 self.cursor.execute("""update block_index
329 set size = max(?, coalesce(size, 0))
330 where blockid = ?""",
333 # Store subblock signatures, if available.
334 # TODO: Even better would be to merge signature data, to handle the
335 # case where different files see different chunks of a block.
336 sigs = subblock_signatures.get((segment, object))
339 """insert or replace into subblock_signatures(
340 blockid, algorithm, signatures)
342 (blockid, self.chunker.ALGORITHM_NAME, buffer(sigs)))
345 class SegmentStateRebuilder(object):
346 """Reconstructs segment metadata files from raw segment data."""
349 self.filters = dict(cumulus.SEGMENT_FILTERS)
350 self.segment_pattern = cumulus.SEARCH_PATHS["segments"]
352 def compute_metadata(self, path, relative_path):
353 """Recompute metadata of a single segment.
356 path: Path to the segment file in the file system.
357 relative_path: Path relative to the root for the storage backend.
359 # Does the filename match that of a segment? If so, extract the
360 # extension to determine the filter to apply to decompress.
361 filename = os.path.basename(relative_path)
362 m = self.segment_pattern.match(filename)
364 segment_name = m.group(1)
365 extension = m.group(2)
366 if extension not in self.filters: return
367 filter_cmd = self.filters[extension]
370 st_buf = os.stat(path)
371 timestamp = time.strftime(SQLITE_TIMESTAMP,
372 time.gmtime(st_buf.st_mtime))
374 # Compute attributes of the compressed segment data.
376 with open(path) as segment:
378 checksummer = cumulus.ChecksumCreator(CHECKSUM_ALGORITHM)
380 buf = segment.read(BLOCK_SIZE)
381 if len(buf) == 0: break
382 disk_size += len(buf)
383 checksummer.update(buf)
384 checksum = checksummer.compute()
386 # Compute attributes of the objects within the segment.
389 with open(path) as segment:
390 decompressed = cumulus.CumulusStore.filter_data(segment, filter_cmd)
391 objects = tarfile.open(mode='r|', fileobj=decompressed)
392 for tarinfo in objects:
393 data_size += tarinfo.size
396 return {"segment": cumulus.uri_encode(segment_name),
397 "path": cumulus.uri_encode(relative_path),
398 "checksum": checksum,
399 "data_size": data_size,
400 "disk_size": disk_size,
401 "timestamp": timestamp}
403 if __name__ == "__main__":
405 segment_rebuilder = SegmentStateRebuilder()
408 for dirpath, dirnames, filenames in os.walk(topdir):
410 files.append(os.path.join(dirpath, f))
413 metadata = segment_rebuilder.compute_metadata(
415 os.path.relpath(f, topdir))
417 for (k, v) in sorted(metadata.items()):
418 print "%s: %s" % (k, v)
422 # Read metadata from stdin; filter out lines starting with "@@" so the
423 # statcache file can be parsed as well.
424 metadata = (x for x in sys.stdin if not x.startswith("@@"))
426 rebuilder = DatabaseRebuilder(cumulus.LocalDatabase(sys.argv[1]))
427 rebuilder.rebuild(metadata, "/")