3 # Cumulus: Efficient Filesystem Backup to the Cloud
4 # Copyright (C) 2013 The Cumulus Developers
5 # See the AUTHORS file for a list of contributors.
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 2 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License along
18 # with this program; if not, write to the Free Software Foundation, Inc.,
19 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 """Code for rebuilding the local database.
23 Given a previous metadata dump and a filesystem tree (which may or may not
24 exactly match), recompute block signatures to the extent possible to rebuild
25 the local database. This can be used to recover from a local database loss,
26 given data from a previous backup.
41 CHECKSUM_ALGORITHM = "sha224"
43 CHUNKER_PROGRAM = "cumulus-chunker-standalone"
45 class Chunker(object):
46 """Compute sub-file chunk boundaries using a sliding Rabin fingerprint.
48 This duplicates the chunking algorithm in third_party/chunk.cc.
50 # Chunking parameters. These should match those in third_party/chunk.cc.
51 MODULUS = 0xbfe6b8a5bf378d83
53 BREAKMARK_VALUE = 0x78
55 MAX_CHUNK_SIZE = 65535
56 TARGET_CHUNK_SIZE = 4096
57 ALGORITHM_NAME = "lbfs-%d/%s" % (TARGET_CHUNK_SIZE, CHECKSUM_ALGORITHM)
59 # Minimum size of a block before we should bother storing subfile
60 # signatures (only applies when full blocks are used to store a file;
61 # subfile signatures are always used when subfile incrementals are
63 MINIMUM_OBJECT_SIZE = 16384
66 degree = self.MODULUS.bit_length() - 1
69 # Lookup table for polynomial reduction when shifting a new byte in,
70 # based on the high-order bits.
71 self.T = [self.polymult(1, i << degree, self.MODULUS) ^ (i << degree)
74 # Values to remove a byte from the signature when it falls out of the
76 self.U = [self.polymult(i, 1 << 8*(self.WINDOW_SIZE - 1),
77 self.MODULUS) for i in range(256)]
79 self.hash_algorithm = cumulus.CHECKSUM_ALGORITHMS[CHECKSUM_ALGORITHM]
80 self.hash_size = self.hash_algorithm().digestsize
82 def polymult(self, x, y, n):
83 # Polynomial multiplication: result = x * y
85 for i in range(x.bit_length()):
90 while result.bit_length() >= size:
91 result ^= n << (result.bit_length() - size)
94 def window_init(self):
95 # Sliding signature state is:
96 # [signature value, history buffer index, history buffer contents]
97 return [0, 0, [0] * self.WINDOW_SIZE]
99 def window_update(self, signature, byte):
101 offset = signature[1]
102 undo = self.U[signature[2][offset]]
103 poly = ((poly ^ undo) << 8) + byte
104 poly ^= self.T[poly >> self.degree]
107 signature[1] = (offset + 1) % self.WINDOW_SIZE
108 signature[2][offset] = byte
110 def compute_breaks(self, buf):
112 signature = self.window_init()
113 for i in xrange(len(buf)):
114 self.window_update(signature, ord(buf[i]))
115 block_len = i - breaks[-1] + 1
116 if ((signature[0] % self.TARGET_CHUNK_SIZE == self.BREAKMARK_VALUE
117 and block_len >= self.MIN_CHUNK_SIZE)
118 or block_len >= self.MAX_CHUNK_SIZE):
120 if breaks[-1] < len(buf):
121 breaks.append(len(buf))
124 def compute_signatures(self, buf, buf_offset=0):
125 """Break a buffer into chunks and compute chunk signatures.
128 buf: The data buffer.
129 buf_offset: The offset of the data buffer within the original
130 block, to handle cases where only a portion of the block is
134 A dictionary containing signature data. Keys are chunk offsets
135 (from the beginning of the block), and values are tuples (size, raw
138 breaks = self.compute_breaks(buf)
140 for i in range(1, len(breaks)):
141 chunk = buf[breaks[i-1]:breaks[i]]
142 hasher = self.hash_algorithm()
144 signatures[breaks[i-1] + buf_offset] = (breaks[i] - breaks[i-1],
148 def dump_signatures(self, signatures):
149 """Convert signatures to the binary format stored in the database."""
152 # Emit records indicating that no signatures are available for the next
153 # n bytes. Since the size is a 16-bit value, to skip larger distances
154 # multiple records must be emitted. An all-zero signature indicates
158 i = min(n, self.MAX_CHUNK_SIZE)
159 records.append(struct.pack(">H", i) + "\x00" * self.hash_size)
163 for next_start, (size, digest) in sorted(signatures.iteritems()):
164 if next_start < position:
165 print "Warning: overlapping signatures, ignoring"
167 skip(next_start - position)
168 records.append(struct.pack(">H", size) + digest)
169 position = next_start + size
171 return "".join(records)
173 def load_signatures(self, signatures):
174 """Loads signatures from the binary format stored in the database."""
175 entry_size = 2 + self.hash_size
176 if len(signatures) % entry_size != 0:
177 print "Warning: Invalid signatures to load"
180 null_digest = "\x00" * self.hash_size
183 for i in range(len(signatures) // entry_size):
184 sig = signatures[i*entry_size:(i+1)*entry_size]
185 size, digest = struct.unpack(">H", sig[:2])[0], sig[2:]
186 if digest != null_digest:
187 result[position] = (size, digest)
192 class ChunkerExternal(Chunker):
193 """A Chunker which uses an external program to compute Rabin fingerprints.
195 This can run much more quickly than the Python code, but should otherwise
196 give identical results.
200 super(ChunkerExternal, self).__init__()
201 self.subproc = subprocess.Popen([CHUNKER_PROGRAM],
202 stdin=subprocess.PIPE,
203 stdout=subprocess.PIPE)
205 def compute_breaks(self, buf):
208 self.subproc.stdin.write(struct.pack(">i", len(buf)))
209 self.subproc.stdin.write(buf)
210 self.subproc.stdin.flush()
211 breaks = self.subproc.stdout.readline()
212 return [0] + [int(x) + 1 for x in breaks.split()]
215 class DatabaseRebuilder(object):
216 def __init__(self, database):
217 self.database = database
218 self.cursor = database.cursor()
219 self.segment_ids = {}
220 self.chunker = ChunkerExternal()
221 #self.chunker = Chunker()
223 def segment_to_id(self, segment):
224 if segment in self.segment_ids: return self.segment_ids[segment]
226 self.cursor.execute("""insert or ignore into segments(segment)
227 values (?)""", (segment,))
228 self.cursor.execute("""select segmentid from segments
229 where segment = ?""", (segment,))
230 id = self.cursor.fetchone()[0]
231 self.segment_ids[segment] = id
234 def rebuild(self, metadata, reference_path):
235 """Iterate through old metadata and use it to rebuild the database.
238 metadata: An iterable containing lines of the metadata log.
239 reference_path: Path to the root of a file system tree which may be
240 similar to data in the metadata log (used to recompute block
243 for fields in cumulus.parse(metadata, lambda l: len(l) == 0):
244 metadata = cumulus.MetadataItem(fields, None)
245 # Only process regular files; skip over other types (directories,
247 if metadata.items.type not in ("-", "f"): continue
249 path = os.path.join(reference_path, metadata.items.name)
251 # TODO: Check file size for early abort if different
252 self.rebuild_file(open(path), metadata)
255 pass # Ignore the file
257 self.database.commit()
259 def rebuild_file(self, fp, metadata):
260 """Recompute database signatures if a file is unchanged.
262 If the current file contents match that from the old metadata (the
263 full-file hash matches), then recompute block- and chunk-level
264 signatures for the objects referenced by the file.
266 blocks = [cumulus.CumulusStore.parse_ref(b) for b in metadata.data()]
267 verifier = cumulus.ChecksumVerifier(metadata.items.checksum)
270 for segment, object, checksum, slice in blocks:
271 # Given a reference to a block of unknown size we don't know how to
272 # match up the data, so we have to give up on rebuilding for this
274 if slice is None: return
276 start, length, exact = slice
277 buf = fp.read(length)
281 csum = cumulus.ChecksumCreator(CHECKSUM_ALGORITHM)
283 checksums[(segment, object)] = (length, csum.compute())
285 # Compute a lower bound on the object size.
286 oldlength, csum = checksums.get((segment, object), (0, None))
287 checksums[(segment, object)] = (max(oldlength, start + length),
290 if length >= self.chunker.MINIMUM_OBJECT_SIZE or not exact:
291 signatures = self.chunker.compute_signatures(buf, start)
292 subblock.setdefault((segment, object), {}).update(signatures)
296 subblock[k] = self.chunker.dump_signatures(subblock[k])
297 self.store_checksums(checksums, subblock)
299 print "Checksum mismatch"
301 def store_checksums(self, block_checksums, subblock_signatures):
302 for (segment, object), (size, checksum) in block_checksums.iteritems():
303 segmentid = self.segment_to_id(segment)
305 """insert or ignore into block_index(segmentid, object)
308 self.cursor.execute("""select blockid from block_index
309 where segmentid = ? and object = ?""",
311 blockid = self.cursor.fetchall()[0][0]
313 # Store checksum only if it is available; we don't want to
314 # overwrite an existing checksum in the database with NULL.
315 if checksum is not None:
316 self.cursor.execute("""update block_index
318 where blockid = ?""",
321 # Update the object size. Our size may be an estimate, based on
322 # slices that we have seen. The size in the database must not be
323 # larger than the true size, but update it to the largest value
325 self.cursor.execute("""update block_index
326 set size = max(?, coalesce(size, 0))
327 where blockid = ?""",
330 # Store subblock signatures, if available.
331 # TODO: Even better would be to merge signature data, to handle the
332 # case where different files see different chunks of a block.
333 sigs = subblock_signatures.get((segment, object))
336 """insert or replace into subblock_signatures(
337 blockid, algorithm, signatures)
339 (blockid, self.chunker.ALGORITHM_NAME, buffer(sigs)))
342 class SegmentStateRebuilder(object):
343 """Reconstructs segment metadata files from raw segment data."""
346 self.filters = dict(cumulus.SEGMENT_FILTERS)
347 self.segment_pattern = cumulus.SEARCH_PATHS["segments"]
349 def compute_metadata(self, path, relative_path):
350 """Recompute metadata of a single segment.
353 path: Path to the segment file in the file system.
354 relative_path: Path relative to the root for the storage backend.
356 # Does the filename match that of a segment? If so, extract the
357 # extension to determine the filter to apply to decompress.
358 filename = os.path.basename(relative_path)
359 m = self.segment_pattern.match(filename)
361 segment_name = m.group(1)
362 extension = m.group(2)
363 if extension not in self.filters: return
364 filter_cmd = self.filters[extension]
366 # Compute attributes of the compressed segment data.
368 with open(path) as segment:
370 checksummer = cumulus.ChecksumCreator(CHECKSUM_ALGORITHM)
372 buf = segment.read(BLOCK_SIZE)
373 if len(buf) == 0: break
374 disk_size += len(buf)
375 checksummer.update(buf)
376 checksum = checksummer.compute()
378 # Compute attributes of the objects within the segment.
381 with open(path) as segment:
382 decompressed = cumulus.CumulusStore.filter_data(segment, filter_cmd)
383 objects = tarfile.open(mode='r|', fileobj=decompressed)
384 for tarinfo in objects:
385 data_size += tarinfo.size
388 return {"segment": segment_name,
389 "path": relative_path,
390 "checksum": checksum,
391 "data_size": data_size,
392 "disk_size": disk_size}
394 if __name__ == "__main__":
396 segment_rebuilder = SegmentStateRebuilder()
399 for dirpath, dirnames, filenames in os.walk(topdir):
401 files.append(os.path.join(dirpath, f))
404 metadata = segment_rebuilder.compute_metadata(
406 os.path.relpath(f, topdir))
408 for (k, v) in sorted(metadata.items()):
409 print "%s: %s" % (k, cumulus.uri_encode(str(v)))
413 # Read metadata from stdin; filter out lines starting with "@@" so the
414 # statcache file can be parsed as well.
415 metadata = (x for x in sys.stdin if not x.startswith("@@"))
417 rebuilder = DatabaseRebuilder(cumulus.LocalDatabase(sys.argv[1]))
418 rebuilder.rebuild(metadata, "/")