Add byte range support to the Azure backend.
[bluesky.git] / bluesky / store-azure.c
1 /* Blue Sky: File Systems in the Cloud
2  *
3  * Copyright (C) 2009  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * TODO: Licensing
7  */
8
9 #include <stdint.h>
10 #include <stdlib.h>
11 #include <glib.h>
12 #include <string.h>
13 #include <gcrypt.h>
14 #include <curl/curl.h>
15
16 #include "bluesky-private.h"
17 #include "libs3.h"
18
19 #define AZURE_API_VERSION "2009-09-19"
20
21 /* Fixed headers that are used in calculating the request signature for Azure,
22  * in the order that they are included. */
23 static const char *signature_headers[] = {
24     "Content-Encoding", "Content-Language", "Content-Length", "Content-MD5",
25     "Content-Type", "Date", "If-Modified-Since", "If-Match", "If-None-Match",
26     "If-Unmodified-Since", "Range", NULL
27 };
28
29 /* Prototype Windows Azure backend for BlueSky.  This is intended to be
30  * minimally functional, but could use additional work for production use. */
31
32 typedef struct {
33     GThreadPool *thread_pool;
34     char *account, *container;
35     uint8_t *key;
36     size_t key_len;
37 } AzureStore;
38
39 static void get_extra_headers(gchar *key, gchar *value, GList **headers)
40 {
41     key = g_ascii_strdown(key, strlen(key));
42     if (strncmp(key, "x-ms-", strlen("x-ms-")) == 0) {
43         *headers = g_list_prepend(*headers,
44                                   g_strdup_printf("%s:%s\n", key, value));
45     }
46     g_free(key);
47 }
48
49 static void get_curl_headers(gchar *key, gchar *value,
50                              struct curl_slist **curl_headers)
51 {
52     char *line = g_strdup_printf("%s: %s", key, value);
53     *curl_headers = curl_slist_append(*curl_headers, line);
54     g_free(line);
55 }
56
57 struct curlbuf {
58     /* For reading */
59     const char *readbuf;
60     size_t readsize;
61 };
62
63 static size_t curl_readfunc(void *ptr, size_t size, size_t nmemb,
64                             void *userdata)
65 {
66     struct curlbuf *buf = (struct curlbuf *)userdata;
67
68     if (buf == NULL)
69         return 0;
70
71     size_t copied = size * nmemb;
72     if (copied > buf->readsize)
73         copied = buf->readsize;
74
75     memcpy(ptr, buf->readbuf, copied);
76     buf->readbuf += copied;
77     buf->readsize -= copied;
78
79     return copied;
80 }
81
82 static size_t curl_writefunc(void *ptr, size_t size, size_t nmemb,
83                              void *userdata)
84 {
85     GString *buf = (GString *)userdata;
86     if (buf != NULL)
87         g_string_append_len(buf, ptr, size * nmemb);
88     return size * nmemb;
89 }
90
91 /* Compute the signature for a request to Azure and add it to the headers. */
92 static void azure_compute_signature(AzureStore *store,
93                                     GHashTable *headers,
94                                     const char *method, const char *path)
95 {
96     if (g_hash_table_lookup(headers, "Date") == NULL) {
97         time_t t;
98         struct tm now;
99         char timebuf[4096];
100         time(&t);
101         gmtime_r(&t, &now);
102         strftime(timebuf, sizeof(timebuf), "%a, %d %b %Y %H:%M:%S GMT", &now);
103         g_hash_table_insert(headers, g_strdup("Date"), g_strdup(timebuf));
104     }
105
106     g_hash_table_insert(headers, g_strdup("x-ms-version"),
107                         g_strdup(AZURE_API_VERSION));
108
109     GString *to_sign = g_string_new("");
110     g_string_append_printf(to_sign, "%s\n", method);
111     for (const char **h = signature_headers; *h != NULL; h++) {
112         const char *val = g_hash_table_lookup(headers, *h);
113         g_string_append_printf(to_sign, "%s\n", val ? val : "");
114     }
115
116     GList *extra_headers = NULL;
117     g_hash_table_foreach(headers, (GHFunc)get_extra_headers, &extra_headers);
118     extra_headers = g_list_sort(extra_headers, (GCompareFunc)g_strcmp0);
119     while (extra_headers != NULL) {
120         g_string_append(to_sign, extra_headers->data);
121         g_free(extra_headers->data);
122         extra_headers = g_list_delete_link(extra_headers, extra_headers);
123     }
124
125     /* FIXME: Doesn't handle query parameters (after '?') */
126     g_string_append_printf(to_sign, "/%s/%s/%s",
127                            store->account, store->container, path);
128
129     /* Compute an HMAC-SHA-256 of the encoded parameters */
130     gcry_error_t status;
131     gcry_md_hd_t handle;
132     status = gcry_md_open(&handle, GCRY_MD_SHA256, GCRY_MD_FLAG_HMAC);
133     g_assert(status == 0);
134     status = gcry_md_setkey(handle, store->key, store->key_len);
135     g_assert(status == 0);
136     gcry_md_write(handle, to_sign->str, to_sign->len);
137     unsigned char *digest = gcry_md_read(handle, GCRY_MD_SHA256);
138     gchar *signature = g_base64_encode(digest,
139                                        gcry_md_get_algo_dlen(GCRY_MD_SHA256));
140     g_hash_table_insert(headers, g_strdup("Authorization"),
141                         g_strdup_printf("SharedKey %s:%s",
142                                         store->account, signature));
143     g_free(signature);
144     gcry_md_close(handle);
145     g_string_free(to_sign, TRUE);
146 }
147
148 /* Submit an HTTP request using CURL.  Takes as input the Azure storage backend
149  * we are acting for, as well as the method (GET, PUT, etc.), HTTP path, other
150  * HTTP headers, and an optional body.  If body is not NULL, an empty body is
151  * sent.  This will compute an Azure authentication signature before sending
152  * the request. */
153 static BlueSkyRCStr *submit_request(AzureStore *store,
154                                     const char *method,
155                                     const char *path,
156                                     GHashTable *headers,
157                                     BlueSkyRCStr *body)
158 {
159     BlueSkyRCStr *result = NULL;
160
161     g_hash_table_insert(headers,
162                         g_strdup("Content-Length"),
163                         g_strdup_printf("%zd", body != NULL ? body->len : 0));
164
165     if (body != 0 && body->len > 0) {
166         GChecksum *csum = g_checksum_new(G_CHECKSUM_MD5);
167         g_checksum_update(csum, (uint8_t *)body->data, body->len);
168         uint8_t md5[16];
169         gsize md5_len = sizeof(md5);
170         g_checksum_get_digest(csum, md5, &md5_len);
171         g_hash_table_insert(headers,
172                             g_strdup("Content-MD5"),
173                             g_base64_encode(md5, md5_len));
174         g_checksum_free(csum);
175     }
176
177     azure_compute_signature(store, headers, method, path);
178
179     CURLcode status;
180     CURL *curl = curl_easy_init();
181
182 #define curl_easy_setopt_safe(opt, val)                                     \
183     if ((status = curl_easy_setopt(curl, (opt), (val))) != CURLE_OK) {      \
184         fprintf(stderr, "CURL error: %s!\n", curl_easy_strerror(status));   \
185         goto cleanup;                                                       \
186     }
187
188     curl_easy_setopt_safe(CURLOPT_NOSIGNAL, 1);
189     curl_easy_setopt_safe(CURLOPT_NOPROGRESS, 1);
190     curl_easy_setopt_safe(CURLOPT_NETRC, CURL_NETRC_IGNORED);
191     curl_easy_setopt_safe(CURLOPT_FOLLOWLOCATION, 1);
192     curl_easy_setopt_safe(CURLOPT_MAXREDIRS, 10);
193
194     curl_easy_setopt_safe(CURLOPT_HEADERFUNCTION, curl_writefunc);
195     curl_easy_setopt_safe(CURLOPT_HEADERDATA, NULL);
196
197     struct curlbuf readbuf;
198     if (body != NULL) {
199         readbuf.readbuf = body->data;
200         readbuf.readsize = body->len;
201     }
202     curl_easy_setopt_safe(CURLOPT_READFUNCTION, curl_readfunc);
203     curl_easy_setopt_safe(CURLOPT_READDATA, body ? &readbuf : NULL);
204
205     GString *result_body = g_string_new("");
206     curl_easy_setopt_safe(CURLOPT_WRITEFUNCTION, curl_writefunc);
207     curl_easy_setopt_safe(CURLOPT_WRITEDATA, result_body);
208
209     struct curl_slist *curl_headers = NULL;
210     g_hash_table_foreach(headers, (GHFunc)get_curl_headers, &curl_headers);
211     curl_easy_setopt_safe(CURLOPT_HTTPHEADER, curl_headers);
212
213     char *uri = g_strdup_printf("http://%s.blob.core.windows.net/%s/%s",
214                                 store->account, store->container, path);
215     printf("URI: %s\n", uri);
216     curl_easy_setopt_safe(CURLOPT_URL, uri);
217
218     if (strcmp(method, "GET") == 0) {
219         /* nothing special needed */
220     } else if (strcmp(method, "PUT") == 0) {
221         curl_easy_setopt_safe(CURLOPT_UPLOAD, 1);
222     } else if (strcmp(method, "DELETE") == 0) {
223         curl_easy_setopt_safe(CURLOPT_CUSTOMREQUEST, "DELETE");
224     }
225
226     status = curl_easy_perform(curl);
227     if (status != 0) {
228         fprintf(stderr, "CURL error: %s!\n", curl_easy_strerror(status));
229         goto cleanup;
230     }
231
232     long response_code = 0;
233     status = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response_code);
234     if (status != 0) {
235         fprintf(stderr, "CURL error: %s!\n", curl_easy_strerror(status));
236         goto cleanup;
237     }
238
239     if (response_code / 100 == 2) {
240         result = bluesky_string_new_from_gstring(result_body);
241         result_body = NULL;
242     } else {
243         fprintf(stderr, "HTTP response code: %ld!\n", response_code);
244         goto cleanup;
245     }
246
247 cleanup:
248     if (result != NULL && result_body != NULL)
249         g_string_free(result_body, TRUE);
250     curl_easy_cleanup(curl);
251     curl_slist_free_all(curl_headers);
252     g_free(uri);
253
254     return result;
255 }
256
257 static void azurestore_task(gpointer a, gpointer s)
258 {
259     BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a;
260     AzureStore *store = (AzureStore *)s;
261
262     async->status = ASYNC_RUNNING;
263     async->exec_time = bluesky_now_hires();
264
265     GHashTable *headers = g_hash_table_new_full(g_str_hash, g_str_equal,
266                                                 g_free, g_free);
267
268     BlueSkyRCStr *result = NULL;
269
270     if (async->op == STORE_OP_GET) {
271         /* FIXME: We ought to check that the response returned the requested
272          * byte range. */
273         if (async->start != 0 && async->len != 0) {
274             g_hash_table_insert(headers,
275                                 g_strdup("Range"),
276                                 g_strdup_printf("%zd-%zd", async->start,
277                                                 async->start + async->len));
278             async->range_done = TRUE;
279         } else if (async->start != 0) {
280             g_hash_table_insert(headers,
281                                 g_strdup("Range"),
282                                 g_strdup_printf("%zd-", async->start));
283             async->range_done = TRUE;
284         }
285         result = submit_request(store, "GET", async->key, headers, NULL);
286         if (result != NULL) {
287             async->data = result;
288             async->result = 0;
289         }
290     } else if (async->op == STORE_OP_PUT) {
291         g_hash_table_insert(headers,
292                             g_strdup("x-ms-blob-type"),
293                             g_strdup("BlockBlob"));
294         g_hash_table_insert(headers,
295                             g_strdup("Transfer-Encoding"),
296                             g_strdup(""));
297         result = submit_request(store, "PUT", async->key,
298                                 headers, async->data);
299         if (result != NULL) {
300             async->result = 0;
301         }
302         bluesky_string_unref(result);
303     }
304
305     bluesky_store_async_mark_complete(async);
306     bluesky_store_async_unref(async);
307     g_hash_table_unref(headers);
308 }
309
310 static gpointer azurestore_new(const gchar *path)
311 {
312     AzureStore *store = g_new(AzureStore, 1);
313     store->thread_pool = g_thread_pool_new(azurestore_task, store, -1, FALSE,
314                                            NULL);
315     if (path == NULL || strlen(path) == 0)
316         store->container = g_strdup("bluesky");
317     else
318         store->container = g_strdup(path);
319
320     store->account = g_strdup(getenv("AZURE_ACCOUNT_NAME"));
321
322     const char *key = getenv("AZURE_SECRET_KEY");
323     store->key = g_base64_decode(key, &store->key_len);
324
325     g_print("Initializing Azure with account %s, container %s\n",
326             store->account, store->container);
327
328     return store;
329 }
330
331 static void azurestore_destroy(gpointer store)
332 {
333     /* TODO: Clean up resources */
334 }
335
336 static void azurestore_submit(gpointer s, BlueSkyStoreAsync *async)
337 {
338     AzureStore *store = (AzureStore *)s;
339     g_return_if_fail(async->status == ASYNC_NEW);
340     g_return_if_fail(async->op != STORE_OP_NONE);
341
342     switch (async->op) {
343     case STORE_OP_GET:
344     case STORE_OP_PUT:
345         async->status = ASYNC_PENDING;
346         bluesky_store_async_ref(async);
347         g_thread_pool_push(store->thread_pool, async, NULL);
348         break;
349
350     default:
351         g_warning("Uknown operation type for AzureStore: %d\n", async->op);
352         bluesky_store_async_mark_complete(async);
353         break;
354     }
355 }
356
357 static void azurestore_cleanup(gpointer store, BlueSkyStoreAsync *async)
358 {
359 }
360
361 static char *azurestore_lookup_last(gpointer s, const char *prefix)
362 {
363     return NULL;
364 }
365
366 static BlueSkyStoreImplementation store_impl = {
367     .create = azurestore_new,
368     .destroy = azurestore_destroy,
369     .submit = azurestore_submit,
370     .cleanup = azurestore_cleanup,
371     .lookup_last = azurestore_lookup_last,
372 };
373
374 void bluesky_store_init_azure(void)
375 {
376     bluesky_store_register(&store_impl, "azure");
377 }
378
379 #if 0
380 typedef struct {
381     enum { S3_GET, S3_PUT } op;
382     gchar *key;
383     BlueSkyRCStr *data;
384 } S3Op;
385
386 struct get_info {
387     int success;
388     GString *buf;
389 };
390
391 struct put_info {
392     int success;
393     BlueSkyRCStr *val;
394     gint offset;
395 };
396
397 struct list_info {
398     int success;
399     char *last_entry;
400     gboolean truncated;
401 };
402
403 static S3Status s3store_get_handler(int bufferSize, const char *buffer,
404                                     void *callbackData)
405 {
406     struct get_info *info = (struct get_info *)callbackData;
407     g_string_append_len(info->buf, buffer, bufferSize);
408     return S3StatusOK;
409 }
410
411 static int s3store_put_handler(int bufferSize, char *buffer,
412                                void *callbackData)
413 {
414     struct put_info *info = (struct put_info *)callbackData;
415     gint bytes = MIN(bufferSize, (int)(info->val->len - info->offset));
416     memcpy(buffer, (char *)info->val->data + info->offset, bytes);
417     info->offset += bytes;
418     return bytes;
419 }
420
421 static S3Status s3store_properties_callback(const S3ResponseProperties *properties,
422                                      void *callbackData)
423 {
424     return S3StatusOK;
425 }
426
427 static void s3store_response_callback(S3Status status,
428                                const S3ErrorDetails *errorDetails,
429                                void *callbackData)
430 {
431     struct get_info *info = (struct get_info *)callbackData;
432
433     if (status == 0) {
434         info->success = 1;
435     }
436
437     if (errorDetails != NULL && errorDetails->message != NULL) {
438         g_print("  Error message: %s\n", errorDetails->message);
439     }
440 }
441
442 static void s3store_task(gpointer a, gpointer s)
443 {
444     BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a;
445     S3Store *store = (S3Store *)s;
446
447     async->status = ASYNC_RUNNING;
448     async->exec_time = bluesky_now_hires();
449
450     if (async->op == STORE_OP_GET) {
451         struct get_info info;
452         info.buf = g_string_new("");
453         info.success = 0;
454
455         struct S3GetObjectHandler handler;
456         handler.responseHandler.propertiesCallback = s3store_properties_callback;
457         handler.responseHandler.completeCallback = s3store_response_callback;
458         handler.getObjectDataCallback = s3store_get_handler;
459
460         S3_get_object(&store->bucket, async->key, NULL,
461                       async->start, async->len, NULL, &handler, &info);
462         async->range_done = TRUE;
463
464         if (info.success) {
465             async->data = bluesky_string_new_from_gstring(info.buf);
466             async->result = 0;
467         } else {
468             g_string_free(info.buf, TRUE);
469         }
470
471     } else if (async->op == STORE_OP_PUT) {
472         struct put_info info;
473         info.success = 0;
474         info.val = async->data;
475         info.offset = 0;
476
477         struct S3PutObjectHandler handler;
478         handler.responseHandler.propertiesCallback
479             = s3store_properties_callback;
480         handler.responseHandler.completeCallback = s3store_response_callback;
481         handler.putObjectDataCallback = s3store_put_handler;
482
483         S3_put_object(&store->bucket, async->key, async->data->len, NULL, NULL,
484                       &handler, &info);
485
486         if (info.success) {
487             async->result = 0;
488         } else {
489             g_warning("Error completing S3 put operation; client must retry!");
490         }
491     }
492
493     bluesky_store_async_mark_complete(async);
494     bluesky_store_async_unref(async);
495 }
496
497 static S3Status s3store_list_handler(int isTruncated,
498                                      const char *nextMarker,
499                                      int contentsCount,
500                                      const S3ListBucketContent *contents,
501                                      int commonPrefixesCount,
502                                      const char **commonPrefixes,
503                                      void *callbackData)
504 {
505     struct list_info *info = (struct list_info *)callbackData;
506     if (contentsCount > 0) {
507         g_free(info->last_entry);
508         info->last_entry = g_strdup(contents[contentsCount - 1].key);
509     }
510     info->truncated = isTruncated;
511     return S3StatusOK;
512 }
513
514 static char *s3store_lookup_last(gpointer s, const char *prefix)
515 {
516     S3Store *store = (S3Store *)s;
517     struct list_info info = {0, NULL, FALSE};
518
519     struct S3ListBucketHandler handler;
520     handler.responseHandler.propertiesCallback
521         = s3store_properties_callback;
522     handler.responseHandler.completeCallback = s3store_response_callback;
523     handler.listBucketCallback = s3store_list_handler;
524
525     char *marker = NULL;
526
527     do {
528         S3_list_bucket(&store->bucket, prefix, marker, NULL, 1024, NULL,
529                        &handler, &info);
530         g_free(marker);
531         marker = g_strdup(info.last_entry);
532         g_print("Last key: %s\n", info.last_entry);
533     } while (info.truncated);
534
535     g_free(marker);
536
537     return info.last_entry;
538 }
539
540 static gpointer s3store_new(const gchar *path)
541 {
542     S3Store *store = g_new(S3Store, 1);
543     store->thread_pool = g_thread_pool_new(s3store_task, store, -1, FALSE,
544                                            NULL);
545     if (path == NULL || strlen(path) == 0)
546         store->bucket.bucketName = "mvrable-bluesky";
547     else
548         store->bucket.bucketName = g_strdup(path);
549     store->bucket.protocol = S3ProtocolHTTP;
550     store->bucket.uriStyle = S3UriStylePath;
551     store->bucket.accessKeyId = getenv("AWS_ACCESS_KEY_ID");
552     store->bucket.secretAccessKey = getenv("AWS_SECRET_ACCESS_KEY");
553
554     const char *key = getenv("BLUESKY_KEY");
555     if (key == NULL) {
556         g_error("Encryption key not defined; please set BLUESKY_KEY environment variable");
557         exit(1);
558     }
559
560     bluesky_crypt_hash_key(key, store->encryption_key);
561
562     g_print("Initializing S3 with bucket %s, access key %s, encryption key %s\n",
563             store->bucket.bucketName, store->bucket.accessKeyId, key);
564
565     return store;
566 }
567
568 static void s3store_destroy(gpointer store)
569 {
570     g_free(store);
571 }
572
573 static void s3store_submit(gpointer s, BlueSkyStoreAsync *async)
574 {
575     S3Store *store = (S3Store *)s;
576     g_return_if_fail(async->status == ASYNC_NEW);
577     g_return_if_fail(async->op != STORE_OP_NONE);
578
579     switch (async->op) {
580     case STORE_OP_GET:
581     case STORE_OP_PUT:
582         async->status = ASYNC_PENDING;
583         bluesky_store_async_ref(async);
584         g_thread_pool_push(store->thread_pool, async, NULL);
585         break;
586
587     default:
588         g_warning("Uknown operation type for S3Store: %d\n", async->op);
589         bluesky_store_async_mark_complete(async);
590         break;
591     }
592 }
593
594 static void s3store_cleanup(gpointer store, BlueSkyStoreAsync *async)
595 {
596     GString *buf = (GString *)async->store_private;
597
598     if (buf != NULL) {
599         g_string_free(buf, TRUE);
600         async->store_private = NULL;
601     }
602 }
603
604 static BlueSkyStoreImplementation store_impl = {
605     .create = s3store_new,
606     .destroy = s3store_destroy,
607     .submit = s3store_submit,
608     .cleanup = s3store_cleanup,
609     .lookup_last = s3store_lookup_last,
610 };
611
612 void bluesky_store_init_s3(void)
613 {
614     S3_initialize(NULL, S3_INIT_ALL);
615     bluesky_store_register(&store_impl, "s3");
616 }
617 #endif