int location_flags;
int pending_read, pending_write;
+ // If the object is not yet flushed to cloud storage but is written to a
+ // journal file locally, a reference to that journal file so that we can
+ // keep the dirty_refs count updated. When the object is deleted or
+ // becomes clean, decrement the dirty_refs counter of the journal file and
+ // set this pointer to NULL.
+ BlueSkyCacheFile *dirty_journal;
+
// A stable identifier for the object (only changes when authenticated data
// is written out, but stays the same when the in-cloud cleaner relocates
// the object).
uint32_t crc32c_finalize(uint32_t crc);
struct _BlueSkyLog {
+ BlueSkyFS *fs;
char *log_directory;
GAsyncQueue *queue;
int fd, dirfd;
int seq_num;
GSList *committed;
+ /* The currently-open log file. */
+ BlueSkyCacheFile *current_log;
+
/* Cache of log segments which have been memory-mapped. */
GMutex *mmap_lock;
GHashTable *mmap_cache;
size_t len;
BlueSkyFS *fs;
BlueSkyLog *log;
- gboolean fetching, ready;
+ gboolean fetching, ready; // Cloud data: downloading or ready for use
int64_t atime; // Access time, for cache management
+
+ /* Journal: Count of objects which are not yet committed to cloud storage
+ * but need to be; a non-zero value prevents the journal file from being
+ * deleted. */
+ gint dirty_refs;
};
BlueSkyLog *bluesky_log_new(const char *log_directory);
BlueSkyRCStr *bluesky_log_map_object(BlueSkyFS *fs, int log_dir, int log_seq,
int log_offset, int log_size);
void bluesky_mmap_unref(BlueSkyCacheFile *mmap);
+void bluesky_cachefile_unref(BlueSkyCacheFile *cachefile);
+BlueSkyCacheFile *bluesky_cachefile_lookup(BlueSkyFS *fs,
+ int clouddir, int log_seq);
void bluesky_cachefile_gc(BlueSkyFS *fs);
#ifdef __cplusplus
}
g_array_unref(log->links);
bluesky_string_unref(log->data);
+ if (log->dirty_journal != NULL)
+ g_atomic_int_add(&log->dirty_journal->dirty_refs, -1);
g_free(log);
}
}
g_string_append_len(state->data, (const char *)&header, sizeof(header));
g_string_append_len(state->data, log->data->data, log->data->len);
+ /* TODO: We should mark the objects as committed on the cloud until the
+ * data is flushed and acknowledged. */
log->location_flags |= CLOUDLOG_CLOUD;
bluesky_cloudlog_stats_update(log, 1);
+ if (log->dirty_journal != NULL) {
+ g_atomic_int_add(&log->dirty_journal->dirty_refs, -1);
+ log->dirty_journal = NULL;
+ }
g_mutex_unlock(log->lock);
if (state->data->len > CLOUDLOG_SEGMENT_SIZE)
BlueSkyCacheFile *cache = (BlueSkyCacheFile *)value;
int64_t age = bluesky_get_current_time() - cache->atime;
- g_print("%s addr=%p mapcount=%d refcount=%d atime_age=%f",
+ g_print("%s addr=%p mapcount=%d refcount=%d dirty=%d atime_age=%f",
cache->filename, cache->addr, cache->mapcount, cache->refcount,
- age / 1e6);
+ cache->dirty_refs, age / 1e6);
if (cache->fetching)
g_print(" (fetching)");
g_print("\n");
if (fs != NULL) {
fs->store = store;
fs->log = bluesky_log_new("journal");
+ fs->log->fs = fs;
g_print("Loaded filesystem superblock\n");
g_free(fs->name);
fs->name = g_strdup(name);
BlueSkyFS *fs = bluesky_new_fs(name);
fs->store = store;
fs->log = bluesky_log_new("journal");
+ fs->log->fs = fs;
BlueSkyInode *root = bluesky_new_inode(BLUESKY_ROOT_INUM, fs,
BLUESKY_DIRECTORY);
log->fd = -1;
}
+ if (log->current_log != NULL) {
+ bluesky_cachefile_unref(log->current_log);
+ log->current_log = NULL;
+ }
+
while (log->fd < 0) {
g_snprintf(logname, sizeof(logname), "journal-%08d", log->seq_num);
log->fd = openat(log->dirfd, logname, O_CREAT|O_WRONLY|O_EXCL, 0600);
}
}
+ log->current_log = bluesky_cachefile_lookup(log->fs, -1, log->seq_num);
+ g_assert(log->current_log != NULL);
+ g_mutex_unlock(log->current_log->lock);
+
if (ftruncate(log->fd, LOG_SEGMENT_SIZE) < 0) {
fprintf(stderr, "Unable to truncate logfile %s: %m\n", logname);
}
offset += sizeof(header) + sizeof(footer) + item->data->len;
+ /* Since we have just written a new dirty object to the journal,
+ * increment the count of live dirty objects in that journal file. The
+ * count will be decremented when objects are deleted or written to the
+ * cloud. */
+ g_atomic_int_add(&log->current_log->dirty_refs, 1);
+ item->dirty_journal = log->current_log;
+
/* Replace the log item's string data with a memory-mapped copy of the
* data, now that it has been written to the log file. (Even if it
* isn't yet on disk, it should at least be in the page cache and so
* then we'll just skip the file on this pass. */
if (g_mutex_trylock(cachefile->lock)) {
int64_t age = bluesky_get_current_time() - cachefile->atime;
- g_print("%s addr=%p mapcount=%d refcount=%d atime_age=%f",
+ g_print("%s addr=%p mapcount=%d refcount=%d dirty=%d atime_age=%f",
cachefile->filename, cachefile->addr, cachefile->mapcount,
- cachefile->refcount, age / 1e6);
+ cachefile->refcount, cachefile->dirty_refs, age / 1e6);
if (cachefile->fetching)
g_print(" (fetching)");
g_print("\n");
if (g_atomic_int_get(&cachefile->refcount) == 0
&& g_atomic_int_get(&cachefile->mapcount) == 0
- && cachefile->type == CLOUDLOG_CLOUD /* FIXME: journals too */)
+ && g_atomic_int_get(&cachefile->dirty_refs) == 0)
{
g_print(" ...deleting\n");
if (unlinkat(fs->log->dirfd, cachefile->filename, 0) < 0) {