void bluesky_store_add_barrier(BlueSkyStoreAsync *barrier,
BlueSkyStoreAsync *async);
-void bluesky_inode_start_sync(BlueSkyInode *inode, BlueSkyStoreAsync *barrier);
+void bluesky_inode_start_sync(BlueSkyInode *inode);
void bluesky_block_touch(BlueSkyInode *inode, uint64_t i);
void bluesky_block_fetch(BlueSkyFS *fs, BlueSkyBlock *block,
BlueSkyStoreAsync *barrier);
void bluesky_block_flush(BlueSkyFS *fs, BlueSkyBlock *block,
- BlueSkyStoreAsync *barrier, GList **log_items);
-void bluesky_file_flush(BlueSkyInode *inode, BlueSkyStoreAsync *barrier,
- GList **log_items);
+ GList **log_items);
+void bluesky_file_flush(BlueSkyInode *inode, GList **log_items);
void bluesky_file_drop_cached(BlueSkyInode *inode);
-/* Logging infrastructure for ensuring operations are persistently recorded to
- * disk. */
-#define BLUESKY_CRC32C_SEED (~(uint32_t)0)
-uint32_t crc32c(uint32_t crc, const char *buf, unsigned int length);
-uint32_t crc32c_finalize(uint32_t crc);
-
-struct _BlueSkyLog {
- char *log_directory;
- GAsyncQueue *queue;
- int fd;
- int seq_num;
-};
-
-typedef struct {
- gboolean committed;
- GMutex *lock;
- GCond *cond;
- char *key;
- BlueSkyRCStr *data;
-} BlueSkyLogItem;
-
-BlueSkyLog *bluesky_log_new(const char *log_directory);
-BlueSkyLogItem *bluesky_log_item_new();
-void bluesky_log_item_submit(BlueSkyLogItem *item, BlueSkyLog *log);
-void bluesky_log_item_finish(BlueSkyLogItem *item);
-
/* Writing of data to the cloud in log segments and tracking the location of
* various pieces of data (both where in the cloud and where cached locally).
* */
#define CLOUDLOG_CLOUD 0x04
typedef struct {
gint refcount;
+ GMutex *lock;
+ GCond *cond;
BlueSkyFS *fs;
// Bitmask of CLOUDLOG_* flags indicating where the object exists.
int location_flags;
+ int pending_read, pending_write;
// A stable identifier for the object (only changes when authenticated data
// is written out, but stays the same when the in-cloud cleaner relocates
BlueSkyCloudPointer location;
// TODO: Location in journal/cache
+ int log_seq, log_offset, log_size;
// Pointers to other objects
GArray *pointers;
BlueSkyCloudID bluesky_cloudlog_id_from_string(const gchar *idstr);
void bluesky_cloudlog_ref(BlueSkyCloudLog *log);
void bluesky_cloudlog_unref(BlueSkyCloudLog *log);
-BlueSkyLogItem *bluesky_cloudlog_sync(BlueSkyCloudLog *log);
+void bluesky_cloudlog_sync(BlueSkyCloudLog *log);
void bluesky_cloudlog_insert(BlueSkyCloudLog *log);
void bluesky_cloudlog_write_log(BlueSkyFS *fs);
+/* Logging infrastructure for ensuring operations are persistently recorded to
+ * disk. */
+#define BLUESKY_CRC32C_SEED (~(uint32_t)0)
+uint32_t crc32c(uint32_t crc, const char *buf, unsigned int length);
+uint32_t crc32c_finalize(uint32_t crc);
+
+struct _BlueSkyLog {
+ char *log_directory;
+ GAsyncQueue *queue;
+ int fd;
+ int seq_num;
+};
+
+BlueSkyLog *bluesky_log_new(const char *log_directory);
+void bluesky_log_item_submit(BlueSkyCloudLog *item, BlueSkyLog *log);
+void bluesky_log_finish_all(GList *log_items);
+
#ifdef __cplusplus
}
#endif
}
inode->change_pending = inode->change_count;
- /* Create a store barrier. All operations part of the writeback will be
- * added to this barrier, so when the barrier completes we know that the
- * writeback is finished. */
- BlueSkyStoreAsync *barrier = bluesky_store_async_new(fs->store);
- barrier->op = STORE_OP_BARRIER;
-
- bluesky_inode_start_sync(inode, barrier);
-
- bluesky_store_async_add_notifier(barrier, writeback_complete, inode);
- bluesky_store_async_submit(barrier);
- bluesky_store_async_unref(barrier);
+ bluesky_inode_start_sync(inode);
}
/* Try to flush dirty data to disk, either due to memory pressure or due to
{
BlueSkyCloudLog *log = g_new0(BlueSkyCloudLog, 1);
+ log->lock = g_mutex_new();
+ log->cond = g_cond_new();
log->fs = fs;
log->type = LOGTYPE_UNKNOWN;
log->id = bluesky_cloudlog_new_id();
}
/* Start a write of the object to the local log. */
-BlueSkyLogItem *bluesky_cloudlog_sync(BlueSkyCloudLog *log)
+void bluesky_cloudlog_sync(BlueSkyCloudLog *log)
{
- BlueSkyLogItem *log_item = bluesky_log_item_new();
- log_item->key = bluesky_cloudlog_id_to_string(log->id);
- log_item->data = log->data;
- bluesky_string_ref(log->data);
- bluesky_log_item_submit(log_item, log->fs->log);
- return log_item;
+ bluesky_log_item_submit(log, log->fs->log);
}
/* Add the given entry to the global hash table containing cloud log entries.
for (int i = 0; i < sizeof(BlueSkyCloudID); i++) {
g_print("%02x", (uint8_t)(log->id.bytes[i]));
}
- g_print(": ty=%d inode=%"PRIu64" locs=%x\n",
- log->type, log->inum, log->location_flags);
+ g_print(": ty=%d inode=%"PRIu64" locs=%x log@(%d,%d)\n",
+ log->type, log->inum, log->location_flags,
+ log->log_seq, log->log_offset);
}
/* Dump a summary of filesystem state as it is cached in memory. */
/* Write the given block to cloud-backed storage and mark it clean. */
void bluesky_block_flush(BlueSkyFS *fs, BlueSkyBlock *block,
- BlueSkyStoreAsync *barrier,
GList **log_items)
{
if (block->type != BLUESKY_BLOCK_DIRTY)
cloudlog->inum = 0; //FIXME
cloudlog->data = data;
bluesky_string_ref(data);
- *log_items = g_list_prepend(*log_items, bluesky_cloudlog_sync(cloudlog));
+ bluesky_cloudlog_sync(cloudlog);
+ *log_items = g_list_prepend(*log_items, cloudlog);
bluesky_cloudlog_insert(cloudlog);
- /* Store the file data asynchronously, and don't bother waiting for a
- * response. */
- BlueSkyStoreAsync *async = bluesky_store_async_new(fs->store);
- async->op = STORE_OP_PUT;
- async->key = g_strdup(name);
- bluesky_string_ref(data);
- async->data = data;
- bluesky_store_async_submit(async);
- if (barrier != NULL)
- bluesky_store_add_barrier(barrier, async);
- bluesky_store_async_unref(async);
-
g_free(block->ref);
block->ref = name;
}
/* Flush all blocks in a file to stable storage. */
-void bluesky_file_flush(BlueSkyInode *inode, BlueSkyStoreAsync *barrier,
- GList **log_items)
+void bluesky_file_flush(BlueSkyInode *inode, GList **log_items)
{
g_return_if_fail(inode->type == BLUESKY_REGULAR);
for (int i = 0; i < inode->blocks->len; i++) {
BlueSkyBlock *b = &g_array_index(inode->blocks, BlueSkyBlock, i);
- bluesky_block_flush(inode->fs, b, barrier, log_items);
+ bluesky_block_flush(inode->fs, b, log_items);
}
}
}
/* Start writeback of an inode and all associated data. */
-void bluesky_inode_start_sync(BlueSkyInode *inode, BlueSkyStoreAsync *barrier)
+void bluesky_inode_start_sync(BlueSkyInode *inode)
{
GList *log_items = NULL;
BlueSkyFS *fs = inode->fs;
if (inode->type == BLUESKY_REGULAR)
- bluesky_file_flush(inode, barrier, &log_items);
+ bluesky_file_flush(inode, &log_items);
GString *buf = g_string_new("");
bluesky_serialize_inode(buf, inode);
}
}
- log_items = g_list_prepend(log_items, bluesky_cloudlog_sync(cloudlog));
-
+ bluesky_cloudlog_sync(cloudlog);
+ log_items = g_list_prepend(log_items, cloudlog);
bluesky_cloudlog_insert(cloudlog);
/* Wait for all log items to be committed to disk. */
- while (log_items != NULL) {
- BlueSkyLogItem *log_item = (BlueSkyLogItem *)log_items->data;
- bluesky_log_item_finish(log_item);
- log_items = g_list_delete_link(log_items, log_items);
- }
-
- BlueSkyStoreAsync *async = bluesky_store_async_new(fs->store);
- async->op = STORE_OP_PUT;
- async->key = g_strdup(key);
- async->data = data;
- bluesky_store_async_submit(async);
- if (barrier != NULL)
- bluesky_store_add_barrier(barrier, async);
- bluesky_store_async_unref(async);
+ bluesky_log_finish_all(log_items);
}
/* Write back an inode and all associated data and wait for completion. Inode
* should already be locked. */
void bluesky_inode_do_sync(BlueSkyInode *inode)
{
- BlueSkyStoreAsync *barrier = bluesky_store_async_new(inode->fs->store);
- barrier->op = STORE_OP_BARRIER;
-
if (bluesky_verbose) {
g_log("bluesky/inode", G_LOG_LEVEL_DEBUG,
"Synchronous writeback for inode %"PRIu64"...", inode->inum);
}
- bluesky_inode_start_sync(inode, barrier);
- bluesky_store_async_submit(barrier);
- bluesky_store_async_wait(barrier);
- bluesky_store_async_unref(barrier);
+ bluesky_inode_start_sync(inode);
if (bluesky_verbose) {
g_log("bluesky/inode", G_LOG_LEVEL_DEBUG,
"Writeback for inode %"PRIu64" complete", inode->inum);
struct log_header {
uint32_t magic; // HEADER_MAGIC
uint64_t offset; // Starting byte offset of the log header
- uint32_t keysize; // Size of the log key (bytes)
uint32_t size; // Size of the data item (bytes)
+ BlueSkyCloudID id; // Object identifier
} __attribute__((packed));
struct log_footer {
fsync(dirfd);
}
- BlueSkyLogItem *item = (BlueSkyLogItem *)g_async_queue_pop(log->queue);
+ BlueSkyCloudLog *item
+ = (BlueSkyCloudLog *)g_async_queue_pop(log->queue);
g_mutex_lock(item->lock);
+ g_assert(item->data != NULL);
+
+ if ((item->location_flags | item->pending_write) & CLOUDLOG_JOURNAL) {
+ g_mutex_unlock(item->lock);
+ bluesky_cloudlog_unref(item);
+ continue;
+ }
+
+ item->pending_write |= CLOUDLOG_JOURNAL;
off_t logsize = lseek(log->fd, 0, SEEK_CUR);
struct log_header header;
header.magic = GUINT32_TO_LE(HEADER_MAGIC);
header.offset = GUINT64_TO_LE(logsize);
- header.keysize = GUINT32_TO_LE(strlen(item->key));
header.size = GUINT32_TO_LE(item->data->len);
+ header.id = item->id;
footer.magic = GUINT32_TO_LE(FOOTER_MAGIC);
uint32_t crc = BLUESKY_CRC32C_SEED;
writebuf(log->fd, (const char *)&header, sizeof(header));
crc = crc32c(crc, (const char *)&header, sizeof(header));
- writebuf(log->fd, item->key, strlen(item->key));
- crc = crc32c(crc, item->key, strlen(item->key));
-
writebuf(log->fd, item->data->data, item->data->len);
crc = crc32c(crc, item->data->data, item->data->len);
footer.crc = crc32c_finalize(crc);
writebuf(log->fd, (const char *)&footer, sizeof(footer));
- logsize += sizeof(header) + sizeof(footer);
- logsize += strlen(item->key) + item->data->len;
+ item->log_seq = log->seq_num;
+ item->log_offset = logsize + sizeof(header);
+ item->log_size = item->data->len;
+
+ logsize += sizeof(header) + sizeof(footer) + item->data->len;
committed = g_slist_prepend(committed, item);
+ g_mutex_unlock(item->lock);
/* Force an fsync either if we will be closing this log segment and
* opening a new file, or if there are no other log items currently
int batchsize = 0;
fdatasync(log->fd);
while (committed != NULL) {
- item = (BlueSkyLogItem *)committed->data;
- item->committed = TRUE;
+ item = (BlueSkyCloudLog *)committed->data;
+ g_mutex_lock(item->lock);
+ item->pending_write &= ~CLOUDLOG_JOURNAL;
+ item->location_flags |= CLOUDLOG_JOURNAL;
g_cond_signal(item->cond);
g_mutex_unlock(item->lock);
committed = g_slist_delete_link(committed, committed);
return log;
}
-BlueSkyLogItem *bluesky_log_item_new()
-{
- BlueSkyLogItem *item = g_new(BlueSkyLogItem, 1);
- item->committed = FALSE;
- item->lock = g_mutex_new();
- item->cond = g_cond_new();
- item->key = NULL;
- item->data = NULL;
- return item;
-}
-
-void bluesky_log_item_submit(BlueSkyLogItem *item, BlueSkyLog *log)
+void bluesky_log_item_submit(BlueSkyCloudLog *item, BlueSkyLog *log)
{
+ bluesky_cloudlog_ref(item);
g_async_queue_push(log->queue, item);
}
-static void bluesky_log_item_free(BlueSkyLogItem *item)
+void bluesky_log_finish_all(GList *log_items)
{
- g_free(item->key);
- bluesky_string_unref(item->data);
- g_mutex_free(item->lock);
- g_cond_free(item->cond);
- g_free(item);
-}
+ while (log_items != NULL) {
+ BlueSkyCloudLog *item = (BlueSkyCloudLog *)log_items->data;
-void bluesky_log_item_finish(BlueSkyLogItem *item)
-{
- g_mutex_lock(item->lock);
- while (!item->committed)
- g_cond_wait(item->cond, item->lock);
- g_mutex_unlock(item->lock);
- bluesky_log_item_free(item);
+ g_mutex_lock(item->lock);
+ while ((item->pending_write & CLOUDLOG_JOURNAL))
+ g_cond_wait(item->cond, item->lock);
+ g_mutex_unlock(item->lock);
+ bluesky_cloudlog_unref(item);
+
+ log_items = g_list_delete_link(log_items, log_items);
+ }
}