#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>
+#include <sys/resource.h>
+#include <sys/wait.h>
#include <unistd.h>
#include <fcntl.h>
#include <time.h>
using std::set;
using std::string;
+static char *const filter_program[] = {"bzip2", "-c", NULL};
+
+static void cloexec(int fd)
+{
+ long flags = fcntl(fd, F_GETFD);
+
+ if (flags < 0)
+ return;
+
+ fcntl(fd, F_SETFD, flags | FD_CLOEXEC);
+}
+
Tarfile::Tarfile(const string &path, const string &segment)
: size(0),
segment_name(segment)
{
- if (tar_open(&t, (char *)path.c_str(), NULL, O_WRONLY | O_CREAT, 0600,
- TAR_VERBOSE | TAR_GNU) == -1)
+ real_fd = open(path.c_str(), O_WRONLY | O_CREAT, 0600);
+ if (real_fd < 0)
+ throw IOException("Error opening output file");
+
+ filter_fd = spawn_filter(real_fd);
+
+ if (tar_fdopen(&t, filter_fd, (char *)path.c_str(), NULL,
+ O_WRONLY | O_CREAT, 0600, TAR_VERBOSE | TAR_GNU) == -1)
throw IOException("Error opening Tarfile");
}
Tarfile::~Tarfile()
{
+ /* Close the tar file... */
tar_append_eof(t);
if (tar_close(t) != 0)
throw IOException("Error closing Tarfile");
+
+ /* ...and wait for filter process to finish. */
+ int status;
+ waitpid(filter_pid, &status, 0);
+
+ if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
+ throw IOException("Filter process error");
+ }
+
+ close(real_fd);
+}
+
+/* Launch a child process which can act as a filter (compress, encrypt, etc.)
+ * on the TAR output. The file descriptor to which output should be written
+ * must be specified; the return value is the file descriptor which will be
+ * attached to the standard input of the filter program. */
+int Tarfile::spawn_filter(int fd_out)
+{
+ int fds[2];
+
+ /* Create a pipe for communicating with the filter process. */
+ if (pipe(fds) < 0) {
+ throw IOException("Unable to create pipe for filter");
+ }
+
+ /* Create a child process which can exec() the filter program. */
+ filter_pid = fork();
+ if (filter_pid < 0)
+ throw IOException("Unable to fork filter process");
+
+ if (filter_pid > 0) {
+ /* Parent process */
+ close(fds[0]);
+ cloexec(fds[1]);
+ } else {
+ /* Child process. Rearrange file descriptors. stdin is fds[0], stdout
+ * is fd_out, stderr is unchanged. */
+ close(fds[1]);
+
+ if (dup2(fds[0], 0) < 0)
+ exit(1);
+ close(fds[0]);
+
+ if (dup2(fd_out, 1) < 0)
+ exit(1);
+ close(fd_out);
+
+ /* Exec the filter program. */
+ execvp(filter_program[0], filter_program);
+
+ /* Should not reach here except for error cases. */
+ fprintf(stderr, "Could not exec filter: %m\n");
+ exit(1);
+ }
+
+ return fds[1];
}
void Tarfile::write_object(int id, const char *data, size_t len)
size += blocks * T_BLOCKSIZE;
}
+/* Estimate the size based on the size of the actual output file on disk.
+ * However, the filter may not have written all data yet, and in the event that
+ * it is buffering data to a large extent, also use */
+size_t Tarfile::size_estimate()
+{
+ struct stat statbuf;
+
+ if (fstat(real_fd, &statbuf) == 0) {
+ size_t disk_size = statbuf.st_size;
+
+ if (disk_size >= size / 128)
+ return disk_size;
+ }
+
+ return size;
+}
+
static const size_t SEGMENT_SIZE = 4 * 1024 * 1024;
ObjectReference TarSegmentStore::write_object(const char *data, size_t len,
segment->name = generate_uuid();
- string filename = path + "/" + segment->name + ".tar";
+ string filename = path + "/" + segment->name + ".tar.bz2";
segment->file = new Tarfile(filename, segment->name);
segment->count = 0;