Improve tracking of segments and segment utilization.
[cumulus.git] / python / cumulus / __init__.py
index 51d3ee8..a40c58d 100644 (file)
@@ -1,7 +1,25 @@
-"""High-level interface for working with LBS archives.
+# Cumulus: Efficient Filesystem Backup to the Cloud
+# Copyright (C) 2008-2009, 2012 The Cumulus Developers
+# See the AUTHORS file for a list of contributors.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""High-level interface for working with Cumulus archives.
 
 This module provides an easy interface for reading from and manipulating
-various parts of an LBS archive:
+various parts of a Cumulus archive:
   - listing the snapshots and segments present
   - reading segment contents
   - parsing snapshot descriptors and snapshot metadata logs
@@ -9,13 +27,13 @@ various parts of an LBS archive:
 """
 
 from __future__ import division
-import os, re, sha, tarfile, tempfile, thread
+import hashlib, os, re, tarfile, tempfile, thread
 from pysqlite2 import dbapi2 as sqlite3
 
 import cumulus.store, cumulus.store.file
 
 # The largest supported snapshot format that can be understood.
-FORMAT_VERSION = (0, 8)         # LBS Snapshot v0.8
+FORMAT_VERSION = (0, 11)        # Cumulus Snapshot v0.11
 
 # Maximum number of nested indirect references allowed in a snapshot.
 MAX_RECURSION_DEPTH = 3
@@ -23,6 +41,28 @@ MAX_RECURSION_DEPTH = 3
 # All segments which have been accessed this session.
 accessed_segments = set()
 
+# Table of methods used to filter segments before storage, and corresponding
+# filename extensions.  These are listed in priority order (methods earlier in
+# the list are tried first).
+SEGMENT_FILTERS = [
+    (".gpg", "cumulus-filter-gpg --decrypt"),
+    (".gz", "gzip -dc"),
+    (".bz2", "bzip2 -dc"),
+]
+
+def uri_decode(s):
+    """Decode a URI-encoded (%xx escapes) string."""
+    def hex_decode(m): return chr(int(m.group(1), 16))
+    return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
+def uri_encode(s):
+    """Encode a string to URI-encoded (%xx escapes) form."""
+    def hex_encode(c):
+        if c > '+' and c < '\x7f' and c != '@':
+            return c
+        else:
+            return "%%%02x" % (ord(c),)
+    return ''.join(hex_encode(c) for c in s)
+
 class Struct:
     """A class which merely acts as a data container.
 
@@ -34,11 +74,13 @@ class Struct:
         return "<%s %s>" % (self.__class__, self.__dict__)
 
 CHECKSUM_ALGORITHMS = {
-    'sha1': sha.new
+    'sha1': hashlib.sha1,
+    'sha224': hashlib.sha224,
+    'sha256': hashlib.sha256,
 }
 
 class ChecksumCreator:
-    """Compute an LBS checksum for provided data.
+    """Compute a Cumulus checksum for provided data.
 
     The algorithm used is selectable, but currently defaults to sha1.
     """
