X-Git-Url: http://git.vrable.net/?a=blobdiff_plain;f=bluesky%2Flog.c;h=9dfaa5bfd61437505ca52ed7b339296a8804a6f0;hb=ddaec40a37a5e65e53546b14632b1b0b35613264;hp=9b4b62bd416fb7edaa6bd5fa61cf1ff558e779f8;hpb=e6135fbeca4bbedd24b5f8c55fb765a97bdc78ad;p=bluesky.git diff --git a/bluesky/log.c b/bluesky/log.c index 9b4b62b..9dfaa5b 100644 --- a/bluesky/log.c +++ b/bluesky/log.c @@ -36,7 +36,7 @@ // Rough size limit for a log segment. This is not a firm limit and there are // no absolute guarantees on the size of a log segment. -#define LOG_SEGMENT_SIZE (1 << 23) +#define LOG_SEGMENT_SIZE (1 << 24) #define HEADER_MAGIC 0x676f4c0a #define FOOTER_MAGIC 0x2e435243 @@ -82,6 +82,7 @@ static void log_commit(BlueSkyLog *log) g_cond_signal(item->cond); g_mutex_unlock(item->lock); log->committed = g_slist_delete_link(log->committed, log->committed); + bluesky_cloudlog_unref(item); batchsize++; } @@ -153,6 +154,7 @@ static gpointer log_thread(gpointer d) if ((item->location_flags | item->pending_write) & CLOUDLOG_JOURNAL) { g_mutex_unlock(item->lock); bluesky_cloudlog_unref(item); + g_atomic_int_add(&item->data_lock_count, -1); continue; } @@ -199,6 +201,7 @@ static gpointer log_thread(gpointer d) offset += sizeof(header) + sizeof(footer) + item->data->len; log->committed = g_slist_prepend(log->committed, item); + g_atomic_int_add(&item->data_lock_count, -1); g_mutex_unlock(item->lock); /* Force an if there are no other log items currently waiting to be @@ -235,6 +238,7 @@ BlueSkyLog *bluesky_log_new(const char *log_directory) void bluesky_log_item_submit(BlueSkyCloudLog *item, BlueSkyLog *log) { bluesky_cloudlog_ref(item); + g_atomic_int_add(&item->data_lock_count, 1); g_async_queue_push(log->queue, item); } @@ -282,13 +286,17 @@ BlueSkyRCStr *bluesky_log_map_object(BlueSkyLog *log, map = g_new0(BlueSkyMmap, 1); off_t length = lseek(fd, 0, SEEK_END); + map->log_seq = log_seq; map->addr = (const char *)mmap(NULL, length, PROT_READ, MAP_SHARED, fd, 0); map->len = length; - g_atomic_int_set(&map->refcount, 1); + map->log = log; + g_atomic_int_set(&map->refcount, 0); g_hash_table_insert(log->mmap_cache, GINT_TO_POINTER(log_seq), map); + g_print("Mapped log segment %d...\n", log_seq); + close(fd); } @@ -296,3 +304,31 @@ BlueSkyRCStr *bluesky_log_map_object(BlueSkyLog *log, return bluesky_string_new_from_mmap(map, log_offset, log_size); } + +void bluesky_mmap_unref(BlueSkyMmap *mmap) +{ + if (mmap == NULL) + return; + + if (g_atomic_int_dec_and_test(&mmap->refcount)) { + /* There is a potential race condition here: the BlueSkyLog contains a + * hash table of currently-existing BlueSkyMmap objects, which does not + * hold a reference. Some other thread might grab a new reference to + * this object after reading it from the hash table. So, before + * destruction we need to grab the lock for the hash table, then check + * the reference count again. If it is still zero, we can proceed with + * object destruction. */ + BlueSkyLog *log = mmap->log; + g_mutex_lock(log->mmap_lock); + if (g_atomic_int_get(&mmap->refcount) > 0) { + g_mutex_unlock(log->mmap_lock); + return; + } + + g_hash_table_remove(log->mmap_cache, GINT_TO_POINTER(mmap->log_seq)); + munmap((void *)mmap->addr, mmap->len); + g_free(mmap); + g_mutex_unlock(log->mmap_lock); + } +} +