GMutex *lock;
GCond *completion_cond; /* Used to wait for operation to complete. */
+ gint refcount; /* Reference count for destruction. */
+
BlueSkyAsyncStatus status;
BlueSkyStoreOp op;
const gchar *name);
BlueSkyStoreAsync *bluesky_store_async_new(BlueSkyStore *store);
+void bluesky_store_async_ref(BlueSkyStoreAsync *async);
+void bluesky_store_async_unref(BlueSkyStoreAsync *async);
void bluesky_store_async_wait(BlueSkyStoreAsync *async);
void bluesky_store_async_mark_complete(BlueSkyStoreAsync *async);
+void bluesky_store_async_submit(BlueSkyStoreAsync *async);
#ifdef __cplusplus
}
g_checksum_update(csum, data->data, data->len);
gchar *name = g_strdup(g_checksum_get_string(csum));
- bluesky_store_put(fs->store, name, data);
+ /* Store the file data asynchronously, and don't bother waiting for a
+ * response. */
+ BlueSkyStoreAsync *async = bluesky_store_async_new(fs->store);
+ async->op = STORE_OP_PUT;
+ async->key = g_strdup(name);
+ bluesky_string_ref(data);
+ async->data = data;
+ bluesky_store_async_submit(async);
+ bluesky_store_async_unref(async);
+
g_free(block->ref);
block->ref = name;
async->result = 0;
}
- // TODO: Deallocate resources
-
g_print("Finish task...\n");
bluesky_store_async_mark_complete(async);
+ bluesky_store_async_unref(async);
}
static gpointer s3store_new()
case STORE_OP_GET:
case STORE_OP_PUT:
async->status = ASYNC_PENDING;
+ bluesky_store_async_ref(async);
g_thread_pool_push(store->thread_pool, async, NULL);
break;
async->store = store;
async->lock = g_mutex_new();
async->completion_cond = g_cond_new();
+ async->refcount = 1;
async->status = ASYNC_NEW;
async->op = STORE_OP_NONE;
async->key = NULL;
return async;
}
+void bluesky_store_async_ref(BlueSkyStoreAsync *async)
+{
+ if (async == NULL)
+ return;
+
+ g_atomic_int_inc(&async->refcount);
+}
+
+void bluesky_store_async_unref(BlueSkyStoreAsync *async)
+{
+ if (async == NULL)
+ return;
+
+ if (g_atomic_int_dec_and_test(&async->refcount)) {
+ async->store->impl->cleanup(async->store->handle, async);
+ g_mutex_free(async->lock);
+ g_cond_free(async->completion_cond);
+ g_free(async->key);
+ bluesky_string_unref(async->data);
+ g_free(async);
+ g_print("Freed async\n");
+ }
+}
+
/* Block until the given operation has completed. */
void bluesky_store_async_wait(BlueSkyStoreAsync *async)
{
while (async->status != ASYNC_COMPLETE) {
g_cond_wait(async->completion_cond, async->lock);
}
+
+ g_mutex_unlock(async->lock);
}
/* Mark an asynchronous operation as complete. This should only be called by
g_cond_broadcast(async->completion_cond);
}
+void bluesky_store_async_submit(BlueSkyStoreAsync *async)
+{
+ BlueSkyStore *store = async->store;
+
+ store->impl->submit(store->handle, async);
+}
+
/* Convenience wrappers that perform a single operation synchronously. */
BlueSkyRCStr *bluesky_store_get(BlueSkyStore *store, const gchar *key)
{
BlueSkyRCStr *data = async->data;
bluesky_string_ref(data);
+ bluesky_store_async_unref(async);
return data;
}
store->impl->submit(store->handle, async);
bluesky_store_async_wait(async);
+ bluesky_store_async_unref(async);
}
#if 0