From 7efae40a865fce46b74538745b17901785062e5f Mon Sep 17 00:00:00 2001 From: Michael Vrable Date: Tue, 18 Jun 2013 09:55:51 -0700 Subject: [PATCH] Rework filter process code to make the interface simpler. --- main.cc | 37 +++++++++++++++---------------------- store.cc | 46 +++++++++++++++++++++++++++++++++++++--------- store.h | 49 +++++++++++++++++++++++++++++++++++++------------ 3 files changed, 89 insertions(+), 43 deletions(-) diff --git a/main.cc b/main.cc index 5ac0f8c..4c8f2bb 100644 --- a/main.cc +++ b/main.cc @@ -896,10 +896,15 @@ int main(int argc, char *argv[]) string dbmeta_filename = "snapshot-"; if (backup_scheme.size() > 0) dbmeta_filename += backup_scheme + "-"; - dbmeta_filename += timestamp + ".meta"; - RemoteFile *dbmeta_file = remote->alloc_file(dbmeta_filename, - "meta"); - FILE *dbmeta = fdopen(dbmeta_file->get_fd(), "w"); + dbmeta_filename += timestamp + ".meta" + filter_extension; + RemoteFile *dbmeta_file = remote->alloc_file(dbmeta_filename, "meta"); + FileFilter *dbmeta_filter = FileFilter::New(dbmeta_file->get_fd(), + filter_program); + if (dbmeta_filter == NULL) { + fprintf(stderr, "Unable to open descriptor output file: %m\n"); + return 1; + } + FILE *dbmeta = fdopen(dbmeta_filter->get_wrapped_fd(), "w"); for (std::set::iterator i = segment_list.begin(); i != segment_list.end(); ++i) { @@ -940,19 +945,13 @@ int main(int argc, char *argv[]) RemoteFile *descriptor_file = remote->alloc_file(desc_filename, "snapshots"); - int descriptor_fd = descriptor_file->get_fd(); - if (descriptor_fd < 0) { + FileFilter *descriptor_filter = FileFilter::New(descriptor_file->get_fd(), + signature_filter.c_str()); + if (descriptor_filter == NULL) { fprintf(stderr, "Unable to open descriptor output file: %m\n"); return 1; } - pid_t signature_pid = 0; - if (signature_filter.size() > 0) { - int new_fd = spawn_filter(descriptor_fd, signature_filter.c_str(), - &signature_pid); - close(descriptor_fd); - descriptor_fd = new_fd; - } - FILE *descriptor = fdopen(descriptor_fd, "w"); + FILE *descriptor = fdopen(descriptor_filter->get_wrapped_fd(), "w"); fprintf(descriptor, "Format: Cumulus Snapshot v0.11\n"); fprintf(descriptor, "Producer: Cumulus %s\n", cumulus_version); @@ -978,14 +977,8 @@ int main(int argc, char *argv[]) } fclose(descriptor); - - if (signature_pid) { - int status; - waitpid(signature_pid, &status, 0); - - if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) { - fatal("Signature filter process error"); - } + if (descriptor_filter->wait() < 0) { + fatal("Signature filter process error"); } descriptor_file->send(); diff --git a/store.cc b/store.cc index 31ff40f..0b03493 100644 --- a/store.cc +++ b/store.cc @@ -64,8 +64,7 @@ Tarfile::Tarfile(RemoteFile *file, const string &segment) assert(sizeof(struct tar_header) == TAR_BLOCK_SIZE); this->file = file; - real_fd = file->get_fd(); - filter_fd = spawn_filter(real_fd, filter_program, &filter_pid); + this->filter = FileFilter::New(file->get_fd(), filter_program); } Tarfile::~Tarfile() @@ -77,25 +76,54 @@ Tarfile::~Tarfile() tar_write(buf, TAR_BLOCK_SIZE); tar_write(buf, TAR_BLOCK_SIZE); - if (close(filter_fd) != 0) + if (close(filter->get_wrapped_fd()) != 0) fatal("Error closing Tarfile"); /* ...and wait for filter process to finish. */ + if (filter->wait() != 0) { + fatal("Filter process error"); + } +} + +FileFilter::FileFilter(int raw, int wrapped, pid_t pid) + : fd_raw(raw), fd_wrapped(wrapped), pid(pid) { } + +FileFilter *FileFilter::New(int fd, const char *program) +{ + if (program == NULL || strlen(program) == 0) { + return new FileFilter(fd, fd, -1); + } + + pid_t pid; + int wrapped_fd = spawn_filter(fd, program, &pid); + close(fd); + return new FileFilter(fd, wrapped_fd, pid); +} + +int FileFilter::wait() +{ + // No filter program was launched implies no need to wait. + if (pid == -1) + return 0; + int status; - waitpid(filter_pid, &status, 0); + if (waitpid(pid, &status, 0) < 0) { + fprintf(stderr, "Error waiting for filter process: %m\n"); + return -1; + } if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) { - fatal("Filter process error"); + fprintf(stderr, "Filter process error: %d\n", WEXITSTATUS(status)); } - close(real_fd); + return status; } /* Launch a child process which can act as a filter (compress, encrypt, etc.) * on the TAR output. The file descriptor to which output should be written * must be specified; the return value is the file descriptor which will be * attached to the standard input of the filter program. */ -int spawn_filter(int fd_out, const char *program, pid_t *filter_pid) +int FileFilter::spawn_filter(int fd_out, const char *program, pid_t *filter_pid) { int fds[2]; pid_t pid; @@ -145,7 +173,7 @@ void Tarfile::tar_write(const char *data, size_t len) size += len; while (len > 0) { - int res = write(filter_fd, data, len); + int res = write(filter->get_wrapped_fd(), data, len); if (res < 0) { if (errno == EINTR) @@ -213,7 +241,7 @@ size_t Tarfile::size_estimate() { struct stat statbuf; - if (fstat(real_fd, &statbuf) == 0) + if (fstat(filter->get_raw_fd(), &statbuf) == 0) return max((int64_t)statbuf.st_size, (int64_t)(size / 128)); /* Couldn't stat the file on disk, so just return the actual number of diff --git a/store.h b/store.h index 016e275..122a4ac 100644 --- a/store.h +++ b/store.h @@ -68,6 +68,42 @@ struct tar_header char padding[12]; }; +class FileFilter { +public: + // It is valid for program to be NULL or empty; if so, no filtering is + // done. + static FileFilter *New(int fd, const char *program); + + // Wait for the filter process to terminate. + int wait(); + + // Accessors for the file descriptors. + int get_raw_fd() const { return fd_raw; } + int get_wrapped_fd() const { return fd_wrapped; } + +private: + FileFilter(int raw, int wrapped, pid_t pid); + + // Launch a process to filter data written to a file descriptor. fd_out is + // the file descriptor where the filtered data should be written. program + // is the filter program to execute (a single string which will be + // interpreted by /bin/sh). The return value is a file descriptor to which + // the data to be filtered should be written. The process ID of the filter + // process is stored at address filter_pid if non-NULL. + static int spawn_filter(int fd_out, const char *program, pid_t *filter_pid); + + // The original file descriptor passed when creating the FileFilter object. + int fd_raw; + + // The wrapped file descriptor: writes here are piped through the filter + // program. + int fd_wrapped; + + // The filter process if one was launched, or -1 if there is no filter + // program. + pid_t pid; +}; + /* A simple wrapper around a single TAR file to represent a segment. Objects * may only be written out all at once, since the tar header must be written * first; incremental writing is not supported. */ @@ -86,10 +122,7 @@ private: std::string segment_name; RemoteFile *file; - - /* Filter support. */ - int real_fd, filter_fd; - pid_t filter_pid; + FileFilter *filter; // Write data to the tar file void tar_write(const char *data, size_t size); @@ -195,12 +228,4 @@ extern const char *filter_program; * included; this adds to it) */ extern const char *filter_extension; -/* Launch a process to filter data written to a file descriptor. fd_out is the - * file descriptor where the filtered data should be written. program is the - * filter program to execute (a single string which will be interpreted by - * /bin/sh). The return value is a file descriptor to which the data to be - * filtered should be written. The process ID of the filter process is stored - * at address filter_pid if non-NULL. */ -int spawn_filter(int fd_out, const char *program, pid_t *filter_pid); - #endif // _LBS_STORE_H -- 2.20.1