0c9e33d660f3b0498587fee31267f3a2b97df5e9
[bluesky.git] / bluesky / store-s3.c
1 /* Blue Sky: File Systems in the Cloud
2  *
3  * Copyright (C) 2009  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * TODO: Licensing
7  */
8
9 #include <stdint.h>
10 #include <stdlib.h>
11 #include <glib.h>
12 #include <string.h>
13
14 #include "bluesky-private.h"
15 #include "libs3.h"
16
17 /* Interface to Amazon S3 storage. */
18
19 typedef struct {
20     GThreadPool *thread_pool;
21     S3BucketContext bucket;
22     uint8_t encryption_key[CRYPTO_KEY_SIZE];
23 } S3Store;
24
25 typedef struct {
26     enum { S3_GET, S3_PUT } op;
27     gchar *key;
28     BlueSkyRCStr *data;
29 } S3Op;
30
31 struct get_info {
32     int success;
33     GString *buf;
34 };
35
36 struct put_info {
37     int success;
38     BlueSkyRCStr *val;
39     gint offset;
40 };
41
42 struct list_info {
43     int success;
44     char *last_entry;
45     gboolean truncated;
46 };
47
48 static S3Status s3store_get_handler(int bufferSize, const char *buffer,
49                                     void *callbackData)
50 {
51     struct get_info *info = (struct get_info *)callbackData;
52     g_string_append_len(info->buf, buffer, bufferSize);
53     return S3StatusOK;
54 }
55
56 static int s3store_put_handler(int bufferSize, char *buffer,
57                                void *callbackData)
58 {
59     struct put_info *info = (struct put_info *)callbackData;
60     gint bytes = MIN(bufferSize, (int)(info->val->len - info->offset));
61     memcpy(buffer, (char *)info->val->data + info->offset, bytes);
62     info->offset += bytes;
63     return bytes;
64 }
65
66 static S3Status s3store_properties_callback(const S3ResponseProperties *properties,
67                                      void *callbackData)
68 {
69     return S3StatusOK;
70 }
71
72 static void s3store_response_callback(S3Status status,
73                                const S3ErrorDetails *errorDetails,
74                                void *callbackData)
75 {
76     struct get_info *info = (struct get_info *)callbackData;
77
78     if (status == 0) {
79         info->success = 1;
80     }
81
82     if (errorDetails != NULL && errorDetails->message != NULL) {
83         g_print("  Error message: %s\n", errorDetails->message);
84     }
85 }
86
87 static void s3store_task(gpointer a, gpointer s)
88 {
89     BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a;
90     S3Store *store = (S3Store *)s;
91
92     async->status = ASYNC_RUNNING;
93     async->exec_time = bluesky_now_hires();
94
95     if (async->op == STORE_OP_GET) {
96         struct get_info info;
97         info.buf = g_string_new("");
98         info.success = 0;
99
100         struct S3GetObjectHandler handler;
101         handler.responseHandler.propertiesCallback = s3store_properties_callback;
102         handler.responseHandler.completeCallback = s3store_response_callback;
103         handler.getObjectDataCallback = s3store_get_handler;
104
105         S3_get_object(&store->bucket, async->key, NULL,
106                       async->start, async->len, NULL, &handler, &info);
107         async->range_done = TRUE;
108
109         if (info.success) {
110             async->data = bluesky_string_new_from_gstring(info.buf);
111             async->result = 0;
112         } else {
113             g_string_free(info.buf, TRUE);
114         }
115
116     } else if (async->op == STORE_OP_PUT) {
117         struct put_info info;
118         info.success = 0;
119         info.val = async->data;
120         info.offset = 0;
121
122         struct S3PutObjectHandler handler;
123         handler.responseHandler.propertiesCallback
124             = s3store_properties_callback;
125         handler.responseHandler.completeCallback = s3store_response_callback;
126         handler.putObjectDataCallback = s3store_put_handler;
127
128         S3_put_object(&store->bucket, async->key, async->data->len, NULL, NULL,
129                       &handler, &info);
130
131         if (info.success) {
132             async->result = 0;
133         } else {
134             g_warning("Error completing S3 put operation; client must retry!");
135         }
136     }
137
138     bluesky_store_async_mark_complete(async);
139     bluesky_store_async_unref(async);
140 }
141
142 static S3Status s3store_list_handler(int isTruncated,
143                                      const char *nextMarker,
144                                      int contentsCount,
145                                      const S3ListBucketContent *contents,
146                                      int commonPrefixesCount,
147                                      const char **commonPrefixes,
148                                      void *callbackData)
149 {
150     struct list_info *info = (struct list_info *)callbackData;
151     if (contentsCount > 0) {
152         g_free(info->last_entry);
153         info->last_entry = g_strdup(contents[contentsCount - 1].key);
154     }
155     info->truncated = isTruncated;
156     return S3StatusOK;
157 }
158
159 static char *s3store_lookup_last(gpointer s, const char *prefix)
160 {
161     S3Store *store = (S3Store *)s;
162     struct list_info info = {0, NULL, FALSE};
163
164     struct S3ListBucketHandler handler;
165     handler.responseHandler.propertiesCallback
166         = s3store_properties_callback;
167     handler.responseHandler.completeCallback = s3store_response_callback;
168     handler.listBucketCallback = s3store_list_handler;
169
170     char *marker = NULL;
171
172     do {
173         S3_list_bucket(&store->bucket, prefix, marker, NULL, 1024, NULL,
174                        &handler, &info);
175         g_free(marker);
176         marker = g_strdup(info.last_entry);
177         g_print("Last key: %s\n", info.last_entry);
178     } while (info.truncated);
179
180     g_free(marker);
181
182     return info.last_entry;
183 }
184
185 static gpointer s3store_new(const gchar *path)
186 {
187     S3Store *store = g_new(S3Store, 1);
188     store->thread_pool = g_thread_pool_new(s3store_task, store, -1, FALSE,
189                                            NULL);
190     if (path == NULL || strlen(path) == 0)
191         store->bucket.bucketName = "mvrable-bluesky";
192     else
193         store->bucket.bucketName = g_strdup(path);
194     store->bucket.protocol = S3ProtocolHTTP;
195     store->bucket.uriStyle = S3UriStyleVirtualHost;
196     store->bucket.accessKeyId = getenv("AWS_ACCESS_KEY_ID");
197     store->bucket.secretAccessKey = getenv("AWS_SECRET_ACCESS_KEY");
198
199     const char *key = getenv("BLUESKY_KEY");
200     if (key == NULL) {
201         g_error("Encryption key not defined; please set BLUESKY_KEY environment variable");
202         exit(1);
203     }
204
205     bluesky_crypt_hash_key(key, store->encryption_key);
206
207     g_print("Initializing S3 with bucket %s, access key %s, encryption key %s\n",
208             store->bucket.bucketName, store->bucket.accessKeyId, key);
209
210     return store;
211 }
212
213 static void s3store_destroy(gpointer store)
214 {
215     g_free(store);
216 }
217
218 static void s3store_submit(gpointer s, BlueSkyStoreAsync *async)
219 {
220     S3Store *store = (S3Store *)s;
221     g_return_if_fail(async->status == ASYNC_NEW);
222     g_return_if_fail(async->op != STORE_OP_NONE);
223
224     switch (async->op) {
225     case STORE_OP_GET:
226     case STORE_OP_PUT:
227         async->status = ASYNC_PENDING;
228         bluesky_store_async_ref(async);
229         g_thread_pool_push(store->thread_pool, async, NULL);
230         break;
231
232     default:
233         g_warning("Uknown operation type for S3Store: %d\n", async->op);
234         bluesky_store_async_mark_complete(async);
235         break;
236     }
237 }
238
239 static void s3store_cleanup(gpointer store, BlueSkyStoreAsync *async)
240 {
241     GString *buf = (GString *)async->store_private;
242
243     if (buf != NULL) {
244         g_string_free(buf, TRUE);
245         async->store_private = NULL;
246     }
247 }
248
249 static BlueSkyStoreImplementation store_impl = {
250     .create = s3store_new,
251     .destroy = s3store_destroy,
252     .submit = s3store_submit,
253     .cleanup = s3store_cleanup,
254     .lookup_last = s3store_lookup_last,
255 };
256
257 void bluesky_store_init_s3(void)
258 {
259     S3_initialize(NULL, S3_INIT_ALL, NULL);
260     bluesky_store_register(&store_impl, "s3");
261 }