Add Windows Azure support to BlueSky.
authorMichael Vrable <mvrable@cs.ucsd.edu>
Thu, 24 Feb 2011 23:54:18 +0000 (15:54 -0800)
committerMichael Vrable <mvrable@cs.ucsd.edu>
Thu, 24 Feb 2011 23:54:18 +0000 (15:54 -0800)
This is not much tested but seems to work at a basic level.

bluesky/CMakeLists.txt
bluesky/init.c
bluesky/store-azure.c [new file with mode: 0644]
cleaner/azure.py

index 2891f93..41d4b90 100644 (file)
@@ -4,8 +4,9 @@ link_directories("${LIBS3_BUILD_DIR}/lib" ${KVSTORE_DIR})
 
 add_library(bluesky SHARED
             cache.c cleaner.c cloudlog.c crc32c.c crypto.c debug.c dir.c file.c
-            imap.c init.c inode.c log.c serialize.c store.c store-bdb.c
-            store-kv.cc store-multi.c store-s3.c store-simple.c util.c)
+            imap.c init.c inode.c log.c serialize.c store.c store-azure.c
+            store-bdb.c store-kv.cc store-multi.c store-s3.c store-simple.c
+            util.c)
 add_executable(bluesky-test main.c)
 
 set(CMAKE_C_FLAGS "-Wall -std=gnu99 ${CMAKE_C_FLAGS}")
