Rework cache flushing logic--this version should work much better.
authorMichael Vrable <mvrable@cs.ucsd.edu>
Mon, 29 Mar 2010 19:48:03 +0000 (12:48 -0700)
committerMichael Vrable <mvrable@cs.ucsd.edu>
Mon, 29 Mar 2010 19:48:03 +0000 (12:48 -0700)
bluesky/bluesky-private.h
bluesky/bluesky.h
bluesky/cache.c
bluesky/init.c
bluesky/inode.c
kvstore/kvservice.cc
nfs3/nfs3.c

index 8d2e3fc..ba9df02 100644 (file)
@@ -24,6 +24,10 @@ extern int bluesky_watermark_low_dirty;
 extern int bluesky_watermark_medium_dirty;
 extern int bluesky_watermark_high_dirty;
 
+extern int bluesky_watermark_low_total;
+extern int bluesky_watermark_medium_total;
+extern int bluesky_watermark_high_total;
+
 /* TODO: Make this go away entirely. */
 BlueSkyFS *bluesky_new_fs(gchar *name);
 
index 59a33c5..714ec55 100644 (file)
@@ -129,6 +129,9 @@ typedef struct {
      * is held for list editing purposes.  Items at the head of the list are
      * most recently accessed/modified. */
     GList dirty_list, accessed_list;
+
+    /* Mutex for the flush daemon, to prevent concurrent execution. */
+    GMutex *flushd_lock;
 } BlueSkyFS;
 
 /* Inode number of the root directory. */
@@ -283,6 +286,7 @@ gint bluesky_dirent_compare(gconstpointer a, gconstpointer b,
                             gpointer unused);
 
 void bluesky_flushd_invoke(BlueSkyFS *fs);
+void bluesky_flushd_invoke_conditional(BlueSkyFS *fs);
 void bluesky_inode_do_sync(BlueSkyInode *inode);
 
 void bluesky_debug_dump(BlueSkyFS *fs);
index 211c2b9..5f16ab0 100644 (file)
@@ -13,8 +13,7 @@
 
 #include "bluesky-private.h"
 
-#define WRITEBACK_DELAY (5 * 1000000)
-#define CACHE_CLEAN_DELAY (30 * 1000000)
+#define WRITEBACK_DELAY (20 * 1000000)
 
 /* Filesystem caching and cache coherency. */
 
@@ -41,13 +40,7 @@ static void writeback_complete(gpointer a, gpointer i)
     g_mutex_unlock(inode->lock);
 }
 
-/* Drop cached data for a given inode, if it is clean.  inode must be locked. */
-static void drop_caches(BlueSkyInode *inode)
-{
-    if (inode->type == BLUESKY_REGULAR)
-        bluesky_file_drop_cached(inode);
-}
-
+#if 0
 static void flushd_inode(gpointer value, gpointer user_data)
 {
     BlueSkyFS *fs = (BlueSkyFS *)user_data;
@@ -125,33 +118,162 @@ static void flushd_inode(gpointer value, gpointer user_data)
     g_mutex_unlock(inode->lock);
     bluesky_inode_unref(inode);
 }
+#endif
 
-/* Scan through the cache for dirty data and start flushing it to stable
- * storage.  This does not guarantee that data is committed when it returns.
- * Instead, this can be called occasionally to ensure that dirty data is
- * gradually flushed.
- *
- * We do not want to hold the filesystem lock while flushing individual inodes,
- * a that could lead to deadlock.  So first scan through the inode table to get
- * a reference to all inodes, then process that queue of inodes after dropping
- * the filesystem lock. */
-static void gather_inodes(gpointer key, gpointer value, gpointer user_data)
+static void flushd_dirty_inode(BlueSkyInode *inode)
 {
-    GSList **list = (GSList **)user_data;
-    *list = g_slist_prepend(*list, value);
-    bluesky_inode_ref((BlueSkyInode *)value);
+    BlueSkyFS *fs = inode->fs;
+
+    /* Inode is clean; nothing to do. */
+    if (inode->change_count == inode->change_commit)
+        return;
+
+    /* Inode writeback is in progress; put back on the dirty list. */
+    if (inode->change_pending) {
+        /* Waiting for an earlier writeback to finish, so don't start a new
+         * writeback yet. */
+        g_mutex_lock(fs->lock);
+        inode->change_time = bluesky_get_current_time();
+        bluesky_list_unlink(&fs->dirty_list, inode->dirty_list);
+        inode->dirty_list = bluesky_list_prepend(&fs->dirty_list, inode);
+        g_mutex_unlock(fs->lock);
+        return;
+    }
+
+    g_log("bluesky/flushd", G_LOG_LEVEL_DEBUG,
+          "Starting flush of inode %"PRIu64, inode->inum);
+    inode->change_pending = inode->change_count;
+
+    /* Create a store barrier.  All operations part of the writeback will be
+     * added to this barrier, so when the barrier completes we know that the
+     * writeback is finished. */
+    BlueSkyStoreAsync *barrier = bluesky_store_async_new(fs->store);
+    barrier->op = STORE_OP_BARRIER;
+
+    bluesky_inode_start_sync(inode, barrier);
+
+    bluesky_store_async_add_notifier(barrier, writeback_complete, inode);
+    bluesky_store_async_submit(barrier);
+    bluesky_store_async_unref(barrier);
 }
 
