From: Michael Vrable Date: Thu, 24 Feb 2011 23:54:18 +0000 (-0800) Subject: Add Windows Azure support to BlueSky. X-Git-Url: https://git.vrable.net/?a=commitdiff_plain;h=fe8e013eff37a1f6df8f5a55fba30ede4f9618be;p=bluesky.git Add Windows Azure support to BlueSky. This is not much tested but seems to work at a basic level. --- diff --git a/bluesky/CMakeLists.txt b/bluesky/CMakeLists.txt index 2891f93..41d4b90 100644 --- a/bluesky/CMakeLists.txt +++ b/bluesky/CMakeLists.txt @@ -4,8 +4,9 @@ link_directories("${LIBS3_BUILD_DIR}/lib" ${KVSTORE_DIR}) add_library(bluesky SHARED cache.c cleaner.c cloudlog.c crc32c.c crypto.c debug.c dir.c file.c - imap.c init.c inode.c log.c serialize.c store.c store-bdb.c - store-kv.cc store-multi.c store-s3.c store-simple.c util.c) + imap.c init.c inode.c log.c serialize.c store.c store-azure.c + store-bdb.c store-kv.cc store-multi.c store-s3.c store-simple.c + util.c) add_executable(bluesky-test main.c) set(CMAKE_C_FLAGS "-Wall -std=gnu99 ${CMAKE_C_FLAGS}") diff --git a/bluesky/init.c b/bluesky/init.c index df33709..e2b35c6 100644 --- a/bluesky/init.c +++ b/bluesky/init.c @@ -56,6 +56,7 @@ static struct { /* BlueSky library initialization. */ void bluesky_store_init_s3(void); +void bluesky_store_init_azure(void); void bluesky_store_init_kv(void); void bluesky_store_init_multi(void); void bluesky_store_init_bdb(void); @@ -79,6 +80,7 @@ void bluesky_init(void) bluesky_store_init(); bluesky_store_init_kv(); bluesky_store_init_s3(); + bluesky_store_init_azure(); bluesky_store_init_multi(); bluesky_store_init_bdb(); bluesky_store_init_simple(); diff --git a/bluesky/store-azure.c b/bluesky/store-azure.c new file mode 100644 index 0000000..e7a2b71 --- /dev/null +++ b/bluesky/store-azure.c @@ -0,0 +1,603 @@ +/* Blue Sky: File Systems in the Cloud + * + * Copyright (C) 2009 The Regents of the University of California + * Written by Michael Vrable + * + * TODO: Licensing + */ + +#include +#include +#include +#include +#include +#include + +#include "bluesky-private.h" +#include "libs3.h" + +#define AZURE_API_VERSION "2009-09-19" + +/* Fixed headers that are used in calculating the request signature for Azure, + * in the order that they are included. */ +static const char *signature_headers[] = { + "Content-Encoding", "Content-Language", "Content-Length", "Content-MD5", + "Content-Type", "Date", "If-Modified-Since", "If-Match", "If-None-Match", + "If-Unmodified-Since", "Range", NULL +}; + +/* Prototype Windows Azure backend for BlueSky. This is intended to be + * minimally functional, but could use additional work for production use. */ + +typedef struct { + GThreadPool *thread_pool; + char *account, *container; + uint8_t *key; + size_t key_len; +} AzureStore; + +static void get_extra_headers(gchar *key, gchar *value, GList **headers) +{ + key = g_ascii_strdown(key, strlen(key)); + if (strncmp(key, "x-ms-", strlen("x-ms-")) == 0) { + *headers = g_list_prepend(*headers, + g_strdup_printf("%s:%s\n", key, value)); + } + g_free(key); +} + +static void get_curl_headers(gchar *key, gchar *value, + struct curl_slist **curl_headers) +{ + char *line = g_strdup_printf("%s: %s", key, value); + *curl_headers = curl_slist_append(*curl_headers, line); + g_free(line); +} + +struct curlbuf { + /* For reading */ + const char *readbuf; + size_t readsize; +}; + +static size_t curl_readfunc(void *ptr, size_t size, size_t nmemb, + void *userdata) +{ + struct curlbuf *buf = (struct curlbuf *)userdata; + + if (buf == NULL) + return 0; + + size_t copied = size * nmemb; + if (copied > buf->readsize) + copied = buf->readsize; + + memcpy(ptr, buf->readbuf, copied); + buf->readbuf += copied; + buf->readsize -= copied; + + return copied; +} + +static size_t curl_writefunc(void *ptr, size_t size, size_t nmemb, + void *userdata) +{ + GString *buf = (GString *)userdata; + if (buf != NULL) + g_string_append_len(buf, ptr, size * nmemb); + return size * nmemb; +} + +/* Compute the signature for a request to Azure and add it to the headers. */ +static void azure_compute_signature(AzureStore *store, + GHashTable *headers, + const char *method, const char *path) +{ + if (g_hash_table_lookup(headers, "Date") == NULL) { + time_t t; + struct tm now; + char timebuf[4096]; + time(&t); + gmtime_r(&t, &now); + strftime(timebuf, sizeof(timebuf), "%a, %d %b %Y %H:%M:%S GMT", &now); + g_hash_table_insert(headers, g_strdup("Date"), g_strdup(timebuf)); + } + + g_hash_table_insert(headers, g_strdup("x-ms-version"), + g_strdup(AZURE_API_VERSION)); + + GString *to_sign = g_string_new(""); + g_string_append_printf(to_sign, "%s\n", method); + for (const char **h = signature_headers; *h != NULL; h++) { + const char *val = g_hash_table_lookup(headers, *h); + g_string_append_printf(to_sign, "%s\n", val ? val : ""); + } + + GList *extra_headers = NULL; + g_hash_table_foreach(headers, (GHFunc)get_extra_headers, &extra_headers); + extra_headers = g_list_sort(extra_headers, (GCompareFunc)g_strcmp0); + while (extra_headers != NULL) { + g_string_append(to_sign, extra_headers->data); + g_free(extra_headers->data); + extra_headers = g_list_delete_link(extra_headers, extra_headers); + } + + /* FIXME: Doesn't handle query parameters (after '?') */ + g_string_append_printf(to_sign, "/%s/%s/%s", + store->account, store->container, path); + + /* Compute an HMAC-SHA-256 of the encoded parameters */ + gcry_error_t status; + gcry_md_hd_t handle; + status = gcry_md_open(&handle, GCRY_MD_SHA256, GCRY_MD_FLAG_HMAC); + g_assert(status == 0); + status = gcry_md_setkey(handle, store->key, store->key_len); + g_assert(status == 0); + gcry_md_write(handle, to_sign->str, to_sign->len); + unsigned char *digest = gcry_md_read(handle, GCRY_MD_SHA256); + gchar *signature = g_base64_encode(digest, + gcry_md_get_algo_dlen(GCRY_MD_SHA256)); + g_hash_table_insert(headers, g_strdup("Authorization"), + g_strdup_printf("SharedKey %s:%s", + store->account, signature)); + g_free(signature); + gcry_md_close(handle); + g_string_free(to_sign, TRUE); +} + +/* Submit an HTTP request using CURL. Takes as input the Azure storage backend + * we are acting for, as well as the method (GET, PUT, etc.), HTTP path, other + * HTTP headers, and an optional body. If body is not NULL, an empty body is + * sent. This will compute an Azure authentication signature before sending + * the request. */ +static BlueSkyRCStr *submit_request(AzureStore *store, + const char *method, + const char *path, + GHashTable *headers, + BlueSkyRCStr *body) +{ + BlueSkyRCStr *result = NULL; + + g_hash_table_insert(headers, + g_strdup("Content-Length"), + g_strdup_printf("%zd", body != NULL ? body->len : 0)); + + if (body != 0 && body->len > 0) { + GChecksum *csum = g_checksum_new(G_CHECKSUM_MD5); + g_checksum_update(csum, (uint8_t *)body->data, body->len); + uint8_t md5[16]; + gsize md5_len = sizeof(md5); + g_checksum_get_digest(csum, md5, &md5_len); + g_hash_table_insert(headers, + g_strdup("Content-MD5"), + g_base64_encode(md5, md5_len)); + g_checksum_free(csum); + } + + azure_compute_signature(store, headers, method, path); + + CURLcode status; + CURL *curl = curl_easy_init(); + +#define curl_easy_setopt_safe(opt, val) \ + if ((status = curl_easy_setopt(curl, (opt), (val))) != CURLE_OK) { \ + fprintf(stderr, "CURL error: %s!\n", curl_easy_strerror(status)); \ + goto cleanup; \ + } + + curl_easy_setopt_safe(CURLOPT_NOSIGNAL, 1); + curl_easy_setopt_safe(CURLOPT_NOPROGRESS, 1); + curl_easy_setopt_safe(CURLOPT_NETRC, CURL_NETRC_IGNORED); + curl_easy_setopt_safe(CURLOPT_FOLLOWLOCATION, 1); + curl_easy_setopt_safe(CURLOPT_MAXREDIRS, 10); + + curl_easy_setopt_safe(CURLOPT_HEADERFUNCTION, curl_writefunc); + curl_easy_setopt_safe(CURLOPT_HEADERDATA, NULL); + + struct curlbuf readbuf; + if (body != NULL) { + readbuf.readbuf = body->data; + readbuf.readsize = body->len; + } + curl_easy_setopt_safe(CURLOPT_READFUNCTION, curl_readfunc); + curl_easy_setopt_safe(CURLOPT_READDATA, body ? &readbuf : NULL); + + GString *result_body = g_string_new(""); + curl_easy_setopt_safe(CURLOPT_WRITEFUNCTION, curl_writefunc); + curl_easy_setopt_safe(CURLOPT_WRITEDATA, result_body); + + struct curl_slist *curl_headers = NULL; + g_hash_table_foreach(headers, (GHFunc)get_curl_headers, &curl_headers); + curl_easy_setopt_safe(CURLOPT_HTTPHEADER, curl_headers); + + char *uri = g_strdup_printf("http://%s.blob.core.windows.net/%s/%s", + store->account, store->container, path); + printf("URI: %s\n", uri); + curl_easy_setopt_safe(CURLOPT_URL, uri); + + if (strcmp(method, "GET") == 0) { + /* nothing special needed */ + } else if (strcmp(method, "PUT") == 0) { + curl_easy_setopt_safe(CURLOPT_UPLOAD, 1); + } else if (strcmp(method, "DELETE") == 0) { + curl_easy_setopt_safe(CURLOPT_CUSTOMREQUEST, "DELETE"); + } + + status = curl_easy_perform(curl); + if (status != 0) { + fprintf(stderr, "CURL error: %s!\n", curl_easy_strerror(status)); + goto cleanup; + } + + long response_code = 0; + status = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response_code); + if (status != 0) { + fprintf(stderr, "CURL error: %s!\n", curl_easy_strerror(status)); + goto cleanup; + } + + if (response_code / 100 == 2) { + result = bluesky_string_new_from_gstring(result_body); + result_body = NULL; + } else { + fprintf(stderr, "HTTP response code: %ld!\n", response_code); + goto cleanup; + } + +cleanup: + if (result != NULL && result_body != NULL) + g_string_free(result_body, TRUE); + curl_easy_cleanup(curl); + curl_slist_free_all(curl_headers); + g_free(uri); + + return result; +} + +static void azurestore_task(gpointer a, gpointer s) +{ + BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a; + AzureStore *store = (AzureStore *)s; + + async->status = ASYNC_RUNNING; + async->exec_time = bluesky_now_hires(); + + GHashTable *headers = g_hash_table_new_full(g_str_hash, g_str_equal, + g_free, g_free); + + BlueSkyRCStr *result = NULL; + + if (async->op == STORE_OP_GET) { + result = submit_request(store, "GET", async->key, headers, NULL); + if (result != NULL) { + async->data = result; + async->result = 0; + } + } else if (async->op == STORE_OP_PUT) { + g_hash_table_insert(headers, + g_strdup("x-ms-blob-type"), + g_strdup("BlockBlob")); + g_hash_table_insert(headers, + g_strdup("Transfer-Encoding"), + g_strdup("")); + result = submit_request(store, "PUT", async->key, + headers, async->data); + if (result != NULL) { + async->result = 0; + } + bluesky_string_unref(result); + } + + bluesky_store_async_mark_complete(async); + bluesky_store_async_unref(async); + g_hash_table_unref(headers); +} + +static gpointer azurestore_new(const gchar *path) +{ + AzureStore *store = g_new(AzureStore, 1); + store->thread_pool = g_thread_pool_new(azurestore_task, store, -1, FALSE, + NULL); + if (path == NULL || strlen(path) == 0) + store->container = g_strdup("bluesky"); + else + store->container = g_strdup(path); + + store->account = g_strdup(getenv("AZURE_ACCOUNT_NAME")); + + const char *key = getenv("AZURE_SECRET_KEY"); + store->key = g_base64_decode(key, &store->key_len); + + g_print("Initializing Azure with account %s, container %s\n", + store->account, store->container); + + return store; +} + +static void azurestore_destroy(gpointer store) +{ + /* TODO: Clean up resources */ +} + +static void azurestore_submit(gpointer s, BlueSkyStoreAsync *async) +{ + AzureStore *store = (AzureStore *)s; + g_return_if_fail(async->status == ASYNC_NEW); + g_return_if_fail(async->op != STORE_OP_NONE); + + switch (async->op) { + case STORE_OP_GET: + case STORE_OP_PUT: + async->status = ASYNC_PENDING; + bluesky_store_async_ref(async); + g_thread_pool_push(store->thread_pool, async, NULL); + break; + + default: + g_warning("Uknown operation type for AzureStore: %d\n", async->op); + bluesky_store_async_mark_complete(async); + break; + } +} + +static void azurestore_cleanup(gpointer store, BlueSkyStoreAsync *async) +{ +} + +static char *azurestore_lookup_last(gpointer s, const char *prefix) +{ + return NULL; +} + +static BlueSkyStoreImplementation store_impl = { + .create = azurestore_new, + .destroy = azurestore_destroy, + .submit = azurestore_submit, + .cleanup = azurestore_cleanup, + .lookup_last = azurestore_lookup_last, +}; + +void bluesky_store_init_azure(void) +{ + bluesky_store_register(&store_impl, "azure"); +} + +#if 0 +typedef struct { + enum { S3_GET, S3_PUT } op; + gchar *key; + BlueSkyRCStr *data; +} S3Op; + +struct get_info { + int success; + GString *buf; +}; + +struct put_info { + int success; + BlueSkyRCStr *val; + gint offset; +}; + +struct list_info { + int success; + char *last_entry; + gboolean truncated; +}; + +static S3Status s3store_get_handler(int bufferSize, const char *buffer, + void *callbackData) +{ + struct get_info *info = (struct get_info *)callbackData; + g_string_append_len(info->buf, buffer, bufferSize); + return S3StatusOK; +} + +static int s3store_put_handler(int bufferSize, char *buffer, + void *callbackData) +{ + struct put_info *info = (struct put_info *)callbackData; + gint bytes = MIN(bufferSize, (int)(info->val->len - info->offset)); + memcpy(buffer, (char *)info->val->data + info->offset, bytes); + info->offset += bytes; + return bytes; +} + +static S3Status s3store_properties_callback(const S3ResponseProperties *properties, + void *callbackData) +{ + return S3StatusOK; +} + +static void s3store_response_callback(S3Status status, + const S3ErrorDetails *errorDetails, + void *callbackData) +{ + struct get_info *info = (struct get_info *)callbackData; + + if (status == 0) { + info->success = 1; + } + + if (errorDetails != NULL && errorDetails->message != NULL) { + g_print(" Error message: %s\n", errorDetails->message); + } +} + +static void s3store_task(gpointer a, gpointer s) +{ + BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a; + S3Store *store = (S3Store *)s; + + async->status = ASYNC_RUNNING; + async->exec_time = bluesky_now_hires(); + + if (async->op == STORE_OP_GET) { + struct get_info info; + info.buf = g_string_new(""); + info.success = 0; + + struct S3GetObjectHandler handler; + handler.responseHandler.propertiesCallback = s3store_properties_callback; + handler.responseHandler.completeCallback = s3store_response_callback; + handler.getObjectDataCallback = s3store_get_handler; + + S3_get_object(&store->bucket, async->key, NULL, + async->start, async->len, NULL, &handler, &info); + async->range_done = TRUE; + + if (info.success) { + async->data = bluesky_string_new_from_gstring(info.buf); + async->result = 0; + } else { + g_string_free(info.buf, TRUE); + } + + } else if (async->op == STORE_OP_PUT) { + struct put_info info; + info.success = 0; + info.val = async->data; + info.offset = 0; + + struct S3PutObjectHandler handler; + handler.responseHandler.propertiesCallback + = s3store_properties_callback; + handler.responseHandler.completeCallback = s3store_response_callback; + handler.putObjectDataCallback = s3store_put_handler; + + S3_put_object(&store->bucket, async->key, async->data->len, NULL, NULL, + &handler, &info); + + if (info.success) { + async->result = 0; + } else { + g_warning("Error completing S3 put operation; client must retry!"); + } + } + + bluesky_store_async_mark_complete(async); + bluesky_store_async_unref(async); +} + +static S3Status s3store_list_handler(int isTruncated, + const char *nextMarker, + int contentsCount, + const S3ListBucketContent *contents, + int commonPrefixesCount, + const char **commonPrefixes, + void *callbackData) +{ + struct list_info *info = (struct list_info *)callbackData; + if (contentsCount > 0) { + g_free(info->last_entry); + info->last_entry = g_strdup(contents[contentsCount - 1].key); + } + info->truncated = isTruncated; + return S3StatusOK; +} + +static char *s3store_lookup_last(gpointer s, const char *prefix) +{ + S3Store *store = (S3Store *)s; + struct list_info info = {0, NULL, FALSE}; + + struct S3ListBucketHandler handler; + handler.responseHandler.propertiesCallback + = s3store_properties_callback; + handler.responseHandler.completeCallback = s3store_response_callback; + handler.listBucketCallback = s3store_list_handler; + + char *marker = NULL; + + do { + S3_list_bucket(&store->bucket, prefix, marker, NULL, 1024, NULL, + &handler, &info); + g_free(marker); + marker = g_strdup(info.last_entry); + g_print("Last key: %s\n", info.last_entry); + } while (info.truncated); + + g_free(marker); + + return info.last_entry; +} + +static gpointer s3store_new(const gchar *path) +{ + S3Store *store = g_new(S3Store, 1); + store->thread_pool = g_thread_pool_new(s3store_task, store, -1, FALSE, + NULL); + if (path == NULL || strlen(path) == 0) + store->bucket.bucketName = "mvrable-bluesky"; + else + store->bucket.bucketName = g_strdup(path); + store->bucket.protocol = S3ProtocolHTTP; + store->bucket.uriStyle = S3UriStylePath; + store->bucket.accessKeyId = getenv("AWS_ACCESS_KEY_ID"); + store->bucket.secretAccessKey = getenv("AWS_SECRET_ACCESS_KEY"); + + const char *key = getenv("BLUESKY_KEY"); + if (key == NULL) { + g_error("Encryption key not defined; please set BLUESKY_KEY environment variable"); + exit(1); + } + + bluesky_crypt_hash_key(key, store->encryption_key); + + g_print("Initializing S3 with bucket %s, access key %s, encryption key %s\n", + store->bucket.bucketName, store->bucket.accessKeyId, key); + + return store; +} + +static void s3store_destroy(gpointer store) +{ + g_free(store); +} + +static void s3store_submit(gpointer s, BlueSkyStoreAsync *async) +{ + S3Store *store = (S3Store *)s; + g_return_if_fail(async->status == ASYNC_NEW); + g_return_if_fail(async->op != STORE_OP_NONE); + + switch (async->op) { + case STORE_OP_GET: + case STORE_OP_PUT: + async->status = ASYNC_PENDING; + bluesky_store_async_ref(async); + g_thread_pool_push(store->thread_pool, async, NULL); + break; + + default: + g_warning("Uknown operation type for S3Store: %d\n", async->op); + bluesky_store_async_mark_complete(async); + break; + } +} + +static void s3store_cleanup(gpointer store, BlueSkyStoreAsync *async) +{ + GString *buf = (GString *)async->store_private; + + if (buf != NULL) { + g_string_free(buf, TRUE); + async->store_private = NULL; + } +} + +static BlueSkyStoreImplementation store_impl = { + .create = s3store_new, + .destroy = s3store_destroy, + .submit = s3store_submit, + .cleanup = s3store_cleanup, + .lookup_last = s3store_lookup_last, +}; + +void bluesky_store_init_s3(void) +{ + S3_initialize(NULL, S3_INIT_ALL); + bluesky_store_register(&store_impl, "s3"); +} +#endif diff --git a/cleaner/azure.py b/cleaner/azure.py index d976e0f..d7ee338 100644 --- a/cleaner/azure.py +++ b/cleaner/azure.py @@ -91,7 +91,7 @@ class AzureError(RuntimeError): class AzureConnection: def __init__(self, account=None, key=None): if account is None: - account = os.environ['AZURE_ACCOUNT_NAME'] + account = os.environ['AZURE_ACCOUNT_NAME'] self.account = account self.host = account + ".blob.core.windows.net" #self.conn = httplib.HTTPConnection(self.host)