Improve tracking of segments and segment utilization.
[cumulus.git] / localdb.cc
index 3fcbafe..ca83559 100644 (file)
 #include "store.h"
 #include "util.h"
 
+using std::max;
 using std::min;
+using std::set;
 using std::string;
 
+static const int SCHEMA_MAJOR = 0;
+static const int SCHEMA_MINOR = 11;
+
 /* Helper function to prepare a statement for execution in the current
  * database. */
 sqlite3_stmt *LocalDb::Prepare(const char *sql)
@@ -85,14 +90,33 @@ void LocalDb::Open(const char *path, const char *snapshot_name,
 
     sqlite3_extended_result_codes(db, 1);
 
+    /* Check that the local database format is the correct version; if not,
+     * report an error. */
+    sqlite3_stmt *stmt = Prepare("select major, minor from schema_version");
+
+    rc = sqlite3_step(stmt);
+    if (rc != SQLITE_ROW) {
+        fatal("Unable to read local database version from database");
+    } else if (rc == SQLITE_ROW) {
+        int major = sqlite3_column_int(stmt, 0);
+        int minor = sqlite3_column_int(stmt, 1);
+        if (major != SCHEMA_MAJOR || minor != SCHEMA_MINOR) {
+            fprintf(stderr,
+                    "Local database does not have required schema version!\n"
+                    "  expected: %d.%d, found: %d.%d\n",
+                    SCHEMA_MAJOR, SCHEMA_MINOR, major, minor);
+            fatal("Unable to continue.");
+        }
+    }
+    sqlite3_finalize(stmt);
+
     if (snapshot_scheme == NULL)
         snapshot_scheme = "";
 
     /* Insert this snapshot into the database, and determine the integer key
      * which will be used to identify it. */
-    sqlite3_stmt *stmt = Prepare("insert into "
-                                 "snapshots(name, scheme, timestamp, intent) "
-                                 "values (?, ?, julianday('now'), ?)");
+    stmt = Prepare("insert into snapshots(name, scheme, timestamp, intent) "
+                   "values (?, ?, julianday('now'), ?)");
     sqlite3_bind_text(stmt, 1, snapshot_name, strlen(snapshot_name),
                       SQLITE_TRANSIENT);
     sqlite3_bind_text(stmt, 2, snapshot_scheme, strlen(snapshot_scheme),
@@ -143,22 +167,13 @@ void LocalDb::Close()
 {
     int rc;
 
-    /* Summarize the snapshot_refs table into segments_used. */
+    /* Summarize the snapshot_refs table into segment_utilization. */
     sqlite3_stmt *stmt = Prepare(
-        "insert or replace into segments_used "
-        "select ? as snapshotid, segmentid, max(utilization) from ("
-        "    select segmentid, cast(used as real) / size as utilization "
-        "    from "
-        "    (select segmentid, sum(size) as used from snapshot_refs "
-        "       group by segmentid) "
-        "    join segments using (segmentid) "
-        "  union "
-        "    select segmentid, utilization from segments_used "
-        "    where snapshotid = ? "
-        ") group by segmentid"
+        "insert or replace into segment_utilization "
+        "select ? as snapshotid, segmentid, sum(size) "
+        "from snapshot_refs group by segmentid"
     );
     sqlite3_bind_int64(stmt, 1, snapshotid);
-    sqlite3_bind_int64(stmt, 2, snapshotid);
     rc = sqlite3_step(stmt);
     if (rc != SQLITE_OK && rc != SQLITE_DONE) {
         ReportError(rc);
@@ -232,13 +247,16 @@ string LocalDb::IdToSegment(int64_t segmentid)
     return result;
 }
 
-void LocalDb::StoreObject(const ObjectReference& ref,
-                          const string &checksum, int64_t size,
-                          double age)
+void LocalDb::StoreObject(const ObjectReference& ref, double age)
 {
     int rc;
     sqlite3_stmt *stmt;
 
+    assert(ref.has_checksum());
+    string checksum = ref.get_checksum();
+    assert(ref.range_is_exact());
+    int64_t size = ref.get_range_length();
+
     if (age == 0.0) {
         stmt = Prepare("insert into block_index("
                        "segmentid, object, checksum, size, timestamp) "
@@ -370,6 +388,33 @@ bool LocalDb::IsAvailable(const ObjectReference &ref)
     return found;
 }
 
+set<string> LocalDb::GetUsedSegments()
+{
+    int rc;
+    sqlite3_stmt *stmt;
+    set<string> result;
+
+    stmt = Prepare("select segment from segments "
+                   "where segmentid in (select segmentid from snapshot_refs)");
+
+    while (true) {
+        rc = sqlite3_step(stmt);
+        if (rc == SQLITE_ROW) {
+            const char *segment
+                = reinterpret_cast<const char *>(sqlite3_column_text(stmt, 0));
+            result.insert(segment);
+        } else if (rc == SQLITE_DONE) {
+            break;
+        } else {
+            ReportError(rc);
+        }
+    }
+
+    sqlite3_finalize(stmt);
+
+    return result;
+}
+
 void LocalDb::UseObject(const ObjectReference& ref)
 {
     int rc;
@@ -390,31 +435,46 @@ void LocalDb::UseObject(const ObjectReference& ref)
     }
     sqlite3_finalize(stmt);
 
-    int64_t block_size = 0;
-    stmt = Prepare("select size from block_index "
-                   "where segmentid = ? and object = ?");
-    sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
-    obj = ref.get_sequence();
-    sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT);
-    rc = sqlite3_step(stmt);
-    if (rc == SQLITE_ROW) {
-        block_size = sqlite3_column_int64(stmt, 0);
+    // Attempt to determine the underlying size of the object.  This may
+    // require a database lookup if the length is not encoded into the object
+    // reference already.
+    int64_t object_size = 0;
+    if (ref.range_is_exact()) {
+        object_size = ref.get_range_length();
     } else {
-        string refstr = ref.to_string();
-        fprintf(stderr, "No block found in block_index for %s\n",
-                refstr.c_str());
+        stmt = Prepare("select size from block_index "
+                       "where segmentid = ? and object = ?");
+        sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
+        obj = ref.get_sequence();
+        sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT);
+        rc = sqlite3_step(stmt);
+        if (rc == SQLITE_ROW) {
+            object_size = sqlite3_column_int64(stmt, 0);
+        } else {
+            fprintf(stderr, "Warning: No block found in block_index for %s\n",
+                    ref.to_string().c_str());
+        }
         sqlite3_finalize(stmt);
-        return;
     }
-    sqlite3_finalize(stmt);
 
-    int64_t new_size = old_size;
+    // Possibly mark additional bytes as being referenced.  The number of bytes
+    // referenced can only be increased (up to the object size).  The bytes
+    // referenced will be set to the object size only if the entire object is
+    // referenced at once: a series of partial ranges that add up to the total
+    // size will have a reference size capped at just less than the full object
+    // size (we can't tell if some bytes were referenced multiple times, and
+    // thus we conservatively assume some bytes might still be unreferenced).
+    int64_t new_refs = old_size;
     if (ref.has_range()) {
-        new_size += ref.get_range_length();
-        new_size = min(new_size, block_size);
+        new_refs = ref.get_range_length();
     } else {
-        new_size = block_size;
+        new_refs = object_size;
     }
+    int64_t new_size = old_size + new_refs;
+    if (old_size < object_size && new_refs < object_size)
+        new_size = min(new_size, object_size - 1);
+    new_size = min(object_size, new_size);
+    new_size = max(new_size, (int64_t)0);
 
     if (new_size != old_size) {
         stmt = Prepare("insert or replace "
@@ -435,44 +495,25 @@ void LocalDb::UseObject(const ObjectReference& ref)
     }
 }
 
-void LocalDb::UseSegment(const std::string &segment, double utilization)
-{
-    int rc;
-    sqlite3_stmt *stmt;
-
-    stmt = Prepare("insert or replace "
-                   "into segments_used(snapshotid, segmentid, utilization) "
-                   "values (?, ?, ?)");
-    sqlite3_bind_int64(stmt, 1, snapshotid);
-    sqlite3_bind_int64(stmt, 2, SegmentToId(segment));
-    sqlite3_bind_double(stmt, 3, utilization);
-
-    rc = sqlite3_step(stmt);
-    if (rc != SQLITE_DONE) {
-        fprintf(stderr, "Could not insert segment use record!\n");
-        ReportError(rc);
-    }
-
-    sqlite3_finalize(stmt);
-}
-
 void LocalDb::SetSegmentChecksum(const std::string &segment,
                                  const std::string &path,
                                  const std::string &checksum,
-                                 int size)
+                                 int data_size, int disk_size)
 {
     int rc;
     sqlite3_stmt *stmt;
 
-    stmt = Prepare("update segments set path = ?, checksum = ?, size = ?, "
+    stmt = Prepare("update segments set path = ?, checksum = ?, "
+                   "data_size = ?, disk_size = ?, "
                    "mtime = coalesce(mtime, julianday('now')) "
                    "where segmentid = ?");
     sqlite3_bind_text(stmt, 1, path.c_str(), path.size(),
                       SQLITE_TRANSIENT);
     sqlite3_bind_text(stmt, 2, checksum.c_str(), checksum.size(),
                       SQLITE_TRANSIENT);
-    sqlite3_bind_int64(stmt, 3, size);
-    sqlite3_bind_int64(stmt, 4, SegmentToId(segment));
+    sqlite3_bind_int64(stmt, 3, data_size);
+    sqlite3_bind_int64(stmt, 4, disk_size);
+    sqlite3_bind_int64(stmt, 5, SegmentToId(segment));
 
     rc = sqlite3_step(stmt);
     if (rc != SQLITE_DONE) {