import os, re, sha, tarfile, tempfile, thread
from pysqlite2 import dbapi2 as sqlite3
+# The largest supported snapshot format that can be understood.
+FORMAT_VERSION = (0, 6) # LBS Snapshot v0.6
+
# Maximum number of nested indirect references allowed in a snapshot.
MAX_RECURSION_DEPTH = 3
+# All segments which have been accessed this session.
+accessed_segments = set()
+
class Struct:
"""A class which merely acts as a data container.
def cleanup(self):
if self.cachedir is not None:
# TODO: Avoid use of system, make this safer
- os.system("rm -rv " + self.cachedir)
+ os.system("rm -rf " + self.cachedir)
self.cachedir = None
@staticmethod
def parse_ref(refstr):
- m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(\d+)\+(\d+)\])?$", refstr)
+ m = re.match(r"^zero\[(\d+)\]$", refstr)
+ if m:
+ return ("zero", None, None, (0, int(m.group(1))))
+
+ m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[((\d+)\+)?(\d+)\])?$", refstr)
if not m: return
segment = m.group(1)
checksum = checksum.lstrip("(").rstrip(")")
if slice is not None:
- slice = (int(m.group(5)), int(m.group(6)))
+ if m.group(5) is None:
+ # Abbreviated slice
+ slice = (0, int(m.group(7)))
+ else:
+ slice = (int(m.group(6)), int(m.group(7)))
return (segment, object, checksum, slice)
def get_segment(self, segment):
+ accessed_segments.add(segment)
raw = self.store.lowlevel_open(segment + ".tar.gpg")
(input, output) = os.popen2("lbs-filter-gpg --decrypt")
f.close()
def load_object(self, segment, object):
+ accessed_segments.add(segment)
path = os.path.join(self.get_cachedir(), segment, object)
if not os.access(path, os.R_OK):
- print "Extracting", segment
self.extract_segment(segment)
if segment in self.lru_list: self.lru_list.remove(segment)
self.lru_list.append(segment)
while len(self.lru_list) > self.CACHE_SIZE:
- os.system("rm -rv " + os.path.join(self.cachedir, self.lru_list[0]))
+ os.system("rm -rf " + os.path.join(self.cachedir, self.lru_list[0]))
self.lru_list = self.lru_list[1:]
return open(path, 'rb').read()
(segment, object, checksum, slice) = self.parse_ref(refstr)
+ if segment == "zero":
+ return "\0" * slice[1]
+
data = self.load_object(segment, object)
if checksum is not None:
except StopIteration:
return {}
+def parse_metadata_version(s):
+ """Convert a string with the snapshot version format to a tuple."""
+
+ m = re.match(r"^LBS Snapshot v(\d+(\.\d+)*)$", s)
+ if m is None:
+ return ()
+ else:
+ return tuple([int(d) for d in m.group(1).split(".")])
+
def read_metadata(object_store, root):
"""Iterate through all lines in the metadata log, following references."""
class MetadataItem:
"""Metadata for a single file (or directory or...) from a snapshot."""
+ # Functions for parsing various datatypes that can appear in a metadata log
+ # item.
+ @staticmethod
+ def decode_int(s):
+ """Decode an integer, expressed in decimal, octal, or hexadecimal."""
+ if s.startswith("0x"):
+ return int(s, 16)
+ elif s.startswith("0"):
+ return int(s, 8)
+ else:
+ return int(s, 10)
+
+ @staticmethod
+ def decode_str(s):
+ """Decode a URI-encoded (%xx escapes) string."""
+ def hex_decode(m): return chr(int(m.group(1), 16))
+ return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
+
+ @staticmethod
+ def raw_str(s):
+ """An unecoded string."""
+ return s
+
+ @staticmethod
+ def decode_user(s):
+ """Decode a user/group to a tuple of uid/gid followed by name."""
+ items = s.split()
+ uid = MetadataItem.decode_int(items[0])
+ name = None
+ if len(items) > 1:
+ if items[1].startswith("(") and items[1].endswith(")"):
+ name = MetadataItem.decode_str(items[1][1:-1])
+ return (uid, name)
+
+ @staticmethod
+ def decode_device(s):
+ """Decode a device major/minor number."""
+ (major, minor) = map(MetadataItem.decode_int, s.split("/"))
+ return (major, minor)
+
+ class Items: pass
+
def __init__(self, fields, object_store):
"""Initialize from a dictionary of key/value pairs from metadata log."""
self.fields = fields
self.object_store = object_store
+ self.keys = []
+ self.items = self.Items()
+ for (k, v) in fields.items():
+ if k in self.field_types:
+ decoder = self.field_types[k]
+ setattr(self.items, k, decoder(v))
+ self.keys.append(k)
def data(self):
"""Return an iterator for the data blocks that make up a file."""
else:
yield ref
+# Description of fields that might appear, and how they should be parsed.
+MetadataItem.field_types = {
+ 'name': MetadataItem.decode_str,
+ 'type': MetadataItem.raw_str,
+ 'mode': MetadataItem.decode_int,
+ 'device': MetadataItem.decode_device,
+ 'user': MetadataItem.decode_user,
+ 'group': MetadataItem.decode_user,
+ 'ctime': MetadataItem.decode_int,
+ 'mtime': MetadataItem.decode_int,
+ 'links': MetadataItem.decode_int,
+ 'inode': MetadataItem.raw_str,
+ 'checksum': MetadataItem.decode_str,
+ 'size': MetadataItem.decode_int,
+ 'contents': MetadataItem.decode_str,
+ 'target': MetadataItem.decode_str,
+}
+
def iterate_metadata(object_store, root):
for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
yield MetadataItem(d, object_store)
where snapshotid < (select max(snapshotid)
from snapshots)""")
- # Delete entries in the snapshot_contents table which are for
- # non-existent snapshots.
- cur.execute("""delete from snapshot_contents
+ # Delete entries in the segments_used table which are for non-existent
+ # snapshots.
+ cur.execute("""delete from segments_used
where snapshotid not in
(select snapshotid from snapshots)""")
# Find segments which contain no objects used by any current snapshots,
# and delete them from the segment table.
cur.execute("""delete from segments where segmentid not in
- (select distinct segmentid from snapshot_contents
- natural join block_index)""")
+ (select segmentid from segments_used)""")
# Finally, delete objects contained in non-existent segments. We can't
# simply delete unused objects, since we use the set of unused objects
cur = self.cursor()
- # First step: Mark all unused-and-expired objects with expired = -1,
- # which will cause us to mostly ignore these objects when rebalancing.
- # At the end, we will set these objects to be in group expired = 0.
- # Mark expired objects which still seem to be in use with expired = 0;
- # these objects will later have values set to indicate groupings of
- # objects when repacking.
- cur.execute("""update block_index set expired = -1
- where expired is not null""")
-
+ # Mark all expired objects with expired = 0; these objects will later
+ # have values set to indicate groupings of objects when repacking.
cur.execute("""update block_index set expired = 0
- where expired is not null and blockid in
- (select blockid from snapshot_contents)""")
+ where expired is not null""")
# We will want to aim for at least one full segment for each bucket
# that we eventually create, but don't know how many bytes that should
# segments, but for now don't worry too much about that.) If we can't
# compute an average, it's probably because there are no expired
# segments, so we have no more work to do.
- cur.execute("""select avg(size) from segment_info
+ cur.execute("""select avg(size) from segments
where segmentid in
(select distinct segmentid from block_index
where expired is not null)""")
cutoffs.reverse()
for i in range(len(cutoffs)):
cur.execute("""update block_index set expired = ?
- where round(? - timestamp) > ? and expired >= 0""",
+ where round(? - timestamp) > ?""",
(i, now, cutoffs[i]))
- cur.execute("update block_index set expired = 0 where expired = -1")