From: Michael Vrable Date: Thu, 29 Jul 2010 22:05:37 +0000 (-0700) Subject: (Mostly) merge local and cloud logging together. X-Git-Url: https://git.vrable.net/?a=commitdiff_plain;h=e9af23b41ae0c11245fbc6886a794e1dfb04a6bd;p=bluesky.git (Mostly) merge local and cloud logging together. --- diff --git a/bluesky/bluesky-private.h b/bluesky/bluesky-private.h index 3cabcd1..a265796 100644 --- a/bluesky/bluesky-private.h +++ b/bluesky/bluesky-private.h @@ -139,43 +139,16 @@ void bluesky_store_sync(BlueSkyStore *store); void bluesky_store_add_barrier(BlueSkyStoreAsync *barrier, BlueSkyStoreAsync *async); -void bluesky_inode_start_sync(BlueSkyInode *inode, BlueSkyStoreAsync *barrier); +void bluesky_inode_start_sync(BlueSkyInode *inode); void bluesky_block_touch(BlueSkyInode *inode, uint64_t i); void bluesky_block_fetch(BlueSkyFS *fs, BlueSkyBlock *block, BlueSkyStoreAsync *barrier); void bluesky_block_flush(BlueSkyFS *fs, BlueSkyBlock *block, - BlueSkyStoreAsync *barrier, GList **log_items); -void bluesky_file_flush(BlueSkyInode *inode, BlueSkyStoreAsync *barrier, - GList **log_items); + GList **log_items); +void bluesky_file_flush(BlueSkyInode *inode, GList **log_items); void bluesky_file_drop_cached(BlueSkyInode *inode); -/* Logging infrastructure for ensuring operations are persistently recorded to - * disk. */ -#define BLUESKY_CRC32C_SEED (~(uint32_t)0) -uint32_t crc32c(uint32_t crc, const char *buf, unsigned int length); -uint32_t crc32c_finalize(uint32_t crc); - -struct _BlueSkyLog { - char *log_directory; - GAsyncQueue *queue; - int fd; - int seq_num; -}; - -typedef struct { - gboolean committed; - GMutex *lock; - GCond *cond; - char *key; - BlueSkyRCStr *data; -} BlueSkyLogItem; - -BlueSkyLog *bluesky_log_new(const char *log_directory); -BlueSkyLogItem *bluesky_log_item_new(); -void bluesky_log_item_submit(BlueSkyLogItem *item, BlueSkyLog *log); -void bluesky_log_item_finish(BlueSkyLogItem *item); - /* Writing of data to the cloud in log segments and tracking the location of * various pieces of data (both where in the cloud and where cached locally). * */ @@ -207,6 +180,8 @@ typedef enum { #define CLOUDLOG_CLOUD 0x04 typedef struct { gint refcount; + GMutex *lock; + GCond *cond; BlueSkyFS *fs; @@ -214,6 +189,7 @@ typedef struct { // Bitmask of CLOUDLOG_* flags indicating where the object exists. int location_flags; + int pending_read, pending_write; // A stable identifier for the object (only changes when authenticated data // is written out, but stays the same when the in-cloud cleaner relocates @@ -228,6 +204,7 @@ typedef struct { BlueSkyCloudPointer location; // TODO: Location in journal/cache + int log_seq, log_offset, log_size; // Pointers to other objects GArray *pointers; @@ -250,10 +227,27 @@ gchar *bluesky_cloudlog_id_to_string(BlueSkyCloudID id); BlueSkyCloudID bluesky_cloudlog_id_from_string(const gchar *idstr); void bluesky_cloudlog_ref(BlueSkyCloudLog *log); void bluesky_cloudlog_unref(BlueSkyCloudLog *log); -BlueSkyLogItem *bluesky_cloudlog_sync(BlueSkyCloudLog *log); +void bluesky_cloudlog_sync(BlueSkyCloudLog *log); void bluesky_cloudlog_insert(BlueSkyCloudLog *log); void bluesky_cloudlog_write_log(BlueSkyFS *fs); +/* Logging infrastructure for ensuring operations are persistently recorded to + * disk. */ +#define BLUESKY_CRC32C_SEED (~(uint32_t)0) +uint32_t crc32c(uint32_t crc, const char *buf, unsigned int length); +uint32_t crc32c_finalize(uint32_t crc); + +struct _BlueSkyLog { + char *log_directory; + GAsyncQueue *queue; + int fd; + int seq_num; +}; + +BlueSkyLog *bluesky_log_new(const char *log_directory); +void bluesky_log_item_submit(BlueSkyCloudLog *item, BlueSkyLog *log); +void bluesky_log_finish_all(GList *log_items); + #ifdef __cplusplus } #endif diff --git a/bluesky/cache.c b/bluesky/cache.c index 441e6c4..587f98b 100644 --- a/bluesky/cache.c +++ b/bluesky/cache.c @@ -78,17 +78,7 @@ static void flushd_dirty_inode(BlueSkyInode *inode) } inode->change_pending = inode->change_count; - /* Create a store barrier. All operations part of the writeback will be - * added to this barrier, so when the barrier completes we know that the - * writeback is finished. */ - BlueSkyStoreAsync *barrier = bluesky_store_async_new(fs->store); - barrier->op = STORE_OP_BARRIER; - - bluesky_inode_start_sync(inode, barrier); - - bluesky_store_async_add_notifier(barrier, writeback_complete, inode); - bluesky_store_async_submit(barrier); - bluesky_store_async_unref(barrier); + bluesky_inode_start_sync(inode); } /* Try to flush dirty data to disk, either due to memory pressure or due to diff --git a/bluesky/cloudlog.c b/bluesky/cloudlog.c index e1fd44a..82f0f65 100644 --- a/bluesky/cloudlog.c +++ b/bluesky/cloudlog.c @@ -86,6 +86,8 @@ BlueSkyCloudLog *bluesky_cloudlog_new(BlueSkyFS *fs) { BlueSkyCloudLog *log = g_new0(BlueSkyCloudLog, 1); + log->lock = g_mutex_new(); + log->cond = g_cond_new(); log->fs = fs; log->type = LOGTYPE_UNKNOWN; log->id = bluesky_cloudlog_new_id(); @@ -108,14 +110,9 @@ void bluesky_cloudlog_unref(BlueSkyCloudLog *log) } /* Start a write of the object to the local log. */ -BlueSkyLogItem *bluesky_cloudlog_sync(BlueSkyCloudLog *log) +void bluesky_cloudlog_sync(BlueSkyCloudLog *log) { - BlueSkyLogItem *log_item = bluesky_log_item_new(); - log_item->key = bluesky_cloudlog_id_to_string(log->id); - log_item->data = log->data; - bluesky_string_ref(log->data); - bluesky_log_item_submit(log_item, log->fs->log); - return log_item; + bluesky_log_item_submit(log, log->fs->log); } /* Add the given entry to the global hash table containing cloud log entries. diff --git a/bluesky/debug.c b/bluesky/debug.c index ca09868..8d4d8d2 100644 --- a/bluesky/debug.c +++ b/bluesky/debug.c @@ -42,8 +42,9 @@ static void cloudlog_dump(gpointer key, gpointer value, gpointer user_data) for (int i = 0; i < sizeof(BlueSkyCloudID); i++) { g_print("%02x", (uint8_t)(log->id.bytes[i])); } - g_print(": ty=%d inode=%"PRIu64" locs=%x\n", - log->type, log->inum, log->location_flags); + g_print(": ty=%d inode=%"PRIu64" locs=%x log@(%d,%d)\n", + log->type, log->inum, log->location_flags, + log->log_seq, log->log_offset); } /* Dump a summary of filesystem state as it is cached in memory. */ diff --git a/bluesky/file.c b/bluesky/file.c index 8faae00..2e4313e 100644 --- a/bluesky/file.c +++ b/bluesky/file.c @@ -258,7 +258,6 @@ void bluesky_block_fetch(BlueSkyFS *fs, BlueSkyBlock *block, /* Write the given block to cloud-backed storage and mark it clean. */ void bluesky_block_flush(BlueSkyFS *fs, BlueSkyBlock *block, - BlueSkyStoreAsync *barrier, GList **log_items) { if (block->type != BLUESKY_BLOCK_DIRTY) @@ -272,21 +271,10 @@ void bluesky_block_flush(BlueSkyFS *fs, BlueSkyBlock *block, cloudlog->inum = 0; //FIXME cloudlog->data = data; bluesky_string_ref(data); - *log_items = g_list_prepend(*log_items, bluesky_cloudlog_sync(cloudlog)); + bluesky_cloudlog_sync(cloudlog); + *log_items = g_list_prepend(*log_items, cloudlog); bluesky_cloudlog_insert(cloudlog); - /* Store the file data asynchronously, and don't bother waiting for a - * response. */ - BlueSkyStoreAsync *async = bluesky_store_async_new(fs->store); - async->op = STORE_OP_PUT; - async->key = g_strdup(name); - bluesky_string_ref(data); - async->data = data; - bluesky_store_async_submit(async); - if (barrier != NULL) - bluesky_store_add_barrier(barrier, async); - bluesky_store_async_unref(async); - g_free(block->ref); block->ref = name; @@ -295,14 +283,13 @@ void bluesky_block_flush(BlueSkyFS *fs, BlueSkyBlock *block, } /* Flush all blocks in a file to stable storage. */ -void bluesky_file_flush(BlueSkyInode *inode, BlueSkyStoreAsync *barrier, - GList **log_items) +void bluesky_file_flush(BlueSkyInode *inode, GList **log_items) { g_return_if_fail(inode->type == BLUESKY_REGULAR); for (int i = 0; i < inode->blocks->len; i++) { BlueSkyBlock *b = &g_array_index(inode->blocks, BlueSkyBlock, i); - bluesky_block_flush(inode->fs, b, barrier, log_items); + bluesky_block_flush(inode->fs, b, log_items); } } diff --git a/bluesky/inode.c b/bluesky/inode.c index 745d81b..0844f79 100644 --- a/bluesky/inode.c +++ b/bluesky/inode.c @@ -307,13 +307,13 @@ void bluesky_inode_flush(BlueSkyFS *fs, BlueSkyInode *inode) } /* Start writeback of an inode and all associated data. */ -void bluesky_inode_start_sync(BlueSkyInode *inode, BlueSkyStoreAsync *barrier) +void bluesky_inode_start_sync(BlueSkyInode *inode) { GList *log_items = NULL; BlueSkyFS *fs = inode->fs; if (inode->type == BLUESKY_REGULAR) - bluesky_file_flush(inode, barrier, &log_items); + bluesky_file_flush(inode, &log_items); GString *buf = g_string_new(""); bluesky_serialize_inode(buf, inode); @@ -340,42 +340,23 @@ void bluesky_inode_start_sync(BlueSkyInode *inode, BlueSkyStoreAsync *barrier) } } - log_items = g_list_prepend(log_items, bluesky_cloudlog_sync(cloudlog)); - + bluesky_cloudlog_sync(cloudlog); + log_items = g_list_prepend(log_items, cloudlog); bluesky_cloudlog_insert(cloudlog); /* Wait for all log items to be committed to disk. */ - while (log_items != NULL) { - BlueSkyLogItem *log_item = (BlueSkyLogItem *)log_items->data; - bluesky_log_item_finish(log_item); - log_items = g_list_delete_link(log_items, log_items); - } - - BlueSkyStoreAsync *async = bluesky_store_async_new(fs->store); - async->op = STORE_OP_PUT; - async->key = g_strdup(key); - async->data = data; - bluesky_store_async_submit(async); - if (barrier != NULL) - bluesky_store_add_barrier(barrier, async); - bluesky_store_async_unref(async); + bluesky_log_finish_all(log_items); } /* Write back an inode and all associated data and wait for completion. Inode * should already be locked. */ void bluesky_inode_do_sync(BlueSkyInode *inode) { - BlueSkyStoreAsync *barrier = bluesky_store_async_new(inode->fs->store); - barrier->op = STORE_OP_BARRIER; - if (bluesky_verbose) { g_log("bluesky/inode", G_LOG_LEVEL_DEBUG, "Synchronous writeback for inode %"PRIu64"...", inode->inum); } - bluesky_inode_start_sync(inode, barrier); - bluesky_store_async_submit(barrier); - bluesky_store_async_wait(barrier); - bluesky_store_async_unref(barrier); + bluesky_inode_start_sync(inode); if (bluesky_verbose) { g_log("bluesky/inode", G_LOG_LEVEL_DEBUG, "Writeback for inode %"PRIu64" complete", inode->inum); diff --git a/bluesky/log.c b/bluesky/log.c index 4c522b8..e9af52a 100644 --- a/bluesky/log.c +++ b/bluesky/log.c @@ -43,8 +43,8 @@ struct log_header { uint32_t magic; // HEADER_MAGIC uint64_t offset; // Starting byte offset of the log header - uint32_t keysize; // Size of the log key (bytes) uint32_t size; // Size of the data item (bytes) + BlueSkyCloudID id; // Object identifier } __attribute__((packed)); struct log_footer { @@ -110,8 +110,18 @@ static gpointer log_thread(gpointer d) fsync(dirfd); } - BlueSkyLogItem *item = (BlueSkyLogItem *)g_async_queue_pop(log->queue); + BlueSkyCloudLog *item + = (BlueSkyCloudLog *)g_async_queue_pop(log->queue); g_mutex_lock(item->lock); + g_assert(item->data != NULL); + + if ((item->location_flags | item->pending_write) & CLOUDLOG_JOURNAL) { + g_mutex_unlock(item->lock); + bluesky_cloudlog_unref(item); + continue; + } + + item->pending_write |= CLOUDLOG_JOURNAL; off_t logsize = lseek(log->fd, 0, SEEK_CUR); struct log_header header; @@ -119,8 +129,8 @@ static gpointer log_thread(gpointer d) header.magic = GUINT32_TO_LE(HEADER_MAGIC); header.offset = GUINT64_TO_LE(logsize); - header.keysize = GUINT32_TO_LE(strlen(item->key)); header.size = GUINT32_TO_LE(item->data->len); + header.id = item->id; footer.magic = GUINT32_TO_LE(FOOTER_MAGIC); uint32_t crc = BLUESKY_CRC32C_SEED; @@ -128,9 +138,6 @@ static gpointer log_thread(gpointer d) writebuf(log->fd, (const char *)&header, sizeof(header)); crc = crc32c(crc, (const char *)&header, sizeof(header)); - writebuf(log->fd, item->key, strlen(item->key)); - crc = crc32c(crc, item->key, strlen(item->key)); - writebuf(log->fd, item->data->data, item->data->len); crc = crc32c(crc, item->data->data, item->data->len); @@ -139,10 +146,14 @@ static gpointer log_thread(gpointer d) footer.crc = crc32c_finalize(crc); writebuf(log->fd, (const char *)&footer, sizeof(footer)); - logsize += sizeof(header) + sizeof(footer); - logsize += strlen(item->key) + item->data->len; + item->log_seq = log->seq_num; + item->log_offset = logsize + sizeof(header); + item->log_size = item->data->len; + + logsize += sizeof(header) + sizeof(footer) + item->data->len; committed = g_slist_prepend(committed, item); + g_mutex_unlock(item->lock); /* Force an fsync either if we will be closing this log segment and * opening a new file, or if there are no other log items currently @@ -154,8 +165,10 @@ static gpointer log_thread(gpointer d) int batchsize = 0; fdatasync(log->fd); while (committed != NULL) { - item = (BlueSkyLogItem *)committed->data; - item->committed = TRUE; + item = (BlueSkyCloudLog *)committed->data; + g_mutex_lock(item->lock); + item->pending_write &= ~CLOUDLOG_JOURNAL; + item->location_flags |= CLOUDLOG_JOURNAL; g_cond_signal(item->cond); g_mutex_unlock(item->lock); committed = g_slist_delete_link(committed, committed); @@ -189,36 +202,23 @@ BlueSkyLog *bluesky_log_new(const char *log_directory) return log; } -BlueSkyLogItem *bluesky_log_item_new() -{ - BlueSkyLogItem *item = g_new(BlueSkyLogItem, 1); - item->committed = FALSE; - item->lock = g_mutex_new(); - item->cond = g_cond_new(); - item->key = NULL; - item->data = NULL; - return item; -} - -void bluesky_log_item_submit(BlueSkyLogItem *item, BlueSkyLog *log) +void bluesky_log_item_submit(BlueSkyCloudLog *item, BlueSkyLog *log) { + bluesky_cloudlog_ref(item); g_async_queue_push(log->queue, item); } -static void bluesky_log_item_free(BlueSkyLogItem *item) +void bluesky_log_finish_all(GList *log_items) { - g_free(item->key); - bluesky_string_unref(item->data); - g_mutex_free(item->lock); - g_cond_free(item->cond); - g_free(item); -} + while (log_items != NULL) { + BlueSkyCloudLog *item = (BlueSkyCloudLog *)log_items->data; -void bluesky_log_item_finish(BlueSkyLogItem *item) -{ - g_mutex_lock(item->lock); - while (!item->committed) - g_cond_wait(item->cond, item->lock); - g_mutex_unlock(item->lock); - bluesky_log_item_free(item); + g_mutex_lock(item->lock); + while ((item->pending_write & CLOUDLOG_JOURNAL)) + g_cond_wait(item->cond, item->lock); + g_mutex_unlock(item->lock); + bluesky_cloudlog_unref(item); + + log_items = g_list_delete_link(log_items, log_items); + } }