From: Michael Vrable Date: Wed, 18 Aug 2010 01:46:58 +0000 (-0700) Subject: Implement a (dumb) cache garbage collector. X-Git-Url: https://git.vrable.net/?a=commitdiff_plain;h=c0d52390142a72a89aa632172e03a3ec909890ed;p=bluesky.git Implement a (dumb) cache garbage collector. This is a proof of concept; it doesn't delete journal files and deletes cache files nearly as soon as they are unused, so it needs better algorithms for choosing when to delete files. But it does seem to work. --- diff --git a/bluesky/bluesky-private.h b/bluesky/bluesky-private.h index 65f356b..73f38e1 100644 --- a/bluesky/bluesky-private.h +++ b/bluesky/bluesky-private.h @@ -257,6 +257,10 @@ struct _BlueSkyLog { /* Cache of log segments which have been memory-mapped. */ GMutex *mmap_lock; GHashTable *mmap_cache; + + /* A count of the disk space consumed (in 1024-byte units) by all files + * tracked by mmap_cache (whether mapped or not, actually). */ + gint disk_used; }; /* An object for tracking log files which are stored locally--either the @@ -286,6 +290,8 @@ BlueSkyRCStr *bluesky_log_map_object(BlueSkyFS *fs, int log_dir, int log_seq, int log_offset, int log_size); void bluesky_mmap_unref(BlueSkyCacheFile *mmap); +void bluesky_cachefile_gc(BlueSkyFS *fs); + #ifdef __cplusplus } #endif diff --git a/bluesky/cache.c b/bluesky/cache.c index f897773..50c0f36 100644 --- a/bluesky/cache.c +++ b/bluesky/cache.c @@ -243,6 +243,7 @@ static gpointer flushd_task(BlueSkyFS *fs) flushd_dirty(fs); flushd_cloud(fs); flushd_clean(fs); + bluesky_cachefile_gc(fs); g_mutex_unlock(fs->flushd_lock); return NULL; diff --git a/bluesky/log.c b/bluesky/log.c index 9ba75b4..976e32e 100644 --- a/bluesky/log.c +++ b/bluesky/log.c @@ -234,7 +234,7 @@ BlueSkyLog *bluesky_log_new(const char *log_directory) log->seq_num = 0; log->queue = g_async_queue_new(); log->mmap_lock = g_mutex_new(); - log->mmap_cache = g_hash_table_new(NULL, NULL); + log->mmap_cache = g_hash_table_new(g_str_hash, g_str_equal); log->dirfd = open(log->log_directory, O_DIRECTORY); if (log->dirfd < 0) { @@ -306,7 +306,7 @@ static void cloudlog_fetch_complete(BlueSkyStoreAsync *async, /* Find the BlueSkyCacheFile object for the given journal or cloud log segment. * Returns the object in the locked state and with a reference taken. */ BlueSkyCacheFile *bluesky_cachefile_lookup(BlueSkyFS *fs, - int clouddir, int log_seq) + int clouddir, int log_seq) { if (page_size == 0) { page_size = getpagesize(); @@ -314,33 +314,37 @@ BlueSkyCacheFile *bluesky_cachefile_lookup(BlueSkyFS *fs, BlueSkyLog *log = fs->log; + char logname[64]; + int type; + + // A request for a local log file + if (clouddir < 0) { + sprintf(logname, "journal-%08d", log_seq); + type = CLOUDLOG_JOURNAL; + } else { + sprintf(logname, "log-%08d-%08d", clouddir, log_seq); + type = CLOUDLOG_CLOUD; + } + BlueSkyCacheFile *map; g_mutex_lock(log->mmap_lock); - map = g_hash_table_lookup(log->mmap_cache, GINT_TO_POINTER(log_seq)); + map = g_hash_table_lookup(log->mmap_cache, logname); if (map == NULL) { - char *logname; - - // A request for a local log file - if (clouddir < 0) { - logname = g_strdup_printf("journal-%08d", log_seq); - } else { - logname = g_strdup_printf("log-%08d-%08d", clouddir, log_seq); - } - /* TODO: stat() call */ map = g_new0(BlueSkyCacheFile, 1); map->type = CLOUDLOG_JOURNAL; map->lock = g_mutex_new(); + map->type = type; g_mutex_lock(map->lock); map->cond = g_cond_new(); - map->filename = logname; + map->filename = g_strdup(logname); map->log_seq = log_seq; map->log = log; g_atomic_int_set(&map->mapcount, 0); g_atomic_int_set(&map->refcount, 0); - g_hash_table_insert(log->mmap_cache, GINT_TO_POINTER(log_seq), map); + g_hash_table_insert(log->mmap_cache, map->filename, map); // If the log file is stored in the cloud, we may need to fetch it if (clouddir >= 0) { @@ -397,7 +401,9 @@ BlueSkyRCStr *bluesky_log_map_object(BlueSkyFS *fs, int log_dir, off_t length = lseek(fd, 0, SEEK_END); map->addr = (const char *)mmap(NULL, length, PROT_READ, MAP_SHARED, fd, 0); + g_atomic_int_add(&log->disk_used, -(map->len / 1024)); map->len = length; + g_atomic_int_add(&log->disk_used, map->len / 1024); g_print("Re-mapped log segment %d...\n", log_seq); g_atomic_int_inc(&map->refcount); @@ -431,3 +437,80 @@ void bluesky_mmap_unref(BlueSkyCacheFile *mmap) g_mutex_unlock(mmap->lock); } } + +/* Scan through all currently-stored files in the journal/cache and garbage + * collect old unused ones, if needed. */ +static void gather_cachefiles(gpointer key, gpointer value, gpointer user_data) +{ + GList **files = (GList **)user_data; + *files = g_list_prepend(*files, value); +} + +static gint compare_cachefiles(gconstpointer a, gconstpointer b) +{ + int64_t ta, tb; + + ta = ((BlueSkyCacheFile *)a)->atime; + tb = ((BlueSkyCacheFile *)b)->atime; + if (ta < tb) + return -1; + else if (ta > tb) + return 1; + else + return 0; +} + +void bluesky_cachefile_gc(BlueSkyFS *fs) +{ + GList *files = NULL; + + g_mutex_lock(fs->log->mmap_lock); + g_hash_table_foreach(fs->log->mmap_cache, gather_cachefiles, &files); + + /* Sort based on atime. The atime should be stable since it shouln't be + * updated except by threads which can grab the mmap_lock, which we already + * hold. */ + files = g_list_sort(files, compare_cachefiles); + + /* Walk the list of files, starting with the oldest, deleting files if + * possible until enough space has been reclaimed. */ + g_print("\nScanning cache: (total size = %d kB)\n", fs->log->disk_used); + while (files != NULL) { + BlueSkyCacheFile *cachefile = (BlueSkyCacheFile *)files->data; + /* Try to lock the structure, but if the lock is held by another thread + * then we'll just skip the file on this pass. */ + if (g_mutex_trylock(cachefile->lock)) { + int64_t age = bluesky_get_current_time() - cachefile->atime; + g_print("%s addr=%p mapcount=%d refcount=%d atime_age=%f", + cachefile->filename, cachefile->addr, cachefile->mapcount, + cachefile->refcount, age / 1e6); + if (cachefile->fetching) + g_print(" (fetching)"); + g_print("\n"); + + if (g_atomic_int_get(&cachefile->refcount) == 0 + && g_atomic_int_get(&cachefile->mapcount) == 0 + && cachefile->type == CLOUDLOG_CLOUD /* FIXME: journals too */) + { + g_print(" ...deleting\n"); + if (unlinkat(fs->log->dirfd, cachefile->filename, 0) < 0) { + fprintf(stderr, "Unable to unlink journal %s: %m\n", + cachefile->filename); + } + + g_hash_table_remove(fs->log->mmap_cache, cachefile->filename); + g_mutex_unlock(cachefile->lock); + g_mutex_free(cachefile->lock); + g_cond_free(cachefile->cond); + g_free(cachefile->filename); + g_free(cachefile); + } else { + g_mutex_unlock(cachefile->lock); + } + } + files = g_list_delete_link(files, files); + } + g_list_free(files); + + g_mutex_unlock(fs->log->mmap_lock); +}