From 870ff381b99c54615457d1cea92e710bc68e194b Mon Sep 17 00:00:00 2001 From: Michael Vrable Date: Mon, 16 Aug 2010 12:04:03 -0700 Subject: [PATCH] First attempt at supporting reading data back from cloud log segments. There are still some bugs, hacks, race conditions, etc., but this seems to be doing mostly the right thing and so is a good start. --- bluesky/bluesky-private.h | 23 ++++--- bluesky/bluesky.h | 8 +-- bluesky/cache.c | 3 + bluesky/cloudlog.c | 31 ++++++--- bluesky/file.c | 3 + bluesky/log.c | 141 +++++++++++++++++++++++++++++--------- bluesky/util.c | 2 +- 7 files changed, 155 insertions(+), 56 deletions(-) diff --git a/bluesky/bluesky-private.h b/bluesky/bluesky-private.h index 643092b..636b664 100644 --- a/bluesky/bluesky-private.h +++ b/bluesky/bluesky-private.h @@ -259,23 +259,30 @@ struct _BlueSkyLog { GHashTable *mmap_cache; }; -/* Reference-counted blocks of memory, used for passing data in and out of - * storage backends and in other places. This may also refer to read-only - * mmaped data. */ -struct _BlueSkyMmap { - gint refcount; +/* An object for tracking log files which are stored locally--either the + * journal for filesystem consistency or log segments which have been fetched + * back from cloud storage. */ +struct _BlueSkyCacheFile { + GMutex *lock; + GCond *cond; + int type; // Only one of CLOUDLOG_{JOURNAL,CLOUD} + int log_dir; int log_seq; - const char *addr; + char *filename; // Local filename, relateive to log directory + gint refcount; // References to the mmaped data + const char *addr; // May be null if data is not mapped in memory size_t len; + BlueSkyFS *fs; BlueSkyLog *log; + gboolean fetching, ready; }; BlueSkyLog *bluesky_log_new(const char *log_directory); void bluesky_log_item_submit(BlueSkyCloudLog *item, BlueSkyLog *log); void bluesky_log_finish_all(GList *log_items); -BlueSkyRCStr *bluesky_log_map_object(BlueSkyLog *log, int log_seq, +BlueSkyRCStr *bluesky_log_map_object(BlueSkyFS *fs, int log_dir, int log_seq, int log_offset, int log_size); -void bluesky_mmap_unref(BlueSkyMmap *mmap); +void bluesky_mmap_unref(BlueSkyCacheFile *mmap); #ifdef __cplusplus } diff --git a/bluesky/bluesky.h b/bluesky/bluesky.h index 820fa12..60a3842 100644 --- a/bluesky/bluesky.h +++ b/bluesky/bluesky.h @@ -63,19 +63,19 @@ void bluesky_init(void); gchar *bluesky_lowercase(const gchar *s); -struct _BlueSkyMmap; -typedef struct _BlueSkyMmap BlueSkyMmap; +struct _BlueSkyCacheFile; +typedef struct _BlueSkyCacheFile BlueSkyCacheFile; typedef struct { gint refcount; - BlueSkyMmap *mmap; + BlueSkyCacheFile *mmap; gchar *data; gsize len; } BlueSkyRCStr; BlueSkyRCStr *bluesky_string_new(gpointer data, gsize len); BlueSkyRCStr *bluesky_string_new_from_gstring(GString *s); -BlueSkyRCStr *bluesky_string_new_from_mmap(BlueSkyMmap *mmap, +BlueSkyRCStr *bluesky_string_new_from_mmap(BlueSkyCacheFile *mmap, int offset, gsize len); void bluesky_string_ref(BlueSkyRCStr *string); void bluesky_string_unref(BlueSkyRCStr *string); diff --git a/bluesky/cache.c b/bluesky/cache.c index a4b9a0c..f897773 100644 --- a/bluesky/cache.c +++ b/bluesky/cache.c @@ -176,6 +176,9 @@ static void drop_caches(BlueSkyInode *inode) log->data = NULL; bluesky_cloudlog_stats_update(log, 1); } + if (log->location_flags & CLOUDLOG_CLOUD) { + log->location_flags &= ~CLOUDLOG_JOURNAL; + } g_mutex_unlock(log->lock); } } diff --git a/bluesky/cloudlog.c b/bluesky/cloudlog.c index 6e57d05..0c3fb02 100644 --- a/bluesky/cloudlog.c +++ b/bluesky/cloudlog.c @@ -202,12 +202,21 @@ void bluesky_cloudlog_fetch(BlueSkyCloudLog *log) if (log->data != NULL) return; - g_assert((log->location_flags | log->pending_write) & CLOUDLOG_JOURNAL); - - bluesky_cloudlog_stats_update(log, -1); - log->data = bluesky_log_map_object(log->fs->log, log->log_seq, - log->log_offset, log->log_size); - bluesky_cloudlog_stats_update(log, 1); + if ((log->location_flags | log->pending_write) & CLOUDLOG_JOURNAL) { + bluesky_cloudlog_stats_update(log, -1); + log->data = bluesky_log_map_object(log->fs, -1, log->log_seq, + log->log_offset, log->log_size); + bluesky_cloudlog_stats_update(log, 1); + } else if (log->location_flags & CLOUDLOG_CLOUD) { + bluesky_cloudlog_stats_update(log, -1); + log->data = bluesky_log_map_object(log->fs, log->location.directory, + log->location.sequence, + log->location.offset, + log->location.size); + bluesky_cloudlog_stats_update(log, 1); + } else { + g_error("Unable to fetch cloudlog entry!"); + } g_cond_broadcast(log->cond); } @@ -234,11 +243,13 @@ BlueSkyCloudPointer bluesky_cloudlog_serialize(BlueSkyCloudLog *log, bluesky_cloudlog_stats_update(log, -1); + /* TODO: Right now offset/size are set to the raw data, but we should add + * header parsing to the code which loads objects back in. */ log->location = state->location; - log->location.offset = state->data->len; - log->location.size - = sizeof(struct log_header) + sizeof(BlueSkyCloudID) * 0 - + log->data->len; + log->location.offset = state->data->len + sizeof(struct log_header); + log->location.size = log->data->len; + /* = sizeof(struct log_header) + sizeof(BlueSkyCloudID) * 0 + + log->data->len; */ struct log_header header; memcpy(header.magic, "AgI ", 4); diff --git a/bluesky/file.c b/bluesky/file.c index 94677f6..0c92e69 100644 --- a/bluesky/file.c +++ b/bluesky/file.c @@ -287,6 +287,9 @@ void bluesky_file_drop_cached(BlueSkyInode *inode) b->ref->data = NULL; bluesky_cloudlog_stats_update(b->ref, 1); } + if (b->ref->location_flags & CLOUDLOG_CLOUD) { + b->ref->location_flags &= ~CLOUDLOG_JOURNAL; + } g_mutex_unlock(b->ref->lock); } } diff --git a/bluesky/log.c b/bluesky/log.c index 99b5ae5..48f662b 100644 --- a/bluesky/log.c +++ b/bluesky/log.c @@ -273,74 +273,149 @@ void bluesky_log_finish_all(GList *log_items) * to it. */ static int page_size = 0; -BlueSkyRCStr *bluesky_log_map_object(BlueSkyLog *log, - int log_seq, int log_offset, int log_size) +static void cloudlog_fetch_complete(BlueSkyStoreAsync *async, + BlueSkyCacheFile *cachefile) +{ + g_print("Fetch of %s from cloud complete, status = %d\n", + async->key, async->result); + + if (async->result < 0) + return; + + g_mutex_lock(cachefile->lock); + char *pathname = g_strdup_printf("%s/%s", cachefile->log->log_directory, + cachefile->filename); + if (!g_file_set_contents(pathname, async->data->data, async->data->len, + NULL)) + { + g_print("Error writing out fetched file to cache!\n"); + } + g_free(pathname); + cachefile->fetching = FALSE; + cachefile->ready = TRUE; + g_cond_broadcast(cachefile->cond); + g_mutex_unlock(cachefile->lock); +} + +/* Find the BlueSkyCacheFile object for the given journal or cloud log segment. + * Returns the object in the locked state. */ +BlueSkyCacheFile *bluesky_cachefile_lookup(BlueSkyFS *fs, + int clouddir, int log_seq) { if (page_size == 0) { page_size = getpagesize(); } - BlueSkyMmap *map; + BlueSkyLog *log = fs->log; + + BlueSkyCacheFile *map; g_mutex_lock(log->mmap_lock); map = g_hash_table_lookup(log->mmap_cache, GINT_TO_POINTER(log_seq)); if (map == NULL) { - char logname[64]; - g_snprintf(logname, sizeof(logname), "log-%08d", log_seq); - int fd = openat(log->dirfd, logname, O_RDONLY); + char *logname; + + // A request for a local log file + if (clouddir < 0) { + logname = g_strdup_printf("log-%08d", log_seq); + } else { + logname = g_strdup_printf("log-%08d-%08d", clouddir, log_seq); + } + + /* TODO: stat() call */ + map = g_new0(BlueSkyCacheFile, 1); + map->type = CLOUDLOG_JOURNAL; + map->lock = g_mutex_new(); + g_mutex_lock(map->lock); + map->cond = g_cond_new(); + map->filename = logname; + map->log_seq = log_seq; + map->log = log; + g_atomic_int_set(&map->refcount, 0); + + g_hash_table_insert(log->mmap_cache, GINT_TO_POINTER(log_seq), map); + + // If the log file is stored in the cloud, we may need to fetch it + if (clouddir >= 0) { + map->fetching = TRUE; + g_print("Starting fetch of %s from cloud\n", logname); + BlueSkyStoreAsync *async = bluesky_store_async_new(fs->store); + async->op = STORE_OP_GET; + async->key = g_strdup(logname); + bluesky_store_async_add_notifier(async, + (GFunc)cloudlog_fetch_complete, + map); + bluesky_store_async_submit(async); + bluesky_store_async_unref(async); + } + } else { + g_mutex_lock(map->lock); + } + + g_mutex_unlock(log->mmap_lock); + return map; +} + +BlueSkyRCStr *bluesky_log_map_object(BlueSkyFS *fs, int log_dir, + int log_seq, int log_offset, int log_size) +{ + if (page_size == 0) { + page_size = getpagesize(); + } + + BlueSkyLog *log = fs->log; + BlueSkyCacheFile *map = bluesky_cachefile_lookup(fs, log_dir, log_seq); + + if (map == NULL) { + return NULL; + } + + if (map->addr == NULL) { + while (!map->ready && map->fetching) { + g_print("Waiting for log segment to be fetched from cloud...\n"); + g_cond_wait(map->cond, map->lock); + } + + int fd = openat(log->dirfd, map->filename, O_RDONLY); if (fd < 0) { - fprintf(stderr, "Error opening logfile %s: %m\n", logname); - g_mutex_unlock(log->mmap_lock); + fprintf(stderr, "Error opening logfile %s: %m\n", map->filename); + g_mutex_unlock(map->lock); return NULL; } - map = g_new0(BlueSkyMmap, 1); - off_t length = lseek(fd, 0, SEEK_END); - map->log_seq = log_seq; map->addr = (const char *)mmap(NULL, length, PROT_READ, MAP_SHARED, fd, 0); map->len = length; - map->log = log; - g_atomic_int_set(&map->refcount, 0); - g_hash_table_insert(log->mmap_cache, GINT_TO_POINTER(log_seq), map); - - g_print("Mapped log segment %d...\n", log_seq); + g_print("Re-mapped log segment %d...\n", log_seq); close(fd); } g_mutex_unlock(log->mmap_lock); - return bluesky_string_new_from_mmap(map, log_offset, log_size); + BlueSkyRCStr *str; + str = bluesky_string_new_from_mmap(map, log_offset, log_size); + g_mutex_unlock(map->lock); + return str; } -void bluesky_mmap_unref(BlueSkyMmap *mmap) +void bluesky_mmap_unref(BlueSkyCacheFile *mmap) { if (mmap == NULL) return; if (g_atomic_int_dec_and_test(&mmap->refcount)) { - /* There is a potential race condition here: the BlueSkyLog contains a - * hash table of currently-existing BlueSkyMmap objects, which does not - * hold a reference. Some other thread might grab a new reference to - * this object after reading it from the hash table. So, before - * destruction we need to grab the lock for the hash table, then check - * the reference count again. If it is still zero, we can proceed with - * object destruction. */ - BlueSkyLog *log = mmap->log; - g_mutex_lock(log->mmap_lock); + g_mutex_lock(mmap->lock); if (g_atomic_int_get(&mmap->refcount) > 0) { - g_mutex_unlock(log->mmap_lock); + g_print("Unmapped log segment %d...\n", mmap->log_seq); + munmap((void *)mmap->addr, mmap->len); + mmap->addr = NULL; return; } - - g_hash_table_remove(log->mmap_cache, GINT_TO_POINTER(mmap->log_seq)); - munmap((void *)mmap->addr, mmap->len); - g_free(mmap); - g_mutex_unlock(log->mmap_lock); + g_mutex_unlock(mmap->lock); } } diff --git a/bluesky/util.c b/bluesky/util.c index f899420..0d24328 100644 --- a/bluesky/util.c +++ b/bluesky/util.c @@ -75,7 +75,7 @@ BlueSkyRCStr *bluesky_string_new_from_gstring(GString *s) } /* Create a new BlueSkyRCStr from a memory-mapped buffer. */ -BlueSkyRCStr *bluesky_string_new_from_mmap(BlueSkyMmap *mmap, +BlueSkyRCStr *bluesky_string_new_from_mmap(BlueSkyCacheFile *mmap, int offset, gsize len) { g_assert(offset + len < mmap->len); -- 2.20.1