X-Git-Url: http://git.vrable.net/?p=cumulus.git;a=blobdiff_plain;f=store.cc;h=a77d64af0a144ef197116aab55ac93f3e3be98bb;hp=115529fc6e46e7c9b5a13a57eebc5e7a10713afd;hb=HEAD;hpb=fbe7425ae37564a99eb49133561eea5f1a6c7877 diff --git a/store.cc b/store.cc index 115529f..a77d64a 100644 --- a/store.cc +++ b/store.cc @@ -64,8 +64,7 @@ Tarfile::Tarfile(RemoteFile *file, const string &segment) assert(sizeof(struct tar_header) == TAR_BLOCK_SIZE); this->file = file; - real_fd = file->get_fd(); - filter_fd = spawn_filter(real_fd, filter_program, &filter_pid); + this->filter.reset(FileFilter::New(file->get_fd(), filter_program)); } Tarfile::~Tarfile() @@ -77,25 +76,57 @@ Tarfile::~Tarfile() tar_write(buf, TAR_BLOCK_SIZE); tar_write(buf, TAR_BLOCK_SIZE); - if (close(filter_fd) != 0) + if (close(filter->get_wrapped_fd()) != 0) fatal("Error closing Tarfile"); /* ...and wait for filter process to finish. */ + if (filter->wait() != 0) { + fatal("Filter process error"); + } +} + +FileFilter::FileFilter(int raw, int wrapped, pid_t pid) + : fd_raw(raw), fd_wrapped(wrapped), pid(pid) { } + +FileFilter *FileFilter::New(int fd, const char *program) +{ + if (program == NULL || strlen(program) == 0) { + return new FileFilter(fd, fd, -1); + } + + pid_t pid; + int wrapped_fd = spawn_filter(fd, program, &pid); + return new FileFilter(fd, wrapped_fd, pid); +} + +int FileFilter::wait() +{ + // No filter program was launched implies no need to wait. + if (pid == -1) + return 0; + + // The raw file descriptor was held open to track the output file size, but + // is not needed any longer. + close(fd_raw); + int status; - waitpid(filter_pid, &status, 0); + if (waitpid(pid, &status, 0) < 0) { + fprintf(stderr, "Error waiting for filter process: %m\n"); + return -1; + } if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) { - fatal("Filter process error"); + fprintf(stderr, "Filter process error: %d\n", WEXITSTATUS(status)); } - close(real_fd); + return status; } /* Launch a child process which can act as a filter (compress, encrypt, etc.) * on the TAR output. The file descriptor to which output should be written * must be specified; the return value is the file descriptor which will be * attached to the standard input of the filter program. */ -int spawn_filter(int fd_out, const char *program, pid_t *filter_pid) +int FileFilter::spawn_filter(int fd_out, const char *program, pid_t *filter_pid) { int fds[2]; pid_t pid; @@ -145,7 +176,7 @@ void Tarfile::tar_write(const char *data, size_t len) size += len; while (len > 0) { - int res = write(filter_fd, data, len); + int res = write(filter->get_wrapped_fd(), data, len); if (res < 0) { if (errno == EINTR) @@ -213,7 +244,7 @@ size_t Tarfile::size_estimate() { struct stat statbuf; - if (fstat(real_fd, &statbuf) == 0) + if (fstat(filter->get_raw_fd(), &statbuf) == 0) return max((int64_t)statbuf.st_size, (int64_t)(size / 128)); /* Couldn't stat the file on disk, so just return the actual number of @@ -309,14 +340,11 @@ void TarSegmentStore::close_segment(const string &group) group_sizes[segment->group].second += disk_size; } - SHA1Checksum segment_checksum; - string checksum; - if (segment_checksum.process_file(segment->rf->get_local_path().c_str())) { - checksum = segment_checksum.checksum_str(); - } + string checksum + = Hash::hash_file(segment->rf->get_local_path().c_str()); - db->SetSegmentMetadata(segment->name, segment->basename, checksum, - group, segment->data_size, disk_size); + db->SetSegmentMetadata(segment->name, segment->rf->get_remote_path(), + checksum, group, segment->data_size, disk_size); } segment->rf->send();