From: Michael Vrable Date: Fri, 27 Aug 2010 23:16:17 +0000 (-0700) Subject: Add in some support for journal replay. X-Git-Url: https://git.vrable.net/?a=commitdiff_plain;h=6732e9143e2c773a8c94947713bc8fc22613f702;p=bluesky.git Add in some support for journal replay. This isn't all functional yet, but making it fully functional will require updating the data fields that are written to the journal first... --- diff --git a/bluesky/bluesky-private.h b/bluesky/bluesky-private.h index 8bb9e0c..837c6e5 100644 --- a/bluesky/bluesky-private.h +++ b/bluesky/bluesky-private.h @@ -230,7 +230,7 @@ struct _BlueSkyCloudLogState { gboolean bluesky_cloudlog_equal(gconstpointer a, gconstpointer b); guint bluesky_cloudlog_hash(gconstpointer a); -BlueSkyCloudLog *bluesky_cloudlog_new(BlueSkyFS *fs); +BlueSkyCloudLog *bluesky_cloudlog_new(BlueSkyFS *fs, const BlueSkyCloudID *id); gchar *bluesky_cloudlog_id_to_string(BlueSkyCloudID id); BlueSkyCloudID bluesky_cloudlog_id_from_string(const gchar *idstr); void bluesky_cloudlog_ref(BlueSkyCloudLog *log); diff --git a/bluesky/cloudlog.c b/bluesky/cloudlog.c index 40067fd..bf7d22e 100644 --- a/bluesky/cloudlog.c +++ b/bluesky/cloudlog.c @@ -78,7 +78,7 @@ guint bluesky_cloudlog_hash(gconstpointer a) * before writing a batch to the cloud, handling indirection through items like * the inode map, etc. */ -BlueSkyCloudLog *bluesky_cloudlog_new(BlueSkyFS *fs) +BlueSkyCloudLog *bluesky_cloudlog_new(BlueSkyFS *fs, const BlueSkyCloudID *id) { BlueSkyCloudLog *log = g_new0(BlueSkyCloudLog, 1); @@ -86,7 +86,10 @@ BlueSkyCloudLog *bluesky_cloudlog_new(BlueSkyFS *fs) log->cond = g_cond_new(); log->fs = fs; log->type = LOGTYPE_UNKNOWN; - log->id = bluesky_cloudlog_new_id(); + if (id != NULL) + memcpy(&log->id, id, sizeof(BlueSkyCloudID)); + else + log->id = bluesky_cloudlog_new_id(); log->links = g_array_new(FALSE, TRUE, sizeof(BlueSkyCloudLog *)); g_atomic_int_set(&log->refcount, 1); diff --git a/bluesky/file.c b/bluesky/file.c index a6f6a15..7b11e70 100644 --- a/bluesky/file.c +++ b/bluesky/file.c @@ -241,7 +241,7 @@ void bluesky_block_flush(BlueSkyInode *inode, BlueSkyBlock *block, g_assert(block->ref == NULL); - BlueSkyCloudLog *cloudlog = bluesky_cloudlog_new(fs); + BlueSkyCloudLog *cloudlog = bluesky_cloudlog_new(fs, NULL); cloudlog->type = LOGTYPE_DATA; cloudlog->inum = inode->inum; cloudlog->data = block->dirty; // String ownership is transferred diff --git a/bluesky/log.c b/bluesky/log.c index 1459ee4..e79b21b 100644 --- a/bluesky/log.c +++ b/bluesky/log.c @@ -569,6 +569,26 @@ void bluesky_cachefile_gc(BlueSkyFS *fs) * also been committed to the cloud. Then, we read in all data in the log past * that point. */ +static GList *directory_contents(const char *dirname) +{ + GList *contents = NULL; + GDir *dir = g_dir_open(dirname, 0, NULL); + if (dir == NULL) { + g_warning("Unable to open journal directory: %s", dirname); + return NULL; + } + + const gchar *file; + while ((file = g_dir_read_name(dir)) != NULL) { + if (strncmp(file, "journal-", 8) == 0) + contents = g_list_prepend(contents, g_strdup(file)); + } + g_dir_close(dir); + + contents = g_list_sort(contents, (GCompareFunc)strcmp); + + return contents; +} static gboolean validate_journal_item(const char *buf, size_t len, off_t offset) { @@ -611,10 +631,45 @@ static void bluesky_replay_scan_journal(const char *buf, size_t len) while (validate_journal_item(buf, len, offset)) { header = (const struct log_header *)(buf + offset); - g_print("In replay found valid item at offset %zd\n", offset); + size_t size = GUINT32_FROM_LE(header->size); + offset += sizeof(struct log_header) + size + sizeof(struct log_footer); + } +} +static void bluesky_replay_scan_journal2(BlueSkyFS *fs, GList **objects, + int log_seq, + const char *buf, size_t len) +{ + const struct log_header *header; + off_t offset = 0; + + while (validate_journal_item(buf, len, offset)) { + header = (const struct log_header *)(buf + offset); + g_print("In replay found valid item at offset %zd\n", offset); size_t size = GUINT32_FROM_LE(header->size); + g_mutex_lock(fs->lock); + BlueSkyCloudLog *log_item; + log_item = g_hash_table_lookup(fs->locations, &header->id); + if (log_item == NULL) { + log_item = bluesky_cloudlog_new(fs, &header->id); + g_hash_table_insert(fs->locations, &log_item->id, log_item); + g_mutex_lock(log_item->lock); + } else { + bluesky_cloudlog_ref(log_item); + g_mutex_lock(log_item->lock); + } + g_mutex_unlock(fs->lock); + *objects = g_list_prepend(*objects, log_item); + + bluesky_string_unref(log_item->data); + log_item->location_flags = CLOUDLOG_JOURNAL; + log_item->data = NULL; + log_item->log_seq = log_seq; + log_item->log_offset = offset + sizeof(struct log_header); + log_item->log_size = header->size; + g_mutex_unlock(log_item->lock); + offset += sizeof(struct log_header) + size + sizeof(struct log_footer); } } @@ -622,21 +677,58 @@ static void bluesky_replay_scan_journal(const char *buf, size_t len) void bluesky_replay(BlueSkyFS *fs) { BlueSkyLog *log = fs->log; - char logname[64]; - int seq_num = 0; - int fd; - - g_snprintf(logname, sizeof(logname), "journal-%08d", seq_num); - fd = openat(log->dirfd, logname, O_RDONLY); - if (fd < 0) - return; + GList *logfiles = directory_contents(log->log_directory); + + /* Scan through log files in reverse order to find the most recent commit + * record. */ + logfiles = g_list_reverse(logfiles); + while (logfiles != NULL) { + char *filename = g_strdup_printf("%s/%s", log->log_directory, + (char *)logfiles->data); + g_print("Scanning file %s\n", filename); + GMappedFile *map = g_mapped_file_new(filename, FALSE, NULL); + if (map == NULL) { + g_warning("Mapping logfile %s failed!\n", filename); + } else { + bluesky_replay_scan_journal(g_mapped_file_get_contents(map), + g_mapped_file_get_length(map)); + g_mapped_file_unref(map); + } + g_free(filename); - off_t length = lseek(fd, 0, SEEK_END); - void *addr = mmap(NULL, length, PROT_READ, MAP_SHARED, fd, 0); - if (addr == NULL) - return; + g_free(logfiles->data); + logfiles = g_list_delete_link(logfiles, logfiles); + } + g_list_foreach(logfiles, (GFunc)g_free, NULL); + g_list_free(logfiles); + + /* Now, scan forward starting from the given point in the log to + * reconstruct all filesystem state. As we reload objects we hold a + * reference to each loaded object. At the end we free all these + * references, so that any objects which were not linked into persistent + * filesystem data structures are freed. */ + GList *objects = NULL; + int seq_num = 0; + while (TRUE) { + char *filename = g_strdup_printf("%s/journal-%08d", + log->log_directory, seq_num); + g_print("Replaying file %s\n", filename); + GMappedFile *map = g_mapped_file_new(filename, FALSE, NULL); + g_free(filename); + if (map == NULL) { + g_warning("Mapping logfile failed, assuming end of journal\n"); + break; + } - bluesky_replay_scan_journal((const char *)addr, length); + bluesky_replay_scan_journal2(fs, &objects, seq_num, + g_mapped_file_get_contents(map), + g_mapped_file_get_length(map)); + g_mapped_file_unref(map); + seq_num++; + } - munmap(addr, length); + while (objects != NULL) { + bluesky_cloudlog_unref((BlueSkyCloudLog *)objects->data); + objects = g_list_delete_link(objects, objects); + } } diff --git a/bluesky/serialize.c b/bluesky/serialize.c index bee9122..f90c63a 100644 --- a/bluesky/serialize.c +++ b/bluesky/serialize.c @@ -73,7 +73,7 @@ BlueSkyCloudLog *bluesky_serialize_inode(BlueSkyInode *inode) GString *out = g_string_new(""); struct serialized_inode buf; - BlueSkyCloudLog *cloudlog = bluesky_cloudlog_new(fs); + BlueSkyCloudLog *cloudlog = bluesky_cloudlog_new(fs, NULL); cloudlog->type = LOGTYPE_INODE; cloudlog->inum = inode->inum;