/* Prototype Windows Azure backend for BlueSky. This is intended to be
* minimally functional, but could use additional work for production use. */
+#define MAX_IDLE_CONNECTIONS 8
+
typedef struct {
GThreadPool *thread_pool;
char *account, *container;
uint8_t *key;
size_t key_len;
+
+ /* A pool of available idle connections that could be used. */
+ GQueue *curl_pool;
+ GMutex *curl_pool_lock;
} AzureStore;
+static CURL *get_connection(AzureStore *store)
+{
+ CURL *curl = NULL;
+
+ g_mutex_lock(store->curl_pool_lock);
+ if (!g_queue_is_empty(store->curl_pool)) {
+ curl = (CURL *)(g_queue_pop_head(store->curl_pool));
+ }
+ g_mutex_unlock(store->curl_pool_lock);
+
+ if (curl == NULL)
+ curl = curl_easy_init();
+
+ return curl;
+}
+
+static void put_connection(AzureStore *store, CURL *curl)
+{
+ g_mutex_lock(store->curl_pool_lock);
+ g_queue_push_head(store->curl_pool, curl);
+ while (g_queue_get_length(store->curl_pool) > MAX_IDLE_CONNECTIONS) {
+ curl = (CURL *)(g_queue_pop_tail(store->curl_pool));
+ curl_easy_cleanup(curl);
+ }
+ g_mutex_unlock(store->curl_pool_lock);
+}
+
static void get_extra_headers(gchar *key, gchar *value, GList **headers)
{
key = g_ascii_strdown(key, strlen(key));
* sent. This will compute an Azure authentication signature before sending
* the request. */
static BlueSkyRCStr *submit_request(AzureStore *store,
+ CURL *curl,
const char *method,
const char *path,
GHashTable *headers,
azure_compute_signature(store, headers, method, path);
CURLcode status;
- CURL *curl = curl_easy_init();
#define curl_easy_setopt_safe(opt, val) \
if ((status = curl_easy_setopt(curl, (opt), (val))) != CURLE_OK) { \
cleanup:
if (result != NULL && result_body != NULL)
g_string_free(result_body, TRUE);
- curl_easy_cleanup(curl);
+ curl_easy_reset(curl);
curl_slist_free_all(curl_headers);
g_free(uri);
g_free, g_free);
BlueSkyRCStr *result = NULL;
+ CURL *curl = get_connection(store);
if (async->op == STORE_OP_GET) {
/* FIXME: We ought to check that the response returned the requested
if (async->start != 0 && async->len != 0) {
g_hash_table_insert(headers,
g_strdup("Range"),
- g_strdup_printf("%zd-%zd", async->start,
+ g_strdup_printf("bytes=%zd-%zd", async->start,
async->start + async->len));
async->range_done = TRUE;
} else if (async->start != 0) {
g_hash_table_insert(headers,
g_strdup("Range"),
- g_strdup_printf("%zd-", async->start));
+ g_strdup_printf("bytes=%zd-", async->start));
async->range_done = TRUE;
}
- result = submit_request(store, "GET", async->key, headers, NULL);
+ result = submit_request(store, curl, "GET", async->key, headers, NULL);
if (result != NULL) {
async->data = result;
async->result = 0;
g_hash_table_insert(headers,
g_strdup("Transfer-Encoding"),
g_strdup(""));
- result = submit_request(store, "PUT", async->key,
+ result = submit_request(store, curl, "PUT", async->key,
headers, async->data);
if (result != NULL) {
async->result = 0;
bluesky_store_async_mark_complete(async);
bluesky_store_async_unref(async);
g_hash_table_unref(headers);
+ put_connection(store, curl);
}
static gpointer azurestore_new(const gchar *path)
g_print("Initializing Azure with account %s, container %s\n",
store->account, store->container);
+ store->curl_pool = g_queue_new();
+ store->curl_pool_lock = g_mutex_new();
+
return store;
}
{
bluesky_store_register(&store_impl, "azure");
}
-
-#if 0
-typedef struct {
- enum { S3_GET, S3_PUT } op;
- gchar *key;
- BlueSkyRCStr *data;
-} S3Op;
-
-struct get_info {
- int success;
- GString *buf;
-};
-
-struct put_info {
- int success;
- BlueSkyRCStr *val;
- gint offset;
-};
-
-struct list_info {
- int success;
- char *last_entry;
- gboolean truncated;
-};
-
-static S3Status s3store_get_handler(int bufferSize, const char *buffer,
- void *callbackData)
-{
- struct get_info *info = (struct get_info *)callbackData;
- g_string_append_len(info->buf, buffer, bufferSize);
- return S3StatusOK;
-}
-
-static int s3store_put_handler(int bufferSize, char *buffer,
- void *callbackData)
-{
- struct put_info *info = (struct put_info *)callbackData;
- gint bytes = MIN(bufferSize, (int)(info->val->len - info->offset));
- memcpy(buffer, (char *)info->val->data + info->offset, bytes);
- info->offset += bytes;
- return bytes;
-}
-
-static S3Status s3store_properties_callback(const S3ResponseProperties *properties,
- void *callbackData)
-{
- return S3StatusOK;
-}
-
-static void s3store_response_callback(S3Status status,
- const S3ErrorDetails *errorDetails,
- void *callbackData)
-{
- struct get_info *info = (struct get_info *)callbackData;
-
- if (status == 0) {
- info->success = 1;
- }
-
- if (errorDetails != NULL && errorDetails->message != NULL) {
- g_print(" Error message: %s\n", errorDetails->message);
- }
-}
-
-static void s3store_task(gpointer a, gpointer s)
-{
- BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a;
- S3Store *store = (S3Store *)s;
-
- async->status = ASYNC_RUNNING;
- async->exec_time = bluesky_now_hires();
-
- if (async->op == STORE_OP_GET) {
- struct get_info info;
- info.buf = g_string_new("");
- info.success = 0;
-
- struct S3GetObjectHandler handler;
- handler.responseHandler.propertiesCallback = s3store_properties_callback;
- handler.responseHandler.completeCallback = s3store_response_callback;
- handler.getObjectDataCallback = s3store_get_handler;
-
- S3_get_object(&store->bucket, async->key, NULL,
- async->start, async->len, NULL, &handler, &info);
- async->range_done = TRUE;
-
- if (info.success) {
- async->data = bluesky_string_new_from_gstring(info.buf);
- async->result = 0;
- } else {
- g_string_free(info.buf, TRUE);
- }
-
- } else if (async->op == STORE_OP_PUT) {
- struct put_info info;
- info.success = 0;
- info.val = async->data;
- info.offset = 0;
-
- struct S3PutObjectHandler handler;
- handler.responseHandler.propertiesCallback
- = s3store_properties_callback;
- handler.responseHandler.completeCallback = s3store_response_callback;
- handler.putObjectDataCallback = s3store_put_handler;
-
- S3_put_object(&store->bucket, async->key, async->data->len, NULL, NULL,
- &handler, &info);
-
- if (info.success) {
- async->result = 0;
- } else {
- g_warning("Error completing S3 put operation; client must retry!");
- }
- }
-
- bluesky_store_async_mark_complete(async);
- bluesky_store_async_unref(async);
-}
-
-static S3Status s3store_list_handler(int isTruncated,
- const char *nextMarker,
- int contentsCount,
- const S3ListBucketContent *contents,
- int commonPrefixesCount,
- const char **commonPrefixes,
- void *callbackData)
-{
- struct list_info *info = (struct list_info *)callbackData;
- if (contentsCount > 0) {
- g_free(info->last_entry);
- info->last_entry = g_strdup(contents[contentsCount - 1].key);
- }
- info->truncated = isTruncated;
- return S3StatusOK;
-}
-
-static char *s3store_lookup_last(gpointer s, const char *prefix)
-{
- S3Store *store = (S3Store *)s;
- struct list_info info = {0, NULL, FALSE};
-
- struct S3ListBucketHandler handler;
- handler.responseHandler.propertiesCallback
- = s3store_properties_callback;
- handler.responseHandler.completeCallback = s3store_response_callback;
- handler.listBucketCallback = s3store_list_handler;
-
- char *marker = NULL;
-
- do {
- S3_list_bucket(&store->bucket, prefix, marker, NULL, 1024, NULL,
- &handler, &info);
- g_free(marker);
- marker = g_strdup(info.last_entry);
- g_print("Last key: %s\n", info.last_entry);
- } while (info.truncated);
-
- g_free(marker);
-
- return info.last_entry;
-}
-
-static gpointer s3store_new(const gchar *path)
-{
- S3Store *store = g_new(S3Store, 1);
- store->thread_pool = g_thread_pool_new(s3store_task, store, -1, FALSE,
- NULL);
- if (path == NULL || strlen(path) == 0)
- store->bucket.bucketName = "mvrable-bluesky";
- else
- store->bucket.bucketName = g_strdup(path);
- store->bucket.protocol = S3ProtocolHTTP;
- store->bucket.uriStyle = S3UriStylePath;
- store->bucket.accessKeyId = getenv("AWS_ACCESS_KEY_ID");
- store->bucket.secretAccessKey = getenv("AWS_SECRET_ACCESS_KEY");
-
- const char *key = getenv("BLUESKY_KEY");
- if (key == NULL) {
- g_error("Encryption key not defined; please set BLUESKY_KEY environment variable");
- exit(1);
- }
-
- bluesky_crypt_hash_key(key, store->encryption_key);
-
- g_print("Initializing S3 with bucket %s, access key %s, encryption key %s\n",
- store->bucket.bucketName, store->bucket.accessKeyId, key);
-
- return store;
-}
-
-static void s3store_destroy(gpointer store)
-{
- g_free(store);
-}
-
-static void s3store_submit(gpointer s, BlueSkyStoreAsync *async)
-{
- S3Store *store = (S3Store *)s;
- g_return_if_fail(async->status == ASYNC_NEW);
- g_return_if_fail(async->op != STORE_OP_NONE);
-
- switch (async->op) {
- case STORE_OP_GET:
- case STORE_OP_PUT:
- async->status = ASYNC_PENDING;
- bluesky_store_async_ref(async);
- g_thread_pool_push(store->thread_pool, async, NULL);
- break;
-
- default:
- g_warning("Uknown operation type for S3Store: %d\n", async->op);
- bluesky_store_async_mark_complete(async);
- break;
- }
-}
-
-static void s3store_cleanup(gpointer store, BlueSkyStoreAsync *async)
-{
- GString *buf = (GString *)async->store_private;
-
- if (buf != NULL) {
- g_string_free(buf, TRUE);
- async->store_private = NULL;
- }
-}
-
-static BlueSkyStoreImplementation store_impl = {
- .create = s3store_new,
- .destroy = s3store_destroy,
- .submit = s3store_submit,
- .cleanup = s3store_cleanup,
- .lookup_last = s3store_lookup_last,
-};
-
-void bluesky_store_init_s3(void)
-{
- S3_initialize(NULL, S3_INIT_ALL);
- bluesky_store_register(&store_impl, "s3");
-}
-#endif