from pysqlite2 import dbapi2 as sqlite3
# The largest supported snapshot format that can be understood.
-FORMAT_VERSION = (0, 2) # LBS Snapshot v0.2
+FORMAT_VERSION = (0, 6) # LBS Snapshot v0.6
# Maximum number of nested indirect references allowed in a snapshot.
MAX_RECURSION_DEPTH = 3
+# All segments which have been accessed this session.
+accessed_segments = set()
+
class Struct:
"""A class which merely acts as a data container.
@staticmethod
def parse_ref(refstr):
- m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(\d+)\+(\d+)\])?$", refstr)
+ m = re.match(r"^zero\[(\d+)\]$", refstr)
+ if m:
+ return ("zero", None, None, (0, int(m.group(1))))
+
+ m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[((\d+)\+)?(\d+)\])?$", refstr)
if not m: return
segment = m.group(1)
checksum = checksum.lstrip("(").rstrip(")")
if slice is not None:
- slice = (int(m.group(5)), int(m.group(6)))
+ if m.group(5) is None:
+ # Abbreviated slice
+ slice = (0, int(m.group(7)))
+ else:
+ slice = (int(m.group(6)), int(m.group(7)))
return (segment, object, checksum, slice)
def get_segment(self, segment):
+ accessed_segments.add(segment)
raw = self.store.lowlevel_open(segment + ".tar.gpg")
(input, output) = os.popen2("lbs-filter-gpg --decrypt")
f.close()
def load_object(self, segment, object):
+ accessed_segments.add(segment)
path = os.path.join(self.get_cachedir(), segment, object)
if not os.access(path, os.R_OK):
self.extract_segment(segment)
(segment, object, checksum, slice) = self.parse_ref(refstr)
+ if segment == "zero":
+ return "\0" * slice[1]
+
data = self.load_object(segment, object)
if checksum is not None:
last_key = None
continue
- m = re.match(r"^(\w+):\s*(.*)$", l)
+ m = re.match(r"^([-\w]+):\s*(.*)$", l)
if m:
dict[m.group(1)] = m.group(2)
last_key = m.group(1)
'device': MetadataItem.decode_device,
'user': MetadataItem.decode_user,
'group': MetadataItem.decode_user,
+ 'ctime': MetadataItem.decode_int,
'mtime': MetadataItem.decode_int,
'links': MetadataItem.decode_int,
'inode': MetadataItem.raw_str,
'checksum': MetadataItem.decode_str,
'size': MetadataItem.decode_int,
'contents': MetadataItem.decode_str,
+ 'target': MetadataItem.decode_str,
}
def iterate_metadata(object_store, root):
"Return a DB-API cursor for directly accessing the local database."
return self.db_connection.cursor()
- def garbage_collect(self):
- """Delete entries from old snapshots from the database."""
+ def list_schemes(self):
+ """Return the list of snapshots found in the local database.
+
+ The returned value is a list of tuples (id, scheme, name, time, intent).
+ """
cur = self.cursor()
+ cur.execute("select distinct scheme from snapshots")
+ schemes = [row[0] for row in cur.fetchall()]
+ schemes.sort()
+ return schemes
+
+ def garbage_collect(self, scheme, intent=1.0):
+ """Delete entries from old snapshots from the database.
+
+ Only snapshots with the specified scheme name will be deleted. If
+ intent is given, it gives the intended next snapshot type, to determine
+ how aggressively to clean (for example, intent=7 could be used if the
+ next snapshot will be a weekly snapshot).
+ """
- # Delete old snapshots.
- cur.execute("""delete from snapshots
- where snapshotid < (select max(snapshotid)
- from snapshots)""")
+ cur = self.cursor()
- # Delete entries in the snapshot_contents table which are for
- # non-existent snapshots.
- cur.execute("""delete from snapshot_contents
+ # Find the id of the last snapshot to be created. This is used for
+ # measuring time in a way: we record this value in each segment we
+ # expire on this run, and then on a future run can tell if there have
+ # been intervening backups made.
+ cur.execute("select max(snapshotid) from snapshots")
+ last_snapshotid = cur.fetchone()[0]
+
+ # Get the list of old snapshots for this scheme. Delete all the old
+ # ones. Rules for what to keep:
+ # - Always keep the most recent snapshot.
+ # - If snapshot X is younger than Y, and X has higher intent, then Y
+ # can be deleted.
+ cur.execute("""select snapshotid, name, intent,
+ julianday('now') - timestamp as age
+ from snapshots where scheme = ?
+ order by age""", (scheme,))
+
+ first = True
+ max_intent = intent
+ for (id, name, snap_intent, snap_age) in cur.fetchall():
+ can_delete = False
+ if snap_intent < max_intent:
+ # Delete small-intent snapshots if there is a more recent
+ # large-intent snapshot.
+ can_delete = True
+ elif snap_intent == intent:
+ # Delete previous snapshots with the specified intent level.
+ can_delete = True
+
+ if can_delete and not first:
+ print "Delete snapshot %d (%s)" % (id, name)
+ cur.execute("delete from snapshots where snapshotid = ?",
+ (id,))
+ first = False
+ max_intent = max(max_intent, snap_intent)
+
+ # Delete entries in the segments_used table which are for non-existent
+ # snapshots.
+ cur.execute("""delete from segments_used
where snapshotid not in
(select snapshotid from snapshots)""")
# Find segments which contain no objects used by any current snapshots,
# and delete them from the segment table.
cur.execute("""delete from segments where segmentid not in
- (select distinct segmentid from snapshot_contents
- natural join block_index)""")
+ (select segmentid from segments_used)""")
- # Finally, delete objects contained in non-existent segments. We can't
- # simply delete unused objects, since we use the set of unused objects
- # to determine the used/free ratio of segments.
+ # Delete unused objects in the block_index table. By "unused", we mean
+ # any object which was stored in a segment which has been deleted, and
+ # any object in a segment which was marked for cleaning and has had
+ # cleaning performed already (the expired time is less than the current
+ # largest snapshot id).
cur.execute("""delete from block_index
- where segmentid not in
- (select segmentid from segments)""")
+ where segmentid not in (select segmentid from segments)
+ or segmentid in (select segmentid from segments
+ where expire_time < ?)""",
+ (last_snapshotid,))
+
+ # Remove sub-block signatures for deleted objects.
+ cur.execute("""delete from subblock_signatures
+ where blockid not in
+ (select blockid from block_index)""")
# Segment cleaning.
class SegmentInfo(Struct): pass
cur = self.cursor()
segments = []
cur.execute("""select segmentid, used, size, mtime,
- julianday('now') - mtime as age from segment_info""")
+ julianday('now') - mtime as age from segment_info
+ where expire_time is null""")
for row in cur:
info = self.SegmentInfo()
info.id = row[0]
info.mtime = row[3]
info.age_days = row[4]
+ # If age is not available for whatever reason, treat it as 0.0.
+ if info.age_days is None:
+ info.age_days = 0.0
+
# Benefit calculation: u is the estimated fraction of each segment
# which is utilized (bytes belonging to objects still in use
# divided by total size; this doesn't take compression or storage
raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
cur = self.cursor()
- cur.execute("update block_index set expired = 1 where segmentid = ?",
+ cur.execute("select max(snapshotid) from snapshots")
+ last_snapshotid = cur.fetchone()[0]
+ cur.execute("update segments set expire_time = ? where segmentid = ?",
+ (last_snapshotid, id))
+ cur.execute("update block_index set expired = 0 where segmentid = ?",
(id,))
def balance_expired_objects(self):
cur = self.cursor()
- # First step: Mark all unused-and-expired objects with expired = -1,
- # which will cause us to mostly ignore these objects when rebalancing.
- # At the end, we will set these objects to be in group expired = 0.
- # Mark expired objects which still seem to be in use with expired = 0;
- # these objects will later have values set to indicate groupings of
- # objects when repacking.
- cur.execute("""update block_index set expired = -1
- where expired is not null""")
-
+ # Mark all expired objects with expired = 0; these objects will later
+ # have values set to indicate groupings of objects when repacking.
cur.execute("""update block_index set expired = 0
- where expired is not null and blockid in
- (select blockid from snapshot_contents)""")
+ where expired is not null""")
# We will want to aim for at least one full segment for each bucket
# that we eventually create, but don't know how many bytes that should
# segments, but for now don't worry too much about that.) If we can't
# compute an average, it's probably because there are no expired
# segments, so we have no more work to do.
- cur.execute("""select avg(size) from segment_info
+ cur.execute("""select avg(size) from segments
where segmentid in
(select distinct segmentid from block_index
where expired is not null)""")
cutoffs.reverse()
for i in range(len(cutoffs)):
cur.execute("""update block_index set expired = ?
- where round(? - timestamp) > ? and expired >= 0""",
+ where round(? - timestamp) > ?
+ and expired is not null""",
(i, now, cutoffs[i]))
- cur.execute("update block_index set expired = 0 where expired = -1")