/* TODO: We should mark the objects as committed on the cloud until the
* data is flushed and acknowledged. */
- log->location_flags |= CLOUDLOG_CLOUD;
+ log->pending_write |= CLOUDLOG_CLOUD;
bluesky_cloudlog_stats_update(log, 1);
- if (log->dirty_journal != NULL) {
- g_atomic_int_add(&log->dirty_journal->dirty_refs, -1);
- log->dirty_journal = NULL;
- }
+ state->writeback_list = g_slist_prepend(state->writeback_list, log);
+ bluesky_cloudlog_ref(log);
g_mutex_unlock(log->lock);
if (state->data->len > CLOUDLOG_SEGMENT_SIZE)
return log->location;
}
+typedef struct {
+ BlueSkyRCStr *data;
+ GSList *items;
+} SerializedRecord;
+
+static void cloudlog_flush_complete(BlueSkyStoreAsync *async,
+ SerializedRecord *record)
+{
+ g_print("Write of %s to cloud complete, status = %d\n",
+ async->key, async->result);
+
+ if (async->result >= 0) {
+ while (record->items != NULL) {
+ BlueSkyCloudLog *item = (BlueSkyCloudLog *)record->items->data;
+ g_mutex_lock(item->lock);
+ bluesky_cloudlog_stats_update(item, -1);
+ item->pending_write &= ~CLOUDLOG_CLOUD;
+ item->location_flags |= CLOUDLOG_CLOUD;
+ bluesky_cloudlog_stats_update(item, 1);
+ if (item->dirty_journal != NULL) {
+ g_atomic_int_add(&item->dirty_journal->dirty_refs, -1);
+ item->dirty_journal = NULL;
+ }
+ g_mutex_unlock(item->lock);
+ bluesky_cloudlog_unref(item);
+
+ record->items = g_slist_delete_link(record->items, record->items);
+ }
+
+ bluesky_string_unref(record->data);
+ g_slist_free(record->items);
+ g_free(record);
+ } else {
+ g_print("Write should be resubmitted...\n");
+
+ BlueSkyStoreAsync *async2 = bluesky_store_async_new(async->store);
+ async2->op = STORE_OP_PUT;
+ async2->key = g_strdup(async->key);
+ async2->data = record->data;
+ bluesky_string_ref(record->data);
+ bluesky_store_async_submit(async2);
+ bluesky_store_async_add_notifier(async2,
+ (GFunc)cloudlog_flush_complete,
+ record);
+ bluesky_store_async_unref(async2);
+ }
+}
+
/* Finish up a partially-written cloud log segment and flush it to storage. */
void bluesky_cloudlog_flush(BlueSkyFS *fs)
{
/* TODO: Append some type of commit record to the log segment? */
g_print("Serializing %zd bytes of data to cloud\n", state->data->len);
+ SerializedRecord *record = g_new0(SerializedRecord, 1);
+ record->data = bluesky_string_new_from_gstring(state->data);
+ record->items = state->writeback_list;
+ state->writeback_list = NULL;
BlueSkyStoreAsync *async = bluesky_store_async_new(fs->store);
async->op = STORE_OP_PUT;
async->key = g_strdup_printf("log-%08d-%08d",
state->location.directory,
state->location.sequence);
- async->data = bluesky_string_new_from_gstring(state->data);
+ async->data = record->data;
+ bluesky_string_ref(record->data);
bluesky_store_async_submit(async);
- //bluesky_store_async_wait(async);
+ bluesky_store_async_add_notifier(async,
+ (GFunc)cloudlog_flush_complete,
+ record);
bluesky_store_async_unref(async);
state->location.sequence++;
g_atomic_int_add(&cachefile->refcount, -1);
}
+static void cloudlog_fetch_complete(BlueSkyStoreAsync *async,
+ BlueSkyCacheFile *cachefile);
+
+static void cloudlog_fetch_start(BlueSkyCacheFile *cachefile)
+{
+ g_atomic_int_inc(&cachefile->refcount);
+ cachefile->fetching = TRUE;
+ g_print("Starting fetch of %s from cloud\n", cachefile->filename);
+ BlueSkyStoreAsync *async = bluesky_store_async_new(cachefile->fs->store);
+ async->op = STORE_OP_GET;
+ async->key = g_strdup(cachefile->filename);
+ bluesky_store_async_add_notifier(async,
+ (GFunc)cloudlog_fetch_complete,
+ cachefile);
+ bluesky_store_async_submit(async);
+ bluesky_store_async_unref(async);
+}
+
static void cloudlog_fetch_complete(BlueSkyStoreAsync *async,
BlueSkyCacheFile *cachefile)
{
g_print("Fetch of %s from cloud complete, status = %d\n",
async->key, async->result);
- if (async->result < 0)
- return;
-
g_mutex_lock(cachefile->lock);
- char *pathname = g_strdup_printf("%s/%s", cachefile->log->log_directory,
- cachefile->filename);
- if (!g_file_set_contents(pathname, async->data->data, async->data->len,
- NULL))
- {
- g_print("Error writing out fetched file to cache!\n");
+ if (async->result >= 0) {
+ char *pathname = g_strdup_printf("%s/%s",
+ cachefile->log->log_directory,
+ cachefile->filename);
+ if (!g_file_set_contents(pathname, async->data->data, async->data->len,
+ NULL))
+ g_print("Error writing out fetched file to cache!\n");
+ g_free(pathname);
+
+ cachefile->fetching = FALSE;
+ cachefile->ready = TRUE;
+ } else {
+ g_print("Error fetching from cloud, retrying...\n");
+ cloudlog_fetch_start(cachefile);
}
- g_free(pathname);
- cachefile->fetching = FALSE;
- cachefile->ready = TRUE;
+
bluesky_cachefile_unref(cachefile);
g_cond_broadcast(cachefile->cond);
g_mutex_unlock(cachefile->lock);
g_print("Adding cache file %s\n", logname);
map = g_new0(BlueSkyCacheFile, 1);
+ map->fs = fs;
map->type = type;
map->lock = g_mutex_new();
map->type = type;
g_hash_table_insert(log->mmap_cache, map->filename, map);
// If the log file is stored in the cloud, we may need to fetch it
- if (clouddir >= 0) {
- g_atomic_int_inc(&map->refcount);
- map->fetching = TRUE;
- g_print("Starting fetch of %s from cloud\n", logname);
- BlueSkyStoreAsync *async = bluesky_store_async_new(fs->store);
- async->op = STORE_OP_GET;
- async->key = g_strdup(logname);
- bluesky_store_async_add_notifier(async,
- (GFunc)cloudlog_fetch_complete,
- map);
- bluesky_store_async_submit(async);
- bluesky_store_async_unref(async);
- }
+ if (clouddir >= 0)
+ cloudlog_fetch_start(map);
} else {
g_mutex_lock(map->lock);
}