Add proper per-file copyright notices/licenses and top-level license.
[bluesky.git] / bluesky / store-s3.c
1 /* Blue Sky: File Systems in the Cloud
2  *
3  * Copyright (C) 2009  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  * 3. Neither the name of the University nor the names of its contributors
15  *    may be used to endorse or promote products derived from this software
16  *    without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
19  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
22  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28  * SUCH DAMAGE.
29  */
30
31 #include <stdint.h>
32 #include <stdlib.h>
33 #include <glib.h>
34 #include <string.h>
35
36 #include "bluesky-private.h"
37 #include "libs3.h"
38
39 /* Interface to Amazon S3 storage. */
40
41 typedef struct {
42     GThreadPool *thread_pool;
43     S3BucketContext bucket;
44     uint8_t encryption_key[CRYPTO_KEY_SIZE];
45 } S3Store;
46
47 typedef struct {
48     enum { S3_GET, S3_PUT } op;
49     gchar *key;
50     BlueSkyRCStr *data;
51 } S3Op;
52
53 struct get_info {
54     int success;
55     GString *buf;
56 };
57
58 struct put_info {
59     int success;
60     BlueSkyRCStr *val;
61     gint offset;
62 };
63
64 struct list_info {
65     int success;
66     char *last_entry;
67     gboolean truncated;
68 };
69
70 static S3Status s3store_get_handler(int bufferSize, const char *buffer,
71                                     void *callbackData)
72 {
73     struct get_info *info = (struct get_info *)callbackData;
74     g_string_append_len(info->buf, buffer, bufferSize);
75     return S3StatusOK;
76 }
77
78 static int s3store_put_handler(int bufferSize, char *buffer,
79                                void *callbackData)
80 {
81     struct put_info *info = (struct put_info *)callbackData;
82     gint bytes = MIN(bufferSize, (int)(info->val->len - info->offset));
83     memcpy(buffer, (char *)info->val->data + info->offset, bytes);
84     info->offset += bytes;
85     return bytes;
86 }
87
88 static S3Status s3store_properties_callback(const S3ResponseProperties *properties,
89                                      void *callbackData)
90 {
91     return S3StatusOK;
92 }
93
94 static void s3store_response_callback(S3Status status,
95                                const S3ErrorDetails *errorDetails,
96                                void *callbackData)
97 {
98     struct get_info *info = (struct get_info *)callbackData;
99
100     if (status == 0) {
101         info->success = 1;
102     }
103
104     if (errorDetails != NULL && errorDetails->message != NULL) {
105         g_print("  Error message: %s\n", errorDetails->message);
106     }
107 }
108
109 static void s3store_task(gpointer a, gpointer s)
110 {
111     BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a;
112     S3Store *store = (S3Store *)s;
113
114     async->status = ASYNC_RUNNING;
115     async->exec_time = bluesky_now_hires();
116
117     if (async->op == STORE_OP_GET) {
118         struct get_info info;
119         info.buf = g_string_new("");
120         info.success = 0;
121
122         struct S3GetObjectHandler handler;
123         handler.responseHandler.propertiesCallback = s3store_properties_callback;
124         handler.responseHandler.completeCallback = s3store_response_callback;
125         handler.getObjectDataCallback = s3store_get_handler;
126
127         S3_get_object(&store->bucket, async->key, NULL,
128                       async->start, async->len, NULL, &handler, &info);
129         async->range_done = TRUE;
130
131         if (info.success) {
132             async->data = bluesky_string_new_from_gstring(info.buf);
133             async->result = 0;
134         } else {
135             g_string_free(info.buf, TRUE);
136         }
137
138     } else if (async->op == STORE_OP_PUT) {
139         struct put_info info;
140         info.success = 0;
141         info.val = async->data;
142         info.offset = 0;
143
144         struct S3PutObjectHandler handler;
145         handler.responseHandler.propertiesCallback
146             = s3store_properties_callback;
147         handler.responseHandler.completeCallback = s3store_response_callback;
148         handler.putObjectDataCallback = s3store_put_handler;
149
150         S3_put_object(&store->bucket, async->key, async->data->len, NULL, NULL,
151                       &handler, &info);
152
153         if (info.success) {
154             async->result = 0;
155         } else {
156             g_warning("Error completing S3 put operation; client must retry!");
157         }
158     }
159
160     bluesky_store_async_mark_complete(async);
161     bluesky_store_async_unref(async);
162 }
163
164 static S3Status s3store_list_handler(int isTruncated,
165                                      const char *nextMarker,
166                                      int contentsCount,
167                                      const S3ListBucketContent *contents,
168                                      int commonPrefixesCount,
169                                      const char **commonPrefixes,
170                                      void *callbackData)
171 {
172     struct list_info *info = (struct list_info *)callbackData;
173     if (contentsCount > 0) {
174         g_free(info->last_entry);
175         info->last_entry = g_strdup(contents[contentsCount - 1].key);
176     }
177     info->truncated = isTruncated;
178     return S3StatusOK;
179 }
180
181 static char *s3store_lookup_last(gpointer s, const char *prefix)
182 {
183     S3Store *store = (S3Store *)s;
184     struct list_info info = {0, NULL, FALSE};
185
186     struct S3ListBucketHandler handler;
187     handler.responseHandler.propertiesCallback
188         = s3store_properties_callback;
189     handler.responseHandler.completeCallback = s3store_response_callback;
190     handler.listBucketCallback = s3store_list_handler;
191
192     char *marker = NULL;
193
194     do {
195         S3_list_bucket(&store->bucket, prefix, marker, NULL, 1024, NULL,
196                        &handler, &info);
197         g_free(marker);
198         marker = g_strdup(info.last_entry);
199         g_print("Last key: %s\n", info.last_entry);
200     } while (info.truncated);
201
202     g_free(marker);
203
204     return info.last_entry;
205 }
206
207 static gpointer s3store_new(const gchar *path)
208 {
209     S3Store *store = g_new(S3Store, 1);
210     store->thread_pool = g_thread_pool_new(s3store_task, store, -1, FALSE,
211                                            NULL);
212     if (path == NULL || strlen(path) == 0)
213         store->bucket.bucketName = "mvrable-bluesky";
214     else
215         store->bucket.bucketName = g_strdup(path);
216     store->bucket.protocol = S3ProtocolHTTP;
217     store->bucket.uriStyle = S3UriStyleVirtualHost;
218     store->bucket.accessKeyId = getenv("AWS_ACCESS_KEY_ID");
219     store->bucket.secretAccessKey = getenv("AWS_SECRET_ACCESS_KEY");
220
221     const char *key = getenv("BLUESKY_KEY");
222     if (key == NULL) {
223         g_error("Encryption key not defined; please set BLUESKY_KEY environment variable");
224         exit(1);
225     }
226
227     bluesky_crypt_hash_key(key, store->encryption_key);
228
229     g_print("Initializing S3 with bucket %s, access key %s, encryption key %s\n",
230             store->bucket.bucketName, store->bucket.accessKeyId, key);
231
232     return store;
233 }
234
235 static void s3store_destroy(gpointer store)
236 {
237     g_free(store);
238 }
239
240 static void s3store_submit(gpointer s, BlueSkyStoreAsync *async)
241 {
242     S3Store *store = (S3Store *)s;
243     g_return_if_fail(async->status == ASYNC_NEW);
244     g_return_if_fail(async->op != STORE_OP_NONE);
245
246     switch (async->op) {
247     case STORE_OP_GET:
248     case STORE_OP_PUT:
249         async->status = ASYNC_PENDING;
250         bluesky_store_async_ref(async);
251         g_thread_pool_push(store->thread_pool, async, NULL);
252         break;
253
254     default:
255         g_warning("Uknown operation type for S3Store: %d\n", async->op);
256         bluesky_store_async_mark_complete(async);
257         break;
258     }
259 }
260
261 static void s3store_cleanup(gpointer store, BlueSkyStoreAsync *async)
262 {
263     GString *buf = (GString *)async->store_private;
264
265     if (buf != NULL) {
266         g_string_free(buf, TRUE);
267         async->store_private = NULL;
268     }
269 }
270
271 static BlueSkyStoreImplementation store_impl = {
272     .create = s3store_new,
273     .destroy = s3store_destroy,
274     .submit = s3store_submit,
275     .cleanup = s3store_cleanup,
276     .lookup_last = s3store_lookup_last,
277 };
278
279 void bluesky_store_init_s3(void)
280 {
281     S3_initialize(NULL, S3_INIT_ALL, NULL);
282     bluesky_store_register(&store_impl, "s3");
283 }