X-Git-Url: http://git.vrable.net/?a=blobdiff_plain;f=bluesky%2Fcloudlog.c;h=40067fd3661f91d4ec9e45a9019a73d9043c8bd6;hb=a82b60b3b683840a7074110831bcbaa16a40f0eb;hp=8b88f64c44f9dccaf880dbf947886b650e7203b5;hpb=ef355dcfecf0dad2d95ff5fb3d847f1bf8b9ebe5;p=bluesky.git diff --git a/bluesky/cloudlog.c b/bluesky/cloudlog.c index 8b88f64..40067fd 100644 --- a/bluesky/cloudlog.c +++ b/bluesky/cloudlog.c @@ -158,8 +158,6 @@ void bluesky_cloudlog_unref(BlueSkyCloudLog *log) } g_array_unref(log->links); bluesky_string_unref(log->data); - if (log->dirty_journal != NULL) - g_atomic_int_add(&log->dirty_journal->dirty_refs, -1); g_free(log); } } @@ -233,7 +231,7 @@ BlueSkyCloudPointer bluesky_cloudlog_serialize(BlueSkyCloudLog *log, { BlueSkyCloudLogState *state = fs->log_state; - if (log->location_flags & CLOUDLOG_CLOUD) { + if ((log->location_flags | log->pending_write) & CLOUDLOG_CLOUD) { return log->location; } @@ -267,6 +265,16 @@ BlueSkyCloudPointer bluesky_cloudlog_serialize(BlueSkyCloudLog *log, g_string_append_len(state->data, (const char *)&header, sizeof(header)); g_string_append_len(state->data, log->data->data, log->data->len); + /* If the object we flushed was an inode, update the inode map. */ + if (log->type == LOGTYPE_INODE) { + g_mutex_lock(fs->lock); + InodeMapEntry *entry = bluesky_inode_map_lookup(fs->inode_map, + log->inum, 1); + entry->id = log->id; + entry->location = log->location; + g_mutex_unlock(fs->lock); + } + /* TODO: We should mark the objects as committed on the cloud until the * data is flushed and acknowledged. */ log->pending_write |= CLOUDLOG_CLOUD; @@ -281,17 +289,13 @@ BlueSkyCloudPointer bluesky_cloudlog_serialize(BlueSkyCloudLog *log, return log->location; } -typedef struct { - BlueSkyRCStr *data; - GSList *items; -} SerializedRecord; - static void cloudlog_flush_complete(BlueSkyStoreAsync *async, SerializedRecord *record) { g_print("Write of %s to cloud complete, status = %d\n", async->key, async->result); + g_mutex_lock(record->lock); if (async->result >= 0) { while (record->items != NULL) { BlueSkyCloudLog *item = (BlueSkyCloudLog *)record->items->data; @@ -300,10 +304,6 @@ static void cloudlog_flush_complete(BlueSkyStoreAsync *async, item->pending_write &= ~CLOUDLOG_CLOUD; item->location_flags |= CLOUDLOG_CLOUD; bluesky_cloudlog_stats_update(item, 1); - if (item->dirty_journal != NULL) { - g_atomic_int_add(&item->dirty_journal->dirty_refs, -1); - item->dirty_journal = NULL; - } g_mutex_unlock(item->lock); bluesky_cloudlog_unref(item); @@ -311,8 +311,11 @@ static void cloudlog_flush_complete(BlueSkyStoreAsync *async, } bluesky_string_unref(record->data); + record->data = NULL; g_slist_free(record->items); - g_free(record); + record->items = NULL; + record->complete = TRUE; + g_cond_broadcast(record->cond); } else { g_print("Write should be resubmitted...\n"); @@ -327,6 +330,7 @@ static void cloudlog_flush_complete(BlueSkyStoreAsync *async, record); bluesky_store_async_unref(async2); } + g_mutex_unlock(record->lock); } /* Finish up a partially-written cloud log segment and flush it to storage. */ @@ -342,6 +346,8 @@ void bluesky_cloudlog_flush(BlueSkyFS *fs) SerializedRecord *record = g_new0(SerializedRecord, 1); record->data = bluesky_string_new_from_gstring(state->data); record->items = state->writeback_list; + record->lock = g_mutex_new(); + record->cond = g_cond_new(); state->writeback_list = NULL; BlueSkyStoreAsync *async = bluesky_store_async_new(fs->store); @@ -357,6 +363,8 @@ void bluesky_cloudlog_flush(BlueSkyFS *fs) record); bluesky_store_async_unref(async); + state->pending_segments = g_list_prepend(state->pending_segments, record); + state->location.sequence++; state->location.offset = 0; state->data = g_string_new("");