Basic filesystem journaling.
authorMichael Vrable <mvrable@cs.ucsd.edu>
Sun, 18 Jul 2010 04:35:54 +0000 (21:35 -0700)
committerMichael Vrable <mvrable@cs.ucsd.edu>
Sun, 18 Jul 2010 04:35:54 +0000 (21:35 -0700)
Infrastructure for writing log entries synchronously (though the log format
is not yet finished and isn't yet useful), and a partial hook into the
BlueSky filesystem.

bluesky/CMakeLists.txt
bluesky/bluesky-private.h
bluesky/bluesky.h
bluesky/inode.c
bluesky/log.c [new file with mode: 0644]

index f7efc46..3b88d16 100644 (file)
@@ -3,8 +3,9 @@ include_directories("${LIBS3_BUILD_DIR}/include" ${KVSTORE_DIR})
 link_directories("${LIBS3_BUILD_DIR}/lib" ${KVSTORE_DIR})
 
 add_library(bluesky SHARED
-            cache.c crypto.c debug.c dir.c file.c init.c inode.c serialize.c
-            store.c store-bdb.c store-kv.cc store-multi.c store-s3.c util.c)
+            cache.c crypto.c debug.c dir.c file.c init.c inode.c log.c
+            serialize.c store.c store-bdb.c store-kv.cc store-multi.c
+            store-s3.c util.c)
 add_executable(bluesky-test main.c)
 
 set(CMAKE_C_FLAGS "-Wall -std=gnu99 ${CMAKE_C_FLAGS}")
index 5080e00..ddf1d34 100644 (file)
@@ -149,6 +149,28 @@ void bluesky_block_flush(BlueSkyFS *fs, BlueSkyBlock *block,
 void bluesky_file_flush(BlueSkyInode *inode, BlueSkyStoreAsync *barrier);
 void bluesky_file_drop_cached(BlueSkyInode *inode);
 
+/* Logging infrastructure for ensuring operations are persistently recorded to
+ * disk. */
+struct _BlueSkyLog {
+    char *log_directory;
+    GAsyncQueue *queue;
+    int fd;
+    int seq_num;
+};
+
+typedef struct {
+    gboolean committed;
+    GMutex *lock;
+    GCond *cond;
+    char *key;
+    BlueSkyRCStr *data;
+} BlueSkyLogItem;
+
+BlueSkyLog *bluesky_log_new(const char *log_directory);
+BlueSkyLogItem *bluesky_log_item_new();
+void bluesky_log_item_submit(BlueSkyLogItem *item, BlueSkyLog *log);
+void bluesky_log_item_finish(BlueSkyLogItem *item);
+
 #ifdef __cplusplus
 }
 #endif
index 29d9a62..beec075 100644 (file)
@@ -93,6 +93,9 @@ BlueSkyRCStr *bluesky_crypt_decrypt(BlueSkyRCStr *in, const uint8_t *key);
 struct _BlueSkyStore;
 typedef struct _BlueSkyStore BlueSkyStore;
 
+struct _BlueSkyLog;
+typedef struct _BlueSkyLog BlueSkyLog;
+
 void bluesky_store_init();
 BlueSkyStore *bluesky_store_new(const gchar *type);
 void bluesky_store_free(BlueSkyStore *store);
