Fix typo in restore.pl.
[cumulus.git] / lbs.py
diff --git a/lbs.py b/lbs.py
index 7eab075..b8de982 100644 (file)
--- a/lbs.py
+++ b/lbs.py
@@ -12,9 +12,15 @@ from __future__ import division
 import os, re, sha, tarfile, tempfile, thread
 from pysqlite2 import dbapi2 as sqlite3
 
+# The largest supported snapshot format that can be understood.
+FORMAT_VERSION = (0, 8)         # LBS Snapshot v0.8
+
 # Maximum number of nested indirect references allowed in a snapshot.
 MAX_RECURSION_DEPTH = 3
 
+# All segments which have been accessed this session.
+accessed_segments = set()
+
 class Struct:
     """A class which merely acts as a data container.
 
@@ -133,7 +139,11 @@ class ObjectStore:
 
     @staticmethod
     def parse_ref(refstr):
-        m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(\d+)\+(\d+)\])?$", refstr)
+        m = re.match(r"^zero\[(\d+)\]$", refstr)
+        if m:
+            return ("zero", None, None, (0, int(m.group(1))))
+
+        m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(((\d+)\+)?(\d+)|=(\d+))\])?$", refstr)
         if not m: return
 
         segment = m.group(1)
@@ -145,11 +155,19 @@ class ObjectStore:
             checksum = checksum.lstrip("(").rstrip(")")
 
         if slice is not None:
-            slice = (int(m.group(5)), int(m.group(6)))
+            if m.group(9) is not None:
+                # Size-assertion slice
+                slice = (0, int(m.group(9)), True)
+            elif m.group(6) is None:
+                # Abbreviated slice
+                slice = (0, int(m.group(8)), False)
+            else:
+                slice = (int(m.group(7)), int(m.group(8)), False)
 
         return (segment, object, checksum, slice)
 
     def get_segment(self, segment):
+        accessed_segments.add(segment)
         raw = self.store.lowlevel_open(segment + ".tar.gpg")
 
         (input, output) = os.popen2("lbs-filter-gpg --decrypt")
@@ -185,9 +203,9 @@ class ObjectStore:
             f.close()
 
     def load_object(self, segment, object):
+        accessed_segments.add(segment)
         path = os.path.join(self.get_cachedir(), segment, object)
         if not os.access(path, os.R_OK):
-            print "Extracting", segment
             self.extract_segment(segment)
         if segment in self.lru_list: self.lru_list.remove(segment)
         self.lru_list.append(segment)
@@ -204,6 +222,9 @@ class ObjectStore:
 
         (segment, object, checksum, slice) = self.parse_ref(refstr)
 
+        if segment == "zero":
+            return "\0" * slice[1]
+
         data = self.load_object(segment, object)
 
         if checksum is not None:
@@ -213,7 +234,8 @@ class ObjectStore:
                 raise ValueError
 
         if slice is not None:
-            (start, length) = slice
+            (start, length, exact) = slice
+            if exact and len(data) != length: raise ValueError
             data = data[start:start+length]
             if len(data) != length: raise IndexError
 
@@ -245,7 +267,7 @@ def parse(lines, terminate=None):
             last_key = None
             continue
 
-        m = re.match(r"^(\w+):\s*(.*)$", l)
+        m = re.match(r"^([-\w]+):\s*(.*)$", l)
         if m:
             dict[m.group(1)] = m.group(2)
             last_key = m.group(1)
@@ -262,6 +284,15 @@ def parse_full(lines):
     except StopIteration:
         return {}
 
+def parse_metadata_version(s):
+    """Convert a string with the snapshot version format to a tuple."""
+
+    m = re.match(r"^LBS Snapshot v(\d+(\.\d+)*)$", s)
+    if m is None:
+        return ()
+    else:
+        return tuple([int(d) for d in m.group(1).split(".")])
+
 def read_metadata(object_store, root):
     """Iterate through all lines in the metadata log, following references."""
 
@@ -391,12 +422,14 @@ MetadataItem.field_types = {
     'device': MetadataItem.decode_device,
     'user': MetadataItem.decode_user,
     'group': MetadataItem.decode_user,
+    'ctime': MetadataItem.decode_int,
     'mtime': MetadataItem.decode_int,
     'links': MetadataItem.decode_int,
     'inode': MetadataItem.raw_str,
     'checksum': MetadataItem.decode_str,
     'size': MetadataItem.decode_int,
     'contents': MetadataItem.decode_str,
+    'target': MetadataItem.decode_str,
 }
 
 def iterate_metadata(object_store, root):
@@ -431,34 +464,91 @@ class LocalDatabase:
         "Return a DB-API cursor for directly accessing the local database."
         return self.db_connection.cursor()
 
-    def garbage_collect(self):
-        """Delete entries from old snapshots from the database."""
+    def list_schemes(self):
+        """Return the list of snapshots found in the local database.
+
+        The returned value is a list of tuples (id, scheme, name, time, intent).
+        """
 
         cur = self.cursor()
+        cur.execute("select distinct scheme from snapshots")
+        schemes = [row[0] for row in cur.fetchall()]
+        schemes.sort()
+        return schemes
+
+    def garbage_collect(self, scheme, intent=1.0):
+        """Delete entries from old snapshots from the database.
+
+        Only snapshots with the specified scheme name will be deleted.  If
+        intent is given, it gives the intended next snapshot type, to determine
+        how aggressively to clean (for example, intent=7 could be used if the
+        next snapshot will be a weekly snapshot).
+        """
 
-        # Delete old snapshots.
-        cur.execute("""delete from snapshots
-                       where snapshotid < (select max(snapshotid)
-                                           from snapshots)""")
+        cur = self.cursor()
 
-        # Delete entries in the snapshot_contents table which are for
-        # non-existent snapshots.
-        cur.execute("""delete from snapshot_contents
+        # Find the id of the last snapshot to be created.  This is used for
+        # measuring time in a way: we record this value in each segment we
+        # expire on this run, and then on a future run can tell if there have
+        # been intervening backups made.
+        cur.execute("select max(snapshotid) from snapshots")
+        last_snapshotid = cur.fetchone()[0]
+
+        # Get the list of old snapshots for this scheme.  Delete all the old
+        # ones.  Rules for what to keep:
+        #   - Always keep the most recent snapshot.
+        #   - If snapshot X is younger than Y, and X has higher intent, then Y
+        #     can be deleted.
+        cur.execute("""select snapshotid, name, intent,
+                              julianday('now') - timestamp as age
+                       from snapshots where scheme = ?
+                       order by age""", (scheme,))
+
+        first = True
+        max_intent = intent
+        for (id, name, snap_intent, snap_age) in cur.fetchall():
+            can_delete = False
+            if snap_intent < max_intent:
+                # Delete small-intent snapshots if there is a more recent
+                # large-intent snapshot.
+                can_delete = True
+            elif snap_intent == intent:
+                # Delete previous snapshots with the specified intent level.
+                can_delete = True
+
+            if can_delete and not first:
+                print "Delete snapshot %d (%s)" % (id, name)
+                cur.execute("delete from snapshots where snapshotid = ?",
+                            (id,))
+            first = False
+            max_intent = max(max_intent, snap_intent)
+
+        # Delete entries in the segments_used table which are for non-existent
+        # snapshots.
+        cur.execute("""delete from segments_used
                        where snapshotid not in
                            (select snapshotid from snapshots)""")
 
         # Find segments which contain no objects used by any current snapshots,
         # and delete them from the segment table.
         cur.execute("""delete from segments where segmentid not in
-                           (select distinct segmentid from snapshot_contents
-                                natural join block_index)""")
+                           (select segmentid from segments_used)""")
 
-        # Finally, delete objects contained in non-existent segments.  We can't
-        # simply delete unused objects, since we use the set of unused objects
-        # to determine the used/free ratio of segments.
+        # Delete unused objects in the block_index table.  By "unused", we mean
+        # any object which was stored in a segment which has been deleted, and
+        # any object in a segment which was marked for cleaning and has had
+        # cleaning performed already (the expired time is less than the current
+        # largest snapshot id).
         cur.execute("""delete from block_index
