Improve tracking of segments and segment utilization.
authorMichael Vrable <vrable@cs.hmc.edu>
Fri, 21 Sep 2012 22:50:14 +0000 (15:50 -0700)
committerMichael Vrable <vrable@cs.hmc.edu>
Wed, 22 Jan 2014 05:11:09 +0000 (21:11 -0800)
Update the local database and cumulus binary to keep better track of
segment utilization data, with an eventual goal of improving cleaning
algorithms.

14 files changed:
localdb.cc
localdb.h
main.cc
metadata.cc
python/cumulus/__init__.py
python/cumulus/cmd_util.py
python/cumulus/config.py
python/cumulus/main.py
python/cumulus/retention.py
ref.h
schema.sql
store.cc
store.h
subfile.cc

index 3fcbafe..ca83559 100644 (file)
 #include "store.h"
 #include "util.h"
 
+using std::max;
 using std::min;
+using std::set;
 using std::string;
 
+static const int SCHEMA_MAJOR = 0;
+static const int SCHEMA_MINOR = 11;
+
 /* Helper function to prepare a statement for execution in the current
  * database. */
 sqlite3_stmt *LocalDb::Prepare(const char *sql)
@@ -85,14 +90,33 @@ void LocalDb::Open(const char *path, const char *snapshot_name,
 
     sqlite3_extended_result_codes(db, 1);
 
+    /* Check that the local database format is the correct version; if not,
+     * report an error. */
+    sqlite3_stmt *stmt = Prepare("select major, minor from schema_version");
+
+    rc = sqlite3_step(stmt);
+    if (rc != SQLITE_ROW) {
+        fatal("Unable to read local database version from database");
+    } else if (rc == SQLITE_ROW) {
+        int major = sqlite3_column_int(stmt, 0);
+        int minor = sqlite3_column_int(stmt, 1);
+        if (major != SCHEMA_MAJOR || minor != SCHEMA_MINOR) {
+            fprintf(stderr,
+                    "Local database does not have required schema version!\n"
+                    "  expected: %d.%d, found: %d.%d\n",
+                    SCHEMA_MAJOR, SCHEMA_MINOR, major, minor);
+            fatal("Unable to continue.");
+        }
+    }
+    sqlite3_finalize(stmt);
+
     if (snapshot_scheme == NULL)
         snapshot_scheme = "";
 
     /* Insert this snapshot into the database, and determine the integer key
      * which will be used to identify it. */
-    sqlite3_stmt *stmt = Prepare("insert into "
-                                 "snapshots(name, scheme, timestamp, intent) "
-                                 "values (?, ?, julianday('now'), ?)");
+    stmt = Prepare("insert into snapshots(name, scheme, timestamp, intent) "
+                   "values (?, ?, julianday('now'), ?)");
     sqlite3_bind_text(stmt, 1, snapshot_name, strlen(snapshot_name),
                       SQLITE_TRANSIENT);
     sqlite3_bind_text(stmt, 2, snapshot_scheme, strlen(snapshot_scheme),
@@ -143,22 +167,13 @@ void LocalDb::Close()
 {
     int rc;
 
-    /* Summarize the snapshot_refs table into segments_used. */
+    /* Summarize the snapshot_refs table into segment_utilization. */
     sqlite3_stmt *stmt = Prepare(
-        "insert or replace into segments_used "
-        "select ? as snapshotid, segmentid, max(utilization) from ("
-        "    select segmentid, cast(used as real) / size as utilization "
-        "    from "
-        "    (select segmentid, sum(size) as used from snapshot_refs "
-        "       group by segmentid) "
-        "    join segments using (segmentid) "
-        "  union "
-        "    select segmentid, utilization from segments_used "
-        "    where snapshotid = ? "
-        ") group by segmentid"
+        "insert or replace into segment_utilization "
+        "select ? as snapshotid, segmentid, sum(size) "
+        "from snapshot_refs group by segmentid"
     );
     sqlite3_bind_int64(stmt, 1, snapshotid);
-    sqlite3_bind_int64(stmt, 2, snapshotid);
     rc = sqlite3_step(stmt);
     if (rc != SQLITE_OK && rc != SQLITE_DONE) {
         ReportError(rc);
@@ -232,13 +247,16 @@ string LocalDb::IdToSegment(int64_t segmentid)
     return result;
 }
 
-void LocalDb::StoreObject(const ObjectReference& ref,
-                          const string &checksum, int64_t size,
-                          double age)
+void LocalDb::StoreObject(const ObjectReference& ref, double age)
 {
     int rc;
     sqlite3_stmt *stmt;
 
+    assert(ref.has_checksum());
+    string checksum = ref.get_checksum();
+    assert(ref.range_is_exact());
+    int64_t size = ref.get_range_length();
+
     if (age == 0.0) {
         stmt = Prepare("insert into block_index("
                        "segmentid, object, checksum, size, timestamp) "
@@ -370,6 +388,33 @@ bool LocalDb::IsAvailable(const ObjectReference &ref)
     return found;
 }
 
+set<string> LocalDb::GetUsedSegments()
+{
+    int rc;
+    sqlite3_stmt *stmt;
+    set<string> result;
+
+    stmt = Prepare("select segment from segments "
+                   "where segmentid in (select segmentid from snapshot_refs)");
+
+    while (true) {
+        rc = sqlite3_step(stmt);
+        if (rc == SQLITE_ROW) {
+            const char *segment
+                = reinterpret_cast<const char *>(sqlite3_column_text(stmt, 0));
+            result.insert(segment);
+        } else if (rc == SQLITE_DONE) {
+            break;
+        } else {
+            ReportError(rc);
+        }
+    }
+
+    sqlite3_finalize(stmt);
+
+    return result;
+}
+
 void LocalDb::UseObject(const ObjectReference& ref)
 {
     int rc;
@@ -390,31 +435,46 @@ void LocalDb::UseObject(const ObjectReference& ref)
     }
     sqlite3_finalize(stmt);
 
-    int64_t block_size = 0;
-    stmt = Prepare("select size from block_index "
-                   "where segmentid = ? and object = ?");
-    sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
-    obj = ref.get_sequence();
-    sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT);
-    rc = sqlite3_step(stmt);
-    if (rc == SQLITE_ROW) {
-        block_size = sqlite3_column_int64(stmt, 0);
+    // Attempt to determine the underlying size of the object.  This may
+    // require a database lookup if the length is not encoded into the object
+    // reference already.
+    int64_t object_size = 0;
+    if (ref.range_is_exact()) {
+        object_size = ref.get_range_length();
     } else {
-        string refstr = ref.to_string();
-        fprintf(stderr, "No block found in block_index for %s\n",
-                refstr.c_str());
+        stmt = Prepare("select size from block_index "
+                       "where segmentid = ? and object = ?");
+        sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
+        obj = ref.get_sequence();
+        sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT);
+        rc = sqlite3_step(stmt);
+        if (rc == SQLITE_ROW) {
+            object_size = sqlite3_column_int64(stmt, 0);
+        } else {
+            fprintf(stderr, "Warning: No block found in block_index for %s\n",
+                    ref.to_string().c_str());
+        }
         sqlite3_finalize(stmt);
-        return;
     }
-    sqlite3_finalize(stmt);
 
-    int64_t new_size = old_size;
+    // Possibly mark additional bytes as being referenced.  The number of bytes
+    // referenced can only be increased (up to the object size).  The bytes
+    // referenced will be set to the object size only if the entire object is
+    // referenced at once: a series of partial ranges that add up to the total
+    // size will have a reference size capped at just less than the full object
+    // size (we can't tell if some bytes were referenced multiple times, and
+    // thus we conservatively assume some bytes might still be unreferenced).
+    int64_t new_refs = old_size;
     if (ref.has_range()) {
-        new_size += ref.get_range_length();
-        new_size = min(new_size, block_size);
+        new_refs = ref.get_range_length();
     } else {
-        new_size = block_size;
+        new_refs = object_size;
     }
+    int64_t new_size = old_size + new_refs;
+    if (old_size < object_size && new_refs < object_size)
+        new_size = min(new_size, object_size - 1);
+    new_size = min(object_size, new_size);
+    new_size = max(new_size, (int64_t)0);
 
     if (new_size != old_size) {
         stmt = Prepare("insert or replace "
@@ -435,44 +495,25 @@ void LocalDb::UseObject(const ObjectReference& ref)
     }
 }
 
-void LocalDb::UseSegment(const std::string &segment, double utilization)
-{
-    int rc;
-    sqlite3_stmt *stmt;
-
-    stmt = Prepare("insert or replace "
-                   "into segments_used(snapshotid, segmentid, utilization) "
-                   "values (?, ?, ?)");
-    sqlite3_bind_int64(stmt, 1, snapshotid);
-    sqlite3_bind_int64(stmt, 2, SegmentToId(segment));
-    sqlite3_bind_double(stmt, 3, utilization);
-
-    rc = sqlite3_step(stmt);
-    if (rc != SQLITE_DONE) {
-        fprintf(stderr, "Could not insert segment use record!\n");
-        ReportError(rc);
-    }
-
-    sqlite3_finalize(stmt);
-}
-
 void LocalDb::SetSegmentChecksum(const std::string &segment,
                                  const std::string &path,
                                  const std::string &checksum,
-                                 int size)
+                                 int data_size, int disk_size)
 {
     int rc;
     sqlite3_stmt *stmt;
 
-    stmt = Prepare("update segments set path = ?, checksum = ?, size = ?, "
+    stmt = Prepare("update segments set path = ?, checksum = ?, "
+                   "data_size = ?, disk_size = ?, "
                    "mtime = coalesce(mtime, julianday('now')) "
                    "where segmentid = ?");
     sqlite3_bind_text(stmt, 1, path.c_str(), path.size(),
                       SQLITE_TRANSIENT);
     sqlite3_bind_text(stmt, 2, checksum.c_str(), checksum.size(),
                       SQLITE_TRANSIENT);
-    sqlite3_bind_int64(stmt, 3, size);
-    sqlite3_bind_int64(stmt, 4, SegmentToId(segment));
+    sqlite3_bind_int64(stmt, 3, data_size);
+    sqlite3_bind_int64(stmt, 4, disk_size);
+    sqlite3_bind_int64(stmt, 5, SegmentToId(segment));
 
     rc = sqlite3_step(stmt);
     if (rc != SQLITE_DONE) {
index 5901836..7764aed 100644 (file)
--- a/localdb.h
+++ b/localdb.h
@@ -31,6 +31,7 @@
 
 #include <sqlite3.h>
 
+#include <set>
 #include <string>
 
 #include "ref.h"
@@ -40,17 +41,17 @@ public:
     void Open(const char *path, const char *snapshot_name,
               const char *snapshot_scheme, double intent);
     void Close();
-    void StoreObject(const ObjectReference& ref,
-                     const std::string &checksum, int64_t size, double age);
+    void StoreObject(const ObjectReference& ref, double age);
     ObjectReference FindObject(const std::string &checksum, int64_t size);
     bool IsOldObject(const std::string &checksum, int64_t size, double *age,
                      int *group);
     bool IsAvailable(const ObjectReference &ref);
     void UseObject(const ObjectReference& ref);
-    void UseSegment(const std::string &segment, double utilization);
 
+    std::set<std::string> GetUsedSegments();
     void SetSegmentChecksum(const std::string &segment, const std::string &path,
-                            const std::string &checksum, int size);
+                            const std::string &checksum,
+                            int data_size, int disk_size);
     bool GetSegmentChecksum(const std::string &segment,
                             std::string *seg_path, std::string *seg_checksum);
 
diff --git a/main.cc b/main.cc
index c3f8c59..e10a04a 100644 (file)
--- a/main.cc
+++ b/main.cc
@@ -82,9 +82,6 @@ static char *block_buf;
  * invocations to help in creating incremental snapshots. */
 LocalDb *db;
 
-/* Keep track of all segments which are needed to reconstruct the snapshot. */
-std::set<string> segment_list;
-
 /* Snapshot intent: 1=daily, 7=weekly, etc.  This is not used directly, but is
  * stored in the local database and can help guide segment cleaning and
  * snapshot expiration policies. */
@@ -98,13 +95,6 @@ bool flag_rebuild_statcache = false;
 /* Whether verbose output is enabled. */
 bool verbose = false;
 
-/* Ensure that the given segment is listed as a dependency of the current
- * snapshot. */
-void add_segment(const string& segment)
-{
-    segment_list.insert(segment);
-}
-
 /* Attempts to open a regular file read-only, but with safety checks for files
  * that might not be fully trusted. */
 int safe_open(const string& path, struct stat *stat_buf)
@@ -232,8 +222,6 @@ int64_t dumpfile(int fd, dictionary &file_info, const string &path,
                  i != old_blocks.end(); ++i) {
                 const ObjectReference &ref = *i;
                 object_list.push_back(ref.to_string());
-                if (ref.is_normal())
-                    add_segment(ref.get_segment());
                 db->UseObject(ref);
             }
             size = stat_buf.st_size;
@@ -274,9 +262,10 @@ int64_t dumpfile(int fd, dictionary &file_info, const string &path,
             double block_age = 0.0;
             ObjectReference ref;
 
-            SHA1Checksum block_hash;
-            block_hash.process(block_buf, bytes);
-            string block_csum = block_hash.checksum_str();
+            Hash *hash = Hash::New();
+            hash->update(block_buf, bytes);
+            string block_csum = hash->digest_str();
+            delete hash;
 
             if (all_zero) {
                 ref = ObjectReference(ObjectReference::REF_ZERO);
@@ -332,8 +321,6 @@ int64_t dumpfile(int fd, dictionary &file_info, const string &path,
             while (!refs.empty()) {
                 ref = refs.front(); refs.pop_front();
                 object_list.push_back(ref.to_string());
-                if (ref.is_normal())
-                    add_segment(ref.get_segment());
                 db->UseObject(ref);
             }
             size += bytes;
@@ -859,7 +846,6 @@ int main(int argc, char *argv[])
     }
 
     ObjectReference root_ref = metawriter->close();
-    add_segment(root_ref.get_segment());
     string backup_root = root_ref.to_string();
 
     delete metawriter;
@@ -880,6 +866,7 @@ int main(int argc, char *argv[])
                                                    "checksums");
     FILE *checksums = fdopen(checksum_file->get_fd(), "w");
 
+    std::set<string> segment_list = db->GetUsedSegments();
     for (std::set<string>::iterator i = segment_list.begin();
          i != segment_list.end(); ++i) {
         string seg_path, seg_csum;
index 2933ce9..15bd7a5 100644 (file)
@@ -49,7 +49,6 @@ bool flag_full_metadata = false;
 
 /* TODO: Move to header file */
 extern LocalDb *db;
-void add_segment(const string& segment);
 
 /* Like strcmp, but sorts in the order that files will be visited in the
  * filesystem.  That is, we break paths apart at slashes, and compare path
@@ -296,8 +295,7 @@ void MetadataWriter::metadata_flush()
         // If indirectly referencing any other metadata logs, be sure those
         // segments are properly referenced.
         if (i->reused) {
-            add_segment(i->ref.get_segment());
-            db->UseSegment(i->ref.get_segment(), 1.0);
+            db->UseObject(i->ref);
         }
 
         // Write out an indirect reference to any previous objects which could
@@ -338,15 +336,13 @@ void MetadataWriter::metadata_flush()
     /* Write current metadata information to a new object. */
     LbsObject *meta = new LbsObject;
     meta->set_group("metadata");
-    meta->set_data(m.data(), m.size());
+    meta->set_data(m.data(), m.size(), NULL);
     meta->write(store);
-    meta->checksum();
 
     /* Write a reference to this block in the root. */
     ObjectReference ref = meta->get_ref();
     metadata_root << "@" << ref.to_string() << "\n";
-    add_segment(ref.get_segment());
-    db->UseSegment(ref.get_segment(), 1.0);
+    db->UseObject(ref);
 
     delete meta;
 
@@ -397,11 +393,9 @@ ObjectReference MetadataWriter::close()
 
     LbsObject *root = new LbsObject;
     root->set_group("metadata");
-    root->set_data(root_data.data(), root_data.size());
+    root->set_data(root_data.data(), root_data.size(), NULL);
     root->write(store);
-    root->checksum();
-    add_segment(root->get_ref().get_segment());
-    db->UseSegment(root->get_ref().get_segment(), 1.0);
+    db->UseObject(root->get_ref());
 
     ObjectReference ref = root->get_ref();
     delete root;
index 51755c7..a40c58d 100644 (file)
@@ -532,7 +532,26 @@ class LocalDatabase:
         schemes.sort()
         return schemes
 
-    def garbage_collect(self, scheme, intent=1.0):
+    def list_snapshots(self, scheme):
+        """Return a list of snapshots for the given scheme."""
+        cur = self.cursor()
+        cur.execute("select name from snapshots")
+        snapshots = [row[0] for row in cur.fetchall()]
+        snapshots.sort()
+        return snapshots
+
+    def delete_snapshot(self, scheme, name):
+        """Remove the specified snapshot from the database.
+
+        Warning: This does not garbage collect all dependent data in the
+        database, so it must be followed by a call to garbage_collect() to make
+        the database consistent.
+        """
+        cur = self.cursor()
+        cur.execute("delete from snapshots where scheme = ? and name = ?",
+                    (scheme, name))
+
+    def prune_old_snapshots(self, scheme, intent=1.0):
         """Delete entries from old snapshots from the database.
 
         Only snapshots with the specified scheme name will be deleted.  If
@@ -579,6 +598,16 @@ class LocalDatabase:
             first = False
             max_intent = max(max_intent, snap_intent)
 
+        self.garbage_collect()
+
+    def garbage_collect(self):
+        """Garbage-collect unreachable segment and object data.
+
+        Remove all segments and checksums which is not reachable from the
+        current set of snapshots stored in the local database.
+        """
+        cur = self.cursor()
+
         # Delete entries in the segments_used table which are for non-existent
         # snapshots.
         cur.execute("""delete from segments_used
@@ -590,16 +619,10 @@ class LocalDatabase:
         cur.execute("""delete from segments where segmentid not in
                            (select segmentid from segments_used)""")
 
-        # Delete unused objects in the block_index table.  By "unused", we mean
-        # any object which was stored in a segment which has been deleted, and
-        # any object in a segment which was marked for cleaning and has had
-        # cleaning performed already (the expired time is less than the current
-        # largest snapshot id).
+        # Delete dangling objects in the block_index table.
         cur.execute("""delete from block_index
-                       where segmentid not in (select segmentid from segments)
-                          or segmentid in (select segmentid from segments
-                                           where expire_time < ?)""",
-                    (last_snapshotid,))
+                       where segmentid not in
+                           (select segmentid from segments)""")
 
         # Remove sub-block signatures for deleted objects.
         cur.execute("""delete from subblock_signatures
index da2fb47..e0b094d 100644 (file)
@@ -59,7 +59,7 @@ def cmd_clean(args, clean_threshold=7.0):
     # Delete old snapshots from the local database.
     intent = float(options.intent)
     for s in db.list_schemes():
-        db.garbage_collect(s, intent)
+        db.prune_old_snapshots(s, intent)
 
     # Expire segments which are poorly-utilized.
     for s in db.get_segment_cleaning_list():
index ea55b1f..cf721bf 100644 (file)
@@ -51,7 +51,6 @@ def _build_retention_engine(spec):
             seconds = int(m.group(1)) * _TIME_UNITS[m.group(2)]
             period = period + datetime.timedelta(seconds=seconds)
             intervalspec = m.group(3)
-        print classname, period
         policy.add_policy(classname, period)
     return policy
 
index 7700148..62f2665 100644 (file)
@@ -22,6 +22,7 @@ This implements maintenance functions and is a wrapper around the C++
 cumulus-backup program.
 """
 
+import datetime
 import re
 import sys
 
@@ -59,11 +60,55 @@ def prune_backups(backup_config, scheme):
     cmd_util.options = options
     cmd_util.cmd_garbage_collect([])
 
+def prune_localdb(backup_config, scheme, next_snapshot=None):
+    """Clean old snapshots out of the local database.
+
+    Clear old snapshots out of the local database, possibly in preparation for
+    running a new backup.  One snapshot of each configured retention period is
+    kept (i.e., one weekly and one daily), and the most recent snapshot is
+    always retained.  If next_snapshot is not None, it should be the timestamp
+    when (approximately) the next snapshot will be taken; if that snapshot
+    would be a daily, weekly, etc. snapshot, then it may result in the previous
+    snapshot of the same duration being evicted from the local database.
+
+    Note that in this sense, "evict" merely refers to tracking the snapshots in
+    the local database; this function does not delete backups from the backup
+    storage.
+    """
+    # Fetch the list of existing snapshots in the local database.  Pruning only
+    # makes sense if there are more than one snapshots present.
+    db = cumulus.LocalDatabase(backup_config.get_global("localdb"))
+    snapshots = sorted(db.list_snapshots(scheme))
+    if len(snapshots) <= 1:
+        return
+
+    # Classify the snapshots (daily, weekly, etc.) and keep the most recent one
+    # of each category.  Also ensure that the most recent snapshot is retained.
+    retention = backup_config.get_retention_for_scheme(scheme)
+    for snapshot in snapshots:
+        retention.consider_snapshot(snapshot)
+    if next_snapshot is not None:
+        retention.consider_snapshot(next_snapshot)
+    retained = set(retention.last_snapshots().values())
+    retained.add(snapshots[-1])
+    print retention.last_snapshots()
+    print retained
+    for s in snapshots:
+        print s, s in retained
+
+    evicted = [s for s in snapshots if s not in retained]
+    for s in evicted:
+        db.delete_snapshot(scheme, s)
+    db.garbage_collect()
+    db.commit()
+
 def main(argv):
     backup_config = config.CumulusConfig(argv[1])
     for scheme in backup_config.backup_schemes():
         print scheme
-        prune_backups(backup_config, scheme)
+        #prune_backups(backup_config, scheme)
+        prune_localdb(backup_config, scheme, datetime.datetime.utcnow())
+        #prune_localdb(backup_config, scheme, datetime.datetime(2013, 1, 1))
 
 if __name__ == "__main__":
     main(sys.argv)
index a7ae983..b0e0078 100644 (file)
@@ -102,10 +102,12 @@ class RetentionEngine(object):
 
     def add_policy(self, backup_class, retention_period):
         self._policies[backup_class] = retention_period
-        self._last_snapshots[backup_class] = (None, None)
+        self._last_snapshots[backup_class] = (None, None, False)
 
     @staticmethod
     def parse_timestamp(s):
+        if isinstance(s, datetime.datetime):
+            return s
         return datetime.datetime.strptime(s, TIMESTAMP_FORMAT)
 
     def consider_snapshot(self, snapshot):
@@ -134,9 +136,11 @@ class RetentionEngine(object):
             partition = _backup_classes[backup_class](timestamp_policy)
             last_snapshot = self._last_snapshots[backup_class]
             if self._last_snapshots[backup_class][0] != partition:
-                self._last_snapshots[backup_class] = (partition, snapshot)
                 self._labels.add(backup_class)
-                if snapshot_age < retention_period: retain = True
+                retain_label = snapshot_age < retention_period
+                self._last_snapshots[backup_class] = (partition, snapshot,
+                                                      retain_label)
+                if retain_label: retain = True
         return retain
 
     def last_labels(self):
@@ -148,4 +152,5 @@ class RetentionEngine(object):
 
     def last_snapshots(self):
         """Returns the most recent snapshot in each backup class."""
-        return dict((k, v[1]) for (k, v) in self._last_snapshots.iteritems())
+        return dict((k, v[1]) for (k, v)
+                    in self._last_snapshots.iteritems() if v[2])
diff --git a/ref.h b/ref.h
index d997133..bbbf5b8 100644 (file)
--- a/ref.h
+++ b/ref.h
@@ -110,7 +110,7 @@ public:
     bool has_range() const { return range_valid; }
     size_t get_range_start() const { return range_start; }
     size_t get_range_length() const { return range_length; }
-    size_t get_range_exact() const { return range_exact; }
+    bool range_is_exact() const { return range_exact; }
     void clear_range()
         { range_start = range_length = 0;
           range_valid = false; range_exact = false; }
index 35b2c9d..d898272 100644 (file)
@@ -3,13 +3,25 @@
 --
 -- The index is stored in an SQLite3 database.  This is its schema.
 
--- List of snapshots which have been created.
+-- Versioning information, describing the revision for which the table schema
+-- was set up.
+create table schema_version(
+    version text,               -- Program version, dotted decimal string
+    major integer,              -- Major version number
+    minor integer               -- Minor version number
+);
+insert into schema_version values ('0.11', 0, 11);
+
+-- List of snapshots which have been created and which we are still tracking.
+-- There may be more snapshots than this actually stored at the remote server,
+-- but the reverse should not ever be true: Cumulus may depend on data stored
+-- in these snapshots when writing a new snapshot.
 create table snapshots (
     snapshotid integer primary key,
     name text not null,
     scheme text not null,
     timestamp real,
-    intent real
+    intent real                 -- TODO: deprecated, should be removed
 );
 
 -- List of segments which have been created.
@@ -19,11 +31,13 @@ create table segments (
     path text,
     checksum text,
     mtime real,
-    size integer,
-    expire_time integer         -- snapshotid of latest snapshot when expired
+    data_size integer,          -- sum of bytes in all objects in the segment
+    disk_size integer           -- size of segment on disk, after compression
+    -- TODO: group? metadata vs. non-metadata?
 );
 
--- Index of all blocks which have been stored, by checksum.
+-- Index of all data blocks in stored segments.  This is indexed by content
+-- hash to allow for coarse block-level data deduplication.
 create table block_index (
     blockid integer primary key,
     segmentid integer not null,
@@ -57,20 +71,14 @@ create table subblock_signatures (
     signatures blob not null
 );
 
--- Summary of segment utilization for each snapshots.
-create table segments_used (
+-- Summary of segment utilization for each snapshot.
+create table segment_utilization (
     snapshotid integer not null,
     segmentid integer not null,
-    utilization real
-);
-create unique index segments_used_index
-    on segments_used(snapshotid, segmentid);
 
--- Overall estimate of segment utilization, for all snapshots combined.
-create view segment_info as
-select segmentid, mtime, size, expire_time,
-       cast(size * utilization as integer) as used, utilization
-from segments join
-     (select segmentid, max(utilization) as utilization
-      from segments_used group by segmentid)
-using (segmentid);
+    -- Estimate for the number of live bytes in data objects: this is capped at
+    -- segments.data_size if all data in the segment is referenced.
+    bytes_referenced integer
+);
+create unique index segment_utilization_index
+    on segment_utilization(snapshotid, segmentid);
index 9634e7b..772f9c5 100644 (file)
--- a/store.cc
+++ b/store.cc
@@ -41,6 +41,7 @@
 #include <iostream>
 
 #include "hash.h"
+#include "localdb.h"
 #include "store.h"
 #include "ref.h"
 #include "util.h"
@@ -226,7 +227,9 @@ static const size_t SEGMENT_SIZE = 4 * 1024 * 1024;
 static map<string, pair<int64_t, int64_t> > group_sizes;
 
 ObjectReference TarSegmentStore::write_object(const char *data, size_t len,
-                                              const std::string &group)
+                                              const std::string &group,
+                                              const std::string &checksum,
+                                              double age)
 {
     struct segment_info *segment;
 
@@ -240,7 +243,7 @@ ObjectReference TarSegmentStore::write_object(const char *data, size_t len,
         segment->basename = segment->name + ".tar";
         segment->basename += filter_extension;
         segment->count = 0;
-        segment->size = 0;
+        segment->data_size = 0;
         segment->rf = remote->alloc_file(segment->basename, "segments");
         segment->file = new Tarfile(segment->rf, segment->name);
 
@@ -255,11 +258,16 @@ ObjectReference TarSegmentStore::write_object(const char *data, size_t len,
 
     segment->file->write_object(id, data, len);
     segment->count++;
-    segment->size += len;
+    segment->data_size += len;
 
     group_sizes[group].first += len;
 
     ObjectReference ref(segment->name, id_buf);
+    ref.set_range(0, len, true);
+    if (checksum.size() > 0)
+        ref.set_checksum(checksum);
+    if (db != NULL)
+        db->StoreObject(ref, age);
 
     // If this segment meets or exceeds the size target, close it so that
     // future objects will go into a new segment.
@@ -292,17 +300,21 @@ void TarSegmentStore::close_segment(const string &group)
     delete segment->file;
 
     if (db != NULL) {
+        struct stat stat_buf;
+        int disk_size = 0;
+        if (stat(segment->rf->get_local_path().c_str(), &stat_buf) == 0) {
+            disk_size = stat_buf.st_size;
+            group_sizes[segment->group].second += disk_size;
+        }
+
         SHA1Checksum segment_checksum;
+        string checksum;
         if (segment_checksum.process_file(segment->rf->get_local_path().c_str())) {
-            string checksum = segment_checksum.checksum_str();
-            db->SetSegmentChecksum(segment->name, segment->basename, checksum,
-                                   segment->size);
+            checksum = segment_checksum.checksum_str();
         }
 
-        struct stat stat_buf;
-        if (stat(segment->rf->get_local_path().c_str(), &stat_buf) == 0) {
-            group_sizes[segment->group].second += stat_buf.st_size;
-        }
+        db->SetSegmentChecksum(segment->name, segment->basename, checksum,
+                               segment->data_size, disk_size);
     }
 
     segment->rf->send();
@@ -317,7 +329,7 @@ string TarSegmentStore::object_reference_to_segment(const string &object)
 }
 
 LbsObject::LbsObject()
-    : group(""), data(NULL), data_len(0), written(false)
+    : group(""), age(0.0), data(NULL), data_len(0), written(false)
 {
 }
 
@@ -325,21 +337,26 @@ LbsObject::~LbsObject()
 {
 }
 
-void LbsObject::write(TarSegmentStore *store)
+void LbsObject::set_data(const char *d, size_t len, const char *checksum)
 {
-    assert(data != NULL);
-    assert(!written);
+    data = d;
+    data_len = len;
 
-    ref = store->write_object(data, data_len, group);
-    written = true;
+    if (checksum != NULL) {
+        this->checksum = checksum;
+    } else {
+        Hash *hash = Hash::New();
+        hash->update(data, data_len);
+        this->checksum = hash->digest_str();
+        delete hash;
+    }
 }
 
-void LbsObject::checksum()
+void LbsObject::write(TarSegmentStore *store)
 {
-    assert(written);
+    assert(data != NULL);
+    assert(!written);
 
-    Hash *hash = Hash::New();
-    hash->update(data, data_len);
-    ref.set_checksum(hash->digest_str());
-    delete hash;
+    ref = store->write_object(data, data_len, group, checksum, age);
+    written = true;
 }
diff --git a/store.h b/store.h
index 428c418..016e275 100644 (file)
--- a/store.h
+++ b/store.h
@@ -108,7 +108,9 @@ public:
     // used to control object placement; objects with different group
     // parameters are kept in separate segments.
     ObjectReference write_object(const char *data, size_t len,
-                                 const std::string &group = "");
+                                 const std::string &group = "",
+                                 const std::string &checksum = "",
+                                 double age = 0.0);
 
     // Ensure all segments have been fully written.
     void sync();
@@ -122,7 +124,7 @@ private:
         std::string group;
         std::string name;           // UUID
         int count;                  // Objects written to this segment
-        int size;                   // Combined size of objects written
+        int data_size;              // Combined size of objects written
         std::string basename;       // Name of segment without directory
         RemoteFile *rf;
     };
@@ -155,27 +157,32 @@ public:
     // Data in an object must be written all at once, and cannot be generated
     // incrementally.  Data can be an arbitrary block of binary data of any
     // size.  The pointer to the data need only remain valid until write() is
-    // called.
-    void set_data(const char *d, size_t len) { data = d; data_len = len; }
+    // called.  If checksum is non-NULL then it is assumed to contain a hash
+    // value for the data; this provides an optimization in case the caller has
+    // already checksummed the data.  Otherwise the set_data will compute a
+    // hash of the data itself.
+    void set_data(const char *d, size_t len, const char *checksum);
+
+    // Explicitly sets the age of the data, for later garbage-collection or
+    // repacking purposes.  If not set, the age defaults to the current time.
+    // The age is stored in the database as a floating point value, expressing
+    // the time in Julian days.
+    void set_age(double age) { this->age = age; }
 
     // Write an object to a segment, thus making it permanent.  This function
     // can be called at most once.
     void write(TarSegmentStore *store);
 
-    // Compute the checksum of an object, and include it in the object
-    // reference.  This should be called after write(), and the data specified
-    // by set_data() must remain valid through the call to checksum().
-    void checksum();
-
     // An object is assigned a permanent name once it has been written to a
     // segment.  Until that time, its name cannot be determined.
-    std::string get_name() const { return ref.to_string(); }
     ObjectReference get_ref() { return ref; }
 
 private:
     std::string group;
+    double age;
     const char *data;
     size_t data_len;
+    std::string checksum;
 
     bool written;
     ObjectReference ref;
index acde97c..70e1727 100644 (file)
@@ -262,16 +262,11 @@ list<ObjectReference> Subfile::create_incremental(TarSegmentStore *tss,
     // No data was matched.  The entire block can be written out as is into a
     // new object, and the new_block_summary used to save chunk signatures.
     if (!matched_old) {
-        SHA1Checksum block_hash;
-        block_hash.process(analyzed_buf, analyzed_len);
-        string block_csum = block_hash.checksum_str();
-
-        o->set_data(analyzed_buf, analyzed_len);
+        o->set_age(block_age);
+        o->set_data(analyzed_buf, analyzed_len, NULL);
         o->write(tss);
         ObjectReference ref = o->get_ref();
-        db->StoreObject(ref, block_csum, analyzed_len, block_age);
         store_analyzed_signatures(ref);
-        ref.set_range(0, analyzed_len, true);
         refs.push_back(ref);
         delete o;
         return refs;
@@ -295,7 +290,7 @@ list<ObjectReference> Subfile::create_incremental(TarSegmentStore *tss,
         string block_csum = block_hash.checksum_str();
 
         o->set_group("data");
-        o->set_data(literal_buf, new_data);
+        o->set_data(literal_buf, new_data, NULL);
         o->write(tss);
         ObjectReference ref = o->get_ref();
         for (i = items.begin(); i != items.end(); ++i) {
@@ -305,7 +300,7 @@ list<ObjectReference> Subfile::create_incremental(TarSegmentStore *tss,
             }
         }
 
-        db->StoreObject(ref, block_csum, new_data, 0.0);
+        //db->StoreObject(ref, 0.0);
 
         block_summary summary;
         summary.ref = ref;