Rework filter process code to make the interface simpler.
[cumulus.git] / store.cc
index 31ff40f..0b03493 100644 (file)
--- a/store.cc
+++ b/store.cc
@@ -64,8 +64,7 @@ Tarfile::Tarfile(RemoteFile *file, const string &segment)
     assert(sizeof(struct tar_header) == TAR_BLOCK_SIZE);
 
     this->file = file;
-    real_fd = file->get_fd();
-    filter_fd = spawn_filter(real_fd, filter_program, &filter_pid);
+    this->filter = FileFilter::New(file->get_fd(), filter_program);
 }
 
 Tarfile::~Tarfile()
@@ -77,25 +76,54 @@ Tarfile::~Tarfile()
     tar_write(buf, TAR_BLOCK_SIZE);
     tar_write(buf, TAR_BLOCK_SIZE);
 
-    if (close(filter_fd) != 0)
+    if (close(filter->get_wrapped_fd()) != 0)
         fatal("Error closing Tarfile");
 
     /* ...and wait for filter process to finish. */
+    if (filter->wait() != 0) {
+        fatal("Filter process error");
+    }
+}
+
+FileFilter::FileFilter(int raw, int wrapped, pid_t pid)
+    : fd_raw(raw), fd_wrapped(wrapped), pid(pid) { }
+
+FileFilter *FileFilter::New(int fd, const char *program)
+{
+    if (program == NULL || strlen(program) == 0) {
+        return new FileFilter(fd, fd, -1);
+    }
+
+    pid_t pid;
+    int wrapped_fd = spawn_filter(fd, program, &pid);
+    close(fd);
+    return new FileFilter(fd, wrapped_fd, pid);
+}
+
+int FileFilter::wait()
+{
+    // No filter program was launched implies no need to wait.
+    if (pid == -1)
+        return 0;
+
     int status;
-    waitpid(filter_pid, &status, 0);
+    if (waitpid(pid, &status, 0) < 0) {
+        fprintf(stderr, "Error waiting for filter process: %m\n");
+        return -1;
+    }
 
     if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
-        fatal("Filter process error");
+        fprintf(stderr, "Filter process error: %d\n", WEXITSTATUS(status));
     }
 
-    close(real_fd);
+    return status;
 }
 
 /* Launch a child process which can act as a filter (compress, encrypt, etc.)
  * on the TAR output.  The file descriptor to which output should be written
  * must be specified; the return value is the file descriptor which will be
  * attached to the standard input of the filter program. */
-int spawn_filter(int fd_out, const char *program, pid_t *filter_pid)
+int FileFilter::spawn_filter(int fd_out, const char *program, pid_t *filter_pid)
 {
     int fds[2];
     pid_t pid;
@@ -145,7 +173,7 @@ void Tarfile::tar_write(const char *data, size_t len)
     size += len;
 
     while (len > 0) {
-        int res = write(filter_fd, data, len);
+        int res = write(filter->get_wrapped_fd(), data, len);
 
         if (res < 0) {
             if (errno == EINTR)
@@ -213,7 +241,7 @@ size_t Tarfile::size_estimate()
 {
     struct stat statbuf;
 
-    if (fstat(real_fd, &statbuf) == 0)
+    if (fstat(filter->get_raw_fd(), &statbuf) == 0)
         return max((int64_t)statbuf.st_size, (int64_t)(size / 128));
 
     /* Couldn't stat the file on disk, so just return the actual number of