Initial framework for direct transfer of backups to remote storage.
authorMichael Vrable <mvrable@cs.ucsd.edu>
Wed, 2 Apr 2008 03:58:27 +0000 (20:58 -0700)
committerMichael Vrable <mvrable@turin.ucsd.edu>
Wed, 2 Apr 2008 03:58:27 +0000 (20:58 -0700)
Add a layer of indirection in the writing of files to the backup store, and
create a background thread to handle the processing of files to be stored.
Right now this secondary thread does not do much, but will easily be able
to launch a helper script for transferring data to a remote server.

Files are processed by the background thread one at a time.  Multiple files
can be queued up for processing, but the size of the queue is limited so
that the production of backup data will be throttled to the speed at which
the data can be transferred (to bound the temporary space needed for
storing files).

Makefile
remote.cc [new file with mode: 0644]
remote.h [new file with mode: 0644]
scandir.cc
store.cc
store.h

index 32b5033..06cbc30 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -4,7 +4,7 @@ CXXFLAGS=-O -Wall -D_FILE_OFFSET_BITS=64 $(DEBUG) \
         `pkg-config --cflags $(PACKAGES)` -DLBS_VERSION=`cat version`
 LDFLAGS=$(DEBUG) `pkg-config --libs $(PACKAGES)`
 
-SRCS=localdb.cc metadata.cc ref.cc scandir.cc sha1.cc store.cc util.cc
+SRCS=localdb.cc metadata.cc ref.cc remote.cc scandir.cc sha1.cc store.cc util.cc
 OBJS=$(SRCS:.cc=.o)
 
 lbs : $(OBJS)
