From: Michael Vrable Date: Tue, 31 Aug 2010 21:06:53 +0000 (-0700) Subject: Implement basic full log replay. X-Git-Url: http://git.vrable.net/?p=bluesky.git;a=commitdiff_plain;h=337e1b04c921c92697b74aed630343c86fabfcbd Implement basic full log replay. This still needs some checking over for bugs and minor fixes. It replays the entire journal from the start to rebuild filesystem state. Still needed: partial joural replay, starting from a checkpoint in the cloud. --- diff --git a/bluesky/bluesky-private.h b/bluesky/bluesky-private.h index 01d8b88..fcdba98 100644 --- a/bluesky/bluesky-private.h +++ b/bluesky/bluesky-private.h @@ -45,7 +45,12 @@ BlueSkyInode *bluesky_list_tail(GList *head); void bluesky_serialize_superblock(GString *out, BlueSkyFS *fs); BlueSkyFS *bluesky_deserialize_superblock(const gchar *buf); BlueSkyCloudLog *bluesky_serialize_inode(BlueSkyInode *inode); -gboolean bluesky_deserialize_inode(BlueSkyInode *inode, const gchar *buf); +gboolean bluesky_deserialize_inode(BlueSkyInode *inode, BlueSkyCloudLog *item); + +void bluesky_serialize_cloudlog(BlueSkyCloudLog *log, + GString *encrypted, + GString *authenticated, + GString *writable); /* Storage layer. Requests can be performed asynchronously, so these objects * help keep track of operations in progress. */ diff --git a/bluesky/inode.c b/bluesky/inode.c index b8ab4e8..9878559 100644 --- a/bluesky/inode.c +++ b/bluesky/inode.c @@ -127,11 +127,11 @@ BlueSkyFS *bluesky_init_fs(gchar *name, BlueSkyStore *store) bluesky_insert_inode(fs, root); bluesky_inode_update_ctime(root, TRUE); + bluesky_replay(fs); + bluesky_inode_do_sync(root); bluesky_superblock_flush(fs); - bluesky_replay(fs); - return fs; } @@ -350,8 +350,7 @@ static void complete_inode_fetch(BlueSkyStoreAsync *async, BlueSkyInode *inode) "Completing fetch of inode %"PRIu64"...", inode->inum); } - if (async->result != 0 - || !bluesky_deserialize_inode(inode, async->data->data)) + if (async->result != 0 || FALSE) { if (bluesky_verbose) { g_log("bluesky/inode", G_LOG_LEVEL_DEBUG, diff --git a/bluesky/log.c b/bluesky/log.c index a3acf10..f8062cf 100644 --- a/bluesky/log.c +++ b/bluesky/log.c @@ -45,7 +45,9 @@ struct log_header { uint32_t magic; // HEADER_MAGIC uint8_t type; // Object type + '0' uint32_t offset; // Starting byte offset of the log header - uint32_t size; // Size of the data item (bytes) + uint32_t size1; // Size of the data item (bytes) + uint32_t size2; // + uint32_t size3; // uint64_t inum; // Inode which owns this data, if any BlueSkyCloudID id; // Object identifier } __attribute__((packed)); @@ -175,9 +177,15 @@ static gpointer log_thread(gpointer d) item->pending_write |= CLOUDLOG_JOURNAL; bluesky_cloudlog_stats_update(item, 1); + GString *data1 = g_string_new(""); + GString *data2 = g_string_new(""); + GString *data3 = g_string_new(""); + bluesky_serialize_cloudlog(item, data1, data2, data3); + struct log_header header; struct log_footer footer; - size_t size = sizeof(header) + sizeof(footer) + item->data->len; + size_t size = sizeof(header) + sizeof(footer); + size += data1->len + data2->len + data3->len; off_t offset = 0; if (log->fd >= 0) offset = lseek(log->fd, 0, SEEK_CUR); @@ -192,7 +200,9 @@ static gpointer log_thread(gpointer d) header.magic = GUINT32_TO_LE(HEADER_MAGIC); header.offset = GUINT32_TO_LE(offset); - header.size = GUINT32_TO_LE(item->data->len); + header.size1 = GUINT32_TO_LE(data1->len); + header.size2 = GUINT32_TO_LE(data2->len); + header.size3 = GUINT32_TO_LE(data3->len); header.type = item->type + '0'; header.id = item->id; header.inum = GUINT64_TO_LE(item->inum); @@ -203,8 +213,12 @@ static gpointer log_thread(gpointer d) writebuf(log->fd, (const char *)&header, sizeof(header)); crc = crc32c(crc, (const char *)&header, sizeof(header)); - writebuf(log->fd, item->data->data, item->data->len); - crc = crc32c(crc, item->data->data, item->data->len); + writebuf(log->fd, data1->str, data1->len); + crc = crc32c(crc, data1->str, data1->len); + writebuf(log->fd, data2->str, data2->len); + crc = crc32c(crc, data2->str, data2->len); + writebuf(log->fd, data3->str, data3->len); + crc = crc32c(crc, data3->str, data3->len); crc = crc32c(crc, (const char *)&footer, sizeof(footer) - sizeof(uint32_t)); @@ -217,6 +231,10 @@ static gpointer log_thread(gpointer d) offset += sizeof(header) + sizeof(footer) + item->data->len; + g_string_free(data1, TRUE); + g_string_free(data2, TRUE); + g_string_free(data3, TRUE); + /* Replace the log item's string data with a memory-mapped copy of the * data, now that it has been written to the log file. (Even if it * isn't yet on disk, it should at least be in the page cache and so @@ -603,7 +621,9 @@ static gboolean validate_journal_item(const char *buf, size_t len, off_t offset) return FALSE; if (GUINT32_FROM_LE(header->offset) != offset) return FALSE; - size_t size = GUINT32_FROM_LE(header->size); + size_t size = GUINT32_FROM_LE(header->size1) + + GUINT32_FROM_LE(header->size2) + + GUINT32_FROM_LE(header->size3); off_t footer_offset = offset + sizeof(struct log_header) + size; if (footer_offset + sizeof(struct log_footer) > len) @@ -634,11 +654,56 @@ static void bluesky_replay_scan_journal(const char *buf, size_t len) while (validate_journal_item(buf, len, offset)) { header = (const struct log_header *)(buf + offset); - size_t size = GUINT32_FROM_LE(header->size); + size_t size = GUINT32_FROM_LE(header->size1) + + GUINT32_FROM_LE(header->size2) + + GUINT32_FROM_LE(header->size3); offset += sizeof(struct log_header) + size + sizeof(struct log_footer); } } +static void reload_item(BlueSkyCloudLog *log_item, + const char *data, + size_t len1, size_t len2, size_t len3) +{ + BlueSkyFS *fs = log_item->fs; + /*const char *data1 = data;*/ + const BlueSkyCloudID *data2 + = (const BlueSkyCloudID *)(data + len1); + /*const BlueSkyCloudPointer *data3 + = (const BlueSkyCloudPointer *)(data + len1 + len2);*/ + + bluesky_string_unref(log_item->data); + log_item->data = NULL; + log_item->location_flags = CLOUDLOG_JOURNAL; + + BlueSkyCloudID id0; + memset(&id0, 0, sizeof(id0)); + + int link_count = len2 / sizeof(BlueSkyCloudID); + GArray *new_links = g_array_new(FALSE, TRUE, sizeof(BlueSkyCloudLog *)); + for (int i = 0; i < link_count; i++) { + BlueSkyCloudID id = data2[i]; + BlueSkyCloudLog *ref = NULL; + if (memcmp(&id, &id0, sizeof(BlueSkyCloudID)) != 0) { + g_mutex_lock(fs->lock); + ref = g_hash_table_lookup(fs->locations, &id); + if (ref != NULL) { + bluesky_cloudlog_ref(ref); + } + g_mutex_unlock(fs->lock); + } + g_array_append_val(new_links, ref); + } + + for (int i = 0; i < log_item->links->len; i++) { + BlueSkyCloudLog *c = g_array_index(log_item->links, + BlueSkyCloudLog *, i); + bluesky_cloudlog_unref(c); + } + g_array_unref(log_item->links); + log_item->links = new_links; +} + static void bluesky_replay_scan_journal2(BlueSkyFS *fs, GList **objects, int log_seq, const char *buf, size_t len) @@ -649,7 +714,9 @@ static void bluesky_replay_scan_journal2(BlueSkyFS *fs, GList **objects, while (validate_journal_item(buf, len, offset)) { header = (const struct log_header *)(buf + offset); g_print("In replay found valid item at offset %zd\n", offset); - size_t size = GUINT32_FROM_LE(header->size); + size_t size = GUINT32_FROM_LE(header->size1) + + GUINT32_FROM_LE(header->size2) + + GUINT32_FROM_LE(header->size3); g_mutex_lock(fs->lock); BlueSkyCloudLog *log_item; @@ -665,12 +732,39 @@ static void bluesky_replay_scan_journal2(BlueSkyFS *fs, GList **objects, g_mutex_unlock(fs->lock); *objects = g_list_prepend(*objects, log_item); - bluesky_string_unref(log_item->data); - log_item->location_flags = CLOUDLOG_JOURNAL; - log_item->data = NULL; + log_item->inum = GUINT64_FROM_LE(header->inum); + reload_item(log_item, buf + offset + sizeof(struct log_header), + GUINT32_FROM_LE(header->size1), + GUINT32_FROM_LE(header->size2), + GUINT32_FROM_LE(header->size3)); log_item->log_seq = log_seq; log_item->log_offset = offset + sizeof(struct log_header); - log_item->log_size = header->size; + log_item->log_size = header->size1; + + bluesky_string_unref(log_item->data); + log_item->data = bluesky_string_new(g_memdup(buf + offset + sizeof(struct log_header), GUINT32_FROM_LE(header->size1)), GUINT32_FROM_LE(header->size1)); + + /* For any inodes which were read from the journal, deserialize the + * inode information, overwriting any old inode data. */ + if (header->type - '0' == LOGTYPE_INODE) { + uint64_t inum = GUINT64_FROM_LE(header->inum); + BlueSkyInode *inode; + g_mutex_lock(fs->lock); + inode = (BlueSkyInode *)g_hash_table_lookup(fs->inodes, &inum); + if (inode == NULL) { + inode = bluesky_new_inode(inum, fs, BLUESKY_PENDING); + inode->change_count = 0; + bluesky_insert_inode(fs, inode); + } + g_mutex_lock(inode->lock); + if (!bluesky_deserialize_inode(inode, log_item)) + g_print("Error deserializing inode %"PRIu64"\n", inum); + fs->next_inum = MAX(fs->next_inum, inum + 1); + g_mutex_unlock(inode->lock); + g_mutex_unlock(fs->lock); + } + bluesky_string_unref(log_item->data); + log_item->data = NULL; g_mutex_unlock(log_item->lock); offset += sizeof(struct log_header) + size + sizeof(struct log_footer); diff --git a/bluesky/serialize.c b/bluesky/serialize.c index f90c63a..792b01a 100644 --- a/bluesky/serialize.c +++ b/bluesky/serialize.c @@ -152,8 +152,11 @@ BlueSkyCloudLog *bluesky_serialize_inode(BlueSkyInode *inode) /* Deserialize an inode into an in-memory representation. Returns a boolean * indicating whether the deserialization was successful. */ -gboolean bluesky_deserialize_inode(BlueSkyInode *inode, const gchar *buf) +gboolean bluesky_deserialize_inode(BlueSkyInode *inode, BlueSkyCloudLog *item) { + g_assert(item->data != NULL); + const char *buf = item->data->data; + if (bluesky_verbose) { g_log("bluesky/serialize", G_LOG_LEVEL_DEBUG, "Deserializing inode %lld...", (long long)inode->inum); @@ -190,15 +193,14 @@ gboolean bluesky_deserialize_inode(BlueSkyInode *inode, const gchar *buf) g_array_set_size(inode->blocks, (inode->size + BLUESKY_BLOCK_SIZE - 1) / BLUESKY_BLOCK_SIZE); - // TODO -#if 0 + g_assert(inode->blocks->len == item->links->len); for (int i = 0; i < inode->blocks->len; i++) { BlueSkyBlock *b = &g_array_index(inode->blocks, BlueSkyBlock, i); b->type = BLUESKY_BLOCK_REF; - b->ref = g_strdup(buf); - buf += strlen(b->ref) + 1; + b->ref = g_array_index(item->links, BlueSkyCloudLog *, i); + bluesky_cloudlog_ref(b->ref); + b->dirty = NULL; } -#endif break; case BLUESKY_DIRECTORY: @@ -242,3 +244,29 @@ gboolean bluesky_deserialize_inode(BlueSkyInode *inode, const gchar *buf) return TRUE; } + +/* Convert an in-memory cloud log item to a more serialized form, suitable + * either for writing to the local journal or the the cloud. */ +void bluesky_serialize_cloudlog(BlueSkyCloudLog *log, + GString *encrypted, // Raw data payload + GString *authenticated, // Block links + GString *writable) // Writable block links +{ + g_string_append_len(encrypted, log->data->data, log->data->len); + for (int i = 0; i < log->links->len; i++) { + BlueSkyCloudLog *ref = g_array_index(log->links, BlueSkyCloudLog *, i); + if (ref != NULL) { + g_string_append_len(authenticated, + (const char *)&ref->id, + sizeof(BlueSkyCloudID)); + // TODO: Fix endianness of output + g_string_append_len(writable, + (const char *)&ref->location, + sizeof(ref->location)); + } else { + BlueSkyCloudID id; + memset(&id, 0, sizeof(id)); + g_string_append_len(authenticated, (const char *)&id, sizeof(id)); + } + } +}