From 0ba2a52ec1386c8928b397f775e4a4f97339fa2a Mon Sep 17 00:00:00 2001 From: Michael Vrable Date: Sat, 17 Jul 2010 21:35:54 -0700 Subject: [PATCH] Basic filesystem journaling. Infrastructure for writing log entries synchronously (though the log format is not yet finished and isn't yet useful), and a partial hook into the BlueSky filesystem. --- bluesky/CMakeLists.txt | 5 +- bluesky/bluesky-private.h | 22 ++++++ bluesky/bluesky.h | 4 + bluesky/inode.c | 8 ++ bluesky/log.c | 158 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 195 insertions(+), 2 deletions(-) create mode 100644 bluesky/log.c diff --git a/bluesky/CMakeLists.txt b/bluesky/CMakeLists.txt index f7efc46..3b88d16 100644 --- a/bluesky/CMakeLists.txt +++ b/bluesky/CMakeLists.txt @@ -3,8 +3,9 @@ include_directories("${LIBS3_BUILD_DIR}/include" ${KVSTORE_DIR}) link_directories("${LIBS3_BUILD_DIR}/lib" ${KVSTORE_DIR}) add_library(bluesky SHARED - cache.c crypto.c debug.c dir.c file.c init.c inode.c serialize.c - store.c store-bdb.c store-kv.cc store-multi.c store-s3.c util.c) + cache.c crypto.c debug.c dir.c file.c init.c inode.c log.c + serialize.c store.c store-bdb.c store-kv.cc store-multi.c + store-s3.c util.c) add_executable(bluesky-test main.c) set(CMAKE_C_FLAGS "-Wall -std=gnu99 ${CMAKE_C_FLAGS}") diff --git a/bluesky/bluesky-private.h b/bluesky/bluesky-private.h index 5080e00..ddf1d34 100644 --- a/bluesky/bluesky-private.h +++ b/bluesky/bluesky-private.h @@ -149,6 +149,28 @@ void bluesky_block_flush(BlueSkyFS *fs, BlueSkyBlock *block, void bluesky_file_flush(BlueSkyInode *inode, BlueSkyStoreAsync *barrier); void bluesky_file_drop_cached(BlueSkyInode *inode); +/* Logging infrastructure for ensuring operations are persistently recorded to + * disk. */ +struct _BlueSkyLog { + char *log_directory; + GAsyncQueue *queue; + int fd; + int seq_num; +}; + +typedef struct { + gboolean committed; + GMutex *lock; + GCond *cond; + char *key; + BlueSkyRCStr *data; +} BlueSkyLogItem; + +BlueSkyLog *bluesky_log_new(const char *log_directory); +BlueSkyLogItem *bluesky_log_item_new(); +void bluesky_log_item_submit(BlueSkyLogItem *item, BlueSkyLog *log); +void bluesky_log_item_finish(BlueSkyLogItem *item); + #ifdef __cplusplus } #endif diff --git a/bluesky/bluesky.h b/bluesky/bluesky.h index 29d9a62..beec075 100644 --- a/bluesky/bluesky.h +++ b/bluesky/bluesky.h @@ -93,6 +93,9 @@ BlueSkyRCStr *bluesky_crypt_decrypt(BlueSkyRCStr *in, const uint8_t *key); struct _BlueSkyStore; typedef struct _BlueSkyStore BlueSkyStore; +struct _BlueSkyLog; +typedef struct _BlueSkyLog BlueSkyLog; + void bluesky_store_init(); BlueSkyStore *bluesky_store_new(const gchar *type); void bluesky_store_free(BlueSkyStore *store); @@ -126,6 +129,7 @@ typedef struct { uint64_t next_inum; /* Next available inode for allocation */ BlueSkyStore *store; + BlueSkyLog *log; /* Accounting for memory used for caches. Space is measured in blocks, not * bytes. We track both total data in the caches and dirty data (total diff --git a/bluesky/inode.c b/bluesky/inode.c index 5977955..a0b3c6a 100644 --- a/bluesky/inode.c +++ b/bluesky/inode.c @@ -106,6 +106,7 @@ BlueSkyFS *bluesky_init_fs(gchar *name, BlueSkyStore *store) g_print("Initializing fresh filesystem\n"); BlueSkyFS *fs = bluesky_new_fs(name); fs->store = store; + fs->log = bluesky_log_new("journal"); BlueSkyInode *root = bluesky_new_inode(BLUESKY_ROOT_INUM, fs, BLUESKY_DIRECTORY); @@ -308,6 +309,13 @@ void bluesky_inode_start_sync(BlueSkyInode *inode, BlueSkyStoreAsync *barrier) char key[64]; sprintf(key, "inode-%016"PRIx64, inode->inum); + BlueSkyLogItem *log_item = bluesky_log_item_new(); + log_item->key = g_strdup(key); + log_item->data = data; + bluesky_string_ref(data); + bluesky_log_item_submit(log_item, fs->log); + bluesky_log_item_finish(log_item); + BlueSkyStoreAsync *async = bluesky_store_async_new(fs->store); async->op = STORE_OP_PUT; async->key = g_strdup(key); diff --git a/bluesky/log.c b/bluesky/log.c new file mode 100644 index 0000000..a4a8150 --- /dev/null +++ b/bluesky/log.c @@ -0,0 +1,158 @@ +/* Blue Sky: File Systems in the Cloud + * + * Copyright (C) 2010 The Regents of the University of California + * Written by Michael Vrable + * + * TODO: Licensing + */ + +#define _GNU_SOURCE +#define _ATFILE_SOURCE + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "bluesky-private.h" + +/* The logging layer for BlueSky. This is used to write filesystem changes + * durably to disk so that they can be recovered in the event of a system + * crash. */ + +/* The logging layer takes care out writing out a sequence of log records to + * disk. On disk, each record consists of a header, a data payload, and a + * footer. The footer contains a checksum of the record, meant to help with + * identifying corrupt log records (we would assume because the log record was + * only incompletely written out before a crash, which should only happen for + * log records that were not considered committed). */ + +// Rough size limit for a log segment. This is not a firm limit and there are +// no absolute guarantees on the size of a log segment. +#define LOG_SEGMENT_SIZE (1 << 20) + +static void writebuf(int fd, const char *buf, size_t len) +{ + while (len > 0) { + ssize_t written; + written = write(fd, buf, len); + if (written < 0 && errno == EINTR) + continue; + g_assert(written >= 0); + buf += written; + len -= written; + } +} + +/* All log writes (at least for a single log) are made by one thread, so we + * don't need to worry about concurrent access to the log file. Log items to + * write are pulled off a queue (and so may be posted by any thread). + * fdatasync() is used to ensure the log items are stable on disk. + * + * The log is broken up into separate files, roughly of size LOG_SEGMENT_SIZE + * each. If a log segment is not currently open (log->fd is negative), a new + * one is created. Log segment filenames are assigned sequentially. + * + * Log replay ought to be implemented later, and ought to set the initial + * sequence number appropriately. + */ +static gpointer log_thread(gpointer d) +{ + BlueSkyLog *log = (BlueSkyLog *)d; + + int dirfd = open(log->log_directory, O_DIRECTORY); + if (dirfd < 0) { + fprintf(stderr, "Unable to open logging directory: %m\n"); + return NULL; + } + + while (TRUE) { + if (log->fd < 0) { + char logfile[64]; + g_snprintf(logfile, sizeof(logfile), "log-%08d", log->seq_num); + log->fd = openat(dirfd, logfile, O_CREAT|O_WRONLY|O_EXCL, 0600); + if (log->fd < 0 && errno == EEXIST) { + fprintf(stderr, "Log file %s already exists...\n", logfile); + log->seq_num++; + continue; + } else if (log->fd < 0) { + fprintf(stderr, "Error opening logfile %s: %m\n", logfile); + return NULL; + } + fsync(log->fd); + fsync(dirfd); + } + + BlueSkyLogItem *item = (BlueSkyLogItem *)g_async_queue_pop(log->queue); + g_mutex_lock(item->lock); + writebuf(log->fd, item->key, strlen(item->key)); + writebuf(log->fd, item->data->data, item->data->len); + fdatasync(log->fd); + item->committed = TRUE; + g_cond_signal(item->cond); + g_mutex_unlock(item->lock); + + off_t logsize = lseek(log->fd, 0, SEEK_CUR); + if (logsize < 0 || logsize >= LOG_SEGMENT_SIZE) { + close(log->fd); + log->fd = -1; + log->seq_num++; + } + } + + return NULL; +} + +BlueSkyLog *bluesky_log_new(const char *log_directory) +{ + BlueSkyLog *log = g_new0(BlueSkyLog, 1); + + log->log_directory = g_strdup(log_directory); + log->fd = -1; + log->seq_num = 0; + log->queue = g_async_queue_new(); + + g_thread_create(log_thread, log, FALSE, NULL); + + return log; +} + +BlueSkyLogItem *bluesky_log_item_new() +{ + BlueSkyLogItem *item = g_new(BlueSkyLogItem, 1); + item->committed = FALSE; + item->lock = g_mutex_new(); + item->cond = g_cond_new(); + item->key = NULL; + item->data = NULL; + return item; +} + +void bluesky_log_item_submit(BlueSkyLogItem *item, BlueSkyLog *log) +{ + g_async_queue_push(log->queue, item); +} + +static void bluesky_log_item_free(BlueSkyLogItem *item) +{ + g_free(item->key); + bluesky_string_unref(item->data); + g_mutex_free(item->lock); + g_cond_free(item->cond); + g_free(item); +} + +void bluesky_log_item_finish(BlueSkyLogItem *item) +{ + g_mutex_lock(item->lock); + while (!item->committed) + g_cond_wait(item->cond, item->lock); + g_mutex_unlock(item->lock); + bluesky_log_item_free(item); +} -- 2.20.1