@@ -126,6 +129,7 @@ typedef struct {
     uint64_t next_inum;         /* Next available inode for allocation */
 
     BlueSkyStore *store;
+    BlueSkyLog *log;
 
     /* Accounting for memory used for caches.  Space is measured in blocks, not
      * bytes.  We track both total data in the caches and dirty data (total
index 5977955..a0b3c6a 100644 (file)
@@ -106,6 +106,7 @@ BlueSkyFS *bluesky_init_fs(gchar *name, BlueSkyStore *store)
     g_print("Initializing fresh filesystem\n");
     BlueSkyFS *fs = bluesky_new_fs(name);
     fs->store = store;
+    fs->log = bluesky_log_new("journal");
 
     BlueSkyInode *root = bluesky_new_inode(BLUESKY_ROOT_INUM, fs,
                                            BLUESKY_DIRECTORY);
@@ -308,6 +309,13 @@ void bluesky_inode_start_sync(BlueSkyInode *inode, BlueSkyStoreAsync *barrier)
     char key[64];
     sprintf(key, "inode-%016"PRIx64, inode->inum);
 
+    BlueSkyLogItem *log_item = bluesky_log_item_new();
+    log_item->key = g_strdup(key);
+    log_item->data = data;
+    bluesky_string_ref(data);
+    bluesky_log_item_submit(log_item, fs->log);
+    bluesky_log_item_finish(log_item);
+
     BlueSkyStoreAsync *async = bluesky_store_async_new(fs->store);
     async->op = STORE_OP_PUT;
     async->key = g_strdup(key);
diff --git a/bluesky/log.c b/bluesky/log.c
new file mode 100644 (file)
index 0000000..a4a8150
--- /dev/null
@@ -0,0 +1,158 @@
+/* Blue Sky: File Systems in the Cloud
+ *
+ * Copyright (C) 2010  The Regents of the University of California
+ * Written by Michael Vrable <mvrable@cs.ucsd.edu>
+ *
+ * TODO: Licensing
+ */
+
+#define _GNU_SOURCE
+#define _ATFILE_SOURCE
+
+#include <stdio.h>
+#include <stdint.h>
+#include <stdlib.h>
+#include <glib.h>
+#include <string.h>
+#include <errno.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <unistd.h>
+
+#include "bluesky-private.h"
+
+/* The logging layer for BlueSky.  This is used to write filesystem changes
+ * durably to disk so that they can be recovered in the event of a system
+ * crash. */
+
+/* The logging layer takes care out writing out a sequence of log records to
+ * disk.  On disk, each record consists of a header, a data payload, and a
+ * footer.  The footer contains a checksum of the record, meant to help with
+ * identifying corrupt log records (we would assume because the log record was
+ * only incompletely written out before a crash, which should only happen for
+ * log records that were not considered committed). */
+
+// Rough size limit for a log segment.  This is not a firm limit and there are
+// no absolute guarantees on the size of a log segment.
+#define LOG_SEGMENT_SIZE (1 << 20)
+
+static void writebuf(int fd, const char *buf, size_t len)
+{
+    while (len > 0) {
+        ssize_t written;
+        written = write(fd, buf, len);
+        if (written < 0 && errno == EINTR)
+            continue;
+        g_assert(written >= 0);
+        buf += written;
+        len -= written;
+    }
+}
+
+/* All log writes (at least for a single log) are made by one thread, so we
+ * don't need to worry about concurrent access to the log file.  Log items to
+ * write are pulled off a queue (and so may be posted by any thread).
+ * fdatasync() is used to ensure the log items are stable on disk.
+ *
+ * The log is broken up into separate files, roughly of size LOG_SEGMENT_SIZE
+ * each.  If a log segment is not currently open (log->fd is negative), a new
+ * one is created.  Log segment filenames are assigned sequentially.
+ *
+ * Log replay ought to be implemented later, and ought to set the initial
+ * sequence number appropriately.
+ */
+static gpointer log_thread(gpointer d)
+{
+    BlueSkyLog *log = (BlueSkyLog *)d;
+
+    int dirfd = open(log->log_directory, O_DIRECTORY);
+    if (dirfd < 0) {
+        fprintf(stderr, "Unable to open logging directory: %m\n");
+        return NULL;
+    }
+
+    while (TRUE) {
+        if (log->fd < 0) {
+            char logfile[64];
+            g_snprintf(logfile, sizeof(logfile), "log-%08d", log->seq_num);
+            log->fd = openat(dirfd, logfile, O_CREAT|O_WRONLY|O_EXCL, 0600);
+            if (log->fd < 0 && errno == EEXIST) {
+                fprintf(stderr, "Log file %s already exists...\n", logfile);
+                log->seq_num++;
+                continue;
+            } else if (log->fd < 0) {
+                fprintf(stderr, "Error opening logfile %s: %m\n", logfile);
+                return NULL;
+            }
+            fsync(log->fd);
+            fsync(dirfd);
+        }
+
+        BlueSkyLogItem *item = (BlueSkyLogItem *)g_async_queue_pop(log->queue);
+        g_mutex_lock(item->lock);
+        writebuf(log->fd, item->key, strlen(item->key));
+        writebuf(log->fd, item->data->data, item->data->len);
+        fdatasync(log->fd);
+        item->committed = TRUE;
+        g_cond_signal(item->cond);
+        g_mutex_unlock(item->lock);
+
+        off_t logsize = lseek(log->fd, 0, SEEK_CUR);
+        if (logsize < 0 || logsize >= LOG_SEGMENT_SIZE) {
+            close(log->fd);
+            log->fd = -1;
+            log->seq_num++;
+        }
+    }
+
+    return NULL;
+}
+
+BlueSkyLog *bluesky_log_new(const char *log_directory)
+{
+    BlueSkyLog *log = g_new0(BlueSkyLog, 1);
+
+    log->log_directory = g_strdup(log_directory);
+    log->fd = -1;
+    log->seq_num = 0;
+    log->queue = g_async_queue_new();
+
+    g_thread_create(log_thread, log, FALSE, NULL);
+
+    return log;
+}
+
+BlueSkyLogItem *bluesky_log_item_new()
+{
+    BlueSkyLogItem *item = g_new(BlueSkyLogItem, 1);
+    item->committed = FALSE;
+    item->lock = g_mutex_new();
+    item->cond = g_cond_new();
+    item->key = NULL;
+    item->data = NULL;
+    return item;
+}
+
+void bluesky_log_item_submit(BlueSkyLogItem *item, BlueSkyLog *log)
+{
+    g_async_queue_push(log->queue, item);
+}
+
+static void bluesky_log_item_free(BlueSkyLogItem *item)
+{
+    g_free(item->key);
+    bluesky_string_unref(item->data);
+    g_mutex_free(item->lock);
+    g_cond_free(item->cond);
+    g_free(item);
+}
+
+void bluesky_log_item_finish(BlueSkyLogItem *item)
+{
+    g_mutex_lock(item->lock);
+    while (!item->committed)
+        g_cond_wait(item->cond, item->lock);
+    g_mutex_unlock(item->lock);
+    bluesky_log_item_free(item);
+}