Improve journal/cloud cache locking and add access time tracking.
[bluesky.git] / bluesky / log.c
index 48f662b..9ba75b4 100644 (file)
@@ -104,7 +104,7 @@ static gboolean log_open(BlueSkyLog *log)
     }
 
     while (log->fd < 0) {
-        g_snprintf(logname, sizeof(logname), "log-%08d", log->seq_num);
+        g_snprintf(logname, sizeof(logname), "journal-%08d", log->seq_num);
         log->fd = openat(log->dirfd, logname, O_CREAT|O_WRONLY|O_EXCL, 0600);
         if (log->fd < 0 && errno == EEXIST) {
             fprintf(stderr, "Log file %s already exists...\n", logname);
@@ -273,6 +273,11 @@ void bluesky_log_finish_all(GList *log_items)
  * to it. */
 static int page_size = 0;
 
+void bluesky_cachefile_unref(BlueSkyCacheFile *cachefile)
+{
+    g_atomic_int_add(&cachefile->refcount, -1);
+}
+
 static void cloudlog_fetch_complete(BlueSkyStoreAsync *async,
                                     BlueSkyCacheFile *cachefile)
 {
@@ -293,12 +298,13 @@ static void cloudlog_fetch_complete(BlueSkyStoreAsync *async,
     g_free(pathname);
     cachefile->fetching = FALSE;
     cachefile->ready = TRUE;
+    bluesky_cachefile_unref(cachefile);
     g_cond_broadcast(cachefile->cond);
     g_mutex_unlock(cachefile->lock);
 }
 
 /* Find the BlueSkyCacheFile object for the given journal or cloud log segment.
- * Returns the object in the locked state. */
+ * Returns the object in the locked state and with a reference taken. */
 BlueSkyCacheFile *bluesky_cachefile_lookup(BlueSkyFS *fs,
                                          int clouddir, int log_seq)
 {
@@ -317,7 +323,7 @@ BlueSkyCacheFile *bluesky_cachefile_lookup(BlueSkyFS *fs,
 
         // A request for a local log file
         if (clouddir < 0) {
-            logname = g_strdup_printf("log-%08d", log_seq);
+            logname = g_strdup_printf("journal-%08d", log_seq);
         } else {
             logname = g_strdup_printf("log-%08d-%08d", clouddir, log_seq);
         }
@@ -331,12 +337,14 @@ BlueSkyCacheFile *bluesky_cachefile_lookup(BlueSkyFS *fs,
         map->filename = logname;
         map->log_seq = log_seq;
         map->log = log;
+        g_atomic_int_set(&map->mapcount, 0);
         g_atomic_int_set(&map->refcount, 0);
 
         g_hash_table_insert(log->mmap_cache, GINT_TO_POINTER(log_seq), map);
 
         // If the log file is stored in the cloud, we may need to fetch it
         if (clouddir >= 0) {
+            g_atomic_int_inc(&map->refcount);
             map->fetching = TRUE;
             g_print("Starting fetch of %s from cloud\n", logname);
             BlueSkyStoreAsync *async = bluesky_store_async_new(fs->store);
@@ -353,6 +361,7 @@ BlueSkyCacheFile *bluesky_cachefile_lookup(BlueSkyFS *fs,
     }
 
     g_mutex_unlock(log->mmap_lock);
+    g_atomic_int_inc(&map->refcount);
     return map;
 }
 
@@ -380,6 +389,7 @@ BlueSkyRCStr *bluesky_log_map_object(BlueSkyFS *fs, int log_dir,
 
         if (fd < 0) {
             fprintf(stderr, "Error opening logfile %s: %m\n", map->filename);
+            bluesky_cachefile_unref(map);
             g_mutex_unlock(map->lock);
             return NULL;
         }
@@ -390,6 +400,7 @@ BlueSkyRCStr *bluesky_log_map_object(BlueSkyFS *fs, int log_dir,
         map->len = length;
 
         g_print("Re-mapped log segment %d...\n", log_seq);
+        g_atomic_int_inc(&map->refcount);
 
         close(fd);
     }
@@ -397,7 +408,9 @@ BlueSkyRCStr *bluesky_log_map_object(BlueSkyFS *fs, int log_dir,
     g_mutex_unlock(log->mmap_lock);
 
     BlueSkyRCStr *str;
+    map->atime = bluesky_get_current_time();
     str = bluesky_string_new_from_mmap(map, log_offset, log_size);
+    bluesky_cachefile_unref(map);
     g_mutex_unlock(map->lock);
     return str;
 }
@@ -407,15 +420,14 @@ void bluesky_mmap_unref(BlueSkyCacheFile *mmap)
     if (mmap == NULL)
         return;
 
-    if (g_atomic_int_dec_and_test(&mmap->refcount)) {
+    if (g_atomic_int_dec_and_test(&mmap->mapcount)) {
         g_mutex_lock(mmap->lock);
-        if (g_atomic_int_get(&mmap->refcount) > 0) {
+        if (g_atomic_int_get(&mmap->mapcount) == 0) {
             g_print("Unmapped log segment %d...\n", mmap->log_seq);
             munmap((void *)mmap->addr, mmap->len);
             mmap->addr = NULL;
-            return;
+            g_atomic_int_add(&mmap->refcount, -1);
         }
         g_mutex_unlock(mmap->lock);
     }
 }
-