From: Michael Vrable Date: Fri, 20 Aug 2010 00:16:53 +0000 (-0700) Subject: Make cloud storage more robust. X-Git-Url: http://git.vrable.net/?p=bluesky.git;a=commitdiff_plain;h=ef355dcfecf0dad2d95ff5fb3d847f1bf8b9ebe5 Make cloud storage more robust. - Do not consider data committed until we get a reply from the cloud. - Add retries on write and on read. --- diff --git a/bluesky/bluesky-private.h b/bluesky/bluesky-private.h index c066fd3..c527ee6 100644 --- a/bluesky/bluesky-private.h +++ b/bluesky/bluesky-private.h @@ -231,6 +231,7 @@ struct _BlueSkyCloudLogState { GString *data; BlueSkyCloudPointer location; GList *inode_list; + GSList *writeback_list; // Items which are being serialized right now }; gboolean bluesky_cloudlog_equal(gconstpointer a, gconstpointer b); diff --git a/bluesky/cloudlog.c b/bluesky/cloudlog.c index 36895fe..8b88f64 100644 --- a/bluesky/cloudlog.c +++ b/bluesky/cloudlog.c @@ -269,12 +269,10 @@ BlueSkyCloudPointer bluesky_cloudlog_serialize(BlueSkyCloudLog *log, /* TODO: We should mark the objects as committed on the cloud until the * data is flushed and acknowledged. */ - log->location_flags |= CLOUDLOG_CLOUD; + log->pending_write |= CLOUDLOG_CLOUD; bluesky_cloudlog_stats_update(log, 1); - if (log->dirty_journal != NULL) { - g_atomic_int_add(&log->dirty_journal->dirty_refs, -1); - log->dirty_journal = NULL; - } + state->writeback_list = g_slist_prepend(state->writeback_list, log); + bluesky_cloudlog_ref(log); g_mutex_unlock(log->lock); if (state->data->len > CLOUDLOG_SEGMENT_SIZE) @@ -283,6 +281,54 @@ BlueSkyCloudPointer bluesky_cloudlog_serialize(BlueSkyCloudLog *log, return log->location; } +typedef struct { + BlueSkyRCStr *data; + GSList *items; +} SerializedRecord; + +static void cloudlog_flush_complete(BlueSkyStoreAsync *async, + SerializedRecord *record) +{ + g_print("Write of %s to cloud complete, status = %d\n", + async->key, async->result); + + if (async->result >= 0) { + while (record->items != NULL) { + BlueSkyCloudLog *item = (BlueSkyCloudLog *)record->items->data; + g_mutex_lock(item->lock); + bluesky_cloudlog_stats_update(item, -1); + item->pending_write &= ~CLOUDLOG_CLOUD; + item->location_flags |= CLOUDLOG_CLOUD; + bluesky_cloudlog_stats_update(item, 1); + if (item->dirty_journal != NULL) { + g_atomic_int_add(&item->dirty_journal->dirty_refs, -1); + item->dirty_journal = NULL; + } + g_mutex_unlock(item->lock); + bluesky_cloudlog_unref(item); + + record->items = g_slist_delete_link(record->items, record->items); + } + + bluesky_string_unref(record->data); + g_slist_free(record->items); + g_free(record); + } else { + g_print("Write should be resubmitted...\n"); + + BlueSkyStoreAsync *async2 = bluesky_store_async_new(async->store); + async2->op = STORE_OP_PUT; + async2->key = g_strdup(async->key); + async2->data = record->data; + bluesky_string_ref(record->data); + bluesky_store_async_submit(async2); + bluesky_store_async_add_notifier(async2, + (GFunc)cloudlog_flush_complete, + record); + bluesky_store_async_unref(async2); + } +} + /* Finish up a partially-written cloud log segment and flush it to storage. */ void bluesky_cloudlog_flush(BlueSkyFS *fs) { @@ -293,15 +339,22 @@ void bluesky_cloudlog_flush(BlueSkyFS *fs) /* TODO: Append some type of commit record to the log segment? */ g_print("Serializing %zd bytes of data to cloud\n", state->data->len); + SerializedRecord *record = g_new0(SerializedRecord, 1); + record->data = bluesky_string_new_from_gstring(state->data); + record->items = state->writeback_list; + state->writeback_list = NULL; BlueSkyStoreAsync *async = bluesky_store_async_new(fs->store); async->op = STORE_OP_PUT; async->key = g_strdup_printf("log-%08d-%08d", state->location.directory, state->location.sequence); - async->data = bluesky_string_new_from_gstring(state->data); + async->data = record->data; + bluesky_string_ref(record->data); bluesky_store_async_submit(async); - //bluesky_store_async_wait(async); + bluesky_store_async_add_notifier(async, + (GFunc)cloudlog_flush_complete, + record); bluesky_store_async_unref(async); state->location.sequence++; diff --git a/bluesky/log.c b/bluesky/log.c index 7146506..a904fe1 100644 --- a/bluesky/log.c +++ b/bluesky/log.c @@ -300,26 +300,47 @@ void bluesky_cachefile_unref(BlueSkyCacheFile *cachefile) g_atomic_int_add(&cachefile->refcount, -1); } +static void cloudlog_fetch_complete(BlueSkyStoreAsync *async, + BlueSkyCacheFile *cachefile); + +static void cloudlog_fetch_start(BlueSkyCacheFile *cachefile) +{ + g_atomic_int_inc(&cachefile->refcount); + cachefile->fetching = TRUE; + g_print("Starting fetch of %s from cloud\n", cachefile->filename); + BlueSkyStoreAsync *async = bluesky_store_async_new(cachefile->fs->store); + async->op = STORE_OP_GET; + async->key = g_strdup(cachefile->filename); + bluesky_store_async_add_notifier(async, + (GFunc)cloudlog_fetch_complete, + cachefile); + bluesky_store_async_submit(async); + bluesky_store_async_unref(async); +} + static void cloudlog_fetch_complete(BlueSkyStoreAsync *async, BlueSkyCacheFile *cachefile) { g_print("Fetch of %s from cloud complete, status = %d\n", async->key, async->result); - if (async->result < 0) - return; - g_mutex_lock(cachefile->lock); - char *pathname = g_strdup_printf("%s/%s", cachefile->log->log_directory, - cachefile->filename); - if (!g_file_set_contents(pathname, async->data->data, async->data->len, - NULL)) - { - g_print("Error writing out fetched file to cache!\n"); + if (async->result >= 0) { + char *pathname = g_strdup_printf("%s/%s", + cachefile->log->log_directory, + cachefile->filename); + if (!g_file_set_contents(pathname, async->data->data, async->data->len, + NULL)) + g_print("Error writing out fetched file to cache!\n"); + g_free(pathname); + + cachefile->fetching = FALSE; + cachefile->ready = TRUE; + } else { + g_print("Error fetching from cloud, retrying...\n"); + cloudlog_fetch_start(cachefile); } - g_free(pathname); - cachefile->fetching = FALSE; - cachefile->ready = TRUE; + bluesky_cachefile_unref(cachefile); g_cond_broadcast(cachefile->cond); g_mutex_unlock(cachefile->lock); @@ -362,6 +383,7 @@ BlueSkyCacheFile *bluesky_cachefile_lookup(BlueSkyFS *fs, g_print("Adding cache file %s\n", logname); map = g_new0(BlueSkyCacheFile, 1); + map->fs = fs; map->type = type; map->lock = g_mutex_new(); map->type = type; @@ -376,19 +398,8 @@ BlueSkyCacheFile *bluesky_cachefile_lookup(BlueSkyFS *fs, g_hash_table_insert(log->mmap_cache, map->filename, map); // If the log file is stored in the cloud, we may need to fetch it - if (clouddir >= 0) { - g_atomic_int_inc(&map->refcount); - map->fetching = TRUE; - g_print("Starting fetch of %s from cloud\n", logname); - BlueSkyStoreAsync *async = bluesky_store_async_new(fs->store); - async->op = STORE_OP_GET; - async->key = g_strdup(logname); - bluesky_store_async_add_notifier(async, - (GFunc)cloudlog_fetch_complete, - map); - bluesky_store_async_submit(async); - bluesky_store_async_unref(async); - } + if (clouddir >= 0) + cloudlog_fetch_start(map); } else { g_mutex_lock(map->lock); }