3 # Cumulus: Efficient Filesystem Backup to the Cloud
4 # Copyright (C) 2013 The Cumulus Developers
5 # See the AUTHORS file for a list of contributors.
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 2 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License along
18 # with this program; if not, write to the Free Software Foundation, Inc.,
19 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 """Code for rebuilding the local database.
23 Given a previous metadata dump and a filesystem tree (which may or may not
24 exactly match), recompute block signatures to the extent possible to rebuild
25 the local database. This can be used to recover from a local database loss,
26 given data from a previous backup.
42 CHECKSUM_ALGORITHM = "sha224"
43 CHUNKER_PROGRAM = "cumulus-chunker-standalone"
45 # TODO: Move to somewhere common
46 SQLITE_TIMESTAMP = "%Y-%m-%d %H:%M:%S"
48 class Chunker(object):
49 """Compute sub-file chunk boundaries using a sliding Rabin fingerprint.
51 This duplicates the chunking algorithm in third_party/chunk.cc.
53 # Chunking parameters. These should match those in third_party/chunk.cc.
54 MODULUS = 0xbfe6b8a5bf378d83
56 BREAKMARK_VALUE = 0x78
58 MAX_CHUNK_SIZE = 65535
59 TARGET_CHUNK_SIZE = 4096
60 ALGORITHM_NAME = "lbfs-%d/%s" % (TARGET_CHUNK_SIZE, CHECKSUM_ALGORITHM)
62 # Minimum size of a block before we should bother storing subfile
63 # signatures (only applies when full blocks are used to store a file;
64 # subfile signatures are always used when subfile incrementals are
66 MINIMUM_OBJECT_SIZE = 16384
69 degree = self.MODULUS.bit_length() - 1
72 # Lookup table for polynomial reduction when shifting a new byte in,
73 # based on the high-order bits.
74 self.T = [self.polymult(1, i << degree, self.MODULUS) ^ (i << degree)
77 # Values to remove a byte from the signature when it falls out of the
79 self.U = [self.polymult(i, 1 << 8*(self.WINDOW_SIZE - 1),
80 self.MODULUS) for i in range(256)]
82 self.hash_algorithm = cumulus.CHECKSUM_ALGORITHMS[CHECKSUM_ALGORITHM]
83 self.hash_size = self.hash_algorithm().digestsize
85 def polymult(self, x, y, n):
86 # Polynomial multiplication: result = x * y
88 for i in range(x.bit_length()):
93 while result.bit_length() >= size:
94 result ^= n << (result.bit_length() - size)
97 def window_init(self):
98 # Sliding signature state is:
99 # [signature value, history buffer index, history buffer contents]
100 return [0, 0, [0] * self.WINDOW_SIZE]
102 def window_update(self, signature, byte):
104 offset = signature[1]
105 undo = self.U[signature[2][offset]]
106 poly = ((poly ^ undo) << 8) + byte
107 poly ^= self.T[poly >> self.degree]
110 signature[1] = (offset + 1) % self.WINDOW_SIZE
111 signature[2][offset] = byte
113 def compute_breaks(self, buf):
115 signature = self.window_init()
116 for i in xrange(len(buf)):
117 self.window_update(signature, ord(buf[i]))
118 block_len = i - breaks[-1] + 1
119 if ((signature[0] % self.TARGET_CHUNK_SIZE == self.BREAKMARK_VALUE
120 and block_len >= self.MIN_CHUNK_SIZE)
121 or block_len >= self.MAX_CHUNK_SIZE):
123 if breaks[-1] < len(buf):
124 breaks.append(len(buf))
127 def compute_signatures(self, buf, buf_offset=0):
128 """Break a buffer into chunks and compute chunk signatures.
131 buf: The data buffer.
132 buf_offset: The offset of the data buffer within the original
133 block, to handle cases where only a portion of the block is
137 A dictionary containing signature data. Keys are chunk offsets
138 (from the beginning of the block), and values are tuples (size, raw
141 breaks = self.compute_breaks(buf)
143 for i in range(1, len(breaks)):
144 chunk = buf[breaks[i-1]:breaks[i]]
145 hasher = self.hash_algorithm()
147 signatures[breaks[i-1] + buf_offset] = (breaks[i] - breaks[i-1],
151 def dump_signatures(self, signatures):
152 """Convert signatures to the binary format stored in the database."""
155 # Emit records indicating that no signatures are available for the next
156 # n bytes. Since the size is a 16-bit value, to skip larger distances
157 # multiple records must be emitted. An all-zero signature indicates
161 i = min(n, self.MAX_CHUNK_SIZE)
162 records.append(struct.pack(">H", i) + "\x00" * self.hash_size)
166 for next_start, (size, digest) in sorted(signatures.iteritems()):
167 if next_start < position:
168 print "Warning: overlapping signatures, ignoring"
170 skip(next_start - position)
171 records.append(struct.pack(">H", size) + digest)
172 position = next_start + size
174 return "".join(records)
176 def load_signatures(self, signatures):
177 """Loads signatures from the binary format stored in the database."""
178 entry_size = 2 + self.hash_size
179 if len(signatures) % entry_size != 0:
180 print "Warning: Invalid signatures to load"
183 null_digest = "\x00" * self.hash_size
186 for i in range(len(signatures) // entry_size):
187 sig = signatures[i*entry_size:(i+1)*entry_size]
188 size, digest = struct.unpack(">H", sig[:2])[0], sig[2:]
189 if digest != null_digest:
190 result[position] = (size, digest)
195 class ChunkerExternal(Chunker):
196 """A Chunker which uses an external program to compute Rabin fingerprints.
198 This can run much more quickly than the Python code, but should otherwise
199 give identical results.
203 super(ChunkerExternal, self).__init__()
204 self.subproc = subprocess.Popen([CHUNKER_PROGRAM],
205 stdin=subprocess.PIPE,
206 stdout=subprocess.PIPE)
208 def compute_breaks(self, buf):
211 self.subproc.stdin.write(struct.pack(">i", len(buf)))
212 self.subproc.stdin.write(buf)
213 self.subproc.stdin.flush()
214 breaks = self.subproc.stdout.readline()
215 return [0] + [int(x) + 1 for x in breaks.split()]
218 class DatabaseRebuilder(object):
219 def __init__(self, database):
220 self.database = database
221 self.cursor = database.cursor()
222 self.segment_ids = {}
223 self.chunker = ChunkerExternal()
224 #self.chunker = Chunker()
226 def segment_to_id(self, segment):
227 if segment in self.segment_ids: return self.segment_ids[segment]
229 self.cursor.execute("""insert or ignore into segments(segment)
230 values (?)""", (segment,))
231 self.cursor.execute("""select segmentid from segments
232 where segment = ?""", (segment,))
233 id = self.cursor.fetchone()[0]
234 self.segment_ids[segment] = id
237 def rebuild(self, metadata, reference_path):
238 """Iterate through old metadata and use it to rebuild the database.
241 metadata: An iterable containing lines of the metadata log.
242 reference_path: Path to the root of a file system tree which may be
243 similar to data in the metadata log (used to recompute block
246 for fields in cumulus.parse(metadata, lambda l: len(l) == 0):
247 metadata = cumulus.MetadataItem(fields, None)
248 # Only process regular files; skip over other types (directories,
250 if metadata.items.type not in ("-", "f"): continue
252 path = os.path.join(reference_path, metadata.items.name)
254 # TODO: Check file size for early abort if different
255 self.rebuild_file(open(path), metadata)
258 pass # Ignore the file
260 self.database.commit()
262 def reload_segment_metadata(self, segment_metadata):
263 """Read a segment metadata (.meta) file into the local database.
265 Updates the segments table in the local database with information from
266 a a segment metadata backup file. Old data is not overwritten, so
267 loading a .meta file with partial information is fine.
269 for info in cumulus.parse(segment_metadata,
270 terminate=lambda l: len(l) == 0):
271 segment = info.pop("segment")
272 self.insert_segment_info(segment, info)
274 self.database.commit()
276 def insert_segment_info(self, segment, info):
277 id = self.segment_to_id(segment)
278 for k, v in info.items():
279 self.cursor.execute("update segments set " + k + " = ? "
280 "where segmentid = ?",
283 def rebuild_file(self, fp, metadata):
284 """Recompute database signatures if a file is unchanged.
286 If the current file contents match that from the old metadata (the
287 full-file hash matches), then recompute block- and chunk-level
288 signatures for the objects referenced by the file.
290 blocks = [cumulus.CumulusStore.parse_ref(b) for b in metadata.data()]
291 verifier = cumulus.ChecksumVerifier(metadata.items.checksum)
294 for segment, object, checksum, slice in blocks:
295 # Given a reference to a block of unknown size we don't know how to
296 # match up the data, so we have to give up on rebuilding for this
298 if slice is None: return
300 start, length, exact = slice
301 buf = fp.read(length)
305 csum = cumulus.ChecksumCreator(CHECKSUM_ALGORITHM)
307 checksums[(segment, object)] = (length, csum.compute())
309 # Compute a lower bound on the object size.
310 oldlength, csum = checksums.get((segment, object), (0, None))
311 checksums[(segment, object)] = (max(oldlength, start + length),
314 if length >= self.chunker.MINIMUM_OBJECT_SIZE or not exact:
315 signatures = self.chunker.compute_signatures(buf, start)
316 subblock.setdefault((segment, object), {}).update(signatures)
320 subblock[k] = self.chunker.dump_signatures(subblock[k])
321 self.store_checksums(checksums, subblock)
323 print "Checksum mismatch"
325 def store_checksums(self, block_checksums, subblock_signatures):
326 for (segment, object), (size, checksum) in block_checksums.iteritems():
327 segmentid = self.segment_to_id(segment)
329 """insert or ignore into block_index(segmentid, object)
332 self.cursor.execute("""select blockid from block_index
333 where segmentid = ? and object = ?""",
335 blockid = self.cursor.fetchall()[0][0]
337 # Store checksum only if it is available; we don't want to
338 # overwrite an existing checksum in the database with NULL.
339 if checksum is not None:
340 self.cursor.execute("""update block_index
342 where blockid = ?""",
345 # Update the object size. Our size may be an estimate, based on
346 # slices that we have seen. The size in the database must not be
347 # larger than the true size, but update it to the largest value
349 self.cursor.execute("""update block_index
350 set size = max(?, coalesce(size, 0))
351 where blockid = ?""",
354 # Store subblock signatures, if available.
355 # TODO: Even better would be to merge signature data, to handle the
356 # case where different files see different chunks of a block.
357 sigs = subblock_signatures.get((segment, object))
360 """insert or replace into subblock_signatures(
361 blockid, algorithm, signatures)
363 (blockid, self.chunker.ALGORITHM_NAME, buffer(sigs)))
366 class SegmentStateRebuilder(object):
367 """Reconstructs segment metadata files from raw segment data."""
370 self.filters = dict(cumulus.SEGMENT_FILTERS)
371 self.segment_pattern = cumulus.SEARCH_PATHS["segments"]
373 def compute_metadata(self, path, relative_path):
374 """Recompute metadata of a single segment.
377 path: Path to the segment file in the file system.
378 relative_path: Path relative to the root for the storage backend.
380 # Does the filename match that of a segment? If so, extract the
381 # extension to determine the filter to apply to decompress.
382 filename = os.path.basename(relative_path)
383 m = self.segment_pattern.match(filename)
385 segment_name = m.group(1)
386 extension = m.group(2)
387 if extension not in self.filters: return
388 filter_cmd = self.filters[extension]
391 st_buf = os.stat(path)
392 timestamp = time.strftime(SQLITE_TIMESTAMP,
393 time.gmtime(st_buf.st_mtime))
395 # Compute attributes of the compressed segment data.
397 with open(path) as segment:
399 checksummer = cumulus.ChecksumCreator(CHECKSUM_ALGORITHM)
401 buf = segment.read(BLOCK_SIZE)
402 if len(buf) == 0: break
403 disk_size += len(buf)
404 checksummer.update(buf)
405 checksum = checksummer.compute()
407 # Compute attributes of the objects within the segment.
410 with open(path) as segment:
411 decompressed = cumulus.CumulusStore.filter_data(segment, filter_cmd)
412 objects = tarfile.open(mode='r|', fileobj=decompressed)
413 for tarinfo in objects:
414 data_size += tarinfo.size
417 return {"segment": cumulus.uri_encode(segment_name),
418 "path": cumulus.uri_encode(relative_path),
419 "checksum": checksum,
420 "data_size": data_size,
421 "disk_size": disk_size,
422 "timestamp": timestamp}
424 if __name__ == "__main__":
425 # Sample code to reconstruct segment metadata--ought to be relocated.
427 segment_rebuilder = SegmentStateRebuilder()
430 for dirpath, dirnames, filenames in os.walk(topdir):
432 files.append(os.path.join(dirpath, f))
435 metadata = segment_rebuilder.compute_metadata(
437 os.path.relpath(f, topdir))
439 for (k, v) in sorted(metadata.items()):
440 print "%s: %s" % (k, v)
444 # Sample code to rebuild the segments table from metadata--needs to be
445 # merged with the code below.
447 rebuilder = DatabaseRebuilder(cumulus.LocalDatabase(sys.argv[1]))
448 rebuilder.reload_segment_metadata(open(sys.argv[2]))
451 # Read metadata from stdin; filter out lines starting with "@@" so the
452 # statcache file can be parsed as well.
453 metadata = (x for x in sys.stdin if not x.startswith("@@"))
455 rebuilder = DatabaseRebuilder(cumulus.LocalDatabase(sys.argv[1]))
456 rebuilder.rebuild(metadata, "/")