@@ -84,7 +126,9 @@ class LowlevelDataStore:
     """
 
     def __init__(self, path):
-        if path.find(":") >= 0:
+        if isinstance(path, cumulus.store.Store):
+            self.store = path
+        elif path.find(":") >= 0:
             self.store = cumulus.store.open(path)
         else:
             self.store = cumulus.store.file.FileStore(path)
@@ -95,6 +139,9 @@ class LowlevelDataStore:
                 return (t, filename)
         return (None, filename)
 
+    def scan(self):
+        self.store.scan()
+
     def lowlevel_open(self, filename):
         """Return a file-like object for reading data from the given file."""
 
@@ -171,19 +218,26 @@ class ObjectStore:
 
     def get_segment(self, segment):
         accessed_segments.add(segment)
-        raw = self.store.lowlevel_open(segment + ".tar.gpg")
 
-        (input, output) = os.popen2("lbs-filter-gpg --decrypt")
-        def copy_thread(src, dst):
-            BLOCK_SIZE = 4096
-            while True:
-                block = src.read(BLOCK_SIZE)
-                if len(block) == 0: break
-                dst.write(block)
-            dst.close()
+        for (extension, filter) in SEGMENT_FILTERS:
+            try:
+                raw = self.store.lowlevel_open(segment + ".tar" + extension)
+
+                (input, output) = os.popen2(filter)
+                def copy_thread(src, dst):
+                    BLOCK_SIZE = 4096
+                    while True:
+                        block = src.read(BLOCK_SIZE)
+                        if len(block) == 0: break
+                        dst.write(block)
+                    dst.close()
 
-        thread.start_new_thread(copy_thread, (raw, input))
-        return output
+                thread.start_new_thread(copy_thread, (raw, input))
+                return output
+            except:
+                pass
+
+        raise cumulus.store.NotFoundError
 
     def load_segment(self, segment):
         seg = tarfile.open(segment, 'r|', self.get_segment(segment))
@@ -290,7 +344,7 @@ def parse_full(lines):
 def parse_metadata_version(s):
     """Convert a string with the snapshot version format to a tuple."""
 
-    m = re.match(r"^LBS Snapshot v(\d+(\.\d+)*)$", s)
+    m = re.match(r"^(?:Cumulus|LBS) Snapshot v(\d+(\.\d+)*)$", s)
     if m is None:
         return ()
     else:
@@ -347,8 +401,7 @@ class MetadataItem:
     @staticmethod
     def decode_str(s):
         """Decode a URI-encoded (%xx escapes) string."""
-        def hex_decode(m): return chr(int(m.group(1), 16))
-        return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
+        return uri_decode(s)
 
     @staticmethod
     def raw_str(s):
@@ -479,7 +532,26 @@ class LocalDatabase:
         schemes.sort()
         return schemes
 
-    def garbage_collect(self, scheme, intent=1.0):
+    def list_snapshots(self, scheme):
+        """Return a list of snapshots for the given scheme."""
+        cur = self.cursor()
+        cur.execute("select name from snapshots")
+        snapshots = [row[0] for row in cur.fetchall()]
+        snapshots.sort()
+        return snapshots
+
+    def delete_snapshot(self, scheme, name):
+        """Remove the specified snapshot from the database.
+
+        Warning: This does not garbage collect all dependent data in the
+        database, so it must be followed by a call to garbage_collect() to make
+        the database consistent.
+        """
+        cur = self.cursor()
+        cur.execute("delete from snapshots where scheme = ? and name = ?",
+                    (scheme, name))
+
+    def prune_old_snapshots(self, scheme, intent=1.0):
         """Delete entries from old snapshots from the database.
 
         Only snapshots with the specified scheme name will be deleted.  If
@@ -526,6 +598,16 @@ class LocalDatabase:
             first = False
             max_intent = max(max_intent, snap_intent)
 
+        self.garbage_collect()
+
+    def garbage_collect(self):
+        """Garbage-collect unreachable segment and object data.
+
+        Remove all segments and checksums which is not reachable from the
+        current set of snapshots stored in the local database.
+        """
+        cur = self.cursor()
+
         # Delete entries in the segments_used table which are for non-existent
         # snapshots.
         cur.execute("""delete from segments_used
@@ -537,16 +619,10 @@ class LocalDatabase:
         cur.execute("""delete from segments where segmentid not in
                            (select segmentid from segments_used)""")
 
-        # Delete unused objects in the block_index table.  By "unused", we mean
-        # any object which was stored in a segment which has been deleted, and
-        # any object in a segment which was marked for cleaning and has had
-        # cleaning performed already (the expired time is less than the current
-        # largest snapshot id).
+        # Delete dangling objects in the block_index table.
         cur.execute("""delete from block_index
-                       where segmentid not in (select segmentid from segments)
-                          or segmentid in (select segmentid from segments
-                                           where expire_time < ?)""",
-                    (last_snapshotid,))
+                       where segmentid not in
+                           (select segmentid from segments)""")
 
         # Remove sub-block signatures for deleted objects.
         cur.execute("""delete from subblock_signatures
@@ -649,7 +725,7 @@ class LocalDatabase:
         """
 
         # The expired column of the block_index table is used when generating a
-        # new LBS snapshot.  A null value indicates that an object may be
+        # new Cumulus snapshot.  A null value indicates that an object may be
         # re-used.  Otherwise, an object must be written into a new segment if
         # needed.  Objects with distinct expired values will be written into
         # distinct segments, to allow for some grouping by age.  The value 0 is