Rework filter process code to make the interface simpler.
authorMichael Vrable <vrable@cs.hmc.edu>
Tue, 18 Jun 2013 16:55:51 +0000 (09:55 -0700)
committerMichael Vrable <vrable@cs.hmc.edu>
Sun, 26 Jan 2014 20:43:43 +0000 (12:43 -0800)
main.cc
store.cc
store.h

diff --git a/main.cc b/main.cc
index 5ac0f8c..4c8f2bb 100644 (file)
--- a/main.cc
+++ b/main.cc
@@ -896,10 +896,15 @@ int main(int argc, char *argv[])
     string dbmeta_filename = "snapshot-";
     if (backup_scheme.size() > 0)
         dbmeta_filename += backup_scheme + "-";
-    dbmeta_filename += timestamp + ".meta";
-    RemoteFile *dbmeta_file = remote->alloc_file(dbmeta_filename,
-                                                   "meta");
-    FILE *dbmeta = fdopen(dbmeta_file->get_fd(), "w");
+    dbmeta_filename += timestamp + ".meta" + filter_extension;
+    RemoteFile *dbmeta_file = remote->alloc_file(dbmeta_filename, "meta");
+    FileFilter *dbmeta_filter = FileFilter::New(dbmeta_file->get_fd(),
+                                                filter_program);
+    if (dbmeta_filter == NULL) {
+        fprintf(stderr, "Unable to open descriptor output file: %m\n");
+        return 1;
+    }
+    FILE *dbmeta = fdopen(dbmeta_filter->get_wrapped_fd(), "w");
 
     for (std::set<string>::iterator i = segment_list.begin();
          i != segment_list.end(); ++i) {
@@ -940,19 +945,13 @@ int main(int argc, char *argv[])
 
     RemoteFile *descriptor_file = remote->alloc_file(desc_filename,
                                                      "snapshots");
-    int descriptor_fd = descriptor_file->get_fd();
-    if (descriptor_fd < 0) {
+    FileFilter *descriptor_filter = FileFilter::New(descriptor_file->get_fd(),
+                                                    signature_filter.c_str());
+    if (descriptor_filter == NULL) {
         fprintf(stderr, "Unable to open descriptor output file: %m\n");
         return 1;
     }
-    pid_t signature_pid = 0;
-    if (signature_filter.size() > 0) {
-        int new_fd = spawn_filter(descriptor_fd, signature_filter.c_str(),
-                                  &signature_pid);
-        close(descriptor_fd);
-        descriptor_fd = new_fd;
-    }
-    FILE *descriptor = fdopen(descriptor_fd, "w");
+    FILE *descriptor = fdopen(descriptor_filter->get_wrapped_fd(), "w");
 
     fprintf(descriptor, "Format: Cumulus Snapshot v0.11\n");
     fprintf(descriptor, "Producer: Cumulus %s\n", cumulus_version);
@@ -978,14 +977,8 @@ int main(int argc, char *argv[])
     }
 
     fclose(descriptor);
-
-    if (signature_pid) {
-        int status;
-        waitpid(signature_pid, &status, 0);
-
-        if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
-            fatal("Signature filter process error");
-        }
+    if (descriptor_filter->wait() < 0) {
+        fatal("Signature filter process error");
     }
 
     descriptor_file->send();
index 31ff40f..0b03493 100644 (file)
--- a/store.cc
+++ b/store.cc
@@ -64,8 +64,7 @@ Tarfile::Tarfile(RemoteFile *file, const string &segment)
     assert(sizeof(struct tar_header) == TAR_BLOCK_SIZE);
 
     this->file = file;
-    real_fd = file->get_fd();
-    filter_fd = spawn_filter(real_fd, filter_program, &filter_pid);
+    this->filter = FileFilter::New(file->get_fd(), filter_program);
 }
 
 Tarfile::~Tarfile()
@@ -77,25 +76,54 @@ Tarfile::~Tarfile()
     tar_write(buf, TAR_BLOCK_SIZE);
     tar_write(buf, TAR_BLOCK_SIZE);
 
-    if (close(filter_fd) != 0)
+    if (close(filter->get_wrapped_fd()) != 0)
         fatal("Error closing Tarfile");
 
     /* ...and wait for filter process to finish. */
