1 /* Blue Sky: File Systems in the Cloud
3 * Copyright (C) 2009 The Regents of the University of California
4 * Written by Michael Vrable <mvrable@cs.ucsd.edu>
14 #include <curl/curl.h>
16 #include "bluesky-private.h"
19 #define AZURE_API_VERSION "2009-09-19"
21 /* Fixed headers that are used in calculating the request signature for Azure,
22 * in the order that they are included. */
23 static const char *signature_headers[] = {
24 "Content-Encoding", "Content-Language", "Content-Length", "Content-MD5",
25 "Content-Type", "Date", "If-Modified-Since", "If-Match", "If-None-Match",
26 "If-Unmodified-Since", "Range", NULL
29 /* Prototype Windows Azure backend for BlueSky. This is intended to be
30 * minimally functional, but could use additional work for production use. */
33 GThreadPool *thread_pool;
34 char *account, *container;
39 static void get_extra_headers(gchar *key, gchar *value, GList **headers)
41 key = g_ascii_strdown(key, strlen(key));
42 if (strncmp(key, "x-ms-", strlen("x-ms-")) == 0) {
43 *headers = g_list_prepend(*headers,
44 g_strdup_printf("%s:%s\n", key, value));
49 static void get_curl_headers(gchar *key, gchar *value,
50 struct curl_slist **curl_headers)
52 char *line = g_strdup_printf("%s: %s", key, value);
53 *curl_headers = curl_slist_append(*curl_headers, line);
63 static size_t curl_readfunc(void *ptr, size_t size, size_t nmemb,
66 struct curlbuf *buf = (struct curlbuf *)userdata;
71 size_t copied = size * nmemb;
72 if (copied > buf->readsize)
73 copied = buf->readsize;
75 memcpy(ptr, buf->readbuf, copied);
76 buf->readbuf += copied;
77 buf->readsize -= copied;
82 static size_t curl_writefunc(void *ptr, size_t size, size_t nmemb,
85 GString *buf = (GString *)userdata;
87 g_string_append_len(buf, ptr, size * nmemb);
91 /* Compute the signature for a request to Azure and add it to the headers. */
92 static void azure_compute_signature(AzureStore *store,
94 const char *method, const char *path)
96 if (g_hash_table_lookup(headers, "Date") == NULL) {
102 strftime(timebuf, sizeof(timebuf), "%a, %d %b %Y %H:%M:%S GMT", &now);
103 g_hash_table_insert(headers, g_strdup("Date"), g_strdup(timebuf));
106 g_hash_table_insert(headers, g_strdup("x-ms-version"),
107 g_strdup(AZURE_API_VERSION));
109 GString *to_sign = g_string_new("");
110 g_string_append_printf(to_sign, "%s\n", method);
111 for (const char **h = signature_headers; *h != NULL; h++) {
112 const char *val = g_hash_table_lookup(headers, *h);
113 g_string_append_printf(to_sign, "%s\n", val ? val : "");
116 GList *extra_headers = NULL;
117 g_hash_table_foreach(headers, (GHFunc)get_extra_headers, &extra_headers);
118 extra_headers = g_list_sort(extra_headers, (GCompareFunc)g_strcmp0);
119 while (extra_headers != NULL) {
120 g_string_append(to_sign, extra_headers->data);
121 g_free(extra_headers->data);
122 extra_headers = g_list_delete_link(extra_headers, extra_headers);
125 /* FIXME: Doesn't handle query parameters (after '?') */
126 g_string_append_printf(to_sign, "/%s/%s/%s",
127 store->account, store->container, path);
129 /* Compute an HMAC-SHA-256 of the encoded parameters */
132 status = gcry_md_open(&handle, GCRY_MD_SHA256, GCRY_MD_FLAG_HMAC);
133 g_assert(status == 0);
134 status = gcry_md_setkey(handle, store->key, store->key_len);
135 g_assert(status == 0);
136 gcry_md_write(handle, to_sign->str, to_sign->len);
137 unsigned char *digest = gcry_md_read(handle, GCRY_MD_SHA256);
138 gchar *signature = g_base64_encode(digest,
139 gcry_md_get_algo_dlen(GCRY_MD_SHA256));
140 g_hash_table_insert(headers, g_strdup("Authorization"),
141 g_strdup_printf("SharedKey %s:%s",
142 store->account, signature));
144 gcry_md_close(handle);
145 g_string_free(to_sign, TRUE);
148 /* Submit an HTTP request using CURL. Takes as input the Azure storage backend
149 * we are acting for, as well as the method (GET, PUT, etc.), HTTP path, other
150 * HTTP headers, and an optional body. If body is not NULL, an empty body is
151 * sent. This will compute an Azure authentication signature before sending
153 static BlueSkyRCStr *submit_request(AzureStore *store,
159 BlueSkyRCStr *result = NULL;
161 g_hash_table_insert(headers,
162 g_strdup("Content-Length"),
163 g_strdup_printf("%zd", body != NULL ? body->len : 0));
165 if (body != 0 && body->len > 0) {
166 GChecksum *csum = g_checksum_new(G_CHECKSUM_MD5);
167 g_checksum_update(csum, (uint8_t *)body->data, body->len);
169 gsize md5_len = sizeof(md5);
170 g_checksum_get_digest(csum, md5, &md5_len);
171 g_hash_table_insert(headers,
172 g_strdup("Content-MD5"),
173 g_base64_encode(md5, md5_len));
174 g_checksum_free(csum);
177 azure_compute_signature(store, headers, method, path);
180 CURL *curl = curl_easy_init();
182 #define curl_easy_setopt_safe(opt, val) \
183 if ((status = curl_easy_setopt(curl, (opt), (val))) != CURLE_OK) { \
184 fprintf(stderr, "CURL error: %s!\n", curl_easy_strerror(status)); \
188 curl_easy_setopt_safe(CURLOPT_NOSIGNAL, 1);
189 curl_easy_setopt_safe(CURLOPT_NOPROGRESS, 1);
190 curl_easy_setopt_safe(CURLOPT_NETRC, CURL_NETRC_IGNORED);
191 curl_easy_setopt_safe(CURLOPT_FOLLOWLOCATION, 1);
192 curl_easy_setopt_safe(CURLOPT_MAXREDIRS, 10);
194 curl_easy_setopt_safe(CURLOPT_HEADERFUNCTION, curl_writefunc);
195 curl_easy_setopt_safe(CURLOPT_HEADERDATA, NULL);
197 struct curlbuf readbuf;
199 readbuf.readbuf = body->data;
200 readbuf.readsize = body->len;
202 curl_easy_setopt_safe(CURLOPT_READFUNCTION, curl_readfunc);
203 curl_easy_setopt_safe(CURLOPT_READDATA, body ? &readbuf : NULL);
205 GString *result_body = g_string_new("");
206 curl_easy_setopt_safe(CURLOPT_WRITEFUNCTION, curl_writefunc);
207 curl_easy_setopt_safe(CURLOPT_WRITEDATA, result_body);
209 struct curl_slist *curl_headers = NULL;
210 g_hash_table_foreach(headers, (GHFunc)get_curl_headers, &curl_headers);
211 curl_easy_setopt_safe(CURLOPT_HTTPHEADER, curl_headers);
213 char *uri = g_strdup_printf("http://%s.blob.core.windows.net/%s/%s",
214 store->account, store->container, path);
215 printf("URI: %s\n", uri);
216 curl_easy_setopt_safe(CURLOPT_URL, uri);
218 if (strcmp(method, "GET") == 0) {
219 /* nothing special needed */
220 } else if (strcmp(method, "PUT") == 0) {
221 curl_easy_setopt_safe(CURLOPT_UPLOAD, 1);
222 } else if (strcmp(method, "DELETE") == 0) {
223 curl_easy_setopt_safe(CURLOPT_CUSTOMREQUEST, "DELETE");
226 status = curl_easy_perform(curl);
228 fprintf(stderr, "CURL error: %s!\n", curl_easy_strerror(status));
232 long response_code = 0;
233 status = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response_code);
235 fprintf(stderr, "CURL error: %s!\n", curl_easy_strerror(status));
239 if (response_code / 100 == 2) {
240 result = bluesky_string_new_from_gstring(result_body);
243 fprintf(stderr, "HTTP response code: %ld!\n", response_code);
248 if (result != NULL && result_body != NULL)
249 g_string_free(result_body, TRUE);
250 curl_easy_cleanup(curl);
251 curl_slist_free_all(curl_headers);
257 static void azurestore_task(gpointer a, gpointer s)
259 BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a;
260 AzureStore *store = (AzureStore *)s;
262 async->status = ASYNC_RUNNING;
263 async->exec_time = bluesky_now_hires();
265 GHashTable *headers = g_hash_table_new_full(g_str_hash, g_str_equal,
268 BlueSkyRCStr *result = NULL;
270 if (async->op == STORE_OP_GET) {
271 /* FIXME: We ought to check that the response returned the requested
273 if (async->start != 0 && async->len != 0) {
274 g_hash_table_insert(headers,
276 g_strdup_printf("%zd-%zd", async->start,
277 async->start + async->len));
278 async->range_done = TRUE;
279 } else if (async->start != 0) {
280 g_hash_table_insert(headers,
282 g_strdup_printf("%zd-", async->start));
283 async->range_done = TRUE;
285 result = submit_request(store, "GET", async->key, headers, NULL);
286 if (result != NULL) {
287 async->data = result;
290 } else if (async->op == STORE_OP_PUT) {
291 g_hash_table_insert(headers,
292 g_strdup("x-ms-blob-type"),
293 g_strdup("BlockBlob"));
294 g_hash_table_insert(headers,
295 g_strdup("Transfer-Encoding"),
297 result = submit_request(store, "PUT", async->key,
298 headers, async->data);
299 if (result != NULL) {
302 bluesky_string_unref(result);
305 bluesky_store_async_mark_complete(async);
306 bluesky_store_async_unref(async);
307 g_hash_table_unref(headers);
310 static gpointer azurestore_new(const gchar *path)
312 AzureStore *store = g_new(AzureStore, 1);
313 store->thread_pool = g_thread_pool_new(azurestore_task, store, -1, FALSE,
315 if (path == NULL || strlen(path) == 0)
316 store->container = g_strdup("bluesky");
318 store->container = g_strdup(path);
320 store->account = g_strdup(getenv("AZURE_ACCOUNT_NAME"));
322 const char *key = getenv("AZURE_SECRET_KEY");
323 store->key = g_base64_decode(key, &store->key_len);
325 g_print("Initializing Azure with account %s, container %s\n",
326 store->account, store->container);
331 static void azurestore_destroy(gpointer store)
333 /* TODO: Clean up resources */
336 static void azurestore_submit(gpointer s, BlueSkyStoreAsync *async)
338 AzureStore *store = (AzureStore *)s;
339 g_return_if_fail(async->status == ASYNC_NEW);
340 g_return_if_fail(async->op != STORE_OP_NONE);
345 async->status = ASYNC_PENDING;
346 bluesky_store_async_ref(async);
347 g_thread_pool_push(store->thread_pool, async, NULL);
351 g_warning("Uknown operation type for AzureStore: %d\n", async->op);
352 bluesky_store_async_mark_complete(async);
357 static void azurestore_cleanup(gpointer store, BlueSkyStoreAsync *async)
361 static char *azurestore_lookup_last(gpointer s, const char *prefix)
366 static BlueSkyStoreImplementation store_impl = {
367 .create = azurestore_new,
368 .destroy = azurestore_destroy,
369 .submit = azurestore_submit,
370 .cleanup = azurestore_cleanup,
371 .lookup_last = azurestore_lookup_last,
374 void bluesky_store_init_azure(void)
376 bluesky_store_register(&store_impl, "azure");
381 enum { S3_GET, S3_PUT } op;
403 static S3Status s3store_get_handler(int bufferSize, const char *buffer,
406 struct get_info *info = (struct get_info *)callbackData;
407 g_string_append_len(info->buf, buffer, bufferSize);
411 static int s3store_put_handler(int bufferSize, char *buffer,
414 struct put_info *info = (struct put_info *)callbackData;
415 gint bytes = MIN(bufferSize, (int)(info->val->len - info->offset));
416 memcpy(buffer, (char *)info->val->data + info->offset, bytes);
417 info->offset += bytes;
421 static S3Status s3store_properties_callback(const S3ResponseProperties *properties,
427 static void s3store_response_callback(S3Status status,
428 const S3ErrorDetails *errorDetails,
431 struct get_info *info = (struct get_info *)callbackData;
437 if (errorDetails != NULL && errorDetails->message != NULL) {
438 g_print(" Error message: %s\n", errorDetails->message);
442 static void s3store_task(gpointer a, gpointer s)
444 BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a;
445 S3Store *store = (S3Store *)s;
447 async->status = ASYNC_RUNNING;
448 async->exec_time = bluesky_now_hires();
450 if (async->op == STORE_OP_GET) {
451 struct get_info info;
452 info.buf = g_string_new("");
455 struct S3GetObjectHandler handler;
456 handler.responseHandler.propertiesCallback = s3store_properties_callback;
457 handler.responseHandler.completeCallback = s3store_response_callback;
458 handler.getObjectDataCallback = s3store_get_handler;
460 S3_get_object(&store->bucket, async->key, NULL,
461 async->start, async->len, NULL, &handler, &info);
462 async->range_done = TRUE;
465 async->data = bluesky_string_new_from_gstring(info.buf);
468 g_string_free(info.buf, TRUE);
471 } else if (async->op == STORE_OP_PUT) {
472 struct put_info info;
474 info.val = async->data;
477 struct S3PutObjectHandler handler;
478 handler.responseHandler.propertiesCallback
479 = s3store_properties_callback;
480 handler.responseHandler.completeCallback = s3store_response_callback;
481 handler.putObjectDataCallback = s3store_put_handler;
483 S3_put_object(&store->bucket, async->key, async->data->len, NULL, NULL,
489 g_warning("Error completing S3 put operation; client must retry!");
493 bluesky_store_async_mark_complete(async);
494 bluesky_store_async_unref(async);
497 static S3Status s3store_list_handler(int isTruncated,
498 const char *nextMarker,
500 const S3ListBucketContent *contents,
501 int commonPrefixesCount,
502 const char **commonPrefixes,
505 struct list_info *info = (struct list_info *)callbackData;
506 if (contentsCount > 0) {
507 g_free(info->last_entry);
508 info->last_entry = g_strdup(contents[contentsCount - 1].key);
510 info->truncated = isTruncated;
514 static char *s3store_lookup_last(gpointer s, const char *prefix)
516 S3Store *store = (S3Store *)s;
517 struct list_info info = {0, NULL, FALSE};
519 struct S3ListBucketHandler handler;
520 handler.responseHandler.propertiesCallback
521 = s3store_properties_callback;
522 handler.responseHandler.completeCallback = s3store_response_callback;
523 handler.listBucketCallback = s3store_list_handler;
528 S3_list_bucket(&store->bucket, prefix, marker, NULL, 1024, NULL,
531 marker = g_strdup(info.last_entry);
532 g_print("Last key: %s\n", info.last_entry);
533 } while (info.truncated);
537 return info.last_entry;
540 static gpointer s3store_new(const gchar *path)
542 S3Store *store = g_new(S3Store, 1);
543 store->thread_pool = g_thread_pool_new(s3store_task, store, -1, FALSE,
545 if (path == NULL || strlen(path) == 0)
546 store->bucket.bucketName = "mvrable-bluesky";
548 store->bucket.bucketName = g_strdup(path);
549 store->bucket.protocol = S3ProtocolHTTP;
550 store->bucket.uriStyle = S3UriStylePath;
551 store->bucket.accessKeyId = getenv("AWS_ACCESS_KEY_ID");
552 store->bucket.secretAccessKey = getenv("AWS_SECRET_ACCESS_KEY");
554 const char *key = getenv("BLUESKY_KEY");
556 g_error("Encryption key not defined; please set BLUESKY_KEY environment variable");
560 bluesky_crypt_hash_key(key, store->encryption_key);
562 g_print("Initializing S3 with bucket %s, access key %s, encryption key %s\n",
563 store->bucket.bucketName, store->bucket.accessKeyId, key);
568 static void s3store_destroy(gpointer store)
573 static void s3store_submit(gpointer s, BlueSkyStoreAsync *async)
575 S3Store *store = (S3Store *)s;
576 g_return_if_fail(async->status == ASYNC_NEW);
577 g_return_if_fail(async->op != STORE_OP_NONE);
582 async->status = ASYNC_PENDING;
583 bluesky_store_async_ref(async);
584 g_thread_pool_push(store->thread_pool, async, NULL);
588 g_warning("Uknown operation type for S3Store: %d\n", async->op);
589 bluesky_store_async_mark_complete(async);
594 static void s3store_cleanup(gpointer store, BlueSkyStoreAsync *async)
596 GString *buf = (GString *)async->store_private;
599 g_string_free(buf, TRUE);
600 async->store_private = NULL;
604 static BlueSkyStoreImplementation store_impl = {
605 .create = s3store_new,
606 .destroy = s3store_destroy,
607 .submit = s3store_submit,
608 .cleanup = s3store_cleanup,
609 .lookup_last = s3store_lookup_last,
612 void bluesky_store_init_s3(void)
614 S3_initialize(NULL, S3_INIT_ALL);
615 bluesky_store_register(&store_impl, "s3");