Start work on tests for Cumulus.
[cumulus.git] / remote.cc
1 /* Cumulus: Efficient Filesystem Backup to the Cloud
2  * Copyright (C) 2008-2009 The Cumulus Developers
3  * See the AUTHORS file for a list of contributors.
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 2 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18  */
19
20 /* Backup data (segments and backup descriptors) may be stored on a remote
21  * fileserver instead of locally.  The only local storage needed is for the
22  * local database and some temporary space for staging files before they are
23  * transferred to the remote server.
24  *
25  * Like encryption, remote storage is handled through the use of external
26  * scripts that are called when a file is to be transferred. */
27
28 #include <assert.h>
29 #include <fcntl.h>
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <string.h>
33 #include <unistd.h>
34 #include <sys/types.h>
35 #include <sys/stat.h>
36 #include <sys/wait.h>
37
38 #include <list>
39 #include <string>
40
41 #include "remote.h"
42 #include "store.h"
43 #include "util.h"
44
45 using std::string;
46
47 RemoteStore::RemoteStore(const string &stagedir, const string &script)
48 {
49     staging_dir = stagedir;
50     backup_script = script;
51
52     /* A background thread is created for each RemoteStore to manage the actual
53      * transfers to a remote server.  The main program thread can enqueue
54      * RemoteFile objects to be transferred asynchronously. */
55     pthread_mutex_init(&lock, NULL);
56     pthread_cond_init(&cond, NULL);
57     terminate = false;
58     busy = true;
59     files_outstanding = 0;
60
61     if (pthread_create(&thread, NULL, RemoteStore::start_transfer_thread,
62                        (void *)this) != 0) {
63         fprintf(stderr, "Cannot create remote storage thread: %m\n");
64         fatal("pthread_create");
65     }
66 }
67
68 /* The RemoteStore destructor will terminate the background transfer thread.
69  * It will wait for all work to finish. */
70 RemoteStore::~RemoteStore()
71 {
72     pthread_mutex_lock(&lock);
73     terminate = true;
74     pthread_cond_broadcast(&cond);
75     pthread_mutex_unlock(&lock);
76
77     if (pthread_join(thread, NULL) != 0) {
78         fprintf(stderr, "Warning: Unable to join storage thread: %m\n");
79     }
80
81     assert(files_outstanding == 0);
82
83     pthread_cond_destroy(&cond);
84     pthread_mutex_destroy(&lock);
85 }
86
87 /* Prepare to write out a new file.  Returns a RemoteFile object.  The file
88  * will initially be created in a temporary directory.  When the file is
89  * written out, the RemoteFile object should be passed to RemoteStore::enqueue,
90  * which will upload it to the remote server. */
91 RemoteFile *RemoteStore::alloc_file(const string &name, const string &type)
92 {
93     pthread_mutex_lock(&lock);
94     files_outstanding++;
95     pthread_mutex_unlock(&lock);
96     return new RemoteFile(this, name, type, staging_dir + "/" + name);
97 }
98
99 /* Request that a file be transferred to the remote server.  The actual
100  * transfer will happen asynchronously in another thread.  The call to enqueue
101  * may block, however, if there is a backlog of data to be transferred.
102  * Ownership of the RemoteFile object is transferred; the RemoteStore will be
103  * responsible for its destruction. */
104 void RemoteStore::enqueue(RemoteFile *file)
105 {
106     pthread_mutex_lock(&lock);
107
108     while (transfer_queue.size() >= MAX_QUEUE_SIZE)
109         pthread_cond_wait(&cond, &lock);
110
111     transfer_queue.push_back(file);
112     files_outstanding--;
113     busy = true;
114
115     pthread_cond_broadcast(&cond);
116     pthread_mutex_unlock(&lock);
117 }
118
119 /* Wait for all transfers to finish. */
120 void RemoteStore::sync()
121 {
122     pthread_mutex_lock(&lock);
123
124     while (busy)
125         pthread_cond_wait(&cond, &lock);
126
127     pthread_mutex_unlock(&lock);
128 }
129
130 void *RemoteStore::start_transfer_thread(void *arg)
131 {
132     RemoteStore *store = static_cast<RemoteStore *>(arg);
133     store->transfer_thread();
134     return NULL;
135 }
136
137 /* Background thread for transferring backups to a remote server. */
138 void RemoteStore::transfer_thread()
139 {
140     /* If a transfer script was specified, launch it and connect to both stdin
141      * and stdout.  fd_in is stdin of the child, and fd_out is stdout for the
142      * child. */
143     pid_t pid = 0;
144     FILE *fd_in = NULL, *fd_out = NULL;
145
146     if (backup_script != "") {
147         int fds[4];
148
149         if (pipe(&fds[0]) < 0) {
150             fatal("Unable to create pipe for upload script");
151         }
152         if (pipe(&fds[2]) < 0) {
153             fatal("Unable to create pipe for upload script");
154         }
155
156         pid = fork();
157         if (pid < 0) {
158             fprintf(stderr, "Unable to fork for upload script: %m\n");
159             fatal("fork: upload script");
160         }
161
162         if (pid > 0) {
163             /* Parent */
164             close(fds[0]);
165             close(fds[3]);
166             cloexec(fds[1]); fd_in = fdopen(fds[1], "w");
167             cloexec(fds[2]); fd_out = fdopen(fds[2], "r");
168         } else if (pid == 0) {
169             /* Child */
170             if (dup2(fds[0], 0) < 0)
171                 exit(1);
172             if (dup2(fds[3], 1) < 0)
173                 exit(1);
174             for (int i = 0; i < 3; i++)
175                 close(fds[i]);
176
177             execlp("/bin/sh", "/bin/sh", "-c", backup_script.c_str(), NULL);
178             fatal("exec failed");
179         }
180     }
181
182     while (true) {
183         RemoteFile *file = NULL;
184
185         // Wait for a file to transfer
186         pthread_mutex_lock(&lock);
187         while (transfer_queue.empty() && !terminate) {
188             busy = false;
189             pthread_cond_broadcast(&cond);
190             pthread_cond_wait(&cond, &lock);
191         }
192         if (terminate && transfer_queue.empty()) {
193             busy = false;
194             pthread_cond_broadcast(&cond);
195             pthread_mutex_unlock(&lock);
196             break;
197         }
198         busy = true;
199         file = transfer_queue.front();
200         transfer_queue.pop_front();
201         pthread_cond_broadcast(&cond);
202         pthread_mutex_unlock(&lock);
203
204         // Transfer the file
205         if (backup_script != "") {
206             string cmd = "PUT ";
207             cmd += uri_encode(file->type) + " ";
208             cmd += uri_encode(file->remote_path) + " ";
209             cmd += uri_encode(file->local_path) + "\n";
210
211             fputs(cmd.c_str(), fd_in);
212             fflush(fd_in);
213
214             char *resp = NULL;
215             size_t n;
216             if (getline(&resp, &n, fd_out) < 0 || resp == NULL)
217                 fatal("error reading response from upload script");
218             if (strchr(resp, '\n'))
219                 *strchr(resp, '\n') = '\0';
220             if (strcmp(resp, "OK") != 0)
221                 fatal("error response from upload script");
222
223             if (unlink(file->local_path.c_str()) < 0) {
224                 fprintf(stderr, "Warning: Deleting temporary file %s: %m\n",
225                         file->local_path.c_str());
226             }
227         }
228
229         delete file;
230     }
231
232     if (fd_in) fclose(fd_in);
233
234     if (pid) {
235         int status = 0;
236         waitpid(pid, &status, 0);
237         if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
238             fprintf(stderr, "Warning: error code from upload script: %d\n",
239                     status);
240         }
241     }
242
243     if (fd_out) fclose(fd_out);
244 }
245
246 RemoteFile::RemoteFile(RemoteStore *remote,
247                        const string &name, const string &type,
248                        const string &local_path)
249 {
250     remote_store = remote;
251     this->type = type;
252     this->local_path = local_path;
253     this->remote_path = name;
254
255     fd = open(local_path.c_str(), O_WRONLY | O_CREAT, 0666);
256     if (fd < 0)
257         fatal("Error opening output file");
258 }