assert(sizeof(struct tar_header) == TAR_BLOCK_SIZE);
this->file = file;
- real_fd = file->get_fd();
- filter_fd = spawn_filter(real_fd, filter_program, &filter_pid);
+ this->filter.reset(FileFilter::New(file->get_fd(), filter_program));
}
Tarfile::~Tarfile()
tar_write(buf, TAR_BLOCK_SIZE);
tar_write(buf, TAR_BLOCK_SIZE);
- if (close(filter_fd) != 0)
+ if (close(filter->get_wrapped_fd()) != 0)
fatal("Error closing Tarfile");
/* ...and wait for filter process to finish. */
+ if (filter->wait() != 0) {
+ fatal("Filter process error");
+ }
+}
+
+FileFilter::FileFilter(int raw, int wrapped, pid_t pid)
+ : fd_raw(raw), fd_wrapped(wrapped), pid(pid) { }
+
+FileFilter *FileFilter::New(int fd, const char *program)
+{
+ if (program == NULL || strlen(program) == 0) {
+ return new FileFilter(fd, fd, -1);
+ }
+
+ pid_t pid;
+ int wrapped_fd = spawn_filter(fd, program, &pid);
+ return new FileFilter(fd, wrapped_fd, pid);
+}
+
+int FileFilter::wait()
+{
+ // No filter program was launched implies no need to wait.
+ if (pid == -1)
+ return 0;
+
+ // The raw file descriptor was held open to track the output file size, but
+ // is not needed any longer.
+ close(fd_raw);
+
int status;
- waitpid(filter_pid, &status, 0);
+ if (waitpid(pid, &status, 0) < 0) {
+ fprintf(stderr, "Error waiting for filter process: %m\n");
+ return -1;
+ }
if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
- fatal("Filter process error");
+ fprintf(stderr, "Filter process error: %d\n", WEXITSTATUS(status));
}
- close(real_fd);
+ return status;
}
/* Launch a child process which can act as a filter (compress, encrypt, etc.)
* on the TAR output. The file descriptor to which output should be written
* must be specified; the return value is the file descriptor which will be
* attached to the standard input of the filter program. */
-int spawn_filter(int fd_out, const char *program, pid_t *filter_pid)
+int FileFilter::spawn_filter(int fd_out, const char *program, pid_t *filter_pid)
{
int fds[2];
pid_t pid;
size += len;
while (len > 0) {
- int res = write(filter_fd, data, len);
+ int res = write(filter->get_wrapped_fd(), data, len);
if (res < 0) {
if (errno == EINTR)
{
struct stat statbuf;
- if (fstat(real_fd, &statbuf) == 0)
+ if (fstat(filter->get_raw_fd(), &statbuf) == 0)
return max((int64_t)statbuf.st_size, (int64_t)(size / 128));
/* Couldn't stat the file on disk, so just return the actual number of
segment->basename += filter_extension;
segment->count = 0;
segment->data_size = 0;
- segment->rf = remote->alloc_file(segment->basename, "segments");
+ segment->rf = remote->alloc_file(segment->basename,
+ group == "metadata" ? "segments0"
+ : "segments1");
segment->file = new Tarfile(segment->rf, segment->name);
segments[group] = segment;
group_sizes[segment->group].second += disk_size;
}
- SHA1Checksum segment_checksum;
- string checksum;
- if (segment_checksum.process_file(segment->rf->get_local_path().c_str())) {
- checksum = segment_checksum.checksum_str();
- }
+ string checksum
+ = Hash::hash_file(segment->rf->get_local_path().c_str());
- db->SetSegmentChecksum(segment->name, segment->basename, checksum,
- segment->data_size, disk_size);
+ db->SetSegmentMetadata(segment->name, segment->rf->get_remote_path(),
+ checksum, group, segment->data_size, disk_size);
}
segment->rf->send();