X-Git-Url: http://git.vrable.net/?a=blobdiff_plain;f=lbs.py;h=712b07287be07fc073979f06f9ce13df1c439988;hb=d3c10b747ecec0acc14863fc12db9661c3f88128;hp=7eab075b3628ef1a8d1e867d10ee27ed81fbf16e;hpb=b6639d4a277e55cbe4847a989d09c7e4dfa23683;p=cumulus.git diff --git a/lbs.py b/lbs.py index 7eab075..712b072 100644 --- a/lbs.py +++ b/lbs.py @@ -12,6 +12,9 @@ from __future__ import division import os, re, sha, tarfile, tempfile, thread from pysqlite2 import dbapi2 as sqlite3 +# The largest supported snapshot format that can be understood. +FORMAT_VERSION = (0, 6) # LBS Snapshot v0.6 + # Maximum number of nested indirect references allowed in a snapshot. MAX_RECURSION_DEPTH = 3 @@ -133,6 +136,10 @@ class ObjectStore: @staticmethod def parse_ref(refstr): + m = re.match(r"^zero\[(\d+)\+(\d+)\]$", refstr) + if m: + return ("zero", None, None, (int(m.group(1)), int(m.group(2)))) + m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(\d+)\+(\d+)\])?$", refstr) if not m: return @@ -187,7 +194,6 @@ class ObjectStore: def load_object(self, segment, object): path = os.path.join(self.get_cachedir(), segment, object) if not os.access(path, os.R_OK): - print "Extracting", segment self.extract_segment(segment) if segment in self.lru_list: self.lru_list.remove(segment) self.lru_list.append(segment) @@ -204,6 +210,9 @@ class ObjectStore: (segment, object, checksum, slice) = self.parse_ref(refstr) + if segment == "zero": + return "\0" * slice[1] + data = self.load_object(segment, object) if checksum is not None: @@ -262,6 +271,15 @@ def parse_full(lines): except StopIteration: return {} +def parse_metadata_version(s): + """Convert a string with the snapshot version format to a tuple.""" + + m = re.match(r"^LBS Snapshot v(\d+(\.\d+)*)$", s) + if m is None: + return () + else: + return tuple([int(d) for d in m.group(1).split(".")]) + def read_metadata(object_store, root): """Iterate through all lines in the metadata log, following references.""" @@ -391,12 +409,14 @@ MetadataItem.field_types = { 'device': MetadataItem.decode_device, 'user': MetadataItem.decode_user, 'group': MetadataItem.decode_user, + 'ctime': MetadataItem.decode_int, 'mtime': MetadataItem.decode_int, 'links': MetadataItem.decode_int, 'inode': MetadataItem.raw_str, 'checksum': MetadataItem.decode_str, 'size': MetadataItem.decode_int, 'contents': MetadataItem.decode_str, + 'target': MetadataItem.decode_str, } def iterate_metadata(object_store, root): @@ -441,17 +461,16 @@ class LocalDatabase: where snapshotid < (select max(snapshotid) from snapshots)""") - # Delete entries in the snapshot_contents table which are for - # non-existent snapshots. - cur.execute("""delete from snapshot_contents + # Delete entries in the segments_used table which are for non-existent + # snapshots. + cur.execute("""delete from segments_used where snapshotid not in (select snapshotid from snapshots)""") # Find segments which contain no objects used by any current snapshots, # and delete them from the segment table. cur.execute("""delete from segments where segmentid not in - (select distinct segmentid from snapshot_contents - natural join block_index)""") + (select segmentid from segments_used)""") # Finally, delete objects contained in non-existent segments. We can't # simply delete unused objects, since we use the set of unused objects @@ -563,18 +582,10 @@ class LocalDatabase: cur = self.cursor() - # First step: Mark all unused-and-expired objects with expired = -1, - # which will cause us to mostly ignore these objects when rebalancing. - # At the end, we will set these objects to be in group expired = 0. - # Mark expired objects which still seem to be in use with expired = 0; - # these objects will later have values set to indicate groupings of - # objects when repacking. - cur.execute("""update block_index set expired = -1 - where expired is not null""") - + # Mark all expired objects with expired = 0; these objects will later + # have values set to indicate groupings of objects when repacking. cur.execute("""update block_index set expired = 0 - where expired is not null and blockid in - (select blockid from snapshot_contents)""") + where expired is not null""") # We will want to aim for at least one full segment for each bucket # that we eventually create, but don't know how many bytes that should @@ -584,7 +595,7 @@ class LocalDatabase: # segments, but for now don't worry too much about that.) If we can't # compute an average, it's probably because there are no expired # segments, so we have no more work to do. - cur.execute("""select avg(size) from segment_info + cur.execute("""select avg(size) from segments where segmentid in (select distinct segmentid from block_index where expired is not null)""") @@ -667,6 +678,5 @@ class LocalDatabase: cutoffs.reverse() for i in range(len(cutoffs)): cur.execute("""update block_index set expired = ? - where round(? - timestamp) > ? and expired >= 0""", + where round(? - timestamp) > ?""", (i, now, cutoffs[i])) - cur.execute("update block_index set expired = 0 where expired = -1")