From 3d780590edec4583eb3ef0ca16120afd0f7451f9 Mon Sep 17 00:00:00 2001 From: Michael Vrable Date: Fri, 21 Sep 2012 15:50:14 -0700 Subject: [PATCH] Improve tracking of segments and segment utilization. Update the local database and cumulus binary to keep better track of segment utilization data, with an eventual goal of improving cleaning algorithms. --- localdb.cc | 165 ++++++++++++++++++++++-------------- localdb.h | 9 +- main.cc | 23 ++--- metadata.cc | 16 ++-- python/cumulus/__init__.py | 43 +++++++--- python/cumulus/cmd_util.py | 2 +- python/cumulus/config.py | 1 - python/cumulus/main.py | 47 +++++++++- python/cumulus/retention.py | 13 ++- ref.h | 2 +- schema.sql | 46 +++++----- store.cc | 61 ++++++++----- store.h | 27 +++--- subfile.cc | 13 +-- 14 files changed, 295 insertions(+), 173 deletions(-) diff --git a/localdb.cc b/localdb.cc index 3fcbafe..ca83559 100644 --- a/localdb.cc +++ b/localdb.cc @@ -38,9 +38,14 @@ #include "store.h" #include "util.h" +using std::max; using std::min; +using std::set; using std::string; +static const int SCHEMA_MAJOR = 0; +static const int SCHEMA_MINOR = 11; + /* Helper function to prepare a statement for execution in the current * database. */ sqlite3_stmt *LocalDb::Prepare(const char *sql) @@ -85,14 +90,33 @@ void LocalDb::Open(const char *path, const char *snapshot_name, sqlite3_extended_result_codes(db, 1); + /* Check that the local database format is the correct version; if not, + * report an error. */ + sqlite3_stmt *stmt = Prepare("select major, minor from schema_version"); + + rc = sqlite3_step(stmt); + if (rc != SQLITE_ROW) { + fatal("Unable to read local database version from database"); + } else if (rc == SQLITE_ROW) { + int major = sqlite3_column_int(stmt, 0); + int minor = sqlite3_column_int(stmt, 1); + if (major != SCHEMA_MAJOR || minor != SCHEMA_MINOR) { + fprintf(stderr, + "Local database does not have required schema version!\n" + " expected: %d.%d, found: %d.%d\n", + SCHEMA_MAJOR, SCHEMA_MINOR, major, minor); + fatal("Unable to continue."); + } + } + sqlite3_finalize(stmt); + if (snapshot_scheme == NULL) snapshot_scheme = ""; /* Insert this snapshot into the database, and determine the integer key * which will be used to identify it. */ - sqlite3_stmt *stmt = Prepare("insert into " - "snapshots(name, scheme, timestamp, intent) " - "values (?, ?, julianday('now'), ?)"); + stmt = Prepare("insert into snapshots(name, scheme, timestamp, intent) " + "values (?, ?, julianday('now'), ?)"); sqlite3_bind_text(stmt, 1, snapshot_name, strlen(snapshot_name), SQLITE_TRANSIENT); sqlite3_bind_text(stmt, 2, snapshot_scheme, strlen(snapshot_scheme), @@ -143,22 +167,13 @@ void LocalDb::Close() { int rc; - /* Summarize the snapshot_refs table into segments_used. */ + /* Summarize the snapshot_refs table into segment_utilization. */ sqlite3_stmt *stmt = Prepare( - "insert or replace into segments_used " - "select ? as snapshotid, segmentid, max(utilization) from (" - " select segmentid, cast(used as real) / size as utilization " - " from " - " (select segmentid, sum(size) as used from snapshot_refs " - " group by segmentid) " - " join segments using (segmentid) " - " union " - " select segmentid, utilization from segments_used " - " where snapshotid = ? " - ") group by segmentid" + "insert or replace into segment_utilization " + "select ? as snapshotid, segmentid, sum(size) " + "from snapshot_refs group by segmentid" ); sqlite3_bind_int64(stmt, 1, snapshotid); - sqlite3_bind_int64(stmt, 2, snapshotid); rc = sqlite3_step(stmt); if (rc != SQLITE_OK && rc != SQLITE_DONE) { ReportError(rc); @@ -232,13 +247,16 @@ string LocalDb::IdToSegment(int64_t segmentid) return result; } -void LocalDb::StoreObject(const ObjectReference& ref, - const string &checksum, int64_t size, - double age) +void LocalDb::StoreObject(const ObjectReference& ref, double age) { int rc; sqlite3_stmt *stmt; + assert(ref.has_checksum()); + string checksum = ref.get_checksum(); + assert(ref.range_is_exact()); + int64_t size = ref.get_range_length(); + if (age == 0.0) { stmt = Prepare("insert into block_index(" "segmentid, object, checksum, size, timestamp) " @@ -370,6 +388,33 @@ bool LocalDb::IsAvailable(const ObjectReference &ref) return found; } +set LocalDb::GetUsedSegments() +{ + int rc; + sqlite3_stmt *stmt; + set result; + + stmt = Prepare("select segment from segments " + "where segmentid in (select segmentid from snapshot_refs)"); + + while (true) { + rc = sqlite3_step(stmt); + if (rc == SQLITE_ROW) { + const char *segment + = reinterpret_cast(sqlite3_column_text(stmt, 0)); + result.insert(segment); + } else if (rc == SQLITE_DONE) { + break; + } else { + ReportError(rc); + } + } + + sqlite3_finalize(stmt); + + return result; +} + void LocalDb::UseObject(const ObjectReference& ref) { int rc; @@ -390,31 +435,46 @@ void LocalDb::UseObject(const ObjectReference& ref) } sqlite3_finalize(stmt); - int64_t block_size = 0; - stmt = Prepare("select size from block_index " - "where segmentid = ? and object = ?"); - sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment())); - obj = ref.get_sequence(); - sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT); - rc = sqlite3_step(stmt); - if (rc == SQLITE_ROW) { - block_size = sqlite3_column_int64(stmt, 0); + // Attempt to determine the underlying size of the object. This may + // require a database lookup if the length is not encoded into the object + // reference already. + int64_t object_size = 0; + if (ref.range_is_exact()) { + object_size = ref.get_range_length(); } else { - string refstr = ref.to_string(); - fprintf(stderr, "No block found in block_index for %s\n", - refstr.c_str()); + stmt = Prepare("select size from block_index " + "where segmentid = ? and object = ?"); + sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment())); + obj = ref.get_sequence(); + sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT); + rc = sqlite3_step(stmt); + if (rc == SQLITE_ROW) { + object_size = sqlite3_column_int64(stmt, 0); + } else { + fprintf(stderr, "Warning: No block found in block_index for %s\n", + ref.to_string().c_str()); + } sqlite3_finalize(stmt); - return; } - sqlite3_finalize(stmt); - int64_t new_size = old_size; + // Possibly mark additional bytes as being referenced. The number of bytes + // referenced can only be increased (up to the object size). The bytes + // referenced will be set to the object size only if the entire object is + // referenced at once: a series of partial ranges that add up to the total + // size will have a reference size capped at just less than the full object + // size (we can't tell if some bytes were referenced multiple times, and + // thus we conservatively assume some bytes might still be unreferenced). + int64_t new_refs = old_size; if (ref.has_range()) { - new_size += ref.get_range_length(); - new_size = min(new_size, block_size); + new_refs = ref.get_range_length(); } else { - new_size = block_size; + new_refs = object_size; } + int64_t new_size = old_size + new_refs; + if (old_size < object_size && new_refs < object_size) + new_size = min(new_size, object_size - 1); + new_size = min(object_size, new_size); + new_size = max(new_size, (int64_t)0); if (new_size != old_size) { stmt = Prepare("insert or replace " @@ -435,44 +495,25 @@ void LocalDb::UseObject(const ObjectReference& ref) } } -void LocalDb::UseSegment(const std::string &segment, double utilization) -{ - int rc; - sqlite3_stmt *stmt; - - stmt = Prepare("insert or replace " - "into segments_used(snapshotid, segmentid, utilization) " - "values (?, ?, ?)"); - sqlite3_bind_int64(stmt, 1, snapshotid); - sqlite3_bind_int64(stmt, 2, SegmentToId(segment)); - sqlite3_bind_double(stmt, 3, utilization); - - rc = sqlite3_step(stmt); - if (rc != SQLITE_DONE) { - fprintf(stderr, "Could not insert segment use record!\n"); - ReportError(rc); - } - - sqlite3_finalize(stmt); -} - void LocalDb::SetSegmentChecksum(const std::string &segment, const std::string &path, const std::string &checksum, - int size) + int data_size, int disk_size) { int rc; sqlite3_stmt *stmt; - stmt = Prepare("update segments set path = ?, checksum = ?, size = ?, " + stmt = Prepare("update segments set path = ?, checksum = ?, " + "data_size = ?, disk_size = ?, " "mtime = coalesce(mtime, julianday('now')) " "where segmentid = ?"); sqlite3_bind_text(stmt, 1, path.c_str(), path.size(), SQLITE_TRANSIENT); sqlite3_bind_text(stmt, 2, checksum.c_str(), checksum.size(), SQLITE_TRANSIENT); - sqlite3_bind_int64(stmt, 3, size); - sqlite3_bind_int64(stmt, 4, SegmentToId(segment)); + sqlite3_bind_int64(stmt, 3, data_size); + sqlite3_bind_int64(stmt, 4, disk_size); + sqlite3_bind_int64(stmt, 5, SegmentToId(segment)); rc = sqlite3_step(stmt); if (rc != SQLITE_DONE) { diff --git a/localdb.h b/localdb.h index 5901836..7764aed 100644 --- a/localdb.h +++ b/localdb.h @@ -31,6 +31,7 @@ #include +#include #include #include "ref.h" @@ -40,17 +41,17 @@ public: void Open(const char *path, const char *snapshot_name, const char *snapshot_scheme, double intent); void Close(); - void StoreObject(const ObjectReference& ref, - const std::string &checksum, int64_t size, double age); + void StoreObject(const ObjectReference& ref, double age); ObjectReference FindObject(const std::string &checksum, int64_t size); bool IsOldObject(const std::string &checksum, int64_t size, double *age, int *group); bool IsAvailable(const ObjectReference &ref); void UseObject(const ObjectReference& ref); - void UseSegment(const std::string &segment, double utilization); + std::set GetUsedSegments(); void SetSegmentChecksum(const std::string &segment, const std::string &path, - const std::string &checksum, int size); + const std::string &checksum, + int data_size, int disk_size); bool GetSegmentChecksum(const std::string &segment, std::string *seg_path, std::string *seg_checksum); diff --git a/main.cc b/main.cc index c3f8c59..e10a04a 100644 --- a/main.cc +++ b/main.cc @@ -82,9 +82,6 @@ static char *block_buf; * invocations to help in creating incremental snapshots. */ LocalDb *db; -/* Keep track of all segments which are needed to reconstruct the snapshot. */ -std::set segment_list; - /* Snapshot intent: 1=daily, 7=weekly, etc. This is not used directly, but is * stored in the local database and can help guide segment cleaning and * snapshot expiration policies. */ @@ -98,13 +95,6 @@ bool flag_rebuild_statcache = false; /* Whether verbose output is enabled. */ bool verbose = false; -/* Ensure that the given segment is listed as a dependency of the current - * snapshot. */ -void add_segment(const string& segment) -{ - segment_list.insert(segment); -} - /* Attempts to open a regular file read-only, but with safety checks for files * that might not be fully trusted. */ int safe_open(const string& path, struct stat *stat_buf) @@ -232,8 +222,6 @@ int64_t dumpfile(int fd, dictionary &file_info, const string &path, i != old_blocks.end(); ++i) { const ObjectReference &ref = *i; object_list.push_back(ref.to_string()); - if (ref.is_normal()) - add_segment(ref.get_segment()); db->UseObject(ref); } size = stat_buf.st_size; @@ -274,9 +262,10 @@ int64_t dumpfile(int fd, dictionary &file_info, const string &path, double block_age = 0.0; ObjectReference ref; - SHA1Checksum block_hash; - block_hash.process(block_buf, bytes); - string block_csum = block_hash.checksum_str(); + Hash *hash = Hash::New(); + hash->update(block_buf, bytes); + string block_csum = hash->digest_str(); + delete hash; if (all_zero) { ref = ObjectReference(ObjectReference::REF_ZERO); @@ -332,8 +321,6 @@ int64_t dumpfile(int fd, dictionary &file_info, const string &path, while (!refs.empty()) { ref = refs.front(); refs.pop_front(); object_list.push_back(ref.to_string()); - if (ref.is_normal()) - add_segment(ref.get_segment()); db->UseObject(ref); } size += bytes; @@ -859,7 +846,6 @@ int main(int argc, char *argv[]) } ObjectReference root_ref = metawriter->close(); - add_segment(root_ref.get_segment()); string backup_root = root_ref.to_string(); delete metawriter; @@ -880,6 +866,7 @@ int main(int argc, char *argv[]) "checksums"); FILE *checksums = fdopen(checksum_file->get_fd(), "w"); + std::set segment_list = db->GetUsedSegments(); for (std::set::iterator i = segment_list.begin(); i != segment_list.end(); ++i) { string seg_path, seg_csum; diff --git a/metadata.cc b/metadata.cc index 2933ce9..15bd7a5 100644 --- a/metadata.cc +++ b/metadata.cc @@ -49,7 +49,6 @@ bool flag_full_metadata = false; /* TODO: Move to header file */ extern LocalDb *db; -void add_segment(const string& segment); /* Like strcmp, but sorts in the order that files will be visited in the * filesystem. That is, we break paths apart at slashes, and compare path @@ -296,8 +295,7 @@ void MetadataWriter::metadata_flush() // If indirectly referencing any other metadata logs, be sure those // segments are properly referenced. if (i->reused) { - add_segment(i->ref.get_segment()); - db->UseSegment(i->ref.get_segment(), 1.0); + db->UseObject(i->ref); } // Write out an indirect reference to any previous objects which could @@ -338,15 +336,13 @@ void MetadataWriter::metadata_flush() /* Write current metadata information to a new object. */ LbsObject *meta = new LbsObject; meta->set_group("metadata"); - meta->set_data(m.data(), m.size()); + meta->set_data(m.data(), m.size(), NULL); meta->write(store); - meta->checksum(); /* Write a reference to this block in the root. */ ObjectReference ref = meta->get_ref(); metadata_root << "@" << ref.to_string() << "\n"; - add_segment(ref.get_segment()); - db->UseSegment(ref.get_segment(), 1.0); + db->UseObject(ref); delete meta; @@ -397,11 +393,9 @@ ObjectReference MetadataWriter::close() LbsObject *root = new LbsObject; root->set_group("metadata"); - root->set_data(root_data.data(), root_data.size()); + root->set_data(root_data.data(), root_data.size(), NULL); root->write(store); - root->checksum(); - add_segment(root->get_ref().get_segment()); - db->UseSegment(root->get_ref().get_segment(), 1.0); + db->UseObject(root->get_ref()); ObjectReference ref = root->get_ref(); delete root; diff --git a/python/cumulus/__init__.py b/python/cumulus/__init__.py index 51755c7..a40c58d 100644 --- a/python/cumulus/__init__.py +++ b/python/cumulus/__init__.py @@ -532,7 +532,26 @@ class LocalDatabase: schemes.sort() return schemes - def garbage_collect(self, scheme, intent=1.0): + def list_snapshots(self, scheme): + """Return a list of snapshots for the given scheme.""" + cur = self.cursor() + cur.execute("select name from snapshots") + snapshots = [row[0] for row in cur.fetchall()] + snapshots.sort() + return snapshots + + def delete_snapshot(self, scheme, name): + """Remove the specified snapshot from the database. + + Warning: This does not garbage collect all dependent data in the + database, so it must be followed by a call to garbage_collect() to make + the database consistent. + """ + cur = self.cursor() + cur.execute("delete from snapshots where scheme = ? and name = ?", + (scheme, name)) + + def prune_old_snapshots(self, scheme, intent=1.0): """Delete entries from old snapshots from the database. Only snapshots with the specified scheme name will be deleted. If @@ -579,6 +598,16 @@ class LocalDatabase: first = False max_intent = max(max_intent, snap_intent) + self.garbage_collect() + + def garbage_collect(self): + """Garbage-collect unreachable segment and object data. + + Remove all segments and checksums which is not reachable from the + current set of snapshots stored in the local database. + """ + cur = self.cursor() + # Delete entries in the segments_used table which are for non-existent # snapshots. cur.execute("""delete from segments_used @@ -590,16 +619,10 @@ class LocalDatabase: cur.execute("""delete from segments where segmentid not in (select segmentid from segments_used)""") - # Delete unused objects in the block_index table. By "unused", we mean - # any object which was stored in a segment which has been deleted, and - # any object in a segment which was marked for cleaning and has had - # cleaning performed already (the expired time is less than the current - # largest snapshot id). + # Delete dangling objects in the block_index table. cur.execute("""delete from block_index - where segmentid not in (select segmentid from segments) - or segmentid in (select segmentid from segments - where expire_time < ?)""", - (last_snapshotid,)) + where segmentid not in + (select segmentid from segments)""") # Remove sub-block signatures for deleted objects. cur.execute("""delete from subblock_signatures diff --git a/python/cumulus/cmd_util.py b/python/cumulus/cmd_util.py index da2fb47..e0b094d 100644 --- a/python/cumulus/cmd_util.py +++ b/python/cumulus/cmd_util.py @@ -59,7 +59,7 @@ def cmd_clean(args, clean_threshold=7.0): # Delete old snapshots from the local database. intent = float(options.intent) for s in db.list_schemes(): - db.garbage_collect(s, intent) + db.prune_old_snapshots(s, intent) # Expire segments which are poorly-utilized. for s in db.get_segment_cleaning_list(): diff --git a/python/cumulus/config.py b/python/cumulus/config.py index ea55b1f..cf721bf 100644 --- a/python/cumulus/config.py +++ b/python/cumulus/config.py @@ -51,7 +51,6 @@ def _build_retention_engine(spec): seconds = int(m.group(1)) * _TIME_UNITS[m.group(2)] period = period + datetime.timedelta(seconds=seconds) intervalspec = m.group(3) - print classname, period policy.add_policy(classname, period) return policy diff --git a/python/cumulus/main.py b/python/cumulus/main.py index 7700148..62f2665 100644 --- a/python/cumulus/main.py +++ b/python/cumulus/main.py @@ -22,6 +22,7 @@ This implements maintenance functions and is a wrapper around the C++ cumulus-backup program. """ +import datetime import re import sys @@ -59,11 +60,55 @@ def prune_backups(backup_config, scheme): cmd_util.options = options cmd_util.cmd_garbage_collect([]) +def prune_localdb(backup_config, scheme, next_snapshot=None): + """Clean old snapshots out of the local database. + + Clear old snapshots out of the local database, possibly in preparation for + running a new backup. One snapshot of each configured retention period is + kept (i.e., one weekly and one daily), and the most recent snapshot is + always retained. If next_snapshot is not None, it should be the timestamp + when (approximately) the next snapshot will be taken; if that snapshot + would be a daily, weekly, etc. snapshot, then it may result in the previous + snapshot of the same duration being evicted from the local database. + + Note that in this sense, "evict" merely refers to tracking the snapshots in + the local database; this function does not delete backups from the backup + storage. + """ + # Fetch the list of existing snapshots in the local database. Pruning only + # makes sense if there are more than one snapshots present. + db = cumulus.LocalDatabase(backup_config.get_global("localdb")) + snapshots = sorted(db.list_snapshots(scheme)) + if len(snapshots) <= 1: + return + + # Classify the snapshots (daily, weekly, etc.) and keep the most recent one + # of each category. Also ensure that the most recent snapshot is retained. + retention = backup_config.get_retention_for_scheme(scheme) + for snapshot in snapshots: + retention.consider_snapshot(snapshot) + if next_snapshot is not None: + retention.consider_snapshot(next_snapshot) + retained = set(retention.last_snapshots().values()) + retained.add(snapshots[-1]) + print retention.last_snapshots() + print retained + for s in snapshots: + print s, s in retained + + evicted = [s for s in snapshots if s not in retained] + for s in evicted: + db.delete_snapshot(scheme, s) + db.garbage_collect() + db.commit() + def main(argv): backup_config = config.CumulusConfig(argv[1]) for scheme in backup_config.backup_schemes(): print scheme - prune_backups(backup_config, scheme) + #prune_backups(backup_config, scheme) + prune_localdb(backup_config, scheme, datetime.datetime.utcnow()) + #prune_localdb(backup_config, scheme, datetime.datetime(2013, 1, 1)) if __name__ == "__main__": main(sys.argv) diff --git a/python/cumulus/retention.py b/python/cumulus/retention.py index a7ae983..b0e0078 100644 --- a/python/cumulus/retention.py +++ b/python/cumulus/retention.py @@ -102,10 +102,12 @@ class RetentionEngine(object): def add_policy(self, backup_class, retention_period): self._policies[backup_class] = retention_period - self._last_snapshots[backup_class] = (None, None) + self._last_snapshots[backup_class] = (None, None, False) @staticmethod def parse_timestamp(s): + if isinstance(s, datetime.datetime): + return s return datetime.datetime.strptime(s, TIMESTAMP_FORMAT) def consider_snapshot(self, snapshot): @@ -134,9 +136,11 @@ class RetentionEngine(object): partition = _backup_classes[backup_class](timestamp_policy) last_snapshot = self._last_snapshots[backup_class] if self._last_snapshots[backup_class][0] != partition: - self._last_snapshots[backup_class] = (partition, snapshot) self._labels.add(backup_class) - if snapshot_age < retention_period: retain = True + retain_label = snapshot_age < retention_period + self._last_snapshots[backup_class] = (partition, snapshot, + retain_label) + if retain_label: retain = True return retain def last_labels(self): @@ -148,4 +152,5 @@ class RetentionEngine(object): def last_snapshots(self): """Returns the most recent snapshot in each backup class.""" - return dict((k, v[1]) for (k, v) in self._last_snapshots.iteritems()) + return dict((k, v[1]) for (k, v) + in self._last_snapshots.iteritems() if v[2]) diff --git a/ref.h b/ref.h index d997133..bbbf5b8 100644 --- a/ref.h +++ b/ref.h @@ -110,7 +110,7 @@ public: bool has_range() const { return range_valid; } size_t get_range_start() const { return range_start; } size_t get_range_length() const { return range_length; } - size_t get_range_exact() const { return range_exact; } + bool range_is_exact() const { return range_exact; } void clear_range() { range_start = range_length = 0; range_valid = false; range_exact = false; } diff --git a/schema.sql b/schema.sql index 35b2c9d..d898272 100644 --- a/schema.sql +++ b/schema.sql @@ -3,13 +3,25 @@ -- -- The index is stored in an SQLite3 database. This is its schema. --- List of snapshots which have been created. +-- Versioning information, describing the revision for which the table schema +-- was set up. +create table schema_version( + version text, -- Program version, dotted decimal string + major integer, -- Major version number + minor integer -- Minor version number +); +insert into schema_version values ('0.11', 0, 11); + +-- List of snapshots which have been created and which we are still tracking. +-- There may be more snapshots than this actually stored at the remote server, +-- but the reverse should not ever be true: Cumulus may depend on data stored +-- in these snapshots when writing a new snapshot. create table snapshots ( snapshotid integer primary key, name text not null, scheme text not null, timestamp real, - intent real + intent real -- TODO: deprecated, should be removed ); -- List of segments which have been created. @@ -19,11 +31,13 @@ create table segments ( path text, checksum text, mtime real, - size integer, - expire_time integer -- snapshotid of latest snapshot when expired + data_size integer, -- sum of bytes in all objects in the segment + disk_size integer -- size of segment on disk, after compression + -- TODO: group? metadata vs. non-metadata? ); --- Index of all blocks which have been stored, by checksum. +-- Index of all data blocks in stored segments. This is indexed by content +-- hash to allow for coarse block-level data deduplication. create table block_index ( blockid integer primary key, segmentid integer not null, @@ -57,20 +71,14 @@ create table subblock_signatures ( signatures blob not null ); --- Summary of segment utilization for each snapshots. -create table segments_used ( +-- Summary of segment utilization for each snapshot. +create table segment_utilization ( snapshotid integer not null, segmentid integer not null, - utilization real -); -create unique index segments_used_index - on segments_used(snapshotid, segmentid); --- Overall estimate of segment utilization, for all snapshots combined. -create view segment_info as -select segmentid, mtime, size, expire_time, - cast(size * utilization as integer) as used, utilization -from segments join - (select segmentid, max(utilization) as utilization - from segments_used group by segmentid) -using (segmentid); + -- Estimate for the number of live bytes in data objects: this is capped at + -- segments.data_size if all data in the segment is referenced. + bytes_referenced integer +); +create unique index segment_utilization_index + on segment_utilization(snapshotid, segmentid); diff --git a/store.cc b/store.cc index 9634e7b..772f9c5 100644 --- a/store.cc +++ b/store.cc @@ -41,6 +41,7 @@ #include #include "hash.h" +#include "localdb.h" #include "store.h" #include "ref.h" #include "util.h" @@ -226,7 +227,9 @@ static const size_t SEGMENT_SIZE = 4 * 1024 * 1024; static map > group_sizes; ObjectReference TarSegmentStore::write_object(const char *data, size_t len, - const std::string &group) + const std::string &group, + const std::string &checksum, + double age) { struct segment_info *segment; @@ -240,7 +243,7 @@ ObjectReference TarSegmentStore::write_object(const char *data, size_t len, segment->basename = segment->name + ".tar"; segment->basename += filter_extension; segment->count = 0; - segment->size = 0; + segment->data_size = 0; segment->rf = remote->alloc_file(segment->basename, "segments"); segment->file = new Tarfile(segment->rf, segment->name); @@ -255,11 +258,16 @@ ObjectReference TarSegmentStore::write_object(const char *data, size_t len, segment->file->write_object(id, data, len); segment->count++; - segment->size += len; + segment->data_size += len; group_sizes[group].first += len; ObjectReference ref(segment->name, id_buf); + ref.set_range(0, len, true); + if (checksum.size() > 0) + ref.set_checksum(checksum); + if (db != NULL) + db->StoreObject(ref, age); // If this segment meets or exceeds the size target, close it so that // future objects will go into a new segment. @@ -292,17 +300,21 @@ void TarSegmentStore::close_segment(const string &group) delete segment->file; if (db != NULL) { + struct stat stat_buf; + int disk_size = 0; + if (stat(segment->rf->get_local_path().c_str(), &stat_buf) == 0) { + disk_size = stat_buf.st_size; + group_sizes[segment->group].second += disk_size; + } + SHA1Checksum segment_checksum; + string checksum; if (segment_checksum.process_file(segment->rf->get_local_path().c_str())) { - string checksum = segment_checksum.checksum_str(); - db->SetSegmentChecksum(segment->name, segment->basename, checksum, - segment->size); + checksum = segment_checksum.checksum_str(); } - struct stat stat_buf; - if (stat(segment->rf->get_local_path().c_str(), &stat_buf) == 0) { - group_sizes[segment->group].second += stat_buf.st_size; - } + db->SetSegmentChecksum(segment->name, segment->basename, checksum, + segment->data_size, disk_size); } segment->rf->send(); @@ -317,7 +329,7 @@ string TarSegmentStore::object_reference_to_segment(const string &object) } LbsObject::LbsObject() - : group(""), data(NULL), data_len(0), written(false) + : group(""), age(0.0), data(NULL), data_len(0), written(false) { } @@ -325,21 +337,26 @@ LbsObject::~LbsObject() { } -void LbsObject::write(TarSegmentStore *store) +void LbsObject::set_data(const char *d, size_t len, const char *checksum) { - assert(data != NULL); - assert(!written); + data = d; + data_len = len; - ref = store->write_object(data, data_len, group); - written = true; + if (checksum != NULL) { + this->checksum = checksum; + } else { + Hash *hash = Hash::New(); + hash->update(data, data_len); + this->checksum = hash->digest_str(); + delete hash; + } } -void LbsObject::checksum() +void LbsObject::write(TarSegmentStore *store) { - assert(written); + assert(data != NULL); + assert(!written); - Hash *hash = Hash::New(); - hash->update(data, data_len); - ref.set_checksum(hash->digest_str()); - delete hash; + ref = store->write_object(data, data_len, group, checksum, age); + written = true; } diff --git a/store.h b/store.h index 428c418..016e275 100644 --- a/store.h +++ b/store.h @@ -108,7 +108,9 @@ public: // used to control object placement; objects with different group // parameters are kept in separate segments. ObjectReference write_object(const char *data, size_t len, - const std::string &group = ""); + const std::string &group = "", + const std::string &checksum = "", + double age = 0.0); // Ensure all segments have been fully written. void sync(); @@ -122,7 +124,7 @@ private: std::string group; std::string name; // UUID int count; // Objects written to this segment - int size; // Combined size of objects written + int data_size; // Combined size of objects written std::string basename; // Name of segment without directory RemoteFile *rf; }; @@ -155,27 +157,32 @@ public: // Data in an object must be written all at once, and cannot be generated // incrementally. Data can be an arbitrary block of binary data of any // size. The pointer to the data need only remain valid until write() is - // called. - void set_data(const char *d, size_t len) { data = d; data_len = len; } + // called. If checksum is non-NULL then it is assumed to contain a hash + // value for the data; this provides an optimization in case the caller has + // already checksummed the data. Otherwise the set_data will compute a + // hash of the data itself. + void set_data(const char *d, size_t len, const char *checksum); + + // Explicitly sets the age of the data, for later garbage-collection or + // repacking purposes. If not set, the age defaults to the current time. + // The age is stored in the database as a floating point value, expressing + // the time in Julian days. + void set_age(double age) { this->age = age; } // Write an object to a segment, thus making it permanent. This function // can be called at most once. void write(TarSegmentStore *store); - // Compute the checksum of an object, and include it in the object - // reference. This should be called after write(), and the data specified - // by set_data() must remain valid through the call to checksum(). - void checksum(); - // An object is assigned a permanent name once it has been written to a // segment. Until that time, its name cannot be determined. - std::string get_name() const { return ref.to_string(); } ObjectReference get_ref() { return ref; } private: std::string group; + double age; const char *data; size_t data_len; + std::string checksum; bool written; ObjectReference ref; diff --git a/subfile.cc b/subfile.cc index acde97c..70e1727 100644 --- a/subfile.cc +++ b/subfile.cc @@ -262,16 +262,11 @@ list Subfile::create_incremental(TarSegmentStore *tss, // No data was matched. The entire block can be written out as is into a // new object, and the new_block_summary used to save chunk signatures. if (!matched_old) { - SHA1Checksum block_hash; - block_hash.process(analyzed_buf, analyzed_len); - string block_csum = block_hash.checksum_str(); - - o->set_data(analyzed_buf, analyzed_len); + o->set_age(block_age); + o->set_data(analyzed_buf, analyzed_len, NULL); o->write(tss); ObjectReference ref = o->get_ref(); - db->StoreObject(ref, block_csum, analyzed_len, block_age); store_analyzed_signatures(ref); - ref.set_range(0, analyzed_len, true); refs.push_back(ref); delete o; return refs; @@ -295,7 +290,7 @@ list Subfile::create_incremental(TarSegmentStore *tss, string block_csum = block_hash.checksum_str(); o->set_group("data"); - o->set_data(literal_buf, new_data); + o->set_data(literal_buf, new_data, NULL); o->write(tss); ObjectReference ref = o->get_ref(); for (i = items.begin(); i != items.end(); ++i) { @@ -305,7 +300,7 @@ list Subfile::create_incremental(TarSegmentStore *tss, } } - db->StoreObject(ref, block_csum, new_data, 0.0); + //db->StoreObject(ref, 0.0); block_summary summary; summary.ref = ref; -- 2.20.1