The map::at method does not always exist, so instead use map::find.
[cumulus.git] / remote.cc
1 /* Cumulus: Smart Filesystem Backup to Dumb Servers
2  *
3  * Copyright (C) 2008  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License along
17  * with this program; if not, write to the Free Software Foundation, Inc.,
18  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19  */
20
21 /* Backup data (segments and backup descriptors) may be stored on a remote
22  * fileserver instead of locally.  The only local storage needed is for the
23  * local database and some temporary space for staging files before they are
24  * transferred to the remote server.
25  *
26  * Like encryption, remote storage is handled through the use of external
27  * scripts that are called when a file is to be transferred. */
28
29 #include <assert.h>
30 #include <fcntl.h>
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <string.h>
34 #include <unistd.h>
35 #include <sys/types.h>
36 #include <sys/stat.h>
37 #include <sys/wait.h>
38
39 #include <list>
40 #include <string>
41
42 #include "remote.h"
43 #include "store.h"
44 #include "util.h"
45
46 using std::string;
47
48 RemoteStore::RemoteStore(const string &stagedir, const string &script)
49 {
50     staging_dir = stagedir;
51     backup_script = script;
52
53     /* A background thread is created for each RemoteStore to manage the actual
54      * transfers to a remote server.  The main program thread can enqueue
55      * RemoteFile objects to be transferred asynchronously. */
56     pthread_mutex_init(&lock, NULL);
57     pthread_cond_init(&cond, NULL);
58     terminate = false;
59     busy = true;
60     files_outstanding = 0;
61
62     if (pthread_create(&thread, NULL, RemoteStore::start_transfer_thread,
63                        (void *)this) != 0) {
64         fprintf(stderr, "Cannot create remote storage thread: %m\n");
65         fatal("pthread_create");
66     }
67 }
68
69 /* The RemoteStore destructor will terminate the background transfer thread.
70  * It will wait for all work to finish. */
71 RemoteStore::~RemoteStore()
72 {
73     pthread_mutex_lock(&lock);
74     terminate = true;
75     pthread_cond_broadcast(&cond);
76     pthread_mutex_unlock(&lock);
77
78     if (pthread_join(thread, NULL) != 0) {
79         fprintf(stderr, "Warning: Unable to join storage thread: %m\n");
80     }
81
82     assert(files_outstanding == 0);
83
84     pthread_cond_destroy(&cond);
85     pthread_mutex_destroy(&lock);
86 }
87
88 /* Prepare to write out a new file.  Returns a RemoteFile object.  The file
89  * will initially be created in a temporary directory.  When the file is
90  * written out, the RemoteFile object should be passed to RemoteStore::enqueue,
91  * which will upload it to the remote server. */
92 RemoteFile *RemoteStore::alloc_file(const string &name, const string &type)
93 {
94     pthread_mutex_lock(&lock);
95     files_outstanding++;
96     pthread_mutex_unlock(&lock);
97     return new RemoteFile(this, name, type, staging_dir + "/" + name);
98 }
99
100 /* Request that a file be transferred to the remote server.  The actual
101  * transfer will happen asynchronously in another thread.  The call to enqueue
102  * may block, however, if there is a backlog of data to be transferred.
103  * Ownership of the RemoteFile object is transferred; the RemoteStore will be
104  * responsible for its destruction. */
105 void RemoteStore::enqueue(RemoteFile *file)
106 {
107     pthread_mutex_lock(&lock);
108
109     while (transfer_queue.size() >= MAX_QUEUE_SIZE)
110         pthread_cond_wait(&cond, &lock);
111
112     transfer_queue.push_back(file);
113     files_outstanding--;
114     busy = true;
115
116     pthread_cond_broadcast(&cond);
117     pthread_mutex_unlock(&lock);
118 }
119
120 /* Wait for all transfers to finish. */
121 void RemoteStore::sync()
122 {
123     pthread_mutex_lock(&lock);
124
125     while (busy)
126         pthread_cond_wait(&cond, &lock);
127
128     pthread_mutex_unlock(&lock);
129 }
130
131 void *RemoteStore::start_transfer_thread(void *arg)
132 {
133     RemoteStore *store = static_cast<RemoteStore *>(arg);
134     store->transfer_thread();
135     return NULL;
136 }
137
138 /* Background thread for transferring backups to a remote server. */
139 void RemoteStore::transfer_thread()
140 {
141     /* If a transfer script was specified, launch it and connect to both stdin
142      * and stdout.  fd_in is stdin of the child, and fd_out is stdout for the
143      * child. */
144     pid_t pid = 0;
145     FILE *fd_in = NULL, *fd_out = NULL;
146
147     if (backup_script != "") {
148         int fds[4];
149
150         if (pipe(&fds[0]) < 0) {
151             fatal("Unable to create pipe for upload script");
152         }
153         if (pipe(&fds[2]) < 0) {
154             fatal("Unable to create pipe for upload script");
155         }
156
157         pid = fork();
158         if (pid < 0) {
159             fprintf(stderr, "Unable to fork for upload script: %m\n");
160             fatal("fork: upload script");
161         }
162
163         if (pid > 0) {
164             /* Parent */
165             close(fds[0]);
166             close(fds[3]);
167             cloexec(fds[1]); fd_in = fdopen(fds[1], "w");
168             cloexec(fds[2]); fd_out = fdopen(fds[2], "r");
169         } else if (pid == 0) {
170             /* Child */
171             if (dup2(fds[0], 0) < 0)
172                 exit(1);
173             if (dup2(fds[3], 1) < 0)
174                 exit(1);
175             for (int i = 0; i < 3; i++)
176                 close(fds[i]);
177
178             execlp("/bin/sh", "/bin/sh", "-c", backup_script.c_str(), NULL);
179             fatal("exec failed");
180         }
181     }
182
183     while (true) {
184         RemoteFile *file = NULL;
185
186         // Wait for a file to transfer
187         pthread_mutex_lock(&lock);
188         while (transfer_queue.empty() && !terminate) {
189             busy = false;
190             pthread_cond_broadcast(&cond);
191             pthread_cond_wait(&cond, &lock);
192         }
193         if (terminate && transfer_queue.empty()) {
194             busy = false;
195             pthread_cond_broadcast(&cond);
196             pthread_mutex_unlock(&lock);
197             break;
198         }
199         busy = true;
200         file = transfer_queue.front();
201         transfer_queue.pop_front();
202         pthread_cond_broadcast(&cond);
203         pthread_mutex_unlock(&lock);
204
205         // Transfer the file
206         if (backup_script != "") {
207             string cmd = "PUT ";
208             cmd += uri_encode(file->type) + " ";
209             cmd += uri_encode(file->remote_path) + " ";
210             cmd += uri_encode(file->local_path) + "\n";
211
212             fputs(cmd.c_str(), fd_in);
213             fflush(fd_in);
214
215             char *resp = NULL;
216             size_t n;
217             if (getline(&resp, &n, fd_out) < 0 || resp == NULL)
218                 fatal("error reading response from upload script");
219             if (strchr(resp, '\n'))
220                 *strchr(resp, '\n') = '\0';
221             if (strcmp(resp, "OK") != 0)
222                 fatal("error response from upload script");
223
224             if (unlink(file->local_path.c_str()) < 0) {
225                 fprintf(stderr, "Warning: Deleting temporary file %s: %m\n",
226                         file->local_path.c_str());
227             }
228         }
229
230         delete file;
231     }
232
233     if (fd_in) fclose(fd_in);
234
235     if (pid) {
236         int status = 0;
237         waitpid(pid, &status, 0);
238         if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
239             fprintf(stderr, "Warning: error code from upload script: %d\n",
240                     status);
241         }
242     }
243
244     if (fd_out) fclose(fd_out);
245 }
246
247 RemoteFile::RemoteFile(RemoteStore *remote,
248                        const string &name, const string &type,
249                        const string &local_path)
250 {
251     remote_store = remote;
252     this->type = type;
253     this->local_path = local_path;
254     this->remote_path = name;
255
256     fd = open(local_path.c_str(), O_WRONLY | O_CREAT, 0666);
257     if (fd < 0)
258         fatal("Error opening output file");
259 }