1 /* Blue Sky: File Systems in the Cloud
3 * Copyright (C) 2009 The Regents of the University of California
4 * Written by Michael Vrable <mvrable@cs.ucsd.edu>
14 #include <curl/curl.h>
16 #include "bluesky-private.h"
19 #define AZURE_API_VERSION "2009-09-19"
21 /* Fixed headers that are used in calculating the request signature for Azure,
22 * in the order that they are included. */
23 static const char *signature_headers[] = {
24 "Content-Encoding", "Content-Language", "Content-Length", "Content-MD5",
25 "Content-Type", "Date", "If-Modified-Since", "If-Match", "If-None-Match",
26 "If-Unmodified-Since", "Range", NULL
29 /* Prototype Windows Azure backend for BlueSky. This is intended to be
30 * minimally functional, but could use additional work for production use. */
32 #define MAX_IDLE_CONNECTIONS 8
35 GThreadPool *thread_pool;
36 char *account, *container;
40 /* A pool of available idle connections that could be used. */
42 GMutex *curl_pool_lock;
45 static CURL *get_connection(AzureStore *store)
49 g_mutex_lock(store->curl_pool_lock);
50 if (!g_queue_is_empty(store->curl_pool)) {
51 curl = (CURL *)(g_queue_pop_head(store->curl_pool));
53 g_mutex_unlock(store->curl_pool_lock);
56 curl = curl_easy_init();
61 static void put_connection(AzureStore *store, CURL *curl)
63 g_mutex_lock(store->curl_pool_lock);
64 g_queue_push_head(store->curl_pool, curl);
65 while (g_queue_get_length(store->curl_pool) > MAX_IDLE_CONNECTIONS) {
66 curl = (CURL *)(g_queue_pop_tail(store->curl_pool));
67 curl_easy_cleanup(curl);
69 g_mutex_unlock(store->curl_pool_lock);
72 static void get_extra_headers(gchar *key, gchar *value, GList **headers)
74 key = g_ascii_strdown(key, strlen(key));
75 if (strncmp(key, "x-ms-", strlen("x-ms-")) == 0) {
76 *headers = g_list_prepend(*headers,
77 g_strdup_printf("%s:%s\n", key, value));
82 static void get_curl_headers(gchar *key, gchar *value,
83 struct curl_slist **curl_headers)
85 char *line = g_strdup_printf("%s: %s", key, value);
86 *curl_headers = curl_slist_append(*curl_headers, line);
96 static size_t curl_readfunc(void *ptr, size_t size, size_t nmemb,
99 struct curlbuf *buf = (struct curlbuf *)userdata;
104 size_t copied = size * nmemb;
105 if (copied > buf->readsize)
106 copied = buf->readsize;
108 memcpy(ptr, buf->readbuf, copied);
109 buf->readbuf += copied;
110 buf->readsize -= copied;
115 static size_t curl_writefunc(void *ptr, size_t size, size_t nmemb,
118 GString *buf = (GString *)userdata;
120 g_string_append_len(buf, ptr, size * nmemb);
124 /* Compute the signature for a request to Azure and add it to the headers. */
125 static void azure_compute_signature(AzureStore *store,
127 const char *method, const char *path)
129 if (g_hash_table_lookup(headers, "Date") == NULL) {
135 strftime(timebuf, sizeof(timebuf), "%a, %d %b %Y %H:%M:%S GMT", &now);
136 g_hash_table_insert(headers, g_strdup("Date"), g_strdup(timebuf));
139 g_hash_table_insert(headers, g_strdup("x-ms-version"),
140 g_strdup(AZURE_API_VERSION));
142 GString *to_sign = g_string_new("");
143 g_string_append_printf(to_sign, "%s\n", method);
144 for (const char **h = signature_headers; *h != NULL; h++) {
145 const char *val = g_hash_table_lookup(headers, *h);
146 g_string_append_printf(to_sign, "%s\n", val ? val : "");
149 GList *extra_headers = NULL;
150 g_hash_table_foreach(headers, (GHFunc)get_extra_headers, &extra_headers);
151 extra_headers = g_list_sort(extra_headers, (GCompareFunc)g_strcmp0);
152 while (extra_headers != NULL) {
153 g_string_append(to_sign, extra_headers->data);
154 g_free(extra_headers->data);
155 extra_headers = g_list_delete_link(extra_headers, extra_headers);
158 /* FIXME: Doesn't handle query parameters (after '?') */
159 g_string_append_printf(to_sign, "/%s/%s/%s",
160 store->account, store->container, path);
162 /* Compute an HMAC-SHA-256 of the encoded parameters */
165 status = gcry_md_open(&handle, GCRY_MD_SHA256, GCRY_MD_FLAG_HMAC);
166 g_assert(status == 0);
167 status = gcry_md_setkey(handle, store->key, store->key_len);
168 g_assert(status == 0);
169 gcry_md_write(handle, to_sign->str, to_sign->len);
170 unsigned char *digest = gcry_md_read(handle, GCRY_MD_SHA256);
171 gchar *signature = g_base64_encode(digest,
172 gcry_md_get_algo_dlen(GCRY_MD_SHA256));
173 g_hash_table_insert(headers, g_strdup("Authorization"),
174 g_strdup_printf("SharedKey %s:%s",
175 store->account, signature));
177 gcry_md_close(handle);
178 g_string_free(to_sign, TRUE);
181 /* Submit an HTTP request using CURL. Takes as input the Azure storage backend
182 * we are acting for, as well as the method (GET, PUT, etc.), HTTP path, other
183 * HTTP headers, and an optional body. If body is not NULL, an empty body is
184 * sent. This will compute an Azure authentication signature before sending
186 static BlueSkyRCStr *submit_request(AzureStore *store,
193 BlueSkyRCStr *result = NULL;
195 g_hash_table_insert(headers,
196 g_strdup("Content-Length"),
197 g_strdup_printf("%zd", body != NULL ? body->len : 0));
199 if (body != 0 && body->len > 0) {
200 GChecksum *csum = g_checksum_new(G_CHECKSUM_MD5);
201 g_checksum_update(csum, (uint8_t *)body->data, body->len);
203 gsize md5_len = sizeof(md5);
204 g_checksum_get_digest(csum, md5, &md5_len);
205 g_hash_table_insert(headers,
206 g_strdup("Content-MD5"),
207 g_base64_encode(md5, md5_len));
208 g_checksum_free(csum);
211 azure_compute_signature(store, headers, method, path);
215 #define curl_easy_setopt_safe(opt, val) \
216 if ((status = curl_easy_setopt(curl, (opt), (val))) != CURLE_OK) { \
217 fprintf(stderr, "CURL error: %s!\n", curl_easy_strerror(status)); \
221 curl_easy_setopt_safe(CURLOPT_NOSIGNAL, 1);
222 curl_easy_setopt_safe(CURLOPT_NOPROGRESS, 1);
223 curl_easy_setopt_safe(CURLOPT_NETRC, CURL_NETRC_IGNORED);
224 curl_easy_setopt_safe(CURLOPT_FOLLOWLOCATION, 1);
225 curl_easy_setopt_safe(CURLOPT_MAXREDIRS, 10);
227 curl_easy_setopt_safe(CURLOPT_HEADERFUNCTION, curl_writefunc);
228 curl_easy_setopt_safe(CURLOPT_HEADERDATA, NULL);
230 struct curlbuf readbuf;
232 readbuf.readbuf = body->data;
233 readbuf.readsize = body->len;
235 curl_easy_setopt_safe(CURLOPT_READFUNCTION, curl_readfunc);
236 curl_easy_setopt_safe(CURLOPT_READDATA, body ? &readbuf : NULL);
238 GString *result_body = g_string_new("");
239 curl_easy_setopt_safe(CURLOPT_WRITEFUNCTION, curl_writefunc);
240 curl_easy_setopt_safe(CURLOPT_WRITEDATA, result_body);
242 struct curl_slist *curl_headers = NULL;
243 g_hash_table_foreach(headers, (GHFunc)get_curl_headers, &curl_headers);
244 curl_easy_setopt_safe(CURLOPT_HTTPHEADER, curl_headers);
246 char *uri = g_strdup_printf("http://%s.blob.core.windows.net/%s/%s",
247 store->account, store->container, path);
248 printf("URI: %s\n", uri);
249 curl_easy_setopt_safe(CURLOPT_URL, uri);
251 if (strcmp(method, "GET") == 0) {
252 /* nothing special needed */
253 } else if (strcmp(method, "PUT") == 0) {
254 curl_easy_setopt_safe(CURLOPT_UPLOAD, 1);
255 } else if (strcmp(method, "DELETE") == 0) {
256 curl_easy_setopt_safe(CURLOPT_CUSTOMREQUEST, "DELETE");
259 status = curl_easy_perform(curl);
261 fprintf(stderr, "CURL error: %s!\n", curl_easy_strerror(status));
265 long response_code = 0;
266 status = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response_code);
268 fprintf(stderr, "CURL error: %s!\n", curl_easy_strerror(status));
272 if (response_code / 100 == 2) {
273 result = bluesky_string_new_from_gstring(result_body);
276 fprintf(stderr, "HTTP response code: %ld!\n", response_code);
281 if (result != NULL && result_body != NULL)
282 g_string_free(result_body, TRUE);
283 curl_easy_reset(curl);
284 curl_slist_free_all(curl_headers);
290 static void azurestore_task(gpointer a, gpointer s)
292 BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a;
293 AzureStore *store = (AzureStore *)s;
295 async->status = ASYNC_RUNNING;
296 async->exec_time = bluesky_now_hires();
298 GHashTable *headers = g_hash_table_new_full(g_str_hash, g_str_equal,
301 BlueSkyRCStr *result = NULL;
302 CURL *curl = get_connection(store);
304 if (async->op == STORE_OP_GET) {
305 /* FIXME: We ought to check that the response returned the requested
307 if (async->start != 0 && async->len != 0) {
308 g_hash_table_insert(headers,
310 g_strdup_printf("%zd-%zd", async->start,
311 async->start + async->len));
312 async->range_done = TRUE;
313 } else if (async->start != 0) {
314 g_hash_table_insert(headers,
316 g_strdup_printf("%zd-", async->start));
317 async->range_done = TRUE;
319 result = submit_request(store, curl, "GET", async->key, headers, NULL);
320 if (result != NULL) {
321 async->data = result;
324 } else if (async->op == STORE_OP_PUT) {
325 g_hash_table_insert(headers,
326 g_strdup("x-ms-blob-type"),
327 g_strdup("BlockBlob"));
328 g_hash_table_insert(headers,
329 g_strdup("Transfer-Encoding"),
331 result = submit_request(store, curl, "PUT", async->key,
332 headers, async->data);
333 if (result != NULL) {
336 bluesky_string_unref(result);
339 bluesky_store_async_mark_complete(async);
340 bluesky_store_async_unref(async);
341 g_hash_table_unref(headers);
342 put_connection(store, curl);
345 static gpointer azurestore_new(const gchar *path)
347 AzureStore *store = g_new(AzureStore, 1);
348 store->thread_pool = g_thread_pool_new(azurestore_task, store, -1, FALSE,
350 if (path == NULL || strlen(path) == 0)
351 store->container = g_strdup("bluesky");
353 store->container = g_strdup(path);
355 store->account = g_strdup(getenv("AZURE_ACCOUNT_NAME"));
357 const char *key = getenv("AZURE_SECRET_KEY");
358 store->key = g_base64_decode(key, &store->key_len);
360 g_print("Initializing Azure with account %s, container %s\n",
361 store->account, store->container);
363 store->curl_pool = g_queue_new();
364 store->curl_pool_lock = g_mutex_new();
369 static void azurestore_destroy(gpointer store)
371 /* TODO: Clean up resources */
374 static void azurestore_submit(gpointer s, BlueSkyStoreAsync *async)
376 AzureStore *store = (AzureStore *)s;
377 g_return_if_fail(async->status == ASYNC_NEW);
378 g_return_if_fail(async->op != STORE_OP_NONE);
383 async->status = ASYNC_PENDING;
384 bluesky_store_async_ref(async);
385 g_thread_pool_push(store->thread_pool, async, NULL);
389 g_warning("Uknown operation type for AzureStore: %d\n", async->op);
390 bluesky_store_async_mark_complete(async);
395 static void azurestore_cleanup(gpointer store, BlueSkyStoreAsync *async)
399 static char *azurestore_lookup_last(gpointer s, const char *prefix)
404 static BlueSkyStoreImplementation store_impl = {
405 .create = azurestore_new,
406 .destroy = azurestore_destroy,
407 .submit = azurestore_submit,
408 .cleanup = azurestore_cleanup,
409 .lookup_last = azurestore_lookup_last,
412 void bluesky_store_init_azure(void)
414 bluesky_store_register(&store_impl, "azure");
419 enum { S3_GET, S3_PUT } op;
441 static S3Status s3store_get_handler(int bufferSize, const char *buffer,
444 struct get_info *info = (struct get_info *)callbackData;
445 g_string_append_len(info->buf, buffer, bufferSize);
449 static int s3store_put_handler(int bufferSize, char *buffer,
452 struct put_info *info = (struct put_info *)callbackData;
453 gint bytes = MIN(bufferSize, (int)(info->val->len - info->offset));
454 memcpy(buffer, (char *)info->val->data + info->offset, bytes);
455 info->offset += bytes;
459 static S3Status s3store_properties_callback(const S3ResponseProperties *properties,
465 static void s3store_response_callback(S3Status status,
466 const S3ErrorDetails *errorDetails,
469 struct get_info *info = (struct get_info *)callbackData;
475 if (errorDetails != NULL && errorDetails->message != NULL) {
476 g_print(" Error message: %s\n", errorDetails->message);
480 static void s3store_task(gpointer a, gpointer s)
482 BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a;
483 S3Store *store = (S3Store *)s;
485 async->status = ASYNC_RUNNING;
486 async->exec_time = bluesky_now_hires();
488 if (async->op == STORE_OP_GET) {
489 struct get_info info;
490 info.buf = g_string_new("");
493 struct S3GetObjectHandler handler;
494 handler.responseHandler.propertiesCallback = s3store_properties_callback;
495 handler.responseHandler.completeCallback = s3store_response_callback;
496 handler.getObjectDataCallback = s3store_get_handler;
498 S3_get_object(&store->bucket, async->key, NULL,
499 async->start, async->len, NULL, &handler, &info);
500 async->range_done = TRUE;
503 async->data = bluesky_string_new_from_gstring(info.buf);
506 g_string_free(info.buf, TRUE);
509 } else if (async->op == STORE_OP_PUT) {
510 struct put_info info;
512 info.val = async->data;
515 struct S3PutObjectHandler handler;
516 handler.responseHandler.propertiesCallback
517 = s3store_properties_callback;
518 handler.responseHandler.completeCallback = s3store_response_callback;
519 handler.putObjectDataCallback = s3store_put_handler;
521 S3_put_object(&store->bucket, async->key, async->data->len, NULL, NULL,
527 g_warning("Error completing S3 put operation; client must retry!");
531 bluesky_store_async_mark_complete(async);
532 bluesky_store_async_unref(async);
535 static S3Status s3store_list_handler(int isTruncated,
536 const char *nextMarker,
538 const S3ListBucketContent *contents,
539 int commonPrefixesCount,
540 const char **commonPrefixes,
543 struct list_info *info = (struct list_info *)callbackData;
544 if (contentsCount > 0) {
545 g_free(info->last_entry);
546 info->last_entry = g_strdup(contents[contentsCount - 1].key);
548 info->truncated = isTruncated;
552 static char *s3store_lookup_last(gpointer s, const char *prefix)
554 S3Store *store = (S3Store *)s;
555 struct list_info info = {0, NULL, FALSE};
557 struct S3ListBucketHandler handler;
558 handler.responseHandler.propertiesCallback
559 = s3store_properties_callback;
560 handler.responseHandler.completeCallback = s3store_response_callback;
561 handler.listBucketCallback = s3store_list_handler;
566 S3_list_bucket(&store->bucket, prefix, marker, NULL, 1024, NULL,
569 marker = g_strdup(info.last_entry);
570 g_print("Last key: %s\n", info.last_entry);
571 } while (info.truncated);
575 return info.last_entry;
578 static gpointer s3store_new(const gchar *path)
580 S3Store *store = g_new(S3Store, 1);
581 store->thread_pool = g_thread_pool_new(s3store_task, store, -1, FALSE,
583 if (path == NULL || strlen(path) == 0)
584 store->bucket.bucketName = "mvrable-bluesky";
586 store->bucket.bucketName = g_strdup(path);
587 store->bucket.protocol = S3ProtocolHTTP;
588 store->bucket.uriStyle = S3UriStylePath;
589 store->bucket.accessKeyId = getenv("AWS_ACCESS_KEY_ID");
590 store->bucket.secretAccessKey = getenv("AWS_SECRET_ACCESS_KEY");
592 const char *key = getenv("BLUESKY_KEY");
594 g_error("Encryption key not defined; please set BLUESKY_KEY environment variable");
598 bluesky_crypt_hash_key(key, store->encryption_key);
600 g_print("Initializing S3 with bucket %s, access key %s, encryption key %s\n",
601 store->bucket.bucketName, store->bucket.accessKeyId, key);
606 static void s3store_destroy(gpointer store)
611 static void s3store_submit(gpointer s, BlueSkyStoreAsync *async)
613 S3Store *store = (S3Store *)s;
614 g_return_if_fail(async->status == ASYNC_NEW);
615 g_return_if_fail(async->op != STORE_OP_NONE);
620 async->status = ASYNC_PENDING;
621 bluesky_store_async_ref(async);
622 g_thread_pool_push(store->thread_pool, async, NULL);
626 g_warning("Uknown operation type for S3Store: %d\n", async->op);
627 bluesky_store_async_mark_complete(async);
632 static void s3store_cleanup(gpointer store, BlueSkyStoreAsync *async)
634 GString *buf = (GString *)async->store_private;
637 g_string_free(buf, TRUE);
638 async->store_private = NULL;
642 static BlueSkyStoreImplementation store_impl = {
643 .create = s3store_new,
644 .destroy = s3store_destroy,
645 .submit = s3store_submit,
646 .cleanup = s3store_cleanup,
647 .lookup_last = s3store_lookup_last,
650 void bluesky_store_init_s3(void)
652 S3_initialize(NULL, S3_INIT_ALL);
653 bluesky_store_register(&store_impl, "s3");