From 337e1b04c921c92697b74aed630343c86fabfcbd Mon Sep 17 00:00:00 2001 From: Michael Vrable Date: Tue, 31 Aug 2010 14:06:53 -0700 Subject: [PATCH] Implement basic full log replay. This still needs some checking over for bugs and minor fixes. It replays the entire journal from the start to rebuild filesystem state. Still needed: partial joural replay, starting from a checkpoint in the cloud. --- bluesky/bluesky-private.h | 7 ++- bluesky/inode.c | 7 +-- bluesky/log.c | 118 ++++++++++++++++++++++++++++++++++---- bluesky/serialize.c | 40 +++++++++++-- 4 files changed, 149 insertions(+), 23 deletions(-) diff --git a/bluesky/bluesky-private.h b/bluesky/bluesky-private.h index 01d8b88..fcdba98 100644 --- a/bluesky/bluesky-private.h +++ b/bluesky/bluesky-private.h @@ -45,7 +45,12 @@ BlueSkyInode *bluesky_list_tail(GList *head); void bluesky_serialize_superblock(GString *out, BlueSkyFS *fs); BlueSkyFS *bluesky_deserialize_superblock(const gchar *buf); BlueSkyCloudLog *bluesky_serialize_inode(BlueSkyInode *inode); -gboolean bluesky_deserialize_inode(BlueSkyInode *inode, const gchar *buf); +gboolean bluesky_deserialize_inode(BlueSkyInode *inode, BlueSkyCloudLog *item); + +void bluesky_serialize_cloudlog(BlueSkyCloudLog *log, + GString *encrypted, + GString *authenticated, + GString *writable); /* Storage layer. Requests can be performed asynchronously, so these objects * help keep track of operations in progress. */ diff --git a/bluesky/inode.c b/bluesky/inode.c index b8ab4e8..9878559 100644 --- a/bluesky/inode.c +++ b/bluesky/inode.c @@ -127,11 +127,11 @@ BlueSkyFS *bluesky_init_fs(gchar *name, BlueSkyStore *store) bluesky_insert_inode(fs, root); bluesky_inode_update_ctime(root, TRUE); + bluesky_replay(fs); + bluesky_inode_do_sync(root); bluesky_superblock_flush(fs); - bluesky_replay(fs); - return fs; } @@ -350,8 +350,7 @@ static void complete_inode_fetch(BlueSkyStoreAsync *async, BlueSkyInode *inode) "Completing fetch of inode %"PRIu64"...", inode->inum); } - if (async->result != 0 - || !bluesky_deserialize_inode(inode, async->data->data)) + if (async->result != 0 || FALSE) { if (bluesky_verbose) { g_log("bluesky/inode", G_LOG_LEVEL_DEBUG, diff --git a/bluesky/log.c b/bluesky/log.c index a3acf10..f8062cf 100644 --- a/bluesky/log.c +++ b/bluesky/log.c @@ -45,7 +45,9 @@ struct log_header { uint32_t magic; // HEADER_MAGIC uint8_t type; // Object type + '0' uint32_t offset; // Starting byte offset of the log header - uint32_t size; // Size of the data item (bytes) + uint32_t size1; // Size of the data item (bytes) + uint32_t size2; // + uint32_t size3; // uint64_t inum; // Inode which owns this data, if any BlueSkyCloudID id; // Object identifier } __attribute__((packed)); @@ -175,9 +177,15 @@ static gpointer log_thread(gpointer d) item->pending_write |= CLOUDLOG_JOURNAL; bluesky_cloudlog_stats_update(item, 1); + GString *data1 = g_string_new(""); + GString *data2 = g_string_new(""); + GString *data3 = g_string_new(""); + bluesky_serialize_cloudlog(item, data1, data2, data3); + struct log_header header; struct log_footer footer; - size_t size = sizeof(header) + sizeof(footer) + item->data->len; + size_t size = sizeof(header) + sizeof(footer); + size += data1->len + data2->len + data3->len; off_t offset = 0; if (log->fd >= 0) offset = lseek(log->fd, 0, SEEK_CUR); @@ -192,7 +200,9 @@ static gpointer log_thread(gpointer d) header.magic = GUINT32_TO_LE(HEADER_MAGIC); header.offset = GUINT32_TO_LE(offset); - header.size = GUINT32_TO_LE(item->data->len); + header.size1 = GUINT32_TO_LE(data1->len); + header.size2 = GUINT32_TO_LE(data2->len); + header.size3 = GUINT32_TO_LE(data3->len); header.type = item->type + '0'; header.id = item->id; header.inum = GUINT64_TO_LE(item->inum); @@ -203,8 +213,12 @@ static gpointer log_thread(gpointer d) writebuf(log->fd, (const char *)&header, sizeof(header)); crc = crc32c(crc, (const char *)&header, sizeof(header)); - writebuf(log->fd, item->data->data, item->data->len); - crc = crc32c(crc, item->data->data, item->data->len); + writebuf(log->fd, data1->str, data1->len); + crc = crc32c(crc, data1->str, data1->len); + writebuf(log->fd, data2->str, data2->len); + crc = crc32c(crc, data2->str, data2->len); + writebuf(log->fd, data3->str, data3->len); + crc = crc32c(crc, data3->str, data3->len); crc = crc32c(crc, (const char *)&footer, sizeof(footer) - sizeof(uint32_t)); @@ -217,6 +231,10 @@ static gpointer log_thread(gpointer d) offset += sizeof(header) + sizeof(footer) + item->data->len; + g_string_free(data1, TRUE); + g_string_free(data2, TRUE); + g_string_free(data3, TRUE); + /* Replace the log item's string data with a memory-mapped copy of the * data, now that it has been written to the log file. (Even if it * isn't yet on disk, it should at least be in the page cache and so @@ -603,7 +621,9 @@ static gboolean validate_journal_item(const char *buf, size_t len, off_t offset) return FALSE; if (GUINT32_FROM_LE(header->offset) != offset) return FALSE; - size_t size = GUINT32_FROM_LE(header->size); + size_t size = GUINT32_FROM_LE(header->size1) + + GUINT32_FROM_LE(header->size2) + + GUINT32_FROM_LE(header->size3); off_t footer_offset = offset + sizeof(struct log_header) + size; if (footer_offset + sizeof(struct log_footer) > len) @@ -634,11 +654,56 @@ static void bluesky_replay_scan_journal(const char *buf, size_t len) while (validate_journal_item(buf, len, offset)) { header = (const struct log_header *)(buf + offset); - size_t size = GUINT32_FROM_LE(header->size); + size_t size = GUINT32_FROM_LE(header->size1) + + GUINT32_FROM_LE(header->size2) + + GUINT32_FROM_LE(header->size3); offset += sizeof(struct log_header) + size + sizeof(struct log_footer); } } +static void reload_item(BlueSkyCloudLog *log_item, + const char *data, + size_t len1, size_t len2, size_t len3) +{ + BlueSkyFS *fs = log_item->fs; + /*const char *data1 = data;*/ + const BlueSkyCloudID *data2 + = (const BlueSkyCloudID *)(data + len1); + /*const BlueSkyCloudPointer *data3 + = (const BlueSkyCloudPointer *)(data + len1 + len2);*/ + + bluesky_string_unref(log_item->data); + log_item->data = NULL; + log_item->location_flags = CLOUDLOG_JOURNAL; + + BlueSkyCloudID id0; + memset(&id0, 0, sizeof(id0)); + + int link_count = len2 / sizeof(BlueSkyCloudID); + GArray *new_links = g_array_new(FALSE, TRUE, sizeof(BlueSkyCloudLog *)); + for (int i = 0; i < link_count; i++) { + BlueSkyCloudID id = data2[i]; + BlueSkyCloudLog *ref = NULL; + if (memcmp(&id, &id0, sizeof(BlueSkyCloudID)) != 0) { + g_mutex_lock(fs->lock); + ref = g_hash_table_lookup(fs->locations, &id); + if (ref != NULL) { + bluesky_cloudlog_ref(ref); + } + g_mutex_unlock(fs->lock); + } + g_array_append_val(new_links, ref); + } + + for (int i = 0; i < log_item->links->len; i++) { + BlueSkyCloudLog *c = g_array_index(log_item->links, + BlueSkyCloudLog *, i); + bluesky_cloudlog_unref(c); + } + g_array_unref(log_item->links); + log_item->links = new_links; +} + static void bluesky_replay_scan_journal2(BlueSkyFS *fs, GList **objects, int log_seq, const char *buf, size_t len) @@ -649,7 +714,9 @@ static void bluesky_replay_scan_journal2(BlueSkyFS *fs, GList **objects, while (validate_journal_item(buf, len, offset)) { header = (const struct log_header *)(buf + offset); g_print("In replay found valid item at offset %zd\n", offset); - size_t size = GUINT32_FROM_LE(header->size); + size_t size = GUINT32_FROM_LE(header->size1) + + GUINT32_FROM_LE(header->size2) + + GUINT32_FROM_LE(header->size3); g_mutex_lock(fs->lock); BlueSkyCloudLog *log_item; @@ -665,12 +732,39 @@ static void bluesky_replay_scan_journal2(BlueSkyFS *fs, GList **objects, g_mutex_unlock(fs->lock); *objects = g_list_prepend(*objects, log_item); - bluesky_string_unref(log_item->data); - log_item->location_flags = CLOUDLOG_JOURNAL; - log_item->data = NULL; + log_item->inum = GUINT64_FROM_LE(header->inum); + reload_item(log_item, buf + offset + sizeof(struct log_header), + GUINT32_FROM_LE(header->size1), + GUINT32_FROM_LE(header->size2), + GUINT32_FROM_LE(header->size3)); log_item->log_seq = log_seq; log_item->log_offset = offset + sizeof(struct log_header); - log_item->log_size = header->size; + log_item->log_size = header->size1; + + bluesky_string_unref(log_item->data); + log_item->data = bluesky_string_new(g_memdup(buf + offset + sizeof(struct log_header), GUINT32_FROM_LE(header->size1)), GUINT32_FROM_LE(header->size1)); + + /* For any inodes which were read from the journal, deserialize the + * inode information, overwriting any old inode data. */ + if (header->type - '0' == LOGTYPE_INODE) { + uint64_t inum = GUINT64_FROM_LE(header->inum); + BlueSkyInode *inode; + g_mutex_lock(fs->lock); + inode = (BlueSkyInode *)g_hash_table_lookup(fs->inodes, &inum); + if (inode == NULL) { + inode = bluesky_new_inode(inum, fs, BLUESKY_PENDING); + inode->change_count = 0; + bluesky_insert_inode(fs, inode); + } + g_mutex_lock(inode->lock); + if (!bluesky_deserialize_inode(inode, log_item)) + g_print("Error deserializing inode %"PRIu64"\n", inum); + fs->next_inum = MAX(fs->next_inum, inum + 1); + g_mutex_unlock(inode->lock); + g_mutex_unlock(fs->lock); + } + bluesky_string_unref(log_item->data); + log_item->data = NULL; g_mutex_unlock(log_item->lock); offset += sizeof(struct log_header) + size + sizeof(struct log_footer); diff --git a/bluesky/serialize.c b/bluesky/serialize.c index f90c63a..792b01a 100644 --- a/bluesky/serialize.c +++ b/bluesky/serialize.c @@ -152,8 +152,11 @@ BlueSkyCloudLog *bluesky_serialize_inode(BlueSkyInode *inode) /* Deserialize an inode into an in-memory representation. Returns a boolean * indicating whether the deserialization was successful. */ -gboolean bluesky_deserialize_inode(BlueSkyInode *inode, const gchar *buf) +gboolean bluesky_deserialize_inode(BlueSkyInode *inode, BlueSkyCloudLog *item) { + g_assert(item->data != NULL); + const char *buf = item->data->data; + if (bluesky_verbose) { g_log("bluesky/serialize", G_LOG_LEVEL_DEBUG, "Deserializing inode %lld...", (long long)inode->inum); @@ -190,15 +193,14 @@ gboolean bluesky_deserialize_inode(BlueSkyInode *inode, const gchar *buf) g_array_set_size(inode->blocks, (inode->size + BLUESKY_BLOCK_SIZE - 1) / BLUESKY_BLOCK_SIZE); - // TODO -#if 0 + g_assert(inode->blocks->len == item->links->len); for (int i = 0; i < inode->blocks->len; i++) { BlueSkyBlock *b = &g_array_index(inode->blocks, BlueSkyBlock, i); b->type = BLUESKY_BLOCK_REF; - b->ref = g_strdup(buf); - buf += strlen(b->ref) + 1; + b->ref = g_array_index(item->links, BlueSkyCloudLog *, i); + bluesky_cloudlog_ref(b->ref); + b->dirty = NULL; } -#endif break; case BLUESKY_DIRECTORY: @@ -242,3 +244,29 @@ gboolean bluesky_deserialize_inode(BlueSkyInode *inode, const gchar *buf) return TRUE; } + +/* Convert an in-memory cloud log item to a more serialized form, suitable + * either for writing to the local journal or the the cloud. */ +void bluesky_serialize_cloudlog(BlueSkyCloudLog *log, + GString *encrypted, // Raw data payload + GString *authenticated, // Block links + GString *writable) // Writable block links +{ + g_string_append_len(encrypted, log->data->data, log->data->len); + for (int i = 0; i < log->links->len; i++) { + BlueSkyCloudLog *ref = g_array_index(log->links, BlueSkyCloudLog *, i); + if (ref != NULL) { + g_string_append_len(authenticated, + (const char *)&ref->id, + sizeof(BlueSkyCloudID)); + // TODO: Fix endianness of output + g_string_append_len(writable, + (const char *)&ref->location, + sizeof(ref->location)); + } else { + BlueSkyCloudID id; + memset(&id, 0, sizeof(id)); + g_string_append_len(authenticated, (const char *)&id, sizeof(id)); + } + } +} -- 2.20.1