-void bluesky_flushd_invoke(BlueSkyFS *fs)
+/* Try to flush dirty data to disk, either due to memory pressure or due to
+ * timeouts. */
+static void flushd_dirty(BlueSkyFS *fs)
 {
-    GSList *list = NULL;
+    int64_t start_time = bluesky_get_current_time();
+    g_mutex_lock(fs->lock);
+
+    while (1) {
+        BlueSkyInode *inode;
+        if (fs->dirty_list.prev == NULL)
+            break;
+        inode = fs->dirty_list.prev->data;
+
+        g_print("Considering flushing inode %"PRIu64"\n", inode->inum);
+
+        /* Stop processing dirty inodes if we both have enough memory available
+         * and the oldest inode is sufficiently new that it need not be flushed
+         * out. */
+        uint64_t elapsed = bluesky_get_current_time() - inode->change_time;
+        if (g_atomic_int_get(&fs->cache_dirty) < bluesky_watermark_low_dirty
+                && elapsed < WRITEBACK_DELAY)
+            break;
+        if (inode->change_time > start_time)
+            break;
 
+        bluesky_inode_ref(inode);
+
+        bluesky_list_unlink(&fs->dirty_list, fs->dirty_list.prev);
+        inode->dirty_list = NULL;
+
+        g_mutex_unlock(fs->lock);
+
+        g_mutex_lock(inode->lock);
+        flushd_dirty_inode(inode);
+        g_mutex_unlock(inode->lock);
+        bluesky_inode_unref(inode);
+
+        g_mutex_lock(fs->lock);
+    }
+
+    g_mutex_unlock(fs->lock);
+}
+
+/* Drop cached data for a given inode, if it is clean.  inode must be locked. */
+static void drop_caches(BlueSkyInode *inode)
+{
+    if (inode->type == BLUESKY_REGULAR)
+        bluesky_file_drop_cached(inode);
+}
+
+/* Drop clean data fromt the cache if needed due to memory pressure. */
+static void flushd_clean(BlueSkyFS *fs)
+{
     g_mutex_lock(fs->lock);
-    g_hash_table_foreach(fs->inodes, gather_inodes, &list);
+
+    size_t inode_count = g_hash_table_size(fs->inodes);
+    if (!inode_count)
+        inode_count = 1;
+
+    while (inode_count-- > 0) {
+        if (g_atomic_int_get(&fs->cache_total) < bluesky_watermark_medium_total)
+            break;
+
+        BlueSkyInode *inode;
+        if (fs->accessed_list.prev == NULL)
+            break;
+        inode = fs->accessed_list.prev->data;
+
+        g_print("Considering dropping cached data for inode %"PRIu64"\n",
+                inode->inum);
+
+        bluesky_inode_ref(inode);
+
+        bluesky_list_unlink(&fs->accessed_list, fs->accessed_list.prev);
+        inode->accessed_list = bluesky_list_prepend(&fs->accessed_list, inode);
+
+        g_mutex_unlock(fs->lock);
+
+        g_mutex_lock(inode->lock);
+        drop_caches(inode);
+        g_mutex_unlock(inode->lock);
+        bluesky_inode_unref(inode);
+
+        g_mutex_lock(fs->lock);
+    }
+
     g_mutex_unlock(fs->lock);
+}
+
+/* Run the flush daemon for a single iteration, though if it is already
+ * executing returns immediately. */
+static gpointer flushd_task(BlueSkyFS *fs)
+{
+    if (!g_mutex_trylock(fs->flushd_lock))
+        return NULL;
+    flushd_dirty(fs);
+    flushd_clean(fs);
+    g_mutex_unlock(fs->flushd_lock);
 
-    list = g_slist_reverse(list);
-    g_slist_foreach(list, flushd_inode, fs);
+    return NULL;
+}
+
+void bluesky_flushd_invoke(BlueSkyFS *fs)
+{
+    g_thread_create((GThreadFunc)flushd_task, fs, FALSE, NULL);
+}
+
+void bluesky_flushd_invoke_conditional(BlueSkyFS *fs)
+{
+    if (g_atomic_int_get(&fs->cache_dirty) < bluesky_watermark_high_dirty
+        && g_atomic_int_get(&fs->cache_total) < bluesky_watermark_high_total)
+        return;
+
+    g_log("bluesky/flushd", G_LOG_LEVEL_DEBUG,
+          "Too much data; invoking flushd: dirty=%d total=%d",
+          g_atomic_int_get(&fs->cache_dirty),
+          g_atomic_int_get(&fs->cache_total));
 
-    g_slist_free(list);
+    bluesky_flushd_invoke(fs);
 }
