From: Michael Vrable Date: Sun, 22 Aug 2010 05:42:35 +0000 (-0700) Subject: Implement new scheme for retaining needed journal segments. X-Git-Url: https://git.vrable.net/?a=commitdiff_plain;h=774148ec71a99b6acf1023fd990ea975a6b92780;p=bluesky.git Implement new scheme for retaining needed journal segments. Write full filesystem snapshots to the cloud, and keep track of the journal position before the snapshot process starts. When it finishes, the journal segments before that mark can be reclaimed (if needed). This could be improved but should at least be safe. --- diff --git a/bluesky/bluesky-private.h b/bluesky/bluesky-private.h index 8601228..7652cff 100644 --- a/bluesky/bluesky-private.h +++ b/bluesky/bluesky-private.h @@ -225,6 +225,7 @@ struct _BlueSkyCloudLogState { BlueSkyCloudPointer location; GList *inode_list; GSList *writeback_list; // Items which are being serialized right now + GList *pending_segments; // Segments which are being uploaded now }; gboolean bluesky_cloudlog_equal(gconstpointer a, gconstpointer b); @@ -266,6 +267,10 @@ struct _BlueSkyLog { /* A count of the disk space consumed (in 1024-byte units) by all files * tracked by mmap_cache (whether mapped or not, actually). */ gint disk_used; + + /* The smallest journal sequence number which may still contain data that + * must be preserved (since it it not yet in the cloud). */ + int journal_watermark; }; /* An object for tracking log files which are stored locally--either the @@ -300,6 +305,15 @@ BlueSkyCacheFile *bluesky_cachefile_lookup(BlueSkyFS *fs, int clouddir, int log_seq); void bluesky_cachefile_gc(BlueSkyFS *fs); +/* Used to track log segments that are being written to the cloud. */ +typedef struct { + BlueSkyRCStr *data; + GSList *items; + GMutex *lock; + GCond *cond; + gboolean complete; +} SerializedRecord; + #ifdef __cplusplus } #endif diff --git a/bluesky/cache.c b/bluesky/cache.c index 50c0f36..a9fda5b 100644 --- a/bluesky/cache.c +++ b/bluesky/cache.c @@ -101,15 +101,21 @@ static void flushd_dirty(BlueSkyFS *fs) g_mutex_unlock(fs->lock); } -/* Try to flush dirty data to the cloud. - * TODO: Rewrite this to work on cloud log items rather than inodes, so we can - * better track which logs are fully synchronized to the cloud and can be - * garbage collected if needed? */ +/* Try to flush dirty data to the cloud. This will take a snapshot of the + * entire filesystem (though only point-in-time consistent for isolated inodes + * and not the filesystem as a whole) and ensure all data is written to the + * cloud. When the write completes, we will allow old journal segments (those + * that were fully written _before_ the snapshot process started) to be garbage + * collected. Newer journal segments can't be collected yet since they may + * still contain data which has not been written persistently to the cloud. */ static void flushd_cloud(BlueSkyFS *fs) { - int64_t start_time = bluesky_get_current_time(); g_mutex_lock(fs->lock); + /* TODO: Locking? Since we're reading a single variable this is probably + * atomic but a lock could be safer. */ + int journal_seq_start = fs->log->seq_num; + while (1) { BlueSkyInode *inode; if (fs->dirty_list.prev == NULL) @@ -121,16 +127,6 @@ static void flushd_cloud(BlueSkyFS *fs) "Flushing inode %"PRIu64" to cloud", inode->inum); } - /* Stop processing dirty inodes if we both have enough memory available - * and the oldest inode is sufficiently new that it need not be flushed - * out. */ - uint64_t elapsed = bluesky_get_current_time() - inode->change_time; - if (g_atomic_int_get(&fs->cache_dirty) < bluesky_watermark_low_dirty - && elapsed < WRITEBACK_DELAY) - break; - if (inode->change_time > start_time) - break; - bluesky_inode_ref(inode); g_mutex_unlock(fs->lock); @@ -156,6 +152,30 @@ static void flushd_cloud(BlueSkyFS *fs) g_mutex_unlock(fs->lock); bluesky_cloudlog_flush(fs); + + /* Wait until all segments have been written to the cloud, so that it + * becomes safe to free up journal segments. */ + while (fs->log_state->pending_segments != NULL) { + SerializedRecord *segment + = (SerializedRecord *)fs->log_state->pending_segments->data; + g_mutex_lock(segment->lock); + while (!segment->complete) + g_cond_wait(segment->cond, segment->lock); + g_mutex_unlock(segment->lock); + + g_mutex_free(segment->lock); + g_cond_free(segment->cond); + g_free(segment); + + fs->log_state->pending_segments + = g_list_delete_link(fs->log_state->pending_segments, + fs->log_state->pending_segments); + } + + g_print("All segments have been flushed, journal < %d is clean\n", + journal_seq_start); + + fs->log->journal_watermark = journal_seq_start; } /* Drop cached data for a given inode, if it is clean. inode must be locked. */ diff --git a/bluesky/cloudlog.c b/bluesky/cloudlog.c index e842fbf..a47f5b0 100644 --- a/bluesky/cloudlog.c +++ b/bluesky/cloudlog.c @@ -279,17 +279,13 @@ BlueSkyCloudPointer bluesky_cloudlog_serialize(BlueSkyCloudLog *log, return log->location; } -typedef struct { - BlueSkyRCStr *data; - GSList *items; -} SerializedRecord; - static void cloudlog_flush_complete(BlueSkyStoreAsync *async, SerializedRecord *record) { g_print("Write of %s to cloud complete, status = %d\n", async->key, async->result); + g_mutex_lock(record->lock); if (async->result >= 0) { while (record->items != NULL) { BlueSkyCloudLog *item = (BlueSkyCloudLog *)record->items->data; @@ -305,8 +301,11 @@ static void cloudlog_flush_complete(BlueSkyStoreAsync *async, } bluesky_string_unref(record->data); + record->data = NULL; g_slist_free(record->items); - g_free(record); + record->items = NULL; + record->complete = TRUE; + g_cond_broadcast(record->cond); } else { g_print("Write should be resubmitted...\n"); @@ -321,6 +320,7 @@ static void cloudlog_flush_complete(BlueSkyStoreAsync *async, record); bluesky_store_async_unref(async2); } + g_mutex_unlock(record->lock); } /* Finish up a partially-written cloud log segment and flush it to storage. */ @@ -336,6 +336,8 @@ void bluesky_cloudlog_flush(BlueSkyFS *fs) SerializedRecord *record = g_new0(SerializedRecord, 1); record->data = bluesky_string_new_from_gstring(state->data); record->items = state->writeback_list; + record->lock = g_mutex_new(); + record->cond = g_cond_new(); state->writeback_list = NULL; BlueSkyStoreAsync *async = bluesky_store_async_new(fs->store); @@ -351,6 +353,8 @@ void bluesky_cloudlog_flush(BlueSkyFS *fs) record); bluesky_store_async_unref(async); + state->pending_segments = g_list_prepend(state->pending_segments, record); + state->location.sequence++; state->location.offset = 0; state->data = g_string_new(""); diff --git a/bluesky/log.c b/bluesky/log.c index 5fd6e28..c8641b7 100644 --- a/bluesky/log.c +++ b/bluesky/log.c @@ -520,10 +520,23 @@ void bluesky_cachefile_gc(BlueSkyFS *fs) g_print(" (fetching)"); g_print("\n"); + gboolean deletion_candidate = FALSE; if (g_atomic_int_get(&fs->log->disk_used) > DISK_CACHE_SIZE_TARGET && g_atomic_int_get(&cachefile->refcount) == 0 && g_atomic_int_get(&cachefile->mapcount) == 0) { + deletion_candidate = TRUE; + } + + /* Don't allow journal files to be reclaimed until all data is + * known to be durably stored in the cloud. */ + if (cachefile->type == CLOUDLOG_JOURNAL + && cachefile->log_seq >= fs->log->journal_watermark) + { + deletion_candidate = FALSE; + } + + if (deletion_candidate) { g_print(" ...deleting\n"); if (unlinkat(fs->log->dirfd, cachefile->filename, 0) < 0) { fprintf(stderr, "Unable to unlink journal %s: %m\n",