+    if (filter->wait() != 0) {
+        fatal("Filter process error");
+    }
+}
+
+FileFilter::FileFilter(int raw, int wrapped, pid_t pid)
+    : fd_raw(raw), fd_wrapped(wrapped), pid(pid) { }
+
+FileFilter *FileFilter::New(int fd, const char *program)
+{
+    if (program == NULL || strlen(program) == 0) {
+        return new FileFilter(fd, fd, -1);
+    }
+
+    pid_t pid;
+    int wrapped_fd = spawn_filter(fd, program, &pid);
+    close(fd);
+    return new FileFilter(fd, wrapped_fd, pid);
+}
+
+int FileFilter::wait()
+{
+    // No filter program was launched implies no need to wait.
+    if (pid == -1)
+        return 0;
+
     int status;
-    waitpid(filter_pid, &status, 0);
+    if (waitpid(pid, &status, 0) < 0) {
+        fprintf(stderr, "Error waiting for filter process: %m\n");
+        return -1;
+    }
 
     if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
-        fatal("Filter process error");
+        fprintf(stderr, "Filter process error: %d\n", WEXITSTATUS(status));
     }
 
-    close(real_fd);
+    return status;
 }
 
 /* Launch a child process which can act as a filter (compress, encrypt, etc.)
  * on the TAR output.  The file descriptor to which output should be written
  * must be specified; the return value is the file descriptor which will be
  * attached to the standard input of the filter program. */
-int spawn_filter(int fd_out, const char *program, pid_t *filter_pid)
+int FileFilter::spawn_filter(int fd_out, const char *program, pid_t *filter_pid)
 {
     int fds[2];
     pid_t pid;
@@ -145,7 +173,7 @@ void Tarfile::tar_write(const char *data, size_t len)
     size += len;
 
     while (len > 0) {
-        int res = write(filter_fd, data, len);
+        int res = write(filter->get_wrapped_fd(), data, len);
 
         if (res < 0) {
             if (errno == EINTR)
@@ -213,7 +241,7 @@ size_t Tarfile::size_estimate()
 {
     struct stat statbuf;
 
-    if (fstat(real_fd, &statbuf) == 0)
+    if (fstat(filter->get_raw_fd(), &statbuf) == 0)
         return max((int64_t)statbuf.st_size, (int64_t)(size / 128));
 
     /* Couldn't stat the file on disk, so just return the actual number of
diff --git a/store.h b/store.h
index 016e275..122a4ac 100644 (file)
--- a/store.h
+++ b/store.h
@@ -68,6 +68,42 @@ struct tar_header
     char padding[12];
 };
 
+class FileFilter {
+public:
+    // It is valid for program to be NULL or empty; if so, no filtering is
+    // done.
+    static FileFilter *New(int fd, const char *program);
+
+    // Wait for the filter process to terminate.
+    int wait();
+
+    // Accessors for the file descriptors.
+    int get_raw_fd() const { return fd_raw; }
+    int get_wrapped_fd() const { return fd_wrapped; }
+
+private:
+    FileFilter(int raw, int wrapped, pid_t pid);
+
+    // Launch a process to filter data written to a file descriptor.  fd_out is
+    // the file descriptor where the filtered data should be written.  program
+    // is the filter program to execute (a single string which will be
+    // interpreted by /bin/sh).  The return value is a file descriptor to which
+    // the data to be filtered should be written.  The process ID of the filter
+    // process is stored at address filter_pid if non-NULL.
+    static int spawn_filter(int fd_out, const char *program, pid_t *filter_pid);
+
+    // The original file descriptor passed when creating the FileFilter object.
+    int fd_raw;
+
+    // The wrapped file descriptor: writes here are piped through the filter
+    // program.
+    int fd_wrapped;
+
+    // The filter process if one was launched, or -1 if there is no filter
+    // program.
+    pid_t pid;
+};
+
 /* A simple wrapper around a single TAR file to represent a segment.  Objects
  * may only be written out all at once, since the tar header must be written
  * first; incremental writing is not supported. */
@@ -86,10 +122,7 @@ private:
     std::string segment_name;
 
     RemoteFile *file;
-
-    /* Filter support. */
-    int real_fd, filter_fd;
-    pid_t filter_pid;
+    FileFilter *filter;
 
     // Write data to the tar file
     void tar_write(const char *data, size_t size);
@@ -195,12 +228,4 @@ extern const char *filter_program;
  * included; this adds to it) */
 extern const char *filter_extension;
 
-/* Launch a process to filter data written to a file descriptor.  fd_out is the
- * file descriptor where the filtered data should be written.  program is the
- * filter program to execute (a single string which will be interpreted by
- * /bin/sh).  The return value is a file descriptor to which the data to be
- * filtered should be written.  The process ID of the filter process is stored
- * at address filter_pid if non-NULL. */
-int spawn_filter(int fd_out, const char *program, pid_t *filter_pid);
-
 #endif // _LBS_STORE_H