index eb2515c..520ed57 100644 (file)
@@ -32,6 +32,10 @@ int bluesky_watermark_low_dirty    = (64 << 20) / BLUESKY_BLOCK_SIZE;
 int bluesky_watermark_medium_dirty = (96 << 20) / BLUESKY_BLOCK_SIZE;
 int bluesky_watermark_high_dirty   = (192 << 20) / BLUESKY_BLOCK_SIZE;
 
+int bluesky_watermark_low_total    = (64 << 20) / BLUESKY_BLOCK_SIZE;
+int bluesky_watermark_medium_total = (128 << 20) / BLUESKY_BLOCK_SIZE;
+int bluesky_watermark_high_total   = (256 << 20) / BLUESKY_BLOCK_SIZE;
+
 /* Environment variables that can be used to initialize settings. */
 static struct {
     const char *env;
index 39feff8..40531cd 100644 (file)
@@ -83,6 +83,7 @@ BlueSkyFS *bluesky_new_fs(gchar *name)
                                   bluesky_fs_key_equal_func);
     fs->next_inum = BLUESKY_ROOT_INUM + 1;
     fs->store = bluesky_store_new("file");
+    fs->flushd_lock = g_mutex_new();
 
     return fs;
 }
index 59e194a..b620f86 100644 (file)
@@ -68,7 +68,7 @@ void KeyValueRpcService::PutValue(
         response->set_result(kvrpc::FAILURE);
     }
 
-    minimum_delay(&start, 1000000);
+    //minimum_delay(&start, 1000000);
     done->Run();
 }
 
@@ -92,7 +92,7 @@ void KeyValueRpcService::GetValue(
         response->set_result(kvrpc::FAILURE);
     }
 
-    minimum_delay(&start, 1000000);
+    //minimum_delay(&start, 1000000);
     done->Run();
 }
 
index b479107..ec5c736 100644 (file)
@@ -328,6 +328,8 @@ void nfsproc3_read_3_svc(read3args *argp, RPCRequest *req)
     memset(&result, 0, sizeof(result));
     char buf[NFS_MAXSIZE];
 
+    bluesky_flushd_invoke_conditional(fs);
+
     BlueSkyInode *inode = lookup_fh(req, &argp->file);
     if (inode == NULL) {
         result.status = NFS3ERR_STALE;
@@ -372,6 +374,8 @@ void nfsproc3_write_3_svc(write3args *argp, RPCRequest *req)
     struct wcc_data wcc;
     memset(&wcc, 0, sizeof(wcc));
 
+    bluesky_flushd_invoke_conditional(fs);
+
     BlueSkyInode *inode = lookup_fh(req, &argp->file);
     if (inode == NULL) {
         result.status = NFS3ERR_STALE;
@@ -380,6 +384,7 @@ void nfsproc3_write_3_svc(write3args *argp, RPCRequest *req)
         return;
     }
 
+#if 0
     /* FIXME: Hack to throttle writes when there is too much dirty data still
      * to be written out. */
     while (g_atomic_int_get(&fs->cache_dirty) > 4096
@@ -392,6 +397,7 @@ void nfsproc3_write_3_svc(write3args *argp, RPCRequest *req)
         delay.tv_nsec = 0;
         nanosleep(&delay, NULL);
     }
+#endif
 
     g_mutex_lock(inode->lock);