From: Michael Vrable Date: Fri, 21 Sep 2012 22:50:14 +0000 (-0700) Subject: Improve tracking of segments and segment utilization. X-Git-Url: http://git.vrable.net/?p=cumulus.git;a=commitdiff_plain;h=3d780590edec4583eb3ef0ca16120afd0f7451f9 Improve tracking of segments and segment utilization. Update the local database and cumulus binary to keep better track of segment utilization data, with an eventual goal of improving cleaning algorithms. --- diff --git a/localdb.cc b/localdb.cc index 3fcbafe..ca83559 100644 --- a/localdb.cc +++ b/localdb.cc @@ -38,9 +38,14 @@ #include "store.h" #include "util.h" +using std::max; using std::min; +using std::set; using std::string; +static const int SCHEMA_MAJOR = 0; +static const int SCHEMA_MINOR = 11; + /* Helper function to prepare a statement for execution in the current * database. */ sqlite3_stmt *LocalDb::Prepare(const char *sql) @@ -85,14 +90,33 @@ void LocalDb::Open(const char *path, const char *snapshot_name, sqlite3_extended_result_codes(db, 1); + /* Check that the local database format is the correct version; if not, + * report an error. */ + sqlite3_stmt *stmt = Prepare("select major, minor from schema_version"); + + rc = sqlite3_step(stmt); + if (rc != SQLITE_ROW) { + fatal("Unable to read local database version from database"); + } else if (rc == SQLITE_ROW) { + int major = sqlite3_column_int(stmt, 0); + int minor = sqlite3_column_int(stmt, 1); + if (major != SCHEMA_MAJOR || minor != SCHEMA_MINOR) { + fprintf(stderr, + "Local database does not have required schema version!\n" + " expected: %d.%d, found: %d.%d\n", + SCHEMA_MAJOR, SCHEMA_MINOR, major, minor); + fatal("Unable to continue."); + } + } + sqlite3_finalize(stmt); + if (snapshot_scheme == NULL) snapshot_scheme = ""; /* Insert this snapshot into the database, and determine the integer key * which will be used to identify it. */ - sqlite3_stmt *stmt = Prepare("insert into " - "snapshots(name, scheme, timestamp, intent) " - "values (?, ?, julianday('now'), ?)"); + stmt = Prepare("insert into snapshots(name, scheme, timestamp, intent) " + "values (?, ?, julianday('now'), ?)"); sqlite3_bind_text(stmt, 1, snapshot_name, strlen(snapshot_name), SQLITE_TRANSIENT); sqlite3_bind_text(stmt, 2, snapshot_scheme, strlen(snapshot_scheme), @@ -143,22 +167,13 @@ void LocalDb::Close() { int rc; - /* Summarize the snapshot_refs table into segments_used. */ + /* Summarize the snapshot_refs table into segment_utilization. */ sqlite3_stmt *stmt = Prepare( - "insert or replace into segments_used " - "select ? as snapshotid, segmentid, max(utilization) from (" - " select segmentid, cast(used as real) / size as utilization " - " from " - " (select segmentid, sum(size) as used from snapshot_refs " - " group by segmentid) " - " join segments using (segmentid) " - " union " - " select segmentid, utilization from segments_used " - " where snapshotid = ? " - ") group by segmentid" + "insert or replace into segment_utilization " + "select ? as snapshotid, segmentid, sum(size) " + "from snapshot_refs group by segmentid" ); sqlite3_bind_int64(stmt, 1, snapshotid); - sqlite3_bind_int64(stmt, 2, snapshotid); rc = sqlite3_step(stmt); if (rc != SQLITE_OK && rc != SQLITE_DONE) { ReportError(rc); @@ -232,13 +247,16 @@ string LocalDb::IdToSegment(int64_t segmentid) return result; } -void LocalDb::StoreObject(const ObjectReference& ref, - const string &checksum, int64_t size, - double age) +void LocalDb::StoreObject(const ObjectReference& ref, double age) { int rc; sqlite3_stmt *stmt; + assert(ref.has_checksum()); + string checksum = ref.get_checksum(); + assert(ref.range_is_exact()); + int64_t size = ref.get_range_length(); + if (age == 0.0) { stmt = Prepare("insert into block_index(" "segmentid, object, checksum, size, timestamp) " @@ -370,6 +388,33 @@ bool LocalDb::IsAvailable(const ObjectReference &ref) return found; } +set LocalDb::GetUsedSegments() +{ + int rc; + sqlite3_stmt *stmt; + set result; + + stmt = Prepare("select segment from segments " + "where segmentid in (select segmentid from snapshot_refs)"); + + while (true) { + rc = sqlite3_step(stmt); + if (rc == SQLITE_ROW) { + const char *segment + = reinterpret_cast(sqlite3_column_text(stmt, 0)); + result.insert(segment); + } else if (rc == SQLITE_DONE) { + break; + } else { + ReportError(rc); + } + } + + sqlite3_finalize(stmt); + + return result; +} + void LocalDb::UseObject(const ObjectReference& ref) { int rc; @@ -390,31 +435,46 @@ void LocalDb::UseObject(const ObjectReference& ref) } sqlite3_finalize(stmt); - int64_t block_size = 0; - stmt = Prepare("select size from block_index " - "where segmentid = ? and object = ?"); - sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment())); - obj = ref.get_sequence(); - sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT); - rc = sqlite3_step(stmt); - if (rc == SQLITE_ROW) { - block_size = sqlite3_column_int64(stmt, 0); + // Attempt to determine the underlying size of the object. This may + // require a database lookup if the length is not encoded into the object + // reference already. + int64_t object_size = 0; + if (ref.range_is_exact()) { + object_size = ref.get_range_length(); } else { - string refstr = ref.to_string(); - fprintf(stderr, "No block found in block_index for %s\n", - refstr.c_str()); + stmt = Prepare("select size from block_index " + "where segmentid = ? and object = ?"); + sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment())); + obj = ref.get_sequence(); + sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT); + rc = sqlite3_step(stmt); + if (rc == SQLITE_ROW) { + object_size = sqlite3_column_int64(stmt, 0); + } else { + fprintf(stderr, "Warning: No block found in block_index for %s\n", + ref.to_string().c_str()); + } sqlite3_finalize(stmt); - return; } - sqlite3_finalize(stmt); - int64_t new_size = old_size; + // Possibly mark additional bytes as being referenced. The number of bytes + // referenced can only be increased (up to the object size). The bytes + // referenced will be set to the object size only if the entire object is + // referenced at once: a series of partial ranges that add up to the total + // size will have a reference size capped at just less than the full object + // size (we can't tell if some bytes were referenced multiple times, and + // thus we conservatively assume some bytes might still be unreferenced). + int64_t new_refs = old_size; if (ref.has_range()) { - new_size += ref.get_range_length(); - new_size = min(new_size, block_size); + new_refs = ref.get_range_length(); } else { - new_size = block_size; + new_refs = object_size; } + int64_t new_size = old_size + new_refs; + if (old_size < object_size && new_refs < object_size) + new_size = min(new_size, object_size - 1); + new_size = min(object_size, new_size); + new_size = max(new_size, (int64_t)0); if (new_size != old_size) { stmt = Prepare("insert or replace " @@ -435,44 +495,25 @@ void LocalDb::UseObject(const ObjectReference& ref) } } -void LocalDb::UseSegment(const std::string &segment, double utilization) -{ - int rc; - sqlite3_stmt *stmt; - - stmt = Prepare("insert or replace " - "into segments_used(snapshotid, segmentid, utilization) " - "values (?, ?, ?)"); - sqlite3_bind_int64(stmt, 1, snapshotid); - sqlite3_bind_int64(stmt, 2, SegmentToId(segment)); - sqlite3_bind_double(stmt, 3, utilization); - - rc = sqlite3_step(stmt); - if (rc != SQLITE_DONE) { - fprintf(stderr, "Could not insert segment use record!\n"); - ReportError(rc); - } - - sqlite3_finalize(stmt); -} - void LocalDb::SetSegmentChecksum(const std::string &segment, const std::string &path, const std::string &checksum, - int size) + int data_size, int disk_size) { int rc; sqlite3_stmt *stmt; - stmt = Prepare("update segments set path = ?, checksum = ?, size = ?, " + stmt = Prepare("update segments set path = ?, checksum = ?, " + "data_size = ?, disk_size = ?, " "mtime = coalesce(mtime, julianday('now')) " "where segmentid = ?"); sqlite3_bind_text(stmt, 1, path.c_str(), path.size(), SQLITE_TRANSIENT); sqlite3_bind_text(stmt, 2, checksum.c_str(), checksum.size(), SQLITE_TRANSIENT); - sqlite3_bind_int64(stmt, 3, size); - sqlite3_bind_int64(stmt, 4, SegmentToId(segment)); + sqlite3_bind_int64(stmt, 3, data_size); + sqlite3_bind_int64(stmt, 4, disk_size); + sqlite3_bind_int64(stmt, 5, SegmentToId(segment)); rc = sqlite3_step(stmt); if (rc != SQLITE_DONE) { diff --git a/localdb.h b/localdb.h index 5901836..7764aed 100644 --- a/localdb.h +++ b/localdb.h @@ -31,6 +31,7 @@ #include +#include #include #include "ref.h" @@ -40,17 +41,17 @@ public: void Open(const char *path, const char *snapshot_name, const char *snapshot_scheme, double intent); void Close(); - void StoreObject(const ObjectReference& ref, - const std::string &checksum, int64_t size, double age); + void StoreObject(const ObjectReference& ref, double age); ObjectReference FindObject(const std::string &checksum, int64_t size); bool IsOldObject(const std::string &checksum, int64_t size, double *age, int *group); bool IsAvailable(const ObjectReference &ref); void UseObject(const ObjectReference& ref); - void UseSegment(const std::string &segment, double utilization); + std::set GetUsedSegments(); void SetSegmentChecksum(const std::string &segment, const std::string &path, - const std::string &checksum, int size); + const std::string &checksum, + int data_size, int disk_size); bool GetSegmentChecksum(const std::string &segment, std::string *seg_path, std::string *seg_checksum); diff --git a/main.cc b/main.cc index c3f8c59..e10a04a 100644 --- a/main.cc +++ b/main.cc @@ -82,9 +82,6 @@ static char *block_buf; * invocations to help in creating incremental snapshots. */ LocalDb *db; -/* Keep track of all segments which are needed to reconstruct the snapshot. */ -std::set segment_list; - /* Snapshot intent: 1=daily, 7=weekly, etc. This is not used directly, but is * stored in the local database and can help guide segment cleaning and * snapshot expiration policies. */ @@ -98,13 +95,6 @@ bool flag_rebuild_statcache = false; /* Whether verbose output is enabled. */ bool verbose = false; -/* Ensure that the given segment is listed as a dependency of the current - * snapshot. */ -void add_segment(const string& segment) -{ - segment_list.insert(segment); -} - /* Attempts to open a regular file read-only, but with safety checks for files * that might not be fully trusted. */ int safe_open(const string& path, struct stat *stat_buf) @@ -232,8 +222,6 @@ int64_t dumpfile(int fd, dictionary &file_info, const string &path, i != old_blocks.end(); ++i) { const ObjectReference &ref = *i; object_list.push_back(ref.to_string()); - if (ref.is_normal()) - add_segment(ref.get_segment()); db->UseObject(ref); } size = stat_buf.st_size; @@ -274,9 +262,10 @@ int64_t dumpfile(int fd, dictionary &file_info, const string &path, double block_age = 0.0; ObjectReference ref; - SHA1Checksum block_hash; - block_hash.process(block_buf, bytes); - string block_csum = block_hash.checksum_str(); + Hash *hash = Hash::New(); + hash->update(block_buf, bytes); + string block_csum = hash->digest_str(); + delete hash; if (all_zero) { ref = ObjectReference(ObjectReference::REF_ZERO); @@ -332,8 +321,6 @@ int64_t dumpfile(int fd, dictionary &file_info, const string &path, while (!refs.empty()) { ref = refs.front(); refs.pop_front(); object_list.push_back(ref.to_string()); - if (ref.is_normal()) - add_segment(ref.get_segment()); db->UseObject(ref); } size += bytes; @@ -859,7 +846,6 @@ int main(int argc, char *argv[]) } ObjectReference root_ref = metawriter->close(); - add_segment(root_ref.get_segment()); string backup_root = root_ref.to_string(); delete metawriter; @@ -880,6 +866,7 @@ int main(int argc, char *argv[]) "checksums"); FILE *checksums = fdopen(checksum_file->get_fd(), "w"); + std::set segment_list = db->GetUsedSegments(); for (std::set::iterator i = segment_list.begin(); i != segment_list.end(); ++i) { string seg_path, seg_csum; diff --git a/metadata.cc b/metadata.cc index 2933ce9..15bd7a5 100644 --- a/metadata.cc +++ b/metadata.cc @@ -49,7 +49,6 @@ bool flag_full_metadata = false; /* TODO: Move to header file */ extern LocalDb *db; -void add_segment(const string& segment); /* Like strcmp, but sorts in the order that files will be visited in the * filesystem. That is, we break paths apart at slashes, and compare path @@ -296,8 +295,7 @@ void MetadataWriter::metadata_flush() // If indirectly referencing any other metadata logs, be sure those // segments are properly referenced. if (i->reused) { - add_segment(i->ref.get_segment()); - db->UseSegment(i->ref.get_segment(), 1.0); + db->UseObject(i->ref); } // Write out an indirect reference to any previous objects which could @@ -338,15 +336,13 @@ void MetadataWriter::metadata_flush() /* Write current metadata information to a new object. */ LbsObject *meta = new LbsObject; meta->set_group("metadata"); - meta->set_data(m.data(), m.size()); + meta->set_data(m.data(), m.size(), NULL); meta->write(store); - meta->checksum(); /* Write a reference to this block in the root. */ ObjectReference ref = meta->get_ref(); metadata_root << "@" << ref.to_string() << "\n"; - add_segment(ref.get_segment()); - db->UseSegment(ref.get_segment(), 1.0); + db->UseObject(ref); delete meta; @@ -397,11 +393,9 @@ ObjectReference MetadataWriter::close() LbsObject *root = new LbsObject; root->set_group("metadata"); - root->set_data(root_data.data(), root_data.size()); + root->set_data(root_data.data(), root_data.size(), NULL); root->write(store); - root->checksum(); - add_segment(root->get_ref().get_segment()); - db->UseSegment(root->get_ref().get_segment(), 1.0); + db->UseObject(root->get_ref()); ObjectReference ref = root->get_ref(); delete root; diff --git a/python/cumulus/__init__.py b/python/cumulus/__init__.py index 51755c7..a40c58d 100644 --- a/python/cumulus/__init__.py +++ b/python/cumulus/__init__.py @@ -532,7 +532,26 @@ class LocalDatabase: schemes.sort() return schemes - def garbage_collect(self, scheme, intent=1.0): + def list_snapshots(self, scheme): + """Return a list of snapshots for the given scheme.""" + cur = self.cursor() + cur.execute("select name from snapshots") + snapshots = [row[0] for row in cur.fetchall()] + snapshots.sort() + return snapshots + + def delete_snapshot(self, scheme, name): + """Remove the specified snapshot from the database. + + Warning: This does not garbage collect all dependent data in the + database, so it must be followed by a call to garbage_collect() to make + the database consistent. + """ + cur = self.cursor() + cur.execute("delete from snapshots where scheme = ? and name = ?", + (scheme, name)) + + def prune_old_snapshots(self, scheme, intent=1.0): """Delete entries from old snapshots from the database. Only snapshots with the specified scheme name will be deleted. If @@ -579,6 +598,16 @@ class LocalDatabase: first = False max_intent = max(max_intent, snap_intent) + self.garbage_collect() + + def garbage_collect(self): + """Garbage-collect unreachable segment and object data. + + Remove all segments and checksums which is not reachable from the + current set of snapshots stored in the local database. + """ + cur = self.cursor() + # Delete entries in the segments_used table which are for non-existent # snapshots. cur.execute("""delete from segments_used @@ -590,16 +619,10 @@ class LocalDatabase: cur.execute("""delete from segments where segmentid not in (select segmentid from segments_used)""") - # Delete unused objects in the block_index table. By "unused", we mean - # any object which was stored in a segment which has been deleted, and - # any object in a segment which was marked for cleaning and has had - # cleaning performed already (the expired time is less than the current - # largest snapshot id). + # Delete dangling objects in the block_index table. cur.execute("""delete from block_index - where segmentid not in (select segmentid from segments) - or segmentid in (select segmentid from segments - where expire_time < ?)""", - (last_snapshotid,)) + where segmentid not in + (select segmentid from segments)""") # Remove sub-block signatures for deleted objects. cur.execute("""delete from subblock_signatures diff --git a/python/cumulus/cmd_util.py b/python/cumulus/cmd_util.py index da2fb47..e0b094d 100644 --- a/python/cumulus/cmd_util.py +++ b/python/cumulus/cmd_util.py @@ -59,7 +59,7 @@ def cmd_clean(args, clean_threshold=7.0): # Delete old snapshots from the local database. intent = float(options.intent) for s in db.list_schemes(): - db.garbage_collect(s, intent) + db.prune_old_snapshots(s, intent) # Expire segments which are poorly-utilized. for s in db.get_segment_cleaning_list(): diff --git a/python/cumulus/config.py b/python/cumulus/config.py index ea55b1f..cf721bf 100644 --- a/python/cumulus/config.py +++ b/python/cumulus/config.py @@ -51,7 +51,6 @@ def _build_retention_engine(spec): seconds = int(m.group(1)) * _TIME_UNITS[m.group(2)] period = period + datetime.timedelta(seconds=seconds) intervalspec = m.group(3) - print classname, period policy.add_policy(classname, period) return policy diff --git a/python/cumulus/main.py b/python/cumulus/main.py index 7700148..62f2665 100644 --- a/python/cumulus/main.py +++ b/python/cumulus/main.py @@ -22,6 +22,7 @@ This implements maintenance functions and is a wrapper around the C++ cumulus-backup program. """ +import datetime import re import sys @@ -59,11 +60,55 @@ def prune_backups(backup_config, scheme): cmd_util.options = options cmd_util.cmd_garbage_collect([]) +def prune_localdb(backup_config, scheme, next_snapshot=None): + """Clean old snapshots out of the local database. + + Clear old snapshots out of the local database, possibly in preparation for + running a new backup. One snapshot of each configured retention period is + kept (i.e., one weekly and one daily), and the most recent snapshot is + always retained. If next_snapshot is not None, it should be the timestamp + when (approximately) the next snapshot will be taken; if that snapshot + would be a daily, weekly, etc. snapshot, then it may result in the previous + snapshot of the same duration being evicted from the local database. + + Note that in this sense, "evict" merely refers to tracking the snapshots in + the local database; this function does not delete backups from the backup + storage. + """ + # Fetch the list of existing snapshots in the local database. Pruning only + # makes sense if there are more than one snapshots present. + db = cumulus.LocalDatabase(backup_config.get_global("localdb")) + snapshots = sorted(db.list_snapshots(scheme)) + if len(snapshots) <= 1: + return + + # Classify the snapshots (daily, weekly, etc.) and keep the most recent one + # of each category. Also ensure that the most recent snapshot is retained. + retention = backup_config.get_retention_for_scheme(scheme) + for snapshot in snapshots: + retention.consider_snapshot(snapshot) + if next_snapshot is not None: + retention.consider_snapshot(next_snapshot) + retained = set(retention.last_snapshots().values()) + retained.add(snapshots[-1]) + print retention.last_snapshots() + print retained + for s in snapshots: + print s, s in retained + + evicted = [s for s in snapshots if s not in retained] + for s in evicted: + db.delete_snapshot(scheme, s) + db.garbage_collect() + db.commit() + def main(argv): backup_config = config.CumulusConfig(argv[1]) for scheme in backup_config.backup_schemes(): print scheme - prune_backups(backup_config, scheme) + #prune_backups(backup_config, scheme) + prune_localdb(backup_config, scheme, datetime.datetime.utcnow()) + #prune_localdb(backup_config, scheme, datetime.datetime(2013, 1, 1)) if __name__ == "__main__": main(sys.argv) diff --git a/python/cumulus/retention.py b/python/cumulus/retention.py index a7ae983..b0e0078 100644 --- a/python/cumulus/retention.py +++ b/python/cumulus/retention.py @@ -102,10 +102,12 @@ class RetentionEngine(object): def add_policy(self, backup_class, retention_period): self._policies[backup_class] = retention_period - self._last_snapshots[backup_class] = (None, None) + self._last_snapshots[backup_class] = (None, None, False) @staticmethod def parse_timestamp(s): + if isinstance(s, datetime.datetime): + return s return datetime.datetime.strptime(s, TIMESTAMP_FORMAT) def consider_snapshot(self, snapshot): @@ -134,9 +136,11 @@ class RetentionEngine(object): partition = _backup_classes[backup_class](timestamp_policy) last_snapshot = self._last_snapshots[backup_class] if self._last_snapshots[backup_class][0] != partition: - self._last_snapshots[backup_class] = (partition, snapshot) self._labels.add(backup_class) - if snapshot_age < retention_period: retain = True + retain_label = snapshot_age < retention_period + self._last_snapshots[backup_class] = (partition, snapshot, + retain_label) + if retain_label: retain = True return retain def last_labels(self): @@ -148,4 +152,5 @@ class RetentionEngine(object): def last_snapshots(self): """Returns the most recent snapshot in each backup class.""" - return dict((k, v[1]) for (k, v) in self._last_snapshots.iteritems()) + return dict((k, v[1]) for (k, v) + in self._last_snapshots.iteritems() if v[2]) diff --git a/ref.h b/ref.h index d997133..bbbf5b8 100644 --- a/ref.h +++ b/ref.h @@ -110,7 +110,7 @@ public: bool has_range() const { return range_valid; } size_t get_range_start() const { return range_start; } size_t get_range_length() const { return range_length; } - size_t get_range_exact() const { return range_exact; } + bool range_is_exact() const { return range_exact; } void clear_range() { range_start = range_length = 0; range_valid = false; range_exact = false; } diff --git a/schema.sql b/schema.sql index 35b2c9d..d898272 100644 --- a/schema.sql +++ b/schema.sql @@ -3,13 +3,25 @@ -- -- The index is stored in an SQLite3 database. This is its schema. --- List of snapshots which have been created. +-- Versioning information, describing the revision for which the table schema +-- was set up. +create table schema_version( + version text, -- Program version, dotted decimal string + major integer, -- Major version number + minor integer -- Minor version number +); +insert into schema_version values ('0.11', 0, 11); + +-- List of snapshots which have been created and which we are still tracking. +-- There may be more snapshots than this actually stored at the remote server, +-- but the reverse should not ever be true: Cumulus may depend on data stored +-- in these snapshots when writing a new snapshot. create table snapshots ( snapshotid integer primary key, name text not null, scheme text not null, timestamp real, - intent real + intent real -- TODO: deprecated, should be removed ); -- List of segments which have been created. @@ -19,11 +31,13 @@ create table segments ( path text, checksum text, mtime real, - size integer, - expire_time integer -- snapshotid of latest snapshot when expired + data_size integer, -- sum of bytes in all objects in the segment + disk_size integer -- size of segment on disk, after compression + -- TODO: group? metadata vs. non-metadata? ); --- Index of all blocks which have been stored, by checksum. +-- Index of all data blocks in stored segments. This is indexed by content +-- hash to allow for coarse block-level data deduplication. create table block_index ( blockid integer primary key, segmentid integer not null, @@ -57,20 +71,14 @@ create table subblock_signatures ( signatures blob not null ); --- Summary of segment utilization for each snapshots. -create table segments_used ( +-- Summary of segment utilization for each snapshot. +create table segment_utilization ( snapshotid integer not null, segmentid integer not null, - utilization real -); -create unique index segments_used_index - on segments_used(snapshotid, segmentid); --- Overall estimate of segment utilization, for all snapshots combined. -create view segment_info as -select segmentid, mtime, size, expire_time, - cast(size * utilization as integer) as used, utilization -from segments join - (select segmentid, max(utilization) as utilization - from segments_used group by segmentid) -using (segmentid); + -- Estimate for the number of live bytes in data objects: this is capped at + -- segments.data_size if all data in the segment is referenced. + bytes_referenced integer +); +create unique index segment_utilization_index + on segment_utilization(snapshotid, segmentid); diff --git a/store.cc b/store.cc index 9634e7b..772f9c5 100644 --- a/store.cc +++ b/store.cc @@ -41,6 +41,7 @@ #include #include "hash.h" +#include "localdb.h" #include "store.h" #include "ref.h" #include "util.h" @@ -226,7 +227,9 @@ static const size_t SEGMENT_SIZE = 4 * 1024 * 1024; static map > group_sizes; ObjectReference TarSegmentStore::write_object(const char *data, size_t len, - const std::string &group) + const std::string &group, + const std::string &checksum, + double age) { struct segment_info *segment; @@ -240,7 +243,7 @@ ObjectReference TarSegmentStore::write_object(const char *data, size_t len, segment->basename = segment->name + ".tar"; segment->basename += filter_extension; segment->count = 0; - segment->size = 0; + segment->data_size = 0; segment->rf = remote->alloc_file(segment->basename, "segments"); segment->file = new Tarfile(segment->rf, segment->name); @@ -255,11 +258,16 @@ ObjectReference TarSegmentStore::write_object(const char *data, size_t len, segment->file->write_object(id, data, len); segment->count++; - segment->size += len; + segment->data_size += len; group_sizes[group].first += len; ObjectReference ref(segment->name, id_buf); + ref.set_range(0, len, true); + if (checksum.size() > 0) + ref.set_checksum(checksum); + if (db != NULL) + db->StoreObject(ref, age); // If this segment meets or exceeds the size target, close it so that // future objects will go into a new segment. @@ -292,17 +300,21 @@ void TarSegmentStore::close_segment(const string &group) delete segment->file; if (db != NULL) { + struct stat stat_buf; + int disk_size = 0; + if (stat(segment->rf->get_local_path().c_str(), &stat_buf) == 0) { + disk_size = stat_buf.st_size; + group_sizes[segment->group].second += disk_size; + } + SHA1Checksum segment_checksum; + string checksum; if (segment_checksum.process_file(segment->rf->get_local_path().c_str())) { - string checksum = segment_checksum.checksum_str(); - db->SetSegmentChecksum(segment->name, segment->basename, checksum, - segment->size); + checksum = segment_checksum.checksum_str(); } - struct stat stat_buf; - if (stat(segment->rf->get_local_path().c_str(), &stat_buf) == 0) { - group_sizes[segment->group].second += stat_buf.st_size; - } + db->SetSegmentChecksum(segment->name, segment->basename, checksum, + segment->data_size, disk_size); } segment->rf->send(); @@ -317,7 +329,7 @@ string TarSegmentStore::object_reference_to_segment(const string &object) } LbsObject::LbsObject() - : group(""), data(NULL), data_len(0), written(false) + : group(""), age(0.0), data(NULL), data_len(0), written(false) { } @@ -325,21 +337,26 @@ LbsObject::~LbsObject() { } -void LbsObject::write(TarSegmentStore *store) +void LbsObject::set_data(const char *d, size_t len, const char *checksum) { - assert(data != NULL); - assert(!written); + data = d; + data_len = len; - ref = store->write_object(data, data_len, group); - written = true; + if (checksum != NULL) { + this->checksum = checksum; + } else { + Hash *hash = Hash::New(); + hash->update(data, data_len); + this->checksum = hash->digest_str(); + delete hash; + } } -void LbsObject::checksum() +void LbsObject::write(TarSegmentStore *store) { - assert(written); + assert(data != NULL); + assert(!written); - Hash *hash = Hash::New(); - hash->update(data, data_len); - ref.set_checksum(hash->digest_str()); - delete hash; + ref = store->write_object(data, data_len, group, checksum, age); + written = true; } diff --git a/store.h b/store.h index 428c418..016e275 100644 --- a/store.h +++ b/store.h @@ -108,7 +108,9 @@ public: // used to control object placement; objects with different group // parameters are kept in separate segments. ObjectReference write_object(const char *data, size_t len, - const std::string &group = ""); + const std::string &group = "", + const std::string &checksum = "", + double age = 0.0); // Ensure all segments have been fully written. void sync(); @@ -122,7 +124,7 @@ private: std::string group; std::string name; // UUID int count; // Objects written to this segment - int size; // Combined size of objects written + int data_size; // Combined size of objects written std::string basename; // Name of segment without directory RemoteFile *rf; }; @@ -155,27 +157,32 @@ public: // Data in an object must be written all at once, and cannot be generated // incrementally. Data can be an arbitrary block of binary data of any // size. The pointer to the data need only remain valid until write() is - // called. - void set_data(const char *d, size_t len) { data = d; data_len = len; } + // called. If checksum is non-NULL then it is assumed to contain a hash + // value for the data; this provides an optimization in case the caller has + // already checksummed the data. Otherwise the set_data will compute a + // hash of the data itself. + void set_data(const char *d, size_t len, const char *checksum); + + // Explicitly sets the age of the data, for later garbage-collection or + // repacking purposes. If not set, the age defaults to the current time. + // The age is stored in the database as a floating point value, expressing + // the time in Julian days. + void set_age(double age) { this->age = age; } // Write an object to a segment, thus making it permanent. This function // can be called at most once. void write(TarSegmentStore *store); - // Compute the checksum of an object, and include it in the object - // reference. This should be called after write(), and the data specified - // by set_data() must remain valid through the call to checksum(). - void checksum(); - // An object is assigned a permanent name once it has been written to a // segment. Until that time, its name cannot be determined. - std::string get_name() const { return ref.to_string(); } ObjectReference get_ref() { return ref; } private: std::string group; + double age; const char *data; size_t data_len; + std::string checksum; bool written; ObjectReference ref; diff --git a/subfile.cc b/subfile.cc index acde97c..70e1727 100644 --- a/subfile.cc +++ b/subfile.cc @@ -262,16 +262,11 @@ list Subfile::create_incremental(TarSegmentStore *tss, // No data was matched. The entire block can be written out as is into a // new object, and the new_block_summary used to save chunk signatures. if (!matched_old) { - SHA1Checksum block_hash; - block_hash.process(analyzed_buf, analyzed_len); - string block_csum = block_hash.checksum_str(); - - o->set_data(analyzed_buf, analyzed_len); + o->set_age(block_age); + o->set_data(analyzed_buf, analyzed_len, NULL); o->write(tss); ObjectReference ref = o->get_ref(); - db->StoreObject(ref, block_csum, analyzed_len, block_age); store_analyzed_signatures(ref); - ref.set_range(0, analyzed_len, true); refs.push_back(ref); delete o; return refs; @@ -295,7 +290,7 @@ list Subfile::create_incremental(TarSegmentStore *tss, string block_csum = block_hash.checksum_str(); o->set_group("data"); - o->set_data(literal_buf, new_data); + o->set_data(literal_buf, new_data, NULL); o->write(tss); ObjectReference ref = o->get_ref(); for (i = items.begin(); i != items.end(); ++i) { @@ -305,7 +300,7 @@ list Subfile::create_incremental(TarSegmentStore *tss, } } - db->StoreObject(ref, block_csum, new_data, 0.0); + //db->StoreObject(ref, 0.0); block_summary summary; summary.ref = ref;