void bluesky_block_fetch(BlueSkyFS *fs, BlueSkyBlock *block,
BlueSkyStoreAsync *barrier);
void bluesky_block_flush(BlueSkyFS *fs, BlueSkyBlock *block,
- BlueSkyStoreAsync *barrier);
-void bluesky_file_flush(BlueSkyInode *inode, BlueSkyStoreAsync *barrier);
+ BlueSkyStoreAsync *barrier, GList **log_items);
+void bluesky_file_flush(BlueSkyInode *inode, BlueSkyStoreAsync *barrier,
+ GList **log_items);
void bluesky_file_drop_cached(BlueSkyInode *inode);
/* Logging infrastructure for ensuring operations are persistently recorded to
/* change_count is increased with every operation which modifies the inode,
* and can be used to determine if cached data is still valid.
* change_commit is the value of change_count when the inode was last
- * committed to stable storage. */
- uint64_t change_count, change_commit;
+ * committed to stable storage (the log).
+ * change_cloud tracks which version was last commited to cloud storage. */
+ uint64_t change_count, change_commit, change_cloud;
/* Timestamp for controlling when modified data is flushed to stable
* storage. When an inode is first modified from a clean state, this is
/* Write the given block to cloud-backed storage and mark it clean. */
void bluesky_block_flush(BlueSkyFS *fs, BlueSkyBlock *block,
- BlueSkyStoreAsync *barrier)
+ BlueSkyStoreAsync *barrier,
+ GList **log_items)
{
if (block->type != BLUESKY_BLOCK_DIRTY)
return;
g_checksum_update(csum, (const guchar *)data->data, data->len);
gchar *name = g_strdup(g_checksum_get_string(csum));
+ /* Start commit to the local log. */
+ BlueSkyLogItem *log_item = bluesky_log_item_new();
+ log_item->key = g_strdup(name);
+ log_item->data = data;
+ bluesky_string_ref(data);
+ bluesky_log_item_submit(log_item, fs->log);
+ *log_items = g_list_prepend(*log_items, log_item);
+
/* Store the file data asynchronously, and don't bother waiting for a
* response. */
BlueSkyStoreAsync *async = bluesky_store_async_new(fs->store);
}
/* Flush all blocks in a file to stable storage. */
-void bluesky_file_flush(BlueSkyInode *inode, BlueSkyStoreAsync *barrier)
+void bluesky_file_flush(BlueSkyInode *inode, BlueSkyStoreAsync *barrier,
+ GList **log_items)
{
g_return_if_fail(inode->type == BLUESKY_REGULAR);
for (int i = 0; i < inode->blocks->len; i++) {
BlueSkyBlock *b = &g_array_index(inode->blocks, BlueSkyBlock, i);
- bluesky_block_flush(inode->fs, b, barrier);
+ bluesky_block_flush(inode->fs, b, barrier, log_items);
}
}
if (inode->change_time == 0)
inode->change_time = now;
+#if 0
if (bluesky_options.writethrough_cache)
bluesky_file_flush(inode, NULL);
+#endif
g_mutex_lock(inode->fs->lock);
bluesky_list_unlink(&inode->fs->dirty_list, inode->dirty_list);
BlueSkyFS *fs = bluesky_deserialize_superblock(data->data);
if (fs != NULL) {
fs->store = store;
+ fs->log = bluesky_log_new("journal");
g_print("Loaded filesystem superblock\n");
g_free(fs->name);
fs->name = g_strdup(name);
/* Start writeback of an inode and all associated data. */
void bluesky_inode_start_sync(BlueSkyInode *inode, BlueSkyStoreAsync *barrier)
{
+ GList *log_items = NULL;
BlueSkyFS *fs = inode->fs;
if (inode->type == BLUESKY_REGULAR)
- bluesky_file_flush(inode, barrier);
+ bluesky_file_flush(inode, barrier, &log_items);
GString *buf = g_string_new("");
bluesky_serialize_inode(buf, inode);
log_item->data = data;
bluesky_string_ref(data);
bluesky_log_item_submit(log_item, fs->log);
- bluesky_log_item_finish(log_item);
+ log_items = g_list_prepend(log_items, log_item);
+
+ /* Wait for all log items to be committed to disk. */
+ while (log_items != NULL) {
+ log_item = (BlueSkyLogItem *)log_items->data;
+ bluesky_log_item_finish(log_item);
+ log_items = g_list_delete_link(log_items, log_items);
+ }
BlueSkyStoreAsync *async = bluesky_store_async_new(fs->store);
async->op = STORE_OP_PUT;
// Rough size limit for a log segment. This is not a firm limit and there are
// no absolute guarantees on the size of a log segment.
-#define LOG_SEGMENT_SIZE (1 << 20)
+#define LOG_SEGMENT_SIZE (1 << 23)
static void writebuf(int fd, const char *buf, size_t len)
{
{
BlueSkyLog *log = (BlueSkyLog *)d;
+ /* If there are multiple log items to write, we may write more than one
+ * before calling fsync(). The committed list is used to track all the
+ * items that should be marked as committed once that final fsync() is
+ * done. */
+ GSList *committed = NULL;
+
int dirfd = open(log->log_directory, O_DIRECTORY);
if (dirfd < 0) {
fprintf(stderr, "Unable to open logging directory: %m\n");
g_mutex_lock(item->lock);
writebuf(log->fd, item->key, strlen(item->key));
writebuf(log->fd, item->data->data, item->data->len);
- fdatasync(log->fd);
- item->committed = TRUE;
- g_cond_signal(item->cond);
- g_mutex_unlock(item->lock);
+ committed = g_slist_prepend(committed, item);
+ /* Force an fsync either if we will be closing this log segment and
+ * opening a new file, or if there are no other log items currently
+ * waiting to be written. */
off_t logsize = lseek(log->fd, 0, SEEK_CUR);
+ if (logsize >= LOG_SEGMENT_SIZE
+ || g_async_queue_length(log->queue) <= 0)
+ {
+ int batchsize = 0;
+ fdatasync(log->fd);
+ while (committed != NULL) {
+ item = (BlueSkyLogItem *)committed->data;
+ item->committed = TRUE;
+ g_cond_signal(item->cond);
+ g_mutex_unlock(item->lock);
+ committed = g_slist_delete_link(committed, committed);
+ batchsize++;
+ }
+ /* if (batchsize > 1)
+ g_print("Log batch size: %d\n", batchsize); */
+ }
+
if (logsize < 0 || logsize >= LOG_SEGMENT_SIZE) {
close(log->fd);
log->fd = -1;