string dbmeta_filename = "snapshot-";
if (backup_scheme.size() > 0)
dbmeta_filename += backup_scheme + "-";
- dbmeta_filename += timestamp + ".meta";
- RemoteFile *dbmeta_file = remote->alloc_file(dbmeta_filename,
- "meta");
- FILE *dbmeta = fdopen(dbmeta_file->get_fd(), "w");
+ dbmeta_filename += timestamp + ".meta" + filter_extension;
+ RemoteFile *dbmeta_file = remote->alloc_file(dbmeta_filename, "meta");
+ FileFilter *dbmeta_filter = FileFilter::New(dbmeta_file->get_fd(),
+ filter_program);
+ if (dbmeta_filter == NULL) {
+ fprintf(stderr, "Unable to open descriptor output file: %m\n");
+ return 1;
+ }
+ FILE *dbmeta = fdopen(dbmeta_filter->get_wrapped_fd(), "w");
for (std::set<string>::iterator i = segment_list.begin();
i != segment_list.end(); ++i) {
RemoteFile *descriptor_file = remote->alloc_file(desc_filename,
"snapshots");
- int descriptor_fd = descriptor_file->get_fd();
- if (descriptor_fd < 0) {
+ FileFilter *descriptor_filter = FileFilter::New(descriptor_file->get_fd(),
+ signature_filter.c_str());
+ if (descriptor_filter == NULL) {
fprintf(stderr, "Unable to open descriptor output file: %m\n");
return 1;
}
- pid_t signature_pid = 0;
- if (signature_filter.size() > 0) {
- int new_fd = spawn_filter(descriptor_fd, signature_filter.c_str(),
- &signature_pid);
- close(descriptor_fd);
- descriptor_fd = new_fd;
- }
- FILE *descriptor = fdopen(descriptor_fd, "w");
+ FILE *descriptor = fdopen(descriptor_filter->get_wrapped_fd(), "w");
fprintf(descriptor, "Format: Cumulus Snapshot v0.11\n");
fprintf(descriptor, "Producer: Cumulus %s\n", cumulus_version);
}
fclose(descriptor);
-
- if (signature_pid) {
- int status;
- waitpid(signature_pid, &status, 0);
-
- if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
- fatal("Signature filter process error");
- }
+ if (descriptor_filter->wait() < 0) {
+ fatal("Signature filter process error");
}
descriptor_file->send();
assert(sizeof(struct tar_header) == TAR_BLOCK_SIZE);
this->file = file;
- real_fd = file->get_fd();
- filter_fd = spawn_filter(real_fd, filter_program, &filter_pid);
+ this->filter = FileFilter::New(file->get_fd(), filter_program);
}
Tarfile::~Tarfile()
tar_write(buf, TAR_BLOCK_SIZE);
tar_write(buf, TAR_BLOCK_SIZE);
- if (close(filter_fd) != 0)
+ if (close(filter->get_wrapped_fd()) != 0)
fatal("Error closing Tarfile");
/* ...and wait for filter process to finish. */
+ if (filter->wait() != 0) {
+ fatal("Filter process error");
+ }
+}
+
+FileFilter::FileFilter(int raw, int wrapped, pid_t pid)
+ : fd_raw(raw), fd_wrapped(wrapped), pid(pid) { }
+
+FileFilter *FileFilter::New(int fd, const char *program)
+{
+ if (program == NULL || strlen(program) == 0) {
+ return new FileFilter(fd, fd, -1);
+ }
+
+ pid_t pid;
+ int wrapped_fd = spawn_filter(fd, program, &pid);
+ close(fd);
+ return new FileFilter(fd, wrapped_fd, pid);
+}
+
+int FileFilter::wait()
+{
+ // No filter program was launched implies no need to wait.
+ if (pid == -1)
+ return 0;
+
int status;
- waitpid(filter_pid, &status, 0);
+ if (waitpid(pid, &status, 0) < 0) {
+ fprintf(stderr, "Error waiting for filter process: %m\n");
+ return -1;
+ }
if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
- fatal("Filter process error");
+ fprintf(stderr, "Filter process error: %d\n", WEXITSTATUS(status));
}
- close(real_fd);
+ return status;
}
/* Launch a child process which can act as a filter (compress, encrypt, etc.)
* on the TAR output. The file descriptor to which output should be written
* must be specified; the return value is the file descriptor which will be
* attached to the standard input of the filter program. */
-int spawn_filter(int fd_out, const char *program, pid_t *filter_pid)
+int FileFilter::spawn_filter(int fd_out, const char *program, pid_t *filter_pid)
{
int fds[2];
pid_t pid;
size += len;
while (len > 0) {
- int res = write(filter_fd, data, len);
+ int res = write(filter->get_wrapped_fd(), data, len);
if (res < 0) {
if (errno == EINTR)
{
struct stat statbuf;
- if (fstat(real_fd, &statbuf) == 0)
+ if (fstat(filter->get_raw_fd(), &statbuf) == 0)
return max((int64_t)statbuf.st_size, (int64_t)(size / 128));
/* Couldn't stat the file on disk, so just return the actual number of
char padding[12];
};
+class FileFilter {
+public:
+ // It is valid for program to be NULL or empty; if so, no filtering is
+ // done.
+ static FileFilter *New(int fd, const char *program);
+
+ // Wait for the filter process to terminate.
+ int wait();
+
+ // Accessors for the file descriptors.
+ int get_raw_fd() const { return fd_raw; }
+ int get_wrapped_fd() const { return fd_wrapped; }
+
+private:
+ FileFilter(int raw, int wrapped, pid_t pid);
+
+ // Launch a process to filter data written to a file descriptor. fd_out is
+ // the file descriptor where the filtered data should be written. program
+ // is the filter program to execute (a single string which will be
+ // interpreted by /bin/sh). The return value is a file descriptor to which
+ // the data to be filtered should be written. The process ID of the filter
+ // process is stored at address filter_pid if non-NULL.
+ static int spawn_filter(int fd_out, const char *program, pid_t *filter_pid);
+
+ // The original file descriptor passed when creating the FileFilter object.
+ int fd_raw;
+
+ // The wrapped file descriptor: writes here are piped through the filter
+ // program.
+ int fd_wrapped;
+
+ // The filter process if one was launched, or -1 if there is no filter
+ // program.
+ pid_t pid;
+};
+
/* A simple wrapper around a single TAR file to represent a segment. Objects
* may only be written out all at once, since the tar header must be written
* first; incremental writing is not supported. */
std::string segment_name;
RemoteFile *file;
-
- /* Filter support. */
- int real_fd, filter_fd;
- pid_t filter_pid;
+ FileFilter *filter;
// Write data to the tar file
void tar_write(const char *data, size_t size);
* included; this adds to it) */
extern const char *filter_extension;
-/* Launch a process to filter data written to a file descriptor. fd_out is the
- * file descriptor where the filtered data should be written. program is the
- * filter program to execute (a single string which will be interpreted by
- * /bin/sh). The return value is a file descriptor to which the data to be
- * filtered should be written. The process ID of the filter process is stored
- * at address filter_pid if non-NULL. */
-int spawn_filter(int fd_out, const char *program, pid_t *filter_pid);
-
#endif // _LBS_STORE_H