X-Git-Url: http://git.vrable.net/?a=blobdiff_plain;f=bluesky%2Flog.c;h=cec46cb9a9aa90cc3a48f72cc8526ba4187f6817;hb=8ae6b69c5d727704d4414e370c165dd71668031c;hp=9b4b62bd416fb7edaa6bd5fa61cf1ff558e779f8;hpb=e6135fbeca4bbedd24b5f8c55fb765a97bdc78ad;p=bluesky.git diff --git a/bluesky/log.c b/bluesky/log.c index 9b4b62b..cec46cb 100644 --- a/bluesky/log.c +++ b/bluesky/log.c @@ -82,6 +82,7 @@ static void log_commit(BlueSkyLog *log) g_cond_signal(item->cond); g_mutex_unlock(item->lock); log->committed = g_slist_delete_link(log->committed, log->committed); + bluesky_cloudlog_unref(item); batchsize++; } @@ -153,6 +154,7 @@ static gpointer log_thread(gpointer d) if ((item->location_flags | item->pending_write) & CLOUDLOG_JOURNAL) { g_mutex_unlock(item->lock); bluesky_cloudlog_unref(item); + g_atomic_int_add(&item->data_lock_count, -1); continue; } @@ -199,6 +201,7 @@ static gpointer log_thread(gpointer d) offset += sizeof(header) + sizeof(footer) + item->data->len; log->committed = g_slist_prepend(log->committed, item); + g_atomic_int_add(&item->data_lock_count, -1); g_mutex_unlock(item->lock); /* Force an if there are no other log items currently waiting to be @@ -235,6 +238,7 @@ BlueSkyLog *bluesky_log_new(const char *log_directory) void bluesky_log_item_submit(BlueSkyCloudLog *item, BlueSkyLog *log) { bluesky_cloudlog_ref(item); + g_atomic_int_add(&item->data_lock_count, 1); g_async_queue_push(log->queue, item); } @@ -282,10 +286,12 @@ BlueSkyRCStr *bluesky_log_map_object(BlueSkyLog *log, map = g_new0(BlueSkyMmap, 1); off_t length = lseek(fd, 0, SEEK_END); + map->log_seq = log_seq; map->addr = (const char *)mmap(NULL, length, PROT_READ, MAP_SHARED, fd, 0); map->len = length; - g_atomic_int_set(&map->refcount, 1); + map->log = log; + g_atomic_int_set(&map->refcount, 0); g_hash_table_insert(log->mmap_cache, GINT_TO_POINTER(log_seq), map); @@ -296,3 +302,31 @@ BlueSkyRCStr *bluesky_log_map_object(BlueSkyLog *log, return bluesky_string_new_from_mmap(map, log_offset, log_size); } + +void bluesky_mmap_unref(BlueSkyMmap *mmap) +{ + if (mmap == NULL) + return; + + if (g_atomic_int_dec_and_test(&mmap->refcount)) { + /* There is a potential race condition here: the BlueSkyLog contains a + * hash table of currently-existing BlueSkyMmap objects, which does not + * hold a reference. Some other thread might grab a new reference to + * this object after reading it from the hash table. So, before + * destruction we need to grab the lock for the hash table, then check + * the reference count again. If it is still zero, we can proceed with + * object destruction. */ + BlueSkyLog *log = mmap->log; + g_mutex_lock(log->mmap_lock); + if (g_atomic_int_get(&mmap->refcount) > 0) { + g_mutex_unlock(log->mmap_lock); + return; + } + + g_hash_table_remove(log->mmap_cache, GINT_TO_POINTER(mmap->log_seq)); + munmap((void *)mmap->addr, mmap->len); + g_free(mmap); + g_mutex_unlock(log->mmap_lock); + } +} +