Ensure metadata file is written prior to calculating checksum.
[cumulus.git] / remote.cc
1 /* Cumulus: Efficient Filesystem Backup to the Cloud
2  * Copyright (C) 2008-2009 The Cumulus Developers
3  * See the AUTHORS file for a list of contributors.
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 2 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18  */
19
20 /* Backup data (segments and backup descriptors) may be stored on a remote
21  * fileserver instead of locally.  The only local storage needed is for the
22  * local database and some temporary space for staging files before they are
23  * transferred to the remote server.
24  *
25  * Like encryption, remote storage is handled through the use of external
26  * scripts that are called when a file is to be transferred. */
27
28 #include <assert.h>
29 #include <errno.h>
30 #include <fcntl.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <string.h>
34 #include <unistd.h>
35 #include <sys/types.h>
36 #include <sys/stat.h>
37 #include <sys/wait.h>
38
39 #include <list>
40 #include <string>
41
42 #include "remote.h"
43 #include "store.h"
44 #include "util.h"
45
46 using std::string;
47
48 static const char *backup_directories[] = {
49     "meta",
50     "segments0",
51     "segments1",
52     "snapshots",
53     NULL
54 };
55
56 RemoteStore::RemoteStore(const string &stagedir, const string &script)
57 {
58     staging_dir = stagedir;
59     backup_script = script;
60
61     /* Ensure all necessary directories exist for each type of backup file. */
62     for (size_t i = 0; backup_directories[i]; i++) {
63         string path = stagedir + "/" + backup_directories[i];
64         if (mkdir(path.c_str(), 0777) < 0) {
65             /* Ignore errors for already-existing directories. */
66             if (errno != EEXIST) {
67                 fprintf(stderr,
68                         "Warning: Cannot create backup directory %s: %m!",
69                         path.c_str());
70             }
71         }
72     }
73
74     /* A background thread is created for each RemoteStore to manage the actual
75      * transfers to a remote server.  The main program thread can enqueue
76      * RemoteFile objects to be transferred asynchronously. */
77     pthread_mutex_init(&lock, NULL);
78     pthread_cond_init(&cond, NULL);
79     terminate = false;
80     busy = true;
81     files_outstanding = 0;
82
83     if (pthread_create(&thread, NULL, RemoteStore::start_transfer_thread,
84                        (void *)this) != 0) {
85         fprintf(stderr, "Cannot create remote storage thread: %m\n");
86         fatal("pthread_create");
87     }
88 }
89
90 /* The RemoteStore destructor will terminate the background transfer thread.
91  * It will wait for all work to finish. */
92 RemoteStore::~RemoteStore()
93 {
94     pthread_mutex_lock(&lock);
95     terminate = true;
96     pthread_cond_broadcast(&cond);
97     pthread_mutex_unlock(&lock);
98
99     if (pthread_join(thread, NULL) != 0) {
100         fprintf(stderr, "Warning: Unable to join storage thread: %m\n");
101     }
102
103     assert(files_outstanding == 0);
104
105     pthread_cond_destroy(&cond);
106     pthread_mutex_destroy(&lock);
107 }
108
109 /* Prepare to write out a new file.  Returns a RemoteFile object.  The file
110  * will initially be created in a temporary directory.  When the file is
111  * written out, the RemoteFile object should be passed to RemoteStore::enqueue,
112  * which will upload it to the remote server. */
113 RemoteFile *RemoteStore::alloc_file(const string &name, const string &type)
114 {
115     pthread_mutex_lock(&lock);
116     files_outstanding++;
117     pthread_mutex_unlock(&lock);
118     return new RemoteFile(this, name, type,
119                           staging_dir + "/" + type + "/" + name);
120 }
121
122 /* Request that a file be transferred to the remote server.  The actual
123  * transfer will happen asynchronously in another thread.  The call to enqueue
124  * may block, however, if there is a backlog of data to be transferred.
125  * Ownership of the RemoteFile object is transferred; the RemoteStore will be
126  * responsible for its destruction. */
127 void RemoteStore::enqueue(RemoteFile *file)
128 {
129     pthread_mutex_lock(&lock);
130
131     while (transfer_queue.size() >= MAX_QUEUE_SIZE)
132         pthread_cond_wait(&cond, &lock);
133
134     transfer_queue.push_back(file);
135     files_outstanding--;
136     busy = true;
137
138     pthread_cond_broadcast(&cond);
139     pthread_mutex_unlock(&lock);
140 }
141
142 /* Wait for all transfers to finish. */
143 void RemoteStore::sync()
144 {
145     pthread_mutex_lock(&lock);
146
147     while (busy)
148         pthread_cond_wait(&cond, &lock);
149
150     pthread_mutex_unlock(&lock);
151 }
152
153 void *RemoteStore::start_transfer_thread(void *arg)
154 {
155     RemoteStore *store = static_cast<RemoteStore *>(arg);
156     store->transfer_thread();
157     return NULL;
158 }
159
160 /* Background thread for transferring backups to a remote server. */
161 void RemoteStore::transfer_thread()
162 {
163     /* If a transfer script was specified, launch it and connect to both stdin
164      * and stdout.  fd_in is stdin of the child, and fd_out is stdout for the
165      * child. */
166     pid_t pid = 0;
167     FILE *fd_in = NULL, *fd_out = NULL;
168
169     if (backup_script != "") {
170         int fds[4];
171
172         if (pipe(&fds[0]) < 0) {
173             fatal("Unable to create pipe for upload script");
174         }
175         if (pipe(&fds[2]) < 0) {
176             fatal("Unable to create pipe for upload script");
177         }
178
179         pid = fork();
180         if (pid < 0) {
181             fprintf(stderr, "Unable to fork for upload script: %m\n");
182             fatal("fork: upload script");
183         }
184
185         if (pid > 0) {
186             /* Parent */
187             close(fds[0]);
188             close(fds[3]);
189             cloexec(fds[1]); fd_in = fdopen(fds[1], "w");
190             cloexec(fds[2]); fd_out = fdopen(fds[2], "r");
191         } else if (pid == 0) {
192             /* Child */
193             if (dup2(fds[0], 0) < 0)
194                 exit(1);
195             if (dup2(fds[3], 1) < 0)
196                 exit(1);
197             for (int i = 0; i < 3; i++)
198                 close(fds[i]);
199
200             execlp("/bin/sh", "/bin/sh", "-c", backup_script.c_str(), NULL);
201             fatal("exec failed");
202         }
203     }
204
205     while (true) {
206         RemoteFile *file = NULL;
207
208         // Wait for a file to transfer
209         pthread_mutex_lock(&lock);
210         while (transfer_queue.empty() && !terminate) {
211             busy = false;
212             pthread_cond_broadcast(&cond);
213             pthread_cond_wait(&cond, &lock);
214         }
215         if (terminate && transfer_queue.empty()) {
216             busy = false;
217             pthread_cond_broadcast(&cond);
218             pthread_mutex_unlock(&lock);
219             break;
220         }
221         busy = true;
222         file = transfer_queue.front();
223         transfer_queue.pop_front();
224         pthread_cond_broadcast(&cond);
225         pthread_mutex_unlock(&lock);
226
227         // Transfer the file
228         if (backup_script != "") {
229             string cmd = "PUT ";
230             cmd += uri_encode(file->type) + " ";
231             cmd += uri_encode(file->remote_path) + " ";
232             cmd += uri_encode(file->local_path) + "\n";
233
234             fputs(cmd.c_str(), fd_in);
235             fflush(fd_in);
236
237             char *resp = NULL;
238             size_t n;
239             if (getline(&resp, &n, fd_out) < 0 || resp == NULL)
240                 fatal("error reading response from upload script");
241             if (strchr(resp, '\n'))
242                 *strchr(resp, '\n') = '\0';
243             if (strcmp(resp, "OK") != 0)
244                 fatal("error response from upload script");
245
246             if (unlink(file->local_path.c_str()) < 0) {
247                 fprintf(stderr, "Warning: Deleting temporary file %s: %m\n",
248                         file->local_path.c_str());
249             }
250         }
251
252         delete file;
253     }
254
255     if (fd_in) fclose(fd_in);
256
257     if (pid) {
258         int status = 0;
259         waitpid(pid, &status, 0);
260         if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
261             fprintf(stderr, "Warning: error code from upload script: %d\n",
262                     status);
263         }
264     }
265
266     if (fd_out) fclose(fd_out);
267 }
268
269 RemoteFile::RemoteFile(RemoteStore *remote,
270                        const string &name, const string &type,
271                        const string &local_path)
272 {
273     remote_store = remote;
274     this->type = type;
275     this->local_path = local_path;
276     this->remote_path = type + "/" + name;
277
278     fd = open(local_path.c_str(), O_WRONLY | O_CREAT, 0666);
279     if (fd < 0)
280         fatal("Error opening output file");
281 }