Replace boost::scoped_ptr with std::unique_ptr.
[cumulus.git] / store.cc
index f87b671..a77d64a 100644 (file)
--- a/store.cc
+++ b/store.cc
@@ -1,7 +1,6 @@
-/* Cumulus: Smart Filesystem Backup to Dumb Servers
- *
- * Copyright (C) 2008  The Regents of the University of California
- * Written by Michael Vrable <mvrable@cs.ucsd.edu>
+/* Cumulus: Efficient Filesystem Backup to the Cloud
+ * Copyright (C) 2008-2009 The Cumulus Developers
+ * See the AUTHORS file for a list of contributors.
  *
  * This program is free software; you can redistribute it and/or modify
  * it under the terms of the GNU General Public License as published by
@@ -42,6 +41,7 @@
 #include <iostream>
 
 #include "hash.h"
+#include "localdb.h"
 #include "store.h"
 #include "ref.h"
 #include "util.h"
@@ -64,8 +64,7 @@ Tarfile::Tarfile(RemoteFile *file, const string &segment)
     assert(sizeof(struct tar_header) == TAR_BLOCK_SIZE);
 
     this->file = file;
-    real_fd = file->get_fd();
-    filter_fd = spawn_filter(real_fd, filter_program, &filter_pid);
+    this->filter.reset(FileFilter::New(file->get_fd(), filter_program));
 }
 
 Tarfile::~Tarfile()
@@ -77,25 +76,57 @@ Tarfile::~Tarfile()
     tar_write(buf, TAR_BLOCK_SIZE);
     tar_write(buf, TAR_BLOCK_SIZE);
 
-    if (close(filter_fd) != 0)
+    if (close(filter->get_wrapped_fd()) != 0)
         fatal("Error closing Tarfile");
 
     /* ...and wait for filter process to finish. */
+    if (filter->wait() != 0) {
+        fatal("Filter process error");
+    }
+}
+
+FileFilter::FileFilter(int raw, int wrapped, pid_t pid)
+    : fd_raw(raw), fd_wrapped(wrapped), pid(pid) { }
+
+FileFilter *FileFilter::New(int fd, const char *program)
+{
+    if (program == NULL || strlen(program) == 0) {
+        return new FileFilter(fd, fd, -1);
+    }
+
+    pid_t pid;
+    int wrapped_fd = spawn_filter(fd, program, &pid);
+    return new FileFilter(fd, wrapped_fd, pid);
+}
+
+int FileFilter::wait()
+{
+    // No filter program was launched implies no need to wait.
+    if (pid == -1)
+        return 0;
+
+    // The raw file descriptor was held open to track the output file size, but
+    // is not needed any longer.
+    close(fd_raw);
+
     int status;
-    waitpid(filter_pid, &status, 0);
+    if (waitpid(pid, &status, 0) < 0) {
+        fprintf(stderr, "Error waiting for filter process: %m\n");
+        return -1;
+    }
 
     if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
-        fatal("Filter process error");
+        fprintf(stderr, "Filter process error: %d\n", WEXITSTATUS(status));
     }
 
-    close(real_fd);
+    return status;
 }
 
 /* Launch a child process which can act as a filter (compress, encrypt, etc.)
  * on the TAR output.  The file descriptor to which output should be written
  * must be specified; the return value is the file descriptor which will be
  * attached to the standard input of the filter program. */
