}
while (log->fd < 0) {
- g_snprintf(logname, sizeof(logname), "log-%08d", log->seq_num);
+ g_snprintf(logname, sizeof(logname), "journal-%08d", log->seq_num);
log->fd = openat(log->dirfd, logname, O_CREAT|O_WRONLY|O_EXCL, 0600);
if (log->fd < 0 && errno == EEXIST) {
fprintf(stderr, "Log file %s already exists...\n", logname);
* to it. */
static int page_size = 0;
+void bluesky_cachefile_unref(BlueSkyCacheFile *cachefile)
+{
+ g_atomic_int_add(&cachefile->refcount, -1);
+}
+
static void cloudlog_fetch_complete(BlueSkyStoreAsync *async,
BlueSkyCacheFile *cachefile)
{
g_free(pathname);
cachefile->fetching = FALSE;
cachefile->ready = TRUE;
+ bluesky_cachefile_unref(cachefile);
g_cond_broadcast(cachefile->cond);
g_mutex_unlock(cachefile->lock);
}
/* Find the BlueSkyCacheFile object for the given journal or cloud log segment.
- * Returns the object in the locked state. */
+ * Returns the object in the locked state and with a reference taken. */
BlueSkyCacheFile *bluesky_cachefile_lookup(BlueSkyFS *fs,
int clouddir, int log_seq)
{
// A request for a local log file
if (clouddir < 0) {
- logname = g_strdup_printf("log-%08d", log_seq);
+ logname = g_strdup_printf("journal-%08d", log_seq);
} else {
logname = g_strdup_printf("log-%08d-%08d", clouddir, log_seq);
}
map->filename = logname;
map->log_seq = log_seq;
map->log = log;
+ g_atomic_int_set(&map->mapcount, 0);
g_atomic_int_set(&map->refcount, 0);
g_hash_table_insert(log->mmap_cache, GINT_TO_POINTER(log_seq), map);
// If the log file is stored in the cloud, we may need to fetch it
if (clouddir >= 0) {
+ g_atomic_int_inc(&map->refcount);
map->fetching = TRUE;
g_print("Starting fetch of %s from cloud\n", logname);
BlueSkyStoreAsync *async = bluesky_store_async_new(fs->store);
}
g_mutex_unlock(log->mmap_lock);
+ g_atomic_int_inc(&map->refcount);
return map;
}
if (fd < 0) {
fprintf(stderr, "Error opening logfile %s: %m\n", map->filename);
+ bluesky_cachefile_unref(map);
g_mutex_unlock(map->lock);
return NULL;
}
map->len = length;
g_print("Re-mapped log segment %d...\n", log_seq);
+ g_atomic_int_inc(&map->refcount);
close(fd);
}
g_mutex_unlock(log->mmap_lock);
BlueSkyRCStr *str;
+ map->atime = bluesky_get_current_time();
str = bluesky_string_new_from_mmap(map, log_offset, log_size);
+ bluesky_cachefile_unref(map);
g_mutex_unlock(map->lock);
return str;
}
if (mmap == NULL)
return;
- if (g_atomic_int_dec_and_test(&mmap->refcount)) {
+ if (g_atomic_int_dec_and_test(&mmap->mapcount)) {
g_mutex_lock(mmap->lock);
- if (g_atomic_int_get(&mmap->refcount) > 0) {
+ if (g_atomic_int_get(&mmap->mapcount) == 0) {
g_print("Unmapped log segment %d...\n", mmap->log_seq);
munmap((void *)mmap->addr, mmap->len);
mmap->addr = NULL;
- return;
+ g_atomic_int_add(&mmap->refcount, -1);
}
g_mutex_unlock(mmap->lock);
}
}
-