#include "store.h"
#include "util.h"
+using std::max;
using std::min;
+using std::set;
using std::string;
+static const int SCHEMA_MAJOR = 0;
+static const int SCHEMA_MINOR = 11;
+
/* Helper function to prepare a statement for execution in the current
* database. */
sqlite3_stmt *LocalDb::Prepare(const char *sql)
sqlite3_extended_result_codes(db, 1);
+ /* Check that the local database format is the correct version; if not,
+ * report an error. */
+ sqlite3_stmt *stmt = Prepare("select major, minor from schema_version");
+
+ rc = sqlite3_step(stmt);
+ if (rc != SQLITE_ROW) {
+ fatal("Unable to read local database version from database");
+ } else if (rc == SQLITE_ROW) {
+ int major = sqlite3_column_int(stmt, 0);
+ int minor = sqlite3_column_int(stmt, 1);
+ if (major != SCHEMA_MAJOR || minor != SCHEMA_MINOR) {
+ fprintf(stderr,
+ "Local database does not have required schema version!\n"
+ " expected: %d.%d, found: %d.%d\n",
+ SCHEMA_MAJOR, SCHEMA_MINOR, major, minor);
+ fatal("Unable to continue.");
+ }
+ }
+ sqlite3_finalize(stmt);
+
if (snapshot_scheme == NULL)
snapshot_scheme = "";
/* Insert this snapshot into the database, and determine the integer key
* which will be used to identify it. */
- sqlite3_stmt *stmt = Prepare("insert into "
- "snapshots(name, scheme, timestamp, intent) "
- "values (?, ?, julianday('now'), ?)");
+ stmt = Prepare("insert into snapshots(name, scheme, timestamp, intent) "
+ "values (?, ?, julianday('now'), ?)");
sqlite3_bind_text(stmt, 1, snapshot_name, strlen(snapshot_name),
SQLITE_TRANSIENT);
sqlite3_bind_text(stmt, 2, snapshot_scheme, strlen(snapshot_scheme),
{
int rc;
- /* Summarize the snapshot_refs table into segments_used. */
+ /* Summarize the snapshot_refs table into segment_utilization. */
sqlite3_stmt *stmt = Prepare(
- "insert or replace into segments_used "
- "select ? as snapshotid, segmentid, max(utilization) from ("
- " select segmentid, cast(used as real) / size as utilization "
- " from "
- " (select segmentid, sum(size) as used from snapshot_refs "
- " group by segmentid) "
- " join segments using (segmentid) "
- " union "
- " select segmentid, utilization from segments_used "
- " where snapshotid = ? "
- ") group by segmentid"
+ "insert or replace into segment_utilization "
+ "select ? as snapshotid, segmentid, sum(size) "
+ "from snapshot_refs group by segmentid"
);
sqlite3_bind_int64(stmt, 1, snapshotid);
- sqlite3_bind_int64(stmt, 2, snapshotid);
rc = sqlite3_step(stmt);
if (rc != SQLITE_OK && rc != SQLITE_DONE) {
ReportError(rc);
return result;
}
-void LocalDb::StoreObject(const ObjectReference& ref,
- const string &checksum, int64_t size,
- double age)
+void LocalDb::StoreObject(const ObjectReference& ref, double age)
{
int rc;
sqlite3_stmt *stmt;
+ assert(ref.has_checksum());
+ string checksum = ref.get_checksum();
+ assert(ref.range_is_exact());
+ int64_t size = ref.get_range_length();
+
if (age == 0.0) {
stmt = Prepare("insert into block_index("
"segmentid, object, checksum, size, timestamp) "
return found;
}
+set<string> LocalDb::GetUsedSegments()
+{
+ int rc;
+ sqlite3_stmt *stmt;
+ set<string> result;
+
+ stmt = Prepare("select segment from segments "
+ "where segmentid in (select segmentid from snapshot_refs)");
+
+ while (true) {
+ rc = sqlite3_step(stmt);
+ if (rc == SQLITE_ROW) {
+ const char *segment
+ = reinterpret_cast<const char *>(sqlite3_column_text(stmt, 0));
+ result.insert(segment);
+ } else if (rc == SQLITE_DONE) {
+ break;
+ } else {
+ ReportError(rc);
+ }
+ }
+
+ sqlite3_finalize(stmt);
+
+ return result;
+}
+
void LocalDb::UseObject(const ObjectReference& ref)
{
int rc;
}
sqlite3_finalize(stmt);
- int64_t block_size = 0;
- stmt = Prepare("select size from block_index "
- "where segmentid = ? and object = ?");
- sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
- obj = ref.get_sequence();
- sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT);
- rc = sqlite3_step(stmt);
- if (rc == SQLITE_ROW) {
- block_size = sqlite3_column_int64(stmt, 0);
+ // Attempt to determine the underlying size of the object. This may
+ // require a database lookup if the length is not encoded into the object
+ // reference already.
+ int64_t object_size = 0;
+ if (ref.range_is_exact()) {
+ object_size = ref.get_range_length();
} else {
- string refstr = ref.to_string();
- fprintf(stderr, "No block found in block_index for %s\n",
- refstr.c_str());
+ stmt = Prepare("select size from block_index "
+ "where segmentid = ? and object = ?");
+ sqlite3_bind_int64(stmt, 1, SegmentToId(ref.get_segment()));
+ obj = ref.get_sequence();
+ sqlite3_bind_text(stmt, 2, obj.c_str(), obj.size(), SQLITE_TRANSIENT);
+ rc = sqlite3_step(stmt);
+ if (rc == SQLITE_ROW) {
+ object_size = sqlite3_column_int64(stmt, 0);
+ } else {
+ fprintf(stderr, "Warning: No block found in block_index for %s\n",
+ ref.to_string().c_str());
+ }
sqlite3_finalize(stmt);
- return;
}
- sqlite3_finalize(stmt);
- int64_t new_size = old_size;
+ // Possibly mark additional bytes as being referenced. The number of bytes
+ // referenced can only be increased (up to the object size). The bytes
+ // referenced will be set to the object size only if the entire object is
+ // referenced at once: a series of partial ranges that add up to the total
+ // size will have a reference size capped at just less than the full object
+ // size (we can't tell if some bytes were referenced multiple times, and
+ // thus we conservatively assume some bytes might still be unreferenced).
+ int64_t new_refs = old_size;
if (ref.has_range()) {
- new_size += ref.get_range_length();
- new_size = min(new_size, block_size);
+ new_refs = ref.get_range_length();
} else {
- new_size = block_size;
+ new_refs = object_size;
}
+ int64_t new_size = old_size + new_refs;
+ if (old_size < object_size && new_refs < object_size)
+ new_size = min(new_size, object_size - 1);
+ new_size = min(object_size, new_size);
+ new_size = max(new_size, (int64_t)0);
if (new_size != old_size) {
stmt = Prepare("insert or replace "
}
}
-void LocalDb::UseSegment(const std::string &segment, double utilization)
-{
- int rc;
- sqlite3_stmt *stmt;
-
- stmt = Prepare("insert or replace "
- "into segments_used(snapshotid, segmentid, utilization) "
- "values (?, ?, ?)");
- sqlite3_bind_int64(stmt, 1, snapshotid);
- sqlite3_bind_int64(stmt, 2, SegmentToId(segment));
- sqlite3_bind_double(stmt, 3, utilization);
-
- rc = sqlite3_step(stmt);
- if (rc != SQLITE_DONE) {
- fprintf(stderr, "Could not insert segment use record!\n");
- ReportError(rc);
- }
-
- sqlite3_finalize(stmt);
-}
-
void LocalDb::SetSegmentChecksum(const std::string &segment,
const std::string &path,
const std::string &checksum,
- int size)
+ int data_size, int disk_size)
{
int rc;
sqlite3_stmt *stmt;
- stmt = Prepare("update segments set path = ?, checksum = ?, size = ?, "
+ stmt = Prepare("update segments set path = ?, checksum = ?, "
+ "data_size = ?, disk_size = ?, "
"mtime = coalesce(mtime, julianday('now')) "
"where segmentid = ?");
sqlite3_bind_text(stmt, 1, path.c_str(), path.size(),
SQLITE_TRANSIENT);
sqlite3_bind_text(stmt, 2, checksum.c_str(), checksum.size(),
SQLITE_TRANSIENT);
- sqlite3_bind_int64(stmt, 3, size);
- sqlite3_bind_int64(stmt, 4, SegmentToId(segment));
+ sqlite3_bind_int64(stmt, 3, data_size);
+ sqlite3_bind_int64(stmt, 4, disk_size);
+ sqlite3_bind_int64(stmt, 5, SegmentToId(segment));
rc = sqlite3_step(stmt);
if (rc != SQLITE_DONE) {
#include <sqlite3.h>
+#include <set>
#include <string>
#include "ref.h"
void Open(const char *path, const char *snapshot_name,
const char *snapshot_scheme, double intent);
void Close();
- void StoreObject(const ObjectReference& ref,
- const std::string &checksum, int64_t size, double age);
+ void StoreObject(const ObjectReference& ref, double age);
ObjectReference FindObject(const std::string &checksum, int64_t size);
bool IsOldObject(const std::string &checksum, int64_t size, double *age,
int *group);
bool IsAvailable(const ObjectReference &ref);
void UseObject(const ObjectReference& ref);
- void UseSegment(const std::string &segment, double utilization);
+ std::set<std::string> GetUsedSegments();
void SetSegmentChecksum(const std::string &segment, const std::string &path,
- const std::string &checksum, int size);
+ const std::string &checksum,
+ int data_size, int disk_size);
bool GetSegmentChecksum(const std::string &segment,
std::string *seg_path, std::string *seg_checksum);
* invocations to help in creating incremental snapshots. */
LocalDb *db;
-/* Keep track of all segments which are needed to reconstruct the snapshot. */
-std::set<string> segment_list;
-
/* Snapshot intent: 1=daily, 7=weekly, etc. This is not used directly, but is
* stored in the local database and can help guide segment cleaning and
* snapshot expiration policies. */
/* Whether verbose output is enabled. */
bool verbose = false;
-/* Ensure that the given segment is listed as a dependency of the current
- * snapshot. */
-void add_segment(const string& segment)
-{
- segment_list.insert(segment);
-}
-
/* Attempts to open a regular file read-only, but with safety checks for files
* that might not be fully trusted. */
int safe_open(const string& path, struct stat *stat_buf)
i != old_blocks.end(); ++i) {
const ObjectReference &ref = *i;
object_list.push_back(ref.to_string());
- if (ref.is_normal())
- add_segment(ref.get_segment());
db->UseObject(ref);
}
size = stat_buf.st_size;
double block_age = 0.0;
ObjectReference ref;
- SHA1Checksum block_hash;
- block_hash.process(block_buf, bytes);
- string block_csum = block_hash.checksum_str();
+ Hash *hash = Hash::New();
+ hash->update(block_buf, bytes);
+ string block_csum = hash->digest_str();
+ delete hash;
if (all_zero) {
ref = ObjectReference(ObjectReference::REF_ZERO);
while (!refs.empty()) {
ref = refs.front(); refs.pop_front();
object_list.push_back(ref.to_string());
- if (ref.is_normal())
- add_segment(ref.get_segment());
db->UseObject(ref);
}
size += bytes;
}
ObjectReference root_ref = metawriter->close();
- add_segment(root_ref.get_segment());
string backup_root = root_ref.to_string();
delete metawriter;
"checksums");
FILE *checksums = fdopen(checksum_file->get_fd(), "w");
+ std::set<string> segment_list = db->GetUsedSegments();
for (std::set<string>::iterator i = segment_list.begin();
i != segment_list.end(); ++i) {
string seg_path, seg_csum;
/* TODO: Move to header file */
extern LocalDb *db;
-void add_segment(const string& segment);
/* Like strcmp, but sorts in the order that files will be visited in the
* filesystem. That is, we break paths apart at slashes, and compare path
// If indirectly referencing any other metadata logs, be sure those
// segments are properly referenced.
if (i->reused) {
- add_segment(i->ref.get_segment());
- db->UseSegment(i->ref.get_segment(), 1.0);
+ db->UseObject(i->ref);
}
// Write out an indirect reference to any previous objects which could
/* Write current metadata information to a new object. */
LbsObject *meta = new LbsObject;
meta->set_group("metadata");
- meta->set_data(m.data(), m.size());
+ meta->set_data(m.data(), m.size(), NULL);
meta->write(store);
- meta->checksum();
/* Write a reference to this block in the root. */
ObjectReference ref = meta->get_ref();
metadata_root << "@" << ref.to_string() << "\n";
- add_segment(ref.get_segment());
- db->UseSegment(ref.get_segment(), 1.0);
+ db->UseObject(ref);
delete meta;
LbsObject *root = new LbsObject;
root->set_group("metadata");
- root->set_data(root_data.data(), root_data.size());
+ root->set_data(root_data.data(), root_data.size(), NULL);
root->write(store);
- root->checksum();
- add_segment(root->get_ref().get_segment());
- db->UseSegment(root->get_ref().get_segment(), 1.0);
+ db->UseObject(root->get_ref());
ObjectReference ref = root->get_ref();
delete root;
schemes.sort()
return schemes
- def garbage_collect(self, scheme, intent=1.0):
+ def list_snapshots(self, scheme):
+ """Return a list of snapshots for the given scheme."""
+ cur = self.cursor()
+ cur.execute("select name from snapshots")
+ snapshots = [row[0] for row in cur.fetchall()]
+ snapshots.sort()
+ return snapshots
+
+ def delete_snapshot(self, scheme, name):
+ """Remove the specified snapshot from the database.
+
+ Warning: This does not garbage collect all dependent data in the
+ database, so it must be followed by a call to garbage_collect() to make
+ the database consistent.
+ """
+ cur = self.cursor()
+ cur.execute("delete from snapshots where scheme = ? and name = ?",
+ (scheme, name))
+
+ def prune_old_snapshots(self, scheme, intent=1.0):
"""Delete entries from old snapshots from the database.
Only snapshots with the specified scheme name will be deleted. If
first = False
max_intent = max(max_intent, snap_intent)
+ self.garbage_collect()
+
+ def garbage_collect(self):
+ """Garbage-collect unreachable segment and object data.
+
+ Remove all segments and checksums which is not reachable from the
+ current set of snapshots stored in the local database.
+ """
+ cur = self.cursor()
+
# Delete entries in the segments_used table which are for non-existent
# snapshots.
cur.execute("""delete from segments_used
cur.execute("""delete from segments where segmentid not in
(select segmentid from segments_used)""")
- # Delete unused objects in the block_index table. By "unused", we mean
- # any object which was stored in a segment which has been deleted, and
- # any object in a segment which was marked for cleaning and has had
- # cleaning performed already (the expired time is less than the current
- # largest snapshot id).
+ # Delete dangling objects in the block_index table.
cur.execute("""delete from block_index
- where segmentid not in (select segmentid from segments)
- or segmentid in (select segmentid from segments
- where expire_time < ?)""",
- (last_snapshotid,))
+ where segmentid not in
+ (select segmentid from segments)""")
# Remove sub-block signatures for deleted objects.
cur.execute("""delete from subblock_signatures
# Delete old snapshots from the local database.
intent = float(options.intent)
for s in db.list_schemes():
- db.garbage_collect(s, intent)
+ db.prune_old_snapshots(s, intent)
# Expire segments which are poorly-utilized.
for s in db.get_segment_cleaning_list():
seconds = int(m.group(1)) * _TIME_UNITS[m.group(2)]
period = period + datetime.timedelta(seconds=seconds)
intervalspec = m.group(3)
- print classname, period
policy.add_policy(classname, period)
return policy
cumulus-backup program.
"""
+import datetime
import re
import sys
cmd_util.options = options
cmd_util.cmd_garbage_collect([])
+def prune_localdb(backup_config, scheme, next_snapshot=None):
+ """Clean old snapshots out of the local database.
+
+ Clear old snapshots out of the local database, possibly in preparation for
+ running a new backup. One snapshot of each configured retention period is
+ kept (i.e., one weekly and one daily), and the most recent snapshot is
+ always retained. If next_snapshot is not None, it should be the timestamp
+ when (approximately) the next snapshot will be taken; if that snapshot
+ would be a daily, weekly, etc. snapshot, then it may result in the previous
+ snapshot of the same duration being evicted from the local database.
+
+ Note that in this sense, "evict" merely refers to tracking the snapshots in
+ the local database; this function does not delete backups from the backup
+ storage.
+ """
+ # Fetch the list of existing snapshots in the local database. Pruning only
+ # makes sense if there are more than one snapshots present.
+ db = cumulus.LocalDatabase(backup_config.get_global("localdb"))
+ snapshots = sorted(db.list_snapshots(scheme))
+ if len(snapshots) <= 1:
+ return
+
+ # Classify the snapshots (daily, weekly, etc.) and keep the most recent one
+ # of each category. Also ensure that the most recent snapshot is retained.
+ retention = backup_config.get_retention_for_scheme(scheme)
+ for snapshot in snapshots:
+ retention.consider_snapshot(snapshot)
+ if next_snapshot is not None:
+ retention.consider_snapshot(next_snapshot)
+ retained = set(retention.last_snapshots().values())
+ retained.add(snapshots[-1])
+ print retention.last_snapshots()
+ print retained
+ for s in snapshots:
+ print s, s in retained
+
+ evicted = [s for s in snapshots if s not in retained]
+ for s in evicted:
+ db.delete_snapshot(scheme, s)
+ db.garbage_collect()
+ db.commit()
+
def main(argv):
backup_config = config.CumulusConfig(argv[1])
for scheme in backup_config.backup_schemes():
print scheme
- prune_backups(backup_config, scheme)
+ #prune_backups(backup_config, scheme)
+ prune_localdb(backup_config, scheme, datetime.datetime.utcnow())
+ #prune_localdb(backup_config, scheme, datetime.datetime(2013, 1, 1))
if __name__ == "__main__":
main(sys.argv)
def add_policy(self, backup_class, retention_period):
self._policies[backup_class] = retention_period
- self._last_snapshots[backup_class] = (None, None)
+ self._last_snapshots[backup_class] = (None, None, False)
@staticmethod
def parse_timestamp(s):
+ if isinstance(s, datetime.datetime):
+ return s
return datetime.datetime.strptime(s, TIMESTAMP_FORMAT)
def consider_snapshot(self, snapshot):
partition = _backup_classes[backup_class](timestamp_policy)
last_snapshot = self._last_snapshots[backup_class]
if self._last_snapshots[backup_class][0] != partition:
- self._last_snapshots[backup_class] = (partition, snapshot)
self._labels.add(backup_class)
- if snapshot_age < retention_period: retain = True
+ retain_label = snapshot_age < retention_period
+ self._last_snapshots[backup_class] = (partition, snapshot,
+ retain_label)
+ if retain_label: retain = True
return retain
def last_labels(self):
def last_snapshots(self):
"""Returns the most recent snapshot in each backup class."""
- return dict((k, v[1]) for (k, v) in self._last_snapshots.iteritems())
+ return dict((k, v[1]) for (k, v)
+ in self._last_snapshots.iteritems() if v[2])
bool has_range() const { return range_valid; }
size_t get_range_start() const { return range_start; }
size_t get_range_length() const { return range_length; }
- size_t get_range_exact() const { return range_exact; }
+ bool range_is_exact() const { return range_exact; }
void clear_range()
{ range_start = range_length = 0;
range_valid = false; range_exact = false; }
--
-- The index is stored in an SQLite3 database. This is its schema.
--- List of snapshots which have been created.
+-- Versioning information, describing the revision for which the table schema
+-- was set up.
+create table schema_version(
+ version text, -- Program version, dotted decimal string
+ major integer, -- Major version number
+ minor integer -- Minor version number
+);
+insert into schema_version values ('0.11', 0, 11);
+
+-- List of snapshots which have been created and which we are still tracking.
+-- There may be more snapshots than this actually stored at the remote server,
+-- but the reverse should not ever be true: Cumulus may depend on data stored
+-- in these snapshots when writing a new snapshot.
create table snapshots (
snapshotid integer primary key,
name text not null,
scheme text not null,
timestamp real,
- intent real
+ intent real -- TODO: deprecated, should be removed
);
-- List of segments which have been created.
path text,
checksum text,
mtime real,
- size integer,
- expire_time integer -- snapshotid of latest snapshot when expired
+ data_size integer, -- sum of bytes in all objects in the segment
+ disk_size integer -- size of segment on disk, after compression
+ -- TODO: group? metadata vs. non-metadata?
);
--- Index of all blocks which have been stored, by checksum.
+-- Index of all data blocks in stored segments. This is indexed by content
+-- hash to allow for coarse block-level data deduplication.
create table block_index (
blockid integer primary key,
segmentid integer not null,
signatures blob not null
);
--- Summary of segment utilization for each snapshots.
-create table segments_used (
+-- Summary of segment utilization for each snapshot.
+create table segment_utilization (
snapshotid integer not null,
segmentid integer not null,
- utilization real
-);
-create unique index segments_used_index
- on segments_used(snapshotid, segmentid);
--- Overall estimate of segment utilization, for all snapshots combined.
-create view segment_info as
-select segmentid, mtime, size, expire_time,
- cast(size * utilization as integer) as used, utilization
-from segments join
- (select segmentid, max(utilization) as utilization
- from segments_used group by segmentid)
-using (segmentid);
+ -- Estimate for the number of live bytes in data objects: this is capped at
+ -- segments.data_size if all data in the segment is referenced.
+ bytes_referenced integer
+);
+create unique index segment_utilization_index
+ on segment_utilization(snapshotid, segmentid);
#include <iostream>
#include "hash.h"
+#include "localdb.h"
#include "store.h"
#include "ref.h"
#include "util.h"
static map<string, pair<int64_t, int64_t> > group_sizes;
ObjectReference TarSegmentStore::write_object(const char *data, size_t len,
- const std::string &group)
+ const std::string &group,
+ const std::string &checksum,
+ double age)
{
struct segment_info *segment;
segment->basename = segment->name + ".tar";
segment->basename += filter_extension;
segment->count = 0;
- segment->size = 0;
+ segment->data_size = 0;
segment->rf = remote->alloc_file(segment->basename, "segments");
segment->file = new Tarfile(segment->rf, segment->name);
segment->file->write_object(id, data, len);
segment->count++;
- segment->size += len;
+ segment->data_size += len;
group_sizes[group].first += len;
ObjectReference ref(segment->name, id_buf);
+ ref.set_range(0, len, true);
+ if (checksum.size() > 0)
+ ref.set_checksum(checksum);
+ if (db != NULL)
+ db->StoreObject(ref, age);
// If this segment meets or exceeds the size target, close it so that
// future objects will go into a new segment.
delete segment->file;
if (db != NULL) {
+ struct stat stat_buf;
+ int disk_size = 0;
+ if (stat(segment->rf->get_local_path().c_str(), &stat_buf) == 0) {
+ disk_size = stat_buf.st_size;
+ group_sizes[segment->group].second += disk_size;
+ }
+
SHA1Checksum segment_checksum;
+ string checksum;
if (segment_checksum.process_file(segment->rf->get_local_path().c_str())) {
- string checksum = segment_checksum.checksum_str();
- db->SetSegmentChecksum(segment->name, segment->basename, checksum,
- segment->size);
+ checksum = segment_checksum.checksum_str();
}
- struct stat stat_buf;
- if (stat(segment->rf->get_local_path().c_str(), &stat_buf) == 0) {
- group_sizes[segment->group].second += stat_buf.st_size;
- }
+ db->SetSegmentChecksum(segment->name, segment->basename, checksum,
+ segment->data_size, disk_size);
}
segment->rf->send();
}
LbsObject::LbsObject()
- : group(""), data(NULL), data_len(0), written(false)
+ : group(""), age(0.0), data(NULL), data_len(0), written(false)
{
}
{
}
-void LbsObject::write(TarSegmentStore *store)
+void LbsObject::set_data(const char *d, size_t len, const char *checksum)
{
- assert(data != NULL);
- assert(!written);
+ data = d;
+ data_len = len;
- ref = store->write_object(data, data_len, group);
- written = true;
+ if (checksum != NULL) {
+ this->checksum = checksum;
+ } else {
+ Hash *hash = Hash::New();
+ hash->update(data, data_len);
+ this->checksum = hash->digest_str();
+ delete hash;
+ }
}
-void LbsObject::checksum()
+void LbsObject::write(TarSegmentStore *store)
{
- assert(written);
+ assert(data != NULL);
+ assert(!written);
- Hash *hash = Hash::New();
- hash->update(data, data_len);
- ref.set_checksum(hash->digest_str());
- delete hash;
+ ref = store->write_object(data, data_len, group, checksum, age);
+ written = true;
}
// used to control object placement; objects with different group
// parameters are kept in separate segments.
ObjectReference write_object(const char *data, size_t len,
- const std::string &group = "");
+ const std::string &group = "",
+ const std::string &checksum = "",
+ double age = 0.0);
// Ensure all segments have been fully written.
void sync();
std::string group;
std::string name; // UUID
int count; // Objects written to this segment
- int size; // Combined size of objects written
+ int data_size; // Combined size of objects written
std::string basename; // Name of segment without directory
RemoteFile *rf;
};
// Data in an object must be written all at once, and cannot be generated
// incrementally. Data can be an arbitrary block of binary data of any
// size. The pointer to the data need only remain valid until write() is
- // called.
- void set_data(const char *d, size_t len) { data = d; data_len = len; }
+ // called. If checksum is non-NULL then it is assumed to contain a hash
+ // value for the data; this provides an optimization in case the caller has
+ // already checksummed the data. Otherwise the set_data will compute a
+ // hash of the data itself.
+ void set_data(const char *d, size_t len, const char *checksum);
+
+ // Explicitly sets the age of the data, for later garbage-collection or
+ // repacking purposes. If not set, the age defaults to the current time.
+ // The age is stored in the database as a floating point value, expressing
+ // the time in Julian days.
+ void set_age(double age) { this->age = age; }
// Write an object to a segment, thus making it permanent. This function
// can be called at most once.
void write(TarSegmentStore *store);
- // Compute the checksum of an object, and include it in the object
- // reference. This should be called after write(), and the data specified
- // by set_data() must remain valid through the call to checksum().
- void checksum();
-
// An object is assigned a permanent name once it has been written to a
// segment. Until that time, its name cannot be determined.
- std::string get_name() const { return ref.to_string(); }
ObjectReference get_ref() { return ref; }
private:
std::string group;
+ double age;
const char *data;
size_t data_len;
+ std::string checksum;
bool written;
ObjectReference ref;
// No data was matched. The entire block can be written out as is into a
// new object, and the new_block_summary used to save chunk signatures.
if (!matched_old) {
- SHA1Checksum block_hash;
- block_hash.process(analyzed_buf, analyzed_len);
- string block_csum = block_hash.checksum_str();
-
- o->set_data(analyzed_buf, analyzed_len);
+ o->set_age(block_age);
+ o->set_data(analyzed_buf, analyzed_len, NULL);
o->write(tss);
ObjectReference ref = o->get_ref();
- db->StoreObject(ref, block_csum, analyzed_len, block_age);
store_analyzed_signatures(ref);
- ref.set_range(0, analyzed_len, true);
refs.push_back(ref);
delete o;
return refs;
string block_csum = block_hash.checksum_str();
o->set_group("data");
- o->set_data(literal_buf, new_data);
+ o->set_data(literal_buf, new_data, NULL);
o->write(tss);
ObjectReference ref = o->get_ref();
for (i = items.begin(); i != items.end(); ++i) {
}
}
- db->StoreObject(ref, block_csum, new_data, 0.0);
+ //db->StoreObject(ref, 0.0);
block_summary summary;
summary.ref = ref;