diff --git a/remote.cc b/remote.cc
new file mode 100644 (file)
index 0000000..1e77333
--- /dev/null
+++ b/remote.cc
@@ -0,0 +1,164 @@
+/* LBS: An LFS-inspired filesystem backup system
+ * Copyright (C) 2006  Michael Vrable
+ *
+ * Backup data (segments and backup descriptors) may be stored on a remote
+ * fileserver instead of locally.  The only local storage needed is for the
+ * local database and some temporary space for staging files before they are
+ * transferred to the remote server.
+ *
+ * Like encryption, remote storage is handled through the use of external
+ * scripts that are called when a file is to be transferred. */
+
+#include <assert.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+
+#include <list>
+#include <string>
+
+#include "remote.h"
+#include "store.h"
+
+using std::string;
+
+RemoteStore::RemoteStore(const string &stagedir)
+{
+    staging_dir = stagedir;
+
+    /* A background thread is created for each RemoteStore to manage the actual
+     * transfers to a remote server.  The main program thread can enqueue
+     * RemoteFile objects to be transferred asynchronously. */
+    pthread_mutex_init(&lock, NULL);
+    pthread_cond_init(&cond, NULL);
+    terminate = false;
+    busy = true;
+    files_outstanding = 0;
+
+    if (pthread_create(&thread, NULL, RemoteStore::start_transfer_thread,
+                       (void *)this) != 0) {
+        fprintf(stderr, "Cannot create remote storage thread: %m\n");
+        throw IOException("pthread_create");
+    }
+}
+
+/* The RemoteStore destructor will terminate the background transfer thread.
+ * It will wait for all work to finish. */
+RemoteStore::~RemoteStore()
+{
+    pthread_mutex_lock(&lock);
+    terminate = true;
+    pthread_cond_broadcast(&cond);
+    pthread_mutex_unlock(&lock);
+
+    if (pthread_join(thread, NULL) != 0) {
+        fprintf(stderr, "Warning: Unable to join storage thread: %m\n");
+    }
+
+    assert(files_outstanding == 0);
+
+    pthread_cond_destroy(&cond);
+    pthread_mutex_destroy(&lock);
+}
+
+/* Prepare to write out a new file.  Returns a RemoteFile object.  The file
+ * will initially be created in a temporary directory.  When the file is
+ * written out, the RemoteFile object should be passed to RemoteStore::enqueue,
+ * which will upload it to the remote server. */
+RemoteFile *RemoteStore::alloc_file(const string &name)
+{
+    fprintf(stderr, "Allocate file: %s\n", name.c_str());
+    pthread_mutex_lock(&lock);
+    files_outstanding++;
+    pthread_mutex_unlock(&lock);
+    return new RemoteFile(this, name, staging_dir + "/" + name);
+}
+
+/* Request that a file be transferred to the remote server.  The actual
+ * transfer will happen asynchronously in another thread.  The call to enqueue
+ * may block, however, if there is a backlog of data to be transferred.
+ * Ownership of the RemoteFile object is transferred; the RemoteStore will be
+ * responsible for its destruction. */
+void RemoteStore::enqueue(RemoteFile *file)
+{
+    fprintf(stderr, "Enqueue: %s\n", file->remote_path.c_str());
+
+    pthread_mutex_lock(&lock);
+
+    while (transfer_queue.size() >= MAX_QUEUE_SIZE)
+        pthread_cond_wait(&cond, &lock);
+
+    transfer_queue.push_back(file);
+    files_outstanding--;
+    busy = true;
+
+    pthread_cond_broadcast(&cond);
+    pthread_mutex_unlock(&lock);
+}
+
+/* Wait for all transfers to finish. */
+void RemoteStore::sync()
+{
+    fprintf(stderr, "RemoteStore::sync() start\n");
+    pthread_mutex_lock(&lock);
+
+    while (busy)
+        pthread_cond_wait(&cond, &lock);
+
+    pthread_mutex_unlock(&lock);
+    fprintf(stderr, "RemoteStore::sync() end\n");
+}
+
+void *RemoteStore::start_transfer_thread(void *arg)
+{
+    RemoteStore *store = static_cast<RemoteStore *>(arg);
+    store->transfer_thread();
+    return NULL;
+}
+
+/* Background thread for transferring backups to a remote server. */
+void RemoteStore::transfer_thread()
+{
+    while (true) {
+        RemoteFile *file = NULL;
+
+        // Wait for a file to transfer
+        pthread_mutex_lock(&lock);
+        while (transfer_queue.empty() && !terminate) {
+            busy = false;
+            pthread_cond_broadcast(&cond);
+            pthread_cond_wait(&cond, &lock);
+        }
+        if (terminate && transfer_queue.empty()) {
+            busy = false;
+            pthread_cond_broadcast(&cond);
+            pthread_mutex_unlock(&lock);
+            break;
+        }
+        busy = true;
+        file = transfer_queue.front();
+        transfer_queue.pop_front();
+        pthread_cond_broadcast(&cond);
+        pthread_mutex_unlock(&lock);
+
+        // Transfer the file
+        fprintf(stderr, "Start transfer: %s\n", file->remote_path.c_str());
+        // TODO
+        fprintf(stderr, "Finish transfer: %s\n", file->remote_path.c_str());
+
+        delete file;
+    }
+}
+
+RemoteFile::RemoteFile(RemoteStore *remote,
+                       const string &name, const string &local_path)
+{
+    remote_store = remote;
+    this->local_path = local_path;
+    this->remote_path = name;
+
+    fd = open(local_path.c_str(), O_WRONLY | O_CREAT, 0666);
+    if (fd < 0)
+        throw IOException("Error opening output file");
+}
diff --git a/remote.h b/remote.h
new file mode 100644 (file)
index 0000000..705fe80
--- /dev/null
+++ b/remote.h
@@ -0,0 +1,75 @@
+/* LBS: An LFS-inspired filesystem backup system
+ * Copyright (C) 2006  Michael Vrable
+ *
+ * Backup data (segments and backup descriptors) may be stored on a remote
+ * fileserver instead of locally.  The only local storage needed is for the
+ * local database and some temporary space for staging files before they are
+ * transferred to the remote server.
+ *
+ * Like encryption, remote storage is handled through the use of external
+ * scripts that are called when a file is to be transferred. */
+
+#ifndef _LBS_REMOTE_H
+#define _LBS_REMOTE_H
+
+#include <list>
+#include <string>
+#include <pthread.h>
+
+class RemoteFile;
+
+class RemoteStore {
+public:
+    static const size_t MAX_QUEUE_SIZE = 4;
+
+    RemoteStore(const std::string &stagedir);
+    ~RemoteStore();
+    RemoteFile *alloc_file(const std::string &name);
+    void enqueue(RemoteFile *file);
+    void sync();
+
+private:
+    pthread_t thread;
+    pthread_mutex_t lock;
+    pthread_cond_t cond;
+
+    std::string staging_dir;
+    bool terminate;             // Set when thread should shut down
+    bool busy;                  // True while there are pending transfers
+    std::list<RemoteFile *> transfer_queue;
+
+    /* For error-checking purposes, track the number of files which have been
+     * allocated but not yet queued to be sent.  This should be zero when the
+     * RemoteStore is destroyed. */
+    int files_outstanding;
+
+    void transfer_thread();
+    static void *start_transfer_thread(void *arg);
+};
+
+class RemoteFile {
+public:
+    /* Get the file descriptor for writing to the (staging copy of the) file.
+     * The _caller_ is responsible for closing this file descriptor once all
+     * data is written, and before send() is called. */
+    int get_fd() const { return fd; }
+
+    const std::string &get_local_path() const { return local_path; }
+
+    /* Called when the file is finished--request that it be sent to the remote
+     * server.  This will delete the RemoteFile object. */
+    void send() { remote_store->enqueue(this); }
+private:
+    friend class RemoteStore;
+
+    RemoteFile(RemoteStore *remote,
+               const std::string &name, const std::string &local_path);
+
+    RemoteStore *remote_store;
+
+    int fd;
+    std::string local_path;
+    std::string remote_path;
+};
+
+#endif // _LBS_REMOTE_H
index 65151bc..92c9a4d 100644 (file)
@@ -26,6 +26,7 @@
 
 #include "localdb.h"
 #include "metadata.h"