-int spawn_filter(int fd_out, const char *program, pid_t *filter_pid)
+int FileFilter::spawn_filter(int fd_out, const char *program, pid_t *filter_pid)
 {
     int fds[2];
     pid_t pid;
@@ -145,7 +176,7 @@ void Tarfile::tar_write(const char *data, size_t len)
     size += len;
 
     while (len > 0) {
-        int res = write(filter_fd, data, len);
+        int res = write(filter->get_wrapped_fd(), data, len);
 
         if (res < 0) {
             if (errno == EINTR)
@@ -213,7 +244,7 @@ size_t Tarfile::size_estimate()
 {
     struct stat statbuf;
 
-    if (fstat(real_fd, &statbuf) == 0)
+    if (fstat(filter->get_raw_fd(), &statbuf) == 0)
         return max((int64_t)statbuf.st_size, (int64_t)(size / 128));
 
     /* Couldn't stat the file on disk, so just return the actual number of
@@ -227,7 +258,9 @@ static const size_t SEGMENT_SIZE = 4 * 1024 * 1024;
 static map<string, pair<int64_t, int64_t> > group_sizes;
 
 ObjectReference TarSegmentStore::write_object(const char *data, size_t len,
-                                              const std::string &group)
+                                              const std::string &group,
+                                              const std::string &checksum,
+                                              double age)
 {
     struct segment_info *segment;
 
@@ -241,8 +274,10 @@ ObjectReference TarSegmentStore::write_object(const char *data, size_t len,
         segment->basename = segment->name + ".tar";
         segment->basename += filter_extension;
         segment->count = 0;
-        segment->size = 0;
-        segment->rf = remote->alloc_file(segment->basename, "segments");
+        segment->data_size = 0;
+        segment->rf = remote->alloc_file(segment->basename,
+                                         group == "metadata" ? "segments0"
+                                                             : "segments1");
         segment->file = new Tarfile(segment->rf, segment->name);
 
         segments[group] = segment;
@@ -256,11 +291,16 @@ ObjectReference TarSegmentStore::write_object(const char *data, size_t len,
 
     segment->file->write_object(id, data, len);
     segment->count++;
-    segment->size += len;
+    segment->data_size += len;
 
     group_sizes[group].first += len;
 
     ObjectReference ref(segment->name, id_buf);
+    ref.set_range(0, len, true);
+    if (checksum.size() > 0)
+        ref.set_checksum(checksum);
+    if (db != NULL)
+        db->StoreObject(ref, age);
 
     // If this segment meets or exceeds the size target, close it so that
     // future objects will go into a new segment.
@@ -293,17 +333,18 @@ void TarSegmentStore::close_segment(const string &group)
     delete segment->file;
 
     if (db != NULL) {
-        SHA1Checksum segment_checksum;
-        if (segment_checksum.process_file(segment->rf->get_local_path().c_str())) {
-            string checksum = segment_checksum.checksum_str();
-            db->SetSegmentChecksum(segment->name, segment->basename, checksum,
-                                   segment->size);
-        }
-
         struct stat stat_buf;
+        int disk_size = 0;
         if (stat(segment->rf->get_local_path().c_str(), &stat_buf) == 0) {
-            group_sizes[segment->group].second += stat_buf.st_size;
+            disk_size = stat_buf.st_size;
+            group_sizes[segment->group].second += disk_size;
         }
+
+        string checksum
+            = Hash::hash_file(segment->rf->get_local_path().c_str());
+
+        db->SetSegmentMetadata(segment->name, segment->rf->get_remote_path(),
+                               checksum, group, segment->data_size, disk_size);
     }
 
     segment->rf->send();
@@ -318,7 +359,7 @@ string TarSegmentStore::object_reference_to_segment(const string &object)
 }
 
 LbsObject::LbsObject()
-    : group(""), data(NULL), data_len(0), written(false)
+    : group(""), age(0.0), data(NULL), data_len(0), written(false)
 {
 }
 
@@ -326,21 +367,26 @@ LbsObject::~LbsObject()
 {
 }
 
-void LbsObject::write(TarSegmentStore *store)
+void LbsObject::set_data(const char *d, size_t len, const char *checksum)
 {
-    assert(data != NULL);
-    assert(!written);
+    data = d;
+    data_len = len;
 
-    ref = store->write_object(data, data_len, group);
-    written = true;
+    if (checksum != NULL) {
+        this->checksum = checksum;
+    } else {
+        Hash *hash = Hash::New();
+        hash->update(data, data_len);
+        this->checksum = hash->digest_str();
+        delete hash;
+    }
 }
 
-void LbsObject::checksum()
+void LbsObject::write(TarSegmentStore *store)
 {
-    assert(written);
+    assert(data != NULL);
+    assert(!written);
 
-    Hash *hash = Hash::New();
-    hash->update(data, data_len);
-    ref.set_checksum(hash->digest_str());
-    delete hash;
+    ref = store->write_object(data, data_len, group, checksum, age);
+    written = true;
 }