index df33709..e2b35c6 100644 (file)
@@ -56,6 +56,7 @@ static struct {
 /* BlueSky library initialization. */
 
 void bluesky_store_init_s3(void);
+void bluesky_store_init_azure(void);
 void bluesky_store_init_kv(void);
 void bluesky_store_init_multi(void);
 void bluesky_store_init_bdb(void);
@@ -79,6 +80,7 @@ void bluesky_init(void)
     bluesky_store_init();
     bluesky_store_init_kv();
     bluesky_store_init_s3();
+    bluesky_store_init_azure();
     bluesky_store_init_multi();
     bluesky_store_init_bdb();
     bluesky_store_init_simple();
diff --git a/bluesky/store-azure.c b/bluesky/store-azure.c
new file mode 100644 (file)
index 0000000..e7a2b71
--- /dev/null
@@ -0,0 +1,603 @@
+/* Blue Sky: File Systems in the Cloud
+ *
+ * Copyright (C) 2009  The Regents of the University of California
+ * Written by Michael Vrable <mvrable@cs.ucsd.edu>
+ *
+ * TODO: Licensing
+ */
+
+#include <stdint.h>
+#include <stdlib.h>
+#include <glib.h>
+#include <string.h>
+#include <gcrypt.h>
+#include <curl/curl.h>
+
+#include "bluesky-private.h"
+#include "libs3.h"
+
+#define AZURE_API_VERSION "2009-09-19"
+
+/* Fixed headers that are used in calculating the request signature for Azure,
+ * in the order that they are included. */
+static const char *signature_headers[] = {
+    "Content-Encoding", "Content-Language", "Content-Length", "Content-MD5",
+    "Content-Type", "Date", "If-Modified-Since", "If-Match", "If-None-Match",
+    "If-Unmodified-Since", "Range", NULL
+};
+
+/* Prototype Windows Azure backend for BlueSky.  This is intended to be
+ * minimally functional, but could use additional work for production use. */
+
+typedef struct {
+    GThreadPool *thread_pool;
+    char *account, *container;
+    uint8_t *key;
+    size_t key_len;
+} AzureStore;
+
+static void get_extra_headers(gchar *key, gchar *value, GList **headers)
+{
+    key = g_ascii_strdown(key, strlen(key));
+    if (strncmp(key, "x-ms-", strlen("x-ms-")) == 0) {
+        *headers = g_list_prepend(*headers,
+                                  g_strdup_printf("%s:%s\n", key, value));
+    }
+    g_free(key);
+}
+
+static void get_curl_headers(gchar *key, gchar *value,
+                             struct curl_slist **curl_headers)
+{
+    char *line = g_strdup_printf("%s: %s", key, value);
+    *curl_headers = curl_slist_append(*curl_headers, line);
+    g_free(line);
+}
+
+struct curlbuf {
+    /* For reading */
+    const char *readbuf;
+    size_t readsize;
+};
+
+static size_t curl_readfunc(void *ptr, size_t size, size_t nmemb,
+                            void *userdata)
+{
+    struct curlbuf *buf = (struct curlbuf *)userdata;
+
+    if (buf == NULL)
+        return 0;
+
+    size_t copied = size * nmemb;
+    if (copied > buf->readsize)
+        copied = buf->readsize;
+
+    memcpy(ptr, buf->readbuf, copied);
+    buf->readbuf += copied;
+    buf->readsize -= copied;
+
+    return copied;
+}
+
+static size_t curl_writefunc(void *ptr, size_t size, size_t nmemb,
+                             void *userdata)
+{
+    GString *buf = (GString *)userdata;
+    if (buf != NULL)
+        g_string_append_len(buf, ptr, size * nmemb);
+    return size * nmemb;
+}
+
+/* Compute the signature for a request to Azure and add it to the headers. */
+static void azure_compute_signature(AzureStore *store,
+                                    GHashTable *headers,
+                                    const char *method, const char *path)
+{
+    if (g_hash_table_lookup(headers, "Date") == NULL) {
+        time_t t;
+        struct tm now;
+        char timebuf[4096];
+        time(&t);
+        gmtime_r(&t, &now);
+        strftime(timebuf, sizeof(timebuf), "%a, %d %b %Y %H:%M:%S GMT", &now);
+        g_hash_table_insert(headers, g_strdup("Date"), g_strdup(timebuf));
+    }
+
+    g_hash_table_insert(headers, g_strdup("x-ms-version"),
+                        g_strdup(AZURE_API_VERSION));
+
+    GString *to_sign = g_string_new("");
+    g_string_append_printf(to_sign, "%s\n", method);
+    for (const char **h = signature_headers; *h != NULL; h++) {
+        const char *val = g_hash_table_lookup(headers, *h);
+        g_string_append_printf(to_sign, "%s\n", val ? val : "");
+    }
+
+    GList *extra_headers = NULL;
+    g_hash_table_foreach(headers, (GHFunc)get_extra_headers, &extra_headers);
+    extra_headers = g_list_sort(extra_headers, (GCompareFunc)g_strcmp0);
+    while (extra_headers != NULL) {
+        g_string_append(to_sign, extra_headers->data);
+        g_free(extra_headers->data);
+        extra_headers = g_list_delete_link(extra_headers, extra_headers);
+    }
+
+    /* FIXME: Doesn't handle query parameters (after '?') */
+    g_string_append_printf(to_sign, "/%s/%s/%s",
+                           store->account, store->container, path);
+
+    /* Compute an HMAC-SHA-256 of the encoded parameters */
+    gcry_error_t status;
+    gcry_md_hd_t handle;
+    status = gcry_md_open(&handle, GCRY_MD_SHA256, GCRY_MD_FLAG_HMAC);
+    g_assert(status == 0);
+    status = gcry_md_setkey(handle, store->key, store->key_len);
+    g_assert(status == 0);
+    gcry_md_write(handle, to_sign->str, to_sign->len);
+    unsigned char *digest = gcry_md_read(handle, GCRY_MD_SHA256);
+    gchar *signature = g_base64_encode(digest,
+                                       gcry_md_get_algo_dlen(GCRY_MD_SHA256));
+    g_hash_table_insert(headers, g_strdup("Authorization"),
+                        g_strdup_printf("SharedKey %s:%s",
+                                        store->account, signature));
+    g_free(signature);
+    gcry_md_close(handle);
+    g_string_free(to_sign, TRUE);
+}
+
+/* Submit an HTTP request using CURL.  Takes as input the Azure storage backend
+ * we are acting for, as well as the method (GET, PUT, etc.), HTTP path, other
+ * HTTP headers, and an optional body.  If body is not NULL, an empty body is
+ * sent.  This will compute an Azure authentication signature before sending
+ * the request. */
+static BlueSkyRCStr *submit_request(AzureStore *store,
+                                    const char *method,
+                                    const char *path,
+                                    GHashTable *headers,
+                                    BlueSkyRCStr *body)
+{
+    BlueSkyRCStr *result = NULL;
+
+    g_hash_table_insert(headers,
+                        g_strdup("Content-Length"),
+                        g_strdup_printf("%zd", body != NULL ? body->len : 0));
+
+    if (body != 0 && body->len > 0) {
+        GChecksum *csum = g_checksum_new(G_CHECKSUM_MD5);
+        g_checksum_update(csum, (uint8_t *)body->data, body->len);
+        uint8_t md5[16];
+        gsize md5_len = sizeof(md5);
+        g_checksum_get_digest(csum, md5, &md5_len);
+        g_hash_table_insert(headers,
+                            g_strdup("Content-MD5"),
+                            g_base64_encode(md5, md5_len));
+        g_checksum_free(csum);
+    }
+
+    azure_compute_signature(store, headers, method, path);
+
+    CURLcode status;
+    CURL *curl = curl_easy_init();
+
+#define curl_easy_setopt_safe(opt, val)                                     \
+    if ((status = curl_easy_setopt(curl, (opt), (val))) != CURLE_OK) {      \
+        fprintf(stderr, "CURL error: %s!\n", curl_easy_strerror(status));   \
+        goto cleanup;                                                       \
+    }
+
+    curl_easy_setopt_safe(CURLOPT_NOSIGNAL, 1);
+    curl_easy_setopt_safe(CURLOPT_NOPROGRESS, 1);
+    curl_easy_setopt_safe(CURLOPT_NETRC, CURL_NETRC_IGNORED);
+    curl_easy_setopt_safe(CURLOPT_FOLLOWLOCATION, 1);
+    curl_easy_setopt_safe(CURLOPT_MAXREDIRS, 10);
+
+    curl_easy_setopt_safe(CURLOPT_HEADERFUNCTION, curl_writefunc);
+    curl_easy_setopt_safe(CURLOPT_HEADERDATA, NULL);
+
+    struct curlbuf readbuf;
+    if (body != NULL) {
+        readbuf.readbuf = body->data;
+        readbuf.readsize = body->len;
+    }
+    curl_easy_setopt_safe(CURLOPT_READFUNCTION, curl_readfunc);
+    curl_easy_setopt_safe(CURLOPT_READDATA, body ? &readbuf : NULL);
+
+    GString *result_body = g_string_new("");
+    curl_easy_setopt_safe(CURLOPT_WRITEFUNCTION, curl_writefunc);
+    curl_easy_setopt_safe(CURLOPT_WRITEDATA, result_body);
+
+    struct curl_slist *curl_headers = NULL;
+    g_hash_table_foreach(headers, (GHFunc)get_curl_headers, &curl_headers);
+    curl_easy_setopt_safe(CURLOPT_HTTPHEADER, curl_headers);
+
+    char *uri = g_strdup_printf("http://%s.blob.core.windows.net/%s/%s",
+                                store->account, store->container, path);
+    printf("URI: %s\n", uri);
+    curl_easy_setopt_safe(CURLOPT_URL, uri);
+
+    if (strcmp(method, "GET") == 0) {
+        /* nothing special needed */
+    } else if (strcmp(method, "PUT") == 0) {
+        curl_easy_setopt_safe(CURLOPT_UPLOAD, 1);
+    } else if (strcmp(method, "DELETE") == 0) {
+        curl_easy_setopt_safe(CURLOPT_CUSTOMREQUEST, "DELETE");
+    }
+
+    status = curl_easy_perform(curl);
+    if (status != 0) {
+        fprintf(stderr, "CURL error: %s!\n", curl_easy_strerror(status));
+        goto cleanup;
+    }
+
+    long response_code = 0;
+    status = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response_code);
+    if (status != 0) {
+        fprintf(stderr, "CURL error: %s!\n", curl_easy_strerror(status));
+        goto cleanup;
+    }
+
+    if (response_code / 100 == 2) {
+        result = bluesky_string_new_from_gstring(result_body);
+        result_body = NULL;
+    } else {
+        fprintf(stderr, "HTTP response code: %ld!\n", response_code);
+        goto cleanup;
+    }
+
+cleanup:
+    if (result != NULL && result_body != NULL)
+        g_string_free(result_body, TRUE);
+    curl_easy_cleanup(curl);
+    curl_slist_free_all(curl_headers);
+    g_free(uri);
+
+    return result;
+}
+
+static void azurestore_task(gpointer a, gpointer s)
+{
+    BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a;
+    AzureStore *store = (AzureStore *)s;
+
+    async->status = ASYNC_RUNNING;
+    async->exec_time = bluesky_now_hires();
+
+    GHashTable *headers = g_hash_table_new_full(g_str_hash, g_str_equal,
+                                                g_free, g_free);
+
+    BlueSkyRCStr *result = NULL;
+
+    if (async->op == STORE_OP_GET) {
+        result = submit_request(store, "GET", async->key, headers, NULL);
+        if (result != NULL) {
+            async->data = result;
+            async->result = 0;
+        }
+    } else if (async->op == STORE_OP_PUT) {
+        g_hash_table_insert(headers,
+                            g_strdup("x-ms-blob-type"),
+                            g_strdup("BlockBlob"));
+        g_hash_table_insert(headers,
+                            g_strdup("Transfer-Encoding"),
+                            g_strdup(""));
+        result = submit_request(store, "PUT", async->key,
+                                headers, async->data);
+        if (result != NULL) {
+            async->result = 0;
+        }
+        bluesky_string_unref(result);
+    }
+
+    bluesky_store_async_mark_complete(async);
+    bluesky_store_async_unref(async);
+    g_hash_table_unref(headers);
+}
+
+static gpointer azurestore_new(const gchar *path)
+{
+    AzureStore *store = g_new(AzureStore, 1);
+    store->thread_pool = g_thread_pool_new(azurestore_task, store, -1, FALSE,
+                                           NULL);
+    if (path == NULL || strlen(path) == 0)
+        store->container = g_strdup("bluesky");
+    else
+        store->container = g_strdup(path);
+
+    store->account = g_strdup(getenv("AZURE_ACCOUNT_NAME"));
+
+    const char *key = getenv("AZURE_SECRET_KEY");
+    store->key = g_base64_decode(key, &store->key_len);
+
+    g_print("Initializing Azure with account %s, container %s\n",
+            store->account, store->container);
+
+    return store;
+}
+
+static void azurestore_destroy(gpointer store)
+{
+    /* TODO: Clean up resources */
+}
+
+static void azurestore_submit(gpointer s, BlueSkyStoreAsync *async)
+{
+    AzureStore *store = (AzureStore *)s;
+    g_return_if_fail(async->status == ASYNC_NEW);
+    g_return_if_fail(async->op != STORE_OP_NONE);
+
+    switch (async->op) {
+    case STORE_OP_GET:
+    case STORE_OP_PUT:
+        async->status = ASYNC_PENDING;
+        bluesky_store_async_ref(async);
+        g_thread_pool_push(store->thread_pool, async, NULL);
+        break;
+
+    default:
+        g_warning("Uknown operation type for AzureStore: %d\n", async->op);
+        bluesky_store_async_mark_complete(async);
+        break;
+    }
+}
+
+static void azurestore_cleanup(gpointer store, BlueSkyStoreAsync *async)
+{
+}
+
+static char *azurestore_lookup_last(gpointer s, const char *prefix)
+{
+    return NULL;
+}
+
+static BlueSkyStoreImplementation store_impl = {
+    .create = azurestore_new,
+    .destroy = azurestore_destroy,
+    .submit = azurestore_submit,
+    .cleanup = azurestore_cleanup,
+    .lookup_last = azurestore_lookup_last,
+};
+
+void bluesky_store_init_azure(void)
+{
+    bluesky_store_register(&store_impl, "azure");
+}
+
+#if 0
+typedef struct {
+    enum { S3_GET, S3_PUT } op;
+    gchar *key;
+    BlueSkyRCStr *data;
+} S3Op;
+
+struct get_info {
+    int success;
+    GString *buf;
+};
+
+struct put_info {
+    int success;
+    BlueSkyRCStr *val;
+    gint offset;
+};
+
+struct list_info {
+    int success;
+    char *last_entry;
+    gboolean truncated;
+};
+
+static S3Status s3store_get_handler(int bufferSize, const char *buffer,
+                                    void *callbackData)
+{
+    struct get_info *info = (struct get_info *)callbackData;
+    g_string_append_len(info->buf, buffer, bufferSize);
+    return S3StatusOK;
+}
+
+static int s3store_put_handler(int bufferSize, char *buffer,
+                               void *callbackData)
+{
+    struct put_info *info = (struct put_info *)callbackData;
+    gint bytes = MIN(bufferSize, (int)(info->val->len - info->offset));
+    memcpy(buffer, (char *)info->val->data + info->offset, bytes);
+    info->offset += bytes;
+    return bytes;
+}
+
+static S3Status s3store_properties_callback(const S3ResponseProperties *properties,
+                                     void *callbackData)
+{
+    return S3StatusOK;
+}
+
+static void s3store_response_callback(S3Status status,
+                               const S3ErrorDetails *errorDetails,
+                               void *callbackData)
+{
+    struct get_info *info = (struct get_info *)callbackData;
+
+    if (status == 0) {
+        info->success = 1;
+    }
+
+    if (errorDetails != NULL && errorDetails->message != NULL) {
+        g_print("  Error message: %s\n", errorDetails->message);
+    }
+}
+
+static void s3store_task(gpointer a, gpointer s)
+{
+    BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a;
+    S3Store *store = (S3Store *)s;
+
+    async->status = ASYNC_RUNNING;
+    async->exec_time = bluesky_now_hires();
+
+    if (async->op == STORE_OP_GET) {
+        struct get_info info;
+        info.buf = g_string_new("");
+        info.success = 0;
+
+        struct S3GetObjectHandler handler;
+        handler.responseHandler.propertiesCallback = s3store_properties_callback;
+        handler.responseHandler.completeCallback = s3store_response_callback;
+        handler.getObjectDataCallback = s3store_get_handler;
+
+        S3_get_object(&store->bucket, async->key, NULL,
+                      async->start, async->len, NULL, &handler, &info);
+        async->range_done = TRUE;
+
+        if (info.success) {
+            async->data = bluesky_string_new_from_gstring(info.buf);
+            async->result = 0;
+        } else {
+            g_string_free(info.buf, TRUE);
+        }
+
+    } else if (async->op == STORE_OP_PUT) {
+        struct put_info info;
+        info.success = 0;
+        info.val = async->data;
+        info.offset = 0;
+
+        struct S3PutObjectHandler handler;
+        handler.responseHandler.propertiesCallback
+            = s3store_properties_callback;
+        handler.responseHandler.completeCallback = s3store_response_callback;
+        handler.putObjectDataCallback = s3store_put_handler;
+
+        S3_put_object(&store->bucket, async->key, async->data->len, NULL, NULL,
+                      &handler, &info);
+
+        if (info.success) {
+            async->result = 0;
+        } else {
+            g_warning("Error completing S3 put operation; client must retry!");
+        }
+    }
+
+    bluesky_store_async_mark_complete(async);
+    bluesky_store_async_unref(async);
+}
+
+static S3Status s3store_list_handler(int isTruncated,
+                                     const char *nextMarker,
+                                     int contentsCount,
+                                     const S3ListBucketContent *contents,
+                                     int commonPrefixesCount,
+                                     const char **commonPrefixes,
+                                     void *callbackData)
+{
+    struct list_info *info = (struct list_info *)callbackData;
+    if (contentsCount > 0) {
+        g_free(info->last_entry);
+        info->last_entry = g_strdup(contents[contentsCount - 1].key);
+    }
+    info->truncated = isTruncated;
+    return S3StatusOK;
+}
+
+static char *s3store_lookup_last(gpointer s, const char *prefix)
+{
+    S3Store *store = (S3Store *)s;
+    struct list_info info = {0, NULL, FALSE};
+
+    struct S3ListBucketHandler handler;
+    handler.responseHandler.propertiesCallback
+        = s3store_properties_callback;
+    handler.responseHandler.completeCallback = s3store_response_callback;
+    handler.listBucketCallback = s3store_list_handler;
+
+    char *marker = NULL;
+
+    do {
+        S3_list_bucket(&store->bucket, prefix, marker, NULL, 1024, NULL,
+                       &handler, &info);
+        g_free(marker);
+        marker = g_strdup(info.last_entry);
+        g_print("Last key: %s\n", info.last_entry);
+    } while (info.truncated);
+
+    g_free(marker);
+
+    return info.last_entry;
+}
+
+static gpointer s3store_new(const gchar *path)
+{
+    S3Store *store = g_new(S3Store, 1);
+    store->thread_pool = g_thread_pool_new(s3store_task, store, -1, FALSE,
+                                           NULL);
+    if (path == NULL || strlen(path) == 0)
+        store->bucket.bucketName = "mvrable-bluesky";
+    else
+        store->bucket.bucketName = g_strdup(path);
+    store->bucket.protocol = S3ProtocolHTTP;
+    store->bucket.uriStyle = S3UriStylePath;
+    store->bucket.accessKeyId = getenv("AWS_ACCESS_KEY_ID");
+    store->bucket.secretAccessKey = getenv("AWS_SECRET_ACCESS_KEY");
+
+    const char *key = getenv("BLUESKY_KEY");
+    if (key == NULL) {
+        g_error("Encryption key not defined; please set BLUESKY_KEY environment variable");
+        exit(1);
+    }
+
+    bluesky_crypt_hash_key(key, store->encryption_key);
+
+    g_print("Initializing S3 with bucket %s, access key %s, encryption key %s\n",
+            store->bucket.bucketName, store->bucket.accessKeyId, key);
+
+    return store;
+}
+
+static void s3store_destroy(gpointer store)
+{
+    g_free(store);
+}
+
+static void s3store_submit(gpointer s, BlueSkyStoreAsync *async)
+{
+    S3Store *store = (S3Store *)s;
+    g_return_if_fail(async->status == ASYNC_NEW);
+    g_return_if_fail(async->op != STORE_OP_NONE);
+
+    switch (async->op) {
+    case STORE_OP_GET:
+    case STORE_OP_PUT:
+        async->status = ASYNC_PENDING;
+        bluesky_store_async_ref(async);
+        g_thread_pool_push(store->thread_pool, async, NULL);
+        break;
+
+    default:
+        g_warning("Uknown operation type for S3Store: %d\n", async->op);
+        bluesky_store_async_mark_complete(async);
+        break;
+    }
+}
+
+static void s3store_cleanup(gpointer store, BlueSkyStoreAsync *async)
+{
+    GString *buf = (GString *)async->store_private;
+
+    if (buf != NULL) {
+        g_string_free(buf, TRUE);
+        async->store_private = NULL;
+    }
+}
+
+static BlueSkyStoreImplementation store_impl = {
+    .create = s3store_new,
+    .destroy = s3store_destroy,
+    .submit = s3store_submit,
+    .cleanup = s3store_cleanup,
+    .lookup_last = s3store_lookup_last,
+};
+
+void bluesky_store_init_s3(void)
+{
+    S3_initialize(NULL, S3_INIT_ALL);
+    bluesky_store_register(&store_impl, "s3");
+}
+#endif
index d976e0f..d7ee338 100644 (file)
@@ -91,7 +91,7 @@ class AzureError(RuntimeError):
 class AzureConnection:
     def __init__(self, account=None, key=None):
         if account is None:
-            account = os.environ['AZURE_ACCOUNT_NAME'] 
+            account = os.environ['AZURE_ACCOUNT_NAME']
         self.account = account
         self.host = account + ".blob.core.windows.net"
         #self.conn = httplib.HTTPConnection(self.host)