+#include "remote.h"
 #include "store.h"
 #include "sha1.h"
 #include "util.h"
@@ -43,6 +44,7 @@ using std::ostream;
 #define LBS_STRINGIFY2(s) #s
 static const char lbs_version[] = LBS_STRINGIFY(LBS_VERSION);
 
+static RemoteStore *remote = NULL;
 static TarSegmentStore *tss = NULL;
 static MetadataWriter *metawriter = NULL;
 
@@ -687,6 +689,9 @@ int main(int argc, char *argv[])
 
     block_buf = new char[LBS_BLOCK_SIZE];
 
+    /* Initialize the remote storage layer. */
+    remote = new RemoteStore(backup_dest);
+
     /* Store the time when the backup started, so it can be included in the
      * snapshot name. */
     time_t now;
@@ -705,7 +710,7 @@ int main(int argc, char *argv[])
              backup_scheme.size() ? backup_scheme.c_str() : NULL,
              snapshot_intent);
 
-    tss = new TarSegmentStore(backup_dest, db);
+    tss = new TarSegmentStore(remote, db);
 
     /* Initialize the stat cache, for skipping over unchanged files. */
     metawriter = new MetadataWriter(tss, localdb_dir.c_str(), desc_buf,
@@ -729,48 +734,54 @@ int main(int argc, char *argv[])
      * segments included in this snapshot.  The format is designed so that it
      * may be easily verified using the sha1sums command. */
     const char csum_type[] = "sha1";
-    string checksum_filename = backup_dest + "/snapshot-";
+    string checksum_filename = "snapshot-";
     if (backup_scheme.size() > 0)
         checksum_filename += backup_scheme + "-";
     checksum_filename = checksum_filename + desc_buf + "." + csum_type + "sums";
-    FILE *checksums = fopen(checksum_filename.c_str(), "w");
-    if (checksums != NULL) {
-        for (std::set<string>::iterator i = segment_list.begin();
-             i != segment_list.end(); ++i) {
-            string seg_path, seg_csum;
-            if (db->GetSegmentChecksum(*i, &seg_path, &seg_csum)) {
-                const char *raw_checksum = NULL;
-                if (strncmp(seg_csum.c_str(), csum_type,
-                            strlen(csum_type)) == 0) {
-                    raw_checksum = seg_csum.c_str() + strlen(csum_type);
-                    if (*raw_checksum == '=')
-                        raw_checksum++;
-                    else
-                        raw_checksum = NULL;
-                }
+    RemoteFile *checksum_file = remote->alloc_file(checksum_filename);
+    FILE *checksums = fdopen(checksum_file->get_fd(), "w");
 
-                if (raw_checksum != NULL)
-                    fprintf(checksums, "%s *%s\n",
-                            raw_checksum, seg_path.c_str());
+    for (std::set<string>::iterator i = segment_list.begin();
+         i != segment_list.end(); ++i) {
+        string seg_path, seg_csum;
+        if (db->GetSegmentChecksum(*i, &seg_path, &seg_csum)) {
+            const char *raw_checksum = NULL;
+            if (strncmp(seg_csum.c_str(), csum_type,
+                        strlen(csum_type)) == 0) {
+                raw_checksum = seg_csum.c_str() + strlen(csum_type);
+                if (*raw_checksum == '=')
+                    raw_checksum++;
+                else
+                    raw_checksum = NULL;
             }
+
+            if (raw_checksum != NULL)
+                fprintf(checksums, "%s *%s\n",
+                        raw_checksum, seg_path.c_str());
         }
-        fclose(checksums);
-    } else {
-        fprintf(stderr, "ERROR: Unable to write checksums file: %m\n");
     }
+    fclose(checksums);
+    checksum_file->send();
 
     db->Close();
 
+    /* All other files should be flushed to remote storage before writing the
+     * backup descriptor below, so that it is not possible to have a backup
+     * descriptor written out depending on non-existent (not yet written)
+     * files. */
+    remote->sync();
+
     /* Write a backup descriptor file, which says which segments are needed and
      * where to start to restore this snapshot.  The filename is based on the
      * current time.  If a signature filter program was specified, filter the
      * data through that to give a chance to sign the descriptor contents. */
-    string desc_filename = backup_dest + "/snapshot-";
+    string desc_filename = "snapshot-";
     if (backup_scheme.size() > 0)
         desc_filename += backup_scheme + "-";
     desc_filename = desc_filename + desc_buf + ".lbs";
 
-    int descriptor_fd = open(desc_filename.c_str(), O_WRONLY | O_CREAT, 0666);
+    RemoteFile *descriptor_file = remote->alloc_file(desc_filename);
+    int descriptor_fd = descriptor_file->get_fd();
     if (descriptor_fd < 0) {
         fprintf(stderr, "Unable to open descriptor output file: %m\n");
         return 1;
@@ -816,5 +827,10 @@ int main(int argc, char *argv[])
         }
     }
 
+    descriptor_file->send();
+
+    remote->sync();
+    delete remote;
+
     return 0;
 }
index 1ea2439..3dcfcb4 100644 (file)
--- a/store.cc
+++ b/store.cc
@@ -46,16 +46,14 @@ static void cloexec(int fd)
     fcntl(fd, F_SETFD, flags | FD_CLOEXEC);
 }
 
-Tarfile::Tarfile(const string &path, const string &segment)
+Tarfile::Tarfile(RemoteFile *file, const string &segment)
     : size(0),
       segment_name(segment)
 {
     assert(sizeof(struct tar_header) == TAR_BLOCK_SIZE);
 
-    real_fd = open(path.c_str(), O_WRONLY | O_CREAT, 0666);
-    if (real_fd < 0)
-        throw IOException("Error opening output file");
-
+    this->file = file;
+    real_fd = file->get_fd();
     filter_fd = spawn_filter(real_fd, filter_program, &filter_pid);
 }
 
@@ -229,10 +227,10 @@ ObjectReference TarSegmentStore::write_object(const char *data, size_t len,
         segment->name = generate_uuid();
         segment->basename = segment->name + ".tar";
         segment->basename += filter_extension;
-        segment->fullname = path + "/" + segment->basename;
-        segment->file = new Tarfile(segment->fullname, segment->name);
         segment->count = 0;
         segment->size = 0;
+        segment->rf = remote->alloc_file(segment->basename);
+        segment->file = new Tarfile(segment->rf, segment->name);
 
         segments[group] = segment;
     } else {
@@ -282,13 +280,15 @@ void TarSegmentStore::close_segment(const string &group)
 
     if (db != NULL) {
         SHA1Checksum segment_checksum;
-        if (segment_checksum.process_file(segment->fullname.c_str())) {
+        if (segment_checksum.process_file(segment->rf->get_local_path().c_str())) {
             string checksum = segment_checksum.checksum_str();
             db->SetSegmentChecksum(segment->name, segment->basename, checksum,
                                    segment->size);
         }
     }
 
+    segment->rf->send();
+
     segments.erase(segments.find(group));
     delete segment;
 }
diff --git a/store.h b/store.h
index d447031..580881c 100644 (file)
--- a/store.h
+++ b/store.h
@@ -18,6 +18,7 @@
 #include <sstream>
 
 #include "localdb.h"
+#include "remote.h"
 #include "sha1.h"
 #include "ref.h"
 
@@ -69,7 +70,7 @@ struct tar_header
  * first; incremental writing is not supported. */
 class Tarfile {
 public:
-    Tarfile(const std::string &path, const std::string &segment);
+    Tarfile(RemoteFile *file, const std::string &segment);
     ~Tarfile();
 
     void write_object(int id, const char *data, size_t len);
@@ -81,6 +82,8 @@ private:
     size_t size;
     std::string segment_name;
 
+    RemoteFile *file;
+
     /* Filter support. */
     int real_fd, filter_fd;
     pid_t filter_pid;
@@ -92,9 +95,9 @@ private:
 class TarSegmentStore {
 public:
     // New segments will be stored in the given directory.
-    TarSegmentStore(const std::string &path,
+    TarSegmentStore(RemoteStore *remote,
                     LocalDb *db = NULL)
-        { this->path = path; this->db = db; }
+        { this->remote = remote; this->db = db; }
     ~TarSegmentStore() { sync(); }
 
     // Writes an object to segment in the store, and returns the name
@@ -117,10 +120,10 @@ private:
         int count;                  // Objects written to this segment
         int size;                   // Combined size of objects written
         std::string basename;       // Name of segment without directory
-        std::string fullname;       // Full path to stored segment
+        RemoteFile *rf;
     };
 
-    std::string path;
+    RemoteStore *remote;
     std::map<std::string, struct segment_info *> segments;
     LocalDb *db;