X-Git-Url: http://git.vrable.net/?a=blobdiff_plain;f=bluesky%2Fstore-azure.c;h=9b601ed1ffbe9296012e9d9709a245c54180f4c0;hb=8ff0fd08d6e1cc97cdb7e94b7cd97dc28c29e674;hp=e7a2b7134da1f72d0f6c280f33befb4cb9b4e88a;hpb=fe8e013eff37a1f6df8f5a55fba30ede4f9618be;p=bluesky.git diff --git a/bluesky/store-azure.c b/bluesky/store-azure.c index e7a2b71..9b601ed 100644 --- a/bluesky/store-azure.c +++ b/bluesky/store-azure.c @@ -3,7 +3,29 @@ * Copyright (C) 2009 The Regents of the University of California * Written by Michael Vrable * - * TODO: Licensing + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the name of the University nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. */ #include @@ -29,13 +51,46 @@ static const char *signature_headers[] = { /* Prototype Windows Azure backend for BlueSky. This is intended to be * minimally functional, but could use additional work for production use. */ +#define MAX_IDLE_CONNECTIONS 8 + typedef struct { GThreadPool *thread_pool; char *account, *container; uint8_t *key; size_t key_len; + + /* A pool of available idle connections that could be used. */ + GQueue *curl_pool; + GMutex *curl_pool_lock; } AzureStore; +static CURL *get_connection(AzureStore *store) +{ + CURL *curl = NULL; + + g_mutex_lock(store->curl_pool_lock); + if (!g_queue_is_empty(store->curl_pool)) { + curl = (CURL *)(g_queue_pop_head(store->curl_pool)); + } + g_mutex_unlock(store->curl_pool_lock); + + if (curl == NULL) + curl = curl_easy_init(); + + return curl; +} + +static void put_connection(AzureStore *store, CURL *curl) +{ + g_mutex_lock(store->curl_pool_lock); + g_queue_push_head(store->curl_pool, curl); + while (g_queue_get_length(store->curl_pool) > MAX_IDLE_CONNECTIONS) { + curl = (CURL *)(g_queue_pop_tail(store->curl_pool)); + curl_easy_cleanup(curl); + } + g_mutex_unlock(store->curl_pool_lock); +} + static void get_extra_headers(gchar *key, gchar *value, GList **headers) { key = g_ascii_strdown(key, strlen(key)); @@ -151,6 +206,7 @@ static void azure_compute_signature(AzureStore *store, * sent. This will compute an Azure authentication signature before sending * the request. */ static BlueSkyRCStr *submit_request(AzureStore *store, + CURL *curl, const char *method, const char *path, GHashTable *headers, @@ -177,7 +233,6 @@ static BlueSkyRCStr *submit_request(AzureStore *store, azure_compute_signature(store, headers, method, path); CURLcode status; - CURL *curl = curl_easy_init(); #define curl_easy_setopt_safe(opt, val) \ if ((status = curl_easy_setopt(curl, (opt), (val))) != CURLE_OK) { \ @@ -247,7 +302,7 @@ static BlueSkyRCStr *submit_request(AzureStore *store, cleanup: if (result != NULL && result_body != NULL) g_string_free(result_body, TRUE); - curl_easy_cleanup(curl); + curl_easy_reset(curl); curl_slist_free_all(curl_headers); g_free(uri); @@ -266,9 +321,24 @@ static void azurestore_task(gpointer a, gpointer s) g_free, g_free); BlueSkyRCStr *result = NULL; + CURL *curl = get_connection(store); if (async->op == STORE_OP_GET) { - result = submit_request(store, "GET", async->key, headers, NULL); + /* FIXME: We ought to check that the response returned the requested + * byte range. */ + if (async->start != 0 && async->len != 0) { + g_hash_table_insert(headers, + g_strdup("Range"), + g_strdup_printf("bytes=%zd-%zd", async->start, + async->start + async->len)); + async->range_done = TRUE; + } else if (async->start != 0) { + g_hash_table_insert(headers, + g_strdup("Range"), + g_strdup_printf("bytes=%zd-", async->start)); + async->range_done = TRUE; + } + result = submit_request(store, curl, "GET", async->key, headers, NULL); if (result != NULL) { async->data = result; async->result = 0; @@ -280,7 +350,7 @@ static void azurestore_task(gpointer a, gpointer s) g_hash_table_insert(headers, g_strdup("Transfer-Encoding"), g_strdup("")); - result = submit_request(store, "PUT", async->key, + result = submit_request(store, curl, "PUT", async->key, headers, async->data); if (result != NULL) { async->result = 0; @@ -291,6 +361,7 @@ static void azurestore_task(gpointer a, gpointer s) bluesky_store_async_mark_complete(async); bluesky_store_async_unref(async); g_hash_table_unref(headers); + put_connection(store, curl); } static gpointer azurestore_new(const gchar *path) @@ -311,6 +382,9 @@ static gpointer azurestore_new(const gchar *path) g_print("Initializing Azure with account %s, container %s\n", store->account, store->container); + store->curl_pool = g_queue_new(); + store->curl_pool_lock = g_mutex_new(); + return store; } @@ -361,243 +435,3 @@ void bluesky_store_init_azure(void) { bluesky_store_register(&store_impl, "azure"); } - -#if 0 -typedef struct { - enum { S3_GET, S3_PUT } op; - gchar *key; - BlueSkyRCStr *data; -} S3Op; - -struct get_info { - int success; - GString *buf; -}; - -struct put_info { - int success; - BlueSkyRCStr *val; - gint offset; -}; - -struct list_info { - int success; - char *last_entry; - gboolean truncated; -}; - -static S3Status s3store_get_handler(int bufferSize, const char *buffer, - void *callbackData) -{ - struct get_info *info = (struct get_info *)callbackData; - g_string_append_len(info->buf, buffer, bufferSize); - return S3StatusOK; -} - -static int s3store_put_handler(int bufferSize, char *buffer, - void *callbackData) -{ - struct put_info *info = (struct put_info *)callbackData; - gint bytes = MIN(bufferSize, (int)(info->val->len - info->offset)); - memcpy(buffer, (char *)info->val->data + info->offset, bytes); - info->offset += bytes; - return bytes; -} - -static S3Status s3store_properties_callback(const S3ResponseProperties *properties, - void *callbackData) -{ - return S3StatusOK; -} - -static void s3store_response_callback(S3Status status, - const S3ErrorDetails *errorDetails, - void *callbackData) -{ - struct get_info *info = (struct get_info *)callbackData; - - if (status == 0) { - info->success = 1; - } - - if (errorDetails != NULL && errorDetails->message != NULL) { - g_print(" Error message: %s\n", errorDetails->message); - } -} - -static void s3store_task(gpointer a, gpointer s) -{ - BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a; - S3Store *store = (S3Store *)s; - - async->status = ASYNC_RUNNING; - async->exec_time = bluesky_now_hires(); - - if (async->op == STORE_OP_GET) { - struct get_info info; - info.buf = g_string_new(""); - info.success = 0; - - struct S3GetObjectHandler handler; - handler.responseHandler.propertiesCallback = s3store_properties_callback; - handler.responseHandler.completeCallback = s3store_response_callback; - handler.getObjectDataCallback = s3store_get_handler; - - S3_get_object(&store->bucket, async->key, NULL, - async->start, async->len, NULL, &handler, &info); - async->range_done = TRUE; - - if (info.success) { - async->data = bluesky_string_new_from_gstring(info.buf); - async->result = 0; - } else { - g_string_free(info.buf, TRUE); - } - - } else if (async->op == STORE_OP_PUT) { - struct put_info info; - info.success = 0; - info.val = async->data; - info.offset = 0; - - struct S3PutObjectHandler handler; - handler.responseHandler.propertiesCallback - = s3store_properties_callback; - handler.responseHandler.completeCallback = s3store_response_callback; - handler.putObjectDataCallback = s3store_put_handler; - - S3_put_object(&store->bucket, async->key, async->data->len, NULL, NULL, - &handler, &info); - - if (info.success) { - async->result = 0; - } else { - g_warning("Error completing S3 put operation; client must retry!"); - } - } - - bluesky_store_async_mark_complete(async); - bluesky_store_async_unref(async); -} - -static S3Status s3store_list_handler(int isTruncated, - const char *nextMarker, - int contentsCount, - const S3ListBucketContent *contents, - int commonPrefixesCount, - const char **commonPrefixes, - void *callbackData) -{ - struct list_info *info = (struct list_info *)callbackData; - if (contentsCount > 0) { - g_free(info->last_entry); - info->last_entry = g_strdup(contents[contentsCount - 1].key); - } - info->truncated = isTruncated; - return S3StatusOK; -} - -static char *s3store_lookup_last(gpointer s, const char *prefix) -{ - S3Store *store = (S3Store *)s; - struct list_info info = {0, NULL, FALSE}; - - struct S3ListBucketHandler handler; - handler.responseHandler.propertiesCallback - = s3store_properties_callback; - handler.responseHandler.completeCallback = s3store_response_callback; - handler.listBucketCallback = s3store_list_handler; - - char *marker = NULL; - - do { - S3_list_bucket(&store->bucket, prefix, marker, NULL, 1024, NULL, - &handler, &info); - g_free(marker); - marker = g_strdup(info.last_entry); - g_print("Last key: %s\n", info.last_entry); - } while (info.truncated); - - g_free(marker); - - return info.last_entry; -} - -static gpointer s3store_new(const gchar *path) -{ - S3Store *store = g_new(S3Store, 1); - store->thread_pool = g_thread_pool_new(s3store_task, store, -1, FALSE, - NULL); - if (path == NULL || strlen(path) == 0) - store->bucket.bucketName = "mvrable-bluesky"; - else - store->bucket.bucketName = g_strdup(path); - store->bucket.protocol = S3ProtocolHTTP; - store->bucket.uriStyle = S3UriStylePath; - store->bucket.accessKeyId = getenv("AWS_ACCESS_KEY_ID"); - store->bucket.secretAccessKey = getenv("AWS_SECRET_ACCESS_KEY"); - - const char *key = getenv("BLUESKY_KEY"); - if (key == NULL) { - g_error("Encryption key not defined; please set BLUESKY_KEY environment variable"); - exit(1); - } - - bluesky_crypt_hash_key(key, store->encryption_key); - - g_print("Initializing S3 with bucket %s, access key %s, encryption key %s\n", - store->bucket.bucketName, store->bucket.accessKeyId, key); - - return store; -} - -static void s3store_destroy(gpointer store) -{ - g_free(store); -} - -static void s3store_submit(gpointer s, BlueSkyStoreAsync *async) -{ - S3Store *store = (S3Store *)s; - g_return_if_fail(async->status == ASYNC_NEW); - g_return_if_fail(async->op != STORE_OP_NONE); - - switch (async->op) { - case STORE_OP_GET: - case STORE_OP_PUT: - async->status = ASYNC_PENDING; - bluesky_store_async_ref(async); - g_thread_pool_push(store->thread_pool, async, NULL); - break; - - default: - g_warning("Uknown operation type for S3Store: %d\n", async->op); - bluesky_store_async_mark_complete(async); - break; - } -} - -static void s3store_cleanup(gpointer store, BlueSkyStoreAsync *async) -{ - GString *buf = (GString *)async->store_private; - - if (buf != NULL) { - g_string_free(buf, TRUE); - async->store_private = NULL; - } -} - -static BlueSkyStoreImplementation store_impl = { - .create = s3store_new, - .destroy = s3store_destroy, - .submit = s3store_submit, - .cleanup = s3store_cleanup, - .lookup_last = s3store_lookup_last, -}; - -void bluesky_store_init_s3(void) -{ - S3_initialize(NULL, S3_INIT_ALL); - bluesky_store_register(&store_impl, "s3"); -} -#endif