X-Git-Url: http://git.vrable.net/?a=blobdiff_plain;f=lbs.py;h=507b9e13bcef1fb5ec078637ed1f95d588ca9245;hb=048815121720c9cfb0bc8df48efd9aa0c846e3d0;hp=3f647c46a7eea2e7d98f0f54c75a80a93daf7cac;hpb=0ccc8bda8094a3a3ad9cec58916c0a740dd33628;p=cumulus.git diff --git a/lbs.py b/lbs.py index 3f647c4..507b9e1 100644 --- a/lbs.py +++ b/lbs.py @@ -13,11 +13,14 @@ import os, re, sha, tarfile, tempfile, thread from pysqlite2 import dbapi2 as sqlite3 # The largest supported snapshot format that can be understood. -FORMAT_VERSION = (0, 2) # LBS Snapshot v0.2 +FORMAT_VERSION = (0, 6) # LBS Snapshot v0.6 # Maximum number of nested indirect references allowed in a snapshot. MAX_RECURSION_DEPTH = 3 +# All segments which have been accessed this session. +accessed_segments = set() + class Struct: """A class which merely acts as a data container. @@ -136,7 +139,11 @@ class ObjectStore: @staticmethod def parse_ref(refstr): - m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(\d+)\+(\d+)\])?$", refstr) + m = re.match(r"^zero\[(\d+)\]$", refstr) + if m: + return ("zero", None, None, (0, int(m.group(1)))) + + m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[((\d+)\+)?(\d+)\])?$", refstr) if not m: return segment = m.group(1) @@ -148,11 +155,16 @@ class ObjectStore: checksum = checksum.lstrip("(").rstrip(")") if slice is not None: - slice = (int(m.group(5)), int(m.group(6))) + if m.group(5) is None: + # Abbreviated slice + slice = (0, int(m.group(7))) + else: + slice = (int(m.group(6)), int(m.group(7))) return (segment, object, checksum, slice) def get_segment(self, segment): + accessed_segments.add(segment) raw = self.store.lowlevel_open(segment + ".tar.gpg") (input, output) = os.popen2("lbs-filter-gpg --decrypt") @@ -188,6 +200,7 @@ class ObjectStore: f.close() def load_object(self, segment, object): + accessed_segments.add(segment) path = os.path.join(self.get_cachedir(), segment, object) if not os.access(path, os.R_OK): self.extract_segment(segment) @@ -206,6 +219,9 @@ class ObjectStore: (segment, object, checksum, slice) = self.parse_ref(refstr) + if segment == "zero": + return "\0" * slice[1] + data = self.load_object(segment, object) if checksum is not None: @@ -247,7 +263,7 @@ def parse(lines, terminate=None): last_key = None continue - m = re.match(r"^(\w+):\s*(.*)$", l) + m = re.match(r"^([-\w]+):\s*(.*)$", l) if m: dict[m.group(1)] = m.group(2) last_key = m.group(1) @@ -402,12 +418,14 @@ MetadataItem.field_types = { 'device': MetadataItem.decode_device, 'user': MetadataItem.decode_user, 'group': MetadataItem.decode_user, + 'ctime': MetadataItem.decode_int, 'mtime': MetadataItem.decode_int, 'links': MetadataItem.decode_int, 'inode': MetadataItem.raw_str, 'checksum': MetadataItem.decode_str, 'size': MetadataItem.decode_int, 'contents': MetadataItem.decode_str, + 'target': MetadataItem.decode_str, } def iterate_metadata(object_store, root): @@ -442,34 +460,91 @@ class LocalDatabase: "Return a DB-API cursor for directly accessing the local database." return self.db_connection.cursor() - def garbage_collect(self): - """Delete entries from old snapshots from the database.""" + def list_schemes(self): + """Return the list of snapshots found in the local database. + + The returned value is a list of tuples (id, scheme, name, time, intent). + """ cur = self.cursor() + cur.execute("select distinct scheme from snapshots") + schemes = [row[0] for row in cur.fetchall()] + schemes.sort() + return schemes + + def garbage_collect(self, scheme, intent=1.0): + """Delete entries from old snapshots from the database. + + Only snapshots with the specified scheme name will be deleted. If + intent is given, it gives the intended next snapshot type, to determine + how aggressively to clean (for example, intent=7 could be used if the + next snapshot will be a weekly snapshot). + """ - # Delete old snapshots. - cur.execute("""delete from snapshots - where snapshotid < (select max(snapshotid) - from snapshots)""") + cur = self.cursor() - # Delete entries in the snapshot_contents table which are for - # non-existent snapshots. - cur.execute("""delete from snapshot_contents + # Find the id of the last snapshot to be created. This is used for + # measuring time in a way: we record this value in each segment we + # expire on this run, and then on a future run can tell if there have + # been intervening backups made. + cur.execute("select max(snapshotid) from snapshots") + last_snapshotid = cur.fetchone()[0] + + # Get the list of old snapshots for this scheme. Delete all the old + # ones. Rules for what to keep: + # - Always keep the most recent snapshot. + # - If snapshot X is younger than Y, and X has higher intent, then Y + # can be deleted. + cur.execute("""select snapshotid, name, intent, + julianday('now') - timestamp as age + from snapshots where scheme = ? + order by age""", (scheme,)) + + first = True + max_intent = intent + for (id, name, snap_intent, snap_age) in cur.fetchall(): + can_delete = False + if snap_intent < max_intent: + # Delete small-intent snapshots if there is a more recent + # large-intent snapshot. + can_delete = True + elif snap_intent == intent: + # Delete previous snapshots with the specified intent level. + can_delete = True + + if can_delete and not first: + print "Delete snapshot %d (%s)" % (id, name) + cur.execute("delete from snapshots where snapshotid = ?", + (id,)) + first = False + max_intent = max(max_intent, snap_intent) + + # Delete entries in the segments_used table which are for non-existent + # snapshots. + cur.execute("""delete from segments_used where snapshotid not in (select snapshotid from snapshots)""") # Find segments which contain no objects used by any current snapshots, # and delete them from the segment table. cur.execute("""delete from segments where segmentid not in - (select distinct segmentid from snapshot_contents - natural join block_index)""") + (select segmentid from segments_used)""") - # Finally, delete objects contained in non-existent segments. We can't - # simply delete unused objects, since we use the set of unused objects - # to determine the used/free ratio of segments. + # Delete unused objects in the block_index table. By "unused", we mean + # any object which was stored in a segment which has been deleted, and + # any object in a segment which was marked for cleaning and has had + # cleaning performed already (the expired time is less than the current + # largest snapshot id). cur.execute("""delete from block_index - where segmentid not in - (select segmentid from segments)""") + where segmentid not in (select segmentid from segments) + or segmentid in (select segmentid from segments + where expire_time < ?)""", + (last_snapshotid,)) + + # Remove sub-block signatures for deleted objects. + cur.execute("""delete from subblock_signatures + where blockid not in + (select blockid from block_index)""") # Segment cleaning. class SegmentInfo(Struct): pass @@ -494,7 +569,8 @@ class LocalDatabase: cur = self.cursor() segments = [] cur.execute("""select segmentid, used, size, mtime, - julianday('now') - mtime as age from segment_info""") + julianday('now') - mtime as age from segment_info + where expire_time is null""") for row in cur: info = self.SegmentInfo() info.id = row[0] @@ -503,6 +579,10 @@ class LocalDatabase: info.mtime = row[3] info.age_days = row[4] + # If age is not available for whatever reason, treat it as 0.0. + if info.age_days is None: + info.age_days = 0.0 + # Benefit calculation: u is the estimated fraction of each segment # which is utilized (bytes belonging to objects still in use # divided by total size; this doesn't take compression or storage @@ -541,7 +621,11 @@ class LocalDatabase: raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment))) cur = self.cursor() - cur.execute("update block_index set expired = 1 where segmentid = ?", + cur.execute("select max(snapshotid) from snapshots") + last_snapshotid = cur.fetchone()[0] + cur.execute("update segments set expire_time = ? where segmentid = ?", + (last_snapshotid, id)) + cur.execute("update block_index set expired = 0 where segmentid = ?", (id,)) def balance_expired_objects(self): @@ -574,18 +658,10 @@ class LocalDatabase: cur = self.cursor() - # First step: Mark all unused-and-expired objects with expired = -1, - # which will cause us to mostly ignore these objects when rebalancing. - # At the end, we will set these objects to be in group expired = 0. - # Mark expired objects which still seem to be in use with expired = 0; - # these objects will later have values set to indicate groupings of - # objects when repacking. - cur.execute("""update block_index set expired = -1 - where expired is not null""") - + # Mark all expired objects with expired = 0; these objects will later + # have values set to indicate groupings of objects when repacking. cur.execute("""update block_index set expired = 0 - where expired is not null and blockid in - (select blockid from snapshot_contents)""") + where expired is not null""") # We will want to aim for at least one full segment for each bucket # that we eventually create, but don't know how many bytes that should @@ -595,7 +671,7 @@ class LocalDatabase: # segments, but for now don't worry too much about that.) If we can't # compute an average, it's probably because there are no expired # segments, so we have no more work to do. - cur.execute("""select avg(size) from segment_info + cur.execute("""select avg(size) from segments where segmentid in (select distinct segmentid from block_index where expired is not null)""") @@ -678,6 +754,6 @@ class LocalDatabase: cutoffs.reverse() for i in range(len(cutoffs)): cur.execute("""update block_index set expired = ? - where round(? - timestamp) > ? and expired >= 0""", + where round(? - timestamp) > ? + and expired is not null""", (i, now, cutoffs[i])) - cur.execute("update block_index set expired = 0 where expired = -1")