-                       where segmentid not in
-                           (select segmentid from segments)""")
+                       where segmentid not in (select segmentid from segments)
+                          or segmentid in (select segmentid from segments
+                                           where expire_time < ?)""",
+                    (last_snapshotid,))
+
+        # Remove sub-block signatures for deleted objects.
+        cur.execute("""delete from subblock_signatures
+                       where blockid not in
+                           (select blockid from block_index)""")
 
     # Segment cleaning.
     class SegmentInfo(Struct): pass
@@ -483,7 +573,8 @@ class LocalDatabase:
         cur = self.cursor()
         segments = []
         cur.execute("""select segmentid, used, size, mtime,
-                       julianday('now') - mtime as age from segment_info""")
+                       julianday('now') - mtime as age from segment_info
+                       where expire_time is null""")
         for row in cur:
             info = self.SegmentInfo()
             info.id = row[0]
@@ -492,6 +583,12 @@ class LocalDatabase:
             info.mtime = row[3]
             info.age_days = row[4]
 
+            # If data is not available for whatever reason, treat it as 0.0.
+            if info.age_days is None:
+                info.age_days = 0.0
+            if info.used_bytes is None:
+                info.used_bytes = 0.0
+
             # Benefit calculation: u is the estimated fraction of each segment
             # which is utilized (bytes belonging to objects still in use
             # divided by total size; this doesn't take compression or storage
@@ -530,7 +627,11 @@ class LocalDatabase:
             raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
 
         cur = self.cursor()
-        cur.execute("update block_index set expired = 1 where segmentid = ?",
+        cur.execute("select max(snapshotid) from snapshots")
+        last_snapshotid = cur.fetchone()[0]
+        cur.execute("update segments set expire_time = ? where segmentid = ?",
+                    (last_snapshotid, id))
+        cur.execute("update block_index set expired = 0 where segmentid = ?",
                     (id,))
 
     def balance_expired_objects(self):
@@ -563,18 +664,10 @@ class LocalDatabase:
 
         cur = self.cursor()
 
-        # First step: Mark all unused-and-expired objects with expired = -1,
-        # which will cause us to mostly ignore these objects when rebalancing.
-        # At the end, we will set these objects to be in group expired = 0.
-        # Mark expired objects which still seem to be in use with expired = 0;
-        # these objects will later have values set to indicate groupings of
-        # objects when repacking.
-        cur.execute("""update block_index set expired = -1
-                       where expired is not null""")
-
+        # Mark all expired objects with expired = 0; these objects will later
+        # have values set to indicate groupings of objects when repacking.
         cur.execute("""update block_index set expired = 0
-                       where expired is not null and blockid in
-                           (select blockid from snapshot_contents)""")
+                       where expired is not null""")
 
         # We will want to aim for at least one full segment for each bucket
         # that we eventually create, but don't know how many bytes that should
@@ -584,7 +677,7 @@ class LocalDatabase:
         # segments, but for now don't worry too much about that.)  If we can't
         # compute an average, it's probably because there are no expired
         # segments, so we have no more work to do.
-        cur.execute("""select avg(size) from segment_info
+        cur.execute("""select avg(size) from segments
                        where segmentid in
                            (select distinct segmentid from block_index
                             where expired is not null)""")
@@ -667,6 +760,6 @@ class LocalDatabase:
         cutoffs.reverse()
         for i in range(len(cutoffs)):
             cur.execute("""update block_index set expired = ?
-                           where round(? - timestamp) > ? and expired >= 0""",
+                           where round(? - timestamp) > ?
+                             and expired is not null""",
                         (i, now, cutoffs[i]))
-        cur.execute("update block_index set expired = 0 where expired = -1")