X-Git-Url: http://git.vrable.net/?a=blobdiff_plain;f=bluesky%2Flog.c;h=1fe61769b92870b95022fcc8933b70d3a9e689d3;hb=a82b60b3b683840a7074110831bcbaa16a40f0eb;hp=976e32eb47a9bbb6b4420ce8e6759754f5453391;hpb=c0d52390142a72a89aa632172e03a3ec909890ed;p=bluesky.git diff --git a/bluesky/log.c b/bluesky/log.c index 976e32e..1fe6176 100644 --- a/bluesky/log.c +++ b/bluesky/log.c @@ -36,7 +36,7 @@ // Rough size limit for a log segment. This is not a firm limit and there are // no absolute guarantees on the size of a log segment. -#define LOG_SEGMENT_SIZE (1 << 24) +#define LOG_SEGMENT_SIZE (1 << 22) #define HEADER_MAGIC 0x676f4c0a #define FOOTER_MAGIC 0x2e435243 @@ -103,6 +103,11 @@ static gboolean log_open(BlueSkyLog *log) log->fd = -1; } + if (log->current_log != NULL) { + bluesky_cachefile_unref(log->current_log); + log->current_log = NULL; + } + while (log->fd < 0) { g_snprintf(logname, sizeof(logname), "journal-%08d", log->seq_num); log->fd = openat(log->dirfd, logname, O_CREAT|O_WRONLY|O_EXCL, 0600); @@ -116,6 +121,10 @@ static gboolean log_open(BlueSkyLog *log) } } + log->current_log = bluesky_cachefile_lookup(log->fs, -1, log->seq_num); + g_assert(log->current_log != NULL); + g_mutex_unlock(log->current_log->lock); + if (ftruncate(log->fd, LOG_SEGMENT_SIZE) < 0) { fprintf(stderr, "Unable to truncate logfile %s: %m\n", logname); } @@ -278,26 +287,47 @@ void bluesky_cachefile_unref(BlueSkyCacheFile *cachefile) g_atomic_int_add(&cachefile->refcount, -1); } +static void cloudlog_fetch_complete(BlueSkyStoreAsync *async, + BlueSkyCacheFile *cachefile); + +static void cloudlog_fetch_start(BlueSkyCacheFile *cachefile) +{ + g_atomic_int_inc(&cachefile->refcount); + cachefile->fetching = TRUE; + g_print("Starting fetch of %s from cloud\n", cachefile->filename); + BlueSkyStoreAsync *async = bluesky_store_async_new(cachefile->fs->store); + async->op = STORE_OP_GET; + async->key = g_strdup(cachefile->filename); + bluesky_store_async_add_notifier(async, + (GFunc)cloudlog_fetch_complete, + cachefile); + bluesky_store_async_submit(async); + bluesky_store_async_unref(async); +} + static void cloudlog_fetch_complete(BlueSkyStoreAsync *async, BlueSkyCacheFile *cachefile) { g_print("Fetch of %s from cloud complete, status = %d\n", async->key, async->result); - if (async->result < 0) - return; - g_mutex_lock(cachefile->lock); - char *pathname = g_strdup_printf("%s/%s", cachefile->log->log_directory, - cachefile->filename); - if (!g_file_set_contents(pathname, async->data->data, async->data->len, - NULL)) - { - g_print("Error writing out fetched file to cache!\n"); + if (async->result >= 0) { + char *pathname = g_strdup_printf("%s/%s", + cachefile->log->log_directory, + cachefile->filename); + if (!g_file_set_contents(pathname, async->data->data, async->data->len, + NULL)) + g_print("Error writing out fetched file to cache!\n"); + g_free(pathname); + + cachefile->fetching = FALSE; + cachefile->ready = TRUE; + } else { + g_print("Error fetching from cloud, retrying...\n"); + cloudlog_fetch_start(cachefile); } - g_free(pathname); - cachefile->fetching = FALSE; - cachefile->ready = TRUE; + bluesky_cachefile_unref(cachefile); g_cond_broadcast(cachefile->cond); g_mutex_unlock(cachefile->lock); @@ -314,6 +344,7 @@ BlueSkyCacheFile *bluesky_cachefile_lookup(BlueSkyFS *fs, BlueSkyLog *log = fs->log; + struct stat statbuf; char logname[64]; int type; @@ -330,10 +361,17 @@ BlueSkyCacheFile *bluesky_cachefile_lookup(BlueSkyFS *fs, g_mutex_lock(log->mmap_lock); map = g_hash_table_lookup(log->mmap_cache, logname); - if (map == NULL) { - /* TODO: stat() call */ + if (map == NULL + && type == CLOUDLOG_JOURNAL + && fstatat(log->dirfd, logname, &statbuf, 0) < 0) { + /* A stale reference to a journal file which doesn't exist any longer + * because it was reclaimed. Return NULL. */ + } else if (map == NULL) { + g_print("Adding cache file %s\n", logname); + map = g_new0(BlueSkyCacheFile, 1); - map->type = CLOUDLOG_JOURNAL; + map->fs = fs; + map->type = type; map->lock = g_mutex_new(); map->type = type; g_mutex_lock(map->lock); @@ -347,25 +385,15 @@ BlueSkyCacheFile *bluesky_cachefile_lookup(BlueSkyFS *fs, g_hash_table_insert(log->mmap_cache, map->filename, map); // If the log file is stored in the cloud, we may need to fetch it - if (clouddir >= 0) { - g_atomic_int_inc(&map->refcount); - map->fetching = TRUE; - g_print("Starting fetch of %s from cloud\n", logname); - BlueSkyStoreAsync *async = bluesky_store_async_new(fs->store); - async->op = STORE_OP_GET; - async->key = g_strdup(logname); - bluesky_store_async_add_notifier(async, - (GFunc)cloudlog_fetch_complete, - map); - bluesky_store_async_submit(async); - bluesky_store_async_unref(async); - } + if (clouddir >= 0) + cloudlog_fetch_start(map); } else { g_mutex_lock(map->lock); } g_mutex_unlock(log->mmap_lock); - g_atomic_int_inc(&map->refcount); + if (map != NULL) + g_atomic_int_inc(&map->refcount); return map; } @@ -488,16 +516,31 @@ void bluesky_cachefile_gc(BlueSkyFS *fs) g_print(" (fetching)"); g_print("\n"); - if (g_atomic_int_get(&cachefile->refcount) == 0 - && g_atomic_int_get(&cachefile->mapcount) == 0 - && cachefile->type == CLOUDLOG_CLOUD /* FIXME: journals too */) + gboolean deletion_candidate = FALSE; + if (g_atomic_int_get(&fs->log->disk_used) + > bluesky_options.cache_size + && g_atomic_int_get(&cachefile->refcount) == 0 + && g_atomic_int_get(&cachefile->mapcount) == 0) { + deletion_candidate = TRUE; + } + + /* Don't allow journal files to be reclaimed until all data is + * known to be durably stored in the cloud. */ + if (cachefile->type == CLOUDLOG_JOURNAL + && cachefile->log_seq >= fs->log->journal_watermark) + { + deletion_candidate = FALSE; + } + + if (deletion_candidate) { g_print(" ...deleting\n"); if (unlinkat(fs->log->dirfd, cachefile->filename, 0) < 0) { fprintf(stderr, "Unable to unlink journal %s: %m\n", cachefile->filename); } + g_atomic_int_add(&fs->log->disk_used, -(cachefile->len / 1024)); g_hash_table_remove(fs->log->mmap_cache, cachefile->filename); g_mutex_unlock(cachefile->lock); g_mutex_free(cachefile->lock);