BlueSkyCloudLog *bluesky_serialize_inode(BlueSkyInode *inode);
gboolean bluesky_deserialize_inode(BlueSkyInode *inode, BlueSkyCloudLog *item);
+void bluesky_deserialize_cloudlog(BlueSkyCloudLog *item,
+ const char *data,
+ size_t len);
+
void bluesky_serialize_cloudlog(BlueSkyCloudLog *log,
GString *encrypted,
GString *authenticated,
/* Clean up any implementation-private data in a BlueSkyStoreAsync. */
void (*cleanup)(gpointer store, BlueSkyStoreAsync *async);
+
+ /* Find the lexicographically-largest file starting with the specified
+ * prefix. */
+ char * (*lookup_last)(gpointer store, const gchar *prefix);
} BlueSkyStoreImplementation;
void bluesky_store_register(const BlueSkyStoreImplementation *impl,
const gchar *name);
+char *bluesky_store_lookup_last(BlueSkyStore *store, const char *prefix);
BlueSkyStoreAsync *bluesky_store_async_new(BlueSkyStore *store);
gpointer bluesky_store_async_get_handle(BlueSkyStoreAsync *async);
void bluesky_store_async_ref(BlueSkyStoreAsync *async);
LOGTYPE_INODE = 2,
LOGTYPE_INODE_MAP = 3,
LOGTYPE_CHECKPOINT = 4,
- LOGTYPE_CHECKPOINT_PTR = 5,
} BlueSkyCloudLogType;
/* Headers that go on items in local log segments and cloud log segments. */
int action);
BlueSkyCloudLog *bluesky_inode_map_serialize(BlueSkyFS *fs);
+gboolean bluesky_checkpoint_load(BlueSkyFS *fs);
+
#ifdef __cplusplus
}
#endif
if (log->data != NULL)
return;
- int offset;
+ /* There are actually two cases: a full deserialization if we have not ever
+ * read the object before, and a partial deserialization where the metadata
+ * is already in memory and we just need to remap the data. If the object
+ * type has not yet been set, we'll need to read and parse the metadata.
+ * Once that is done, we can fall through the case of remapping the data
+ * itself. */
+ if (log->type == LOGTYPE_UNKNOWN) {
+ BlueSkyRCStr *raw = NULL;
+ if ((log->location_flags | log->pending_write) & CLOUDLOG_JOURNAL) {
+ raw = bluesky_log_map_object(log->fs, -1, log->log_seq,
+ log->log_offset, log->log_size);
+ }
+
+ if (raw == NULL && (log->location_flags & CLOUDLOG_CLOUD)) {
+ log->location_flags &= ~CLOUDLOG_JOURNAL;
+ raw = bluesky_log_map_object(log->fs,
+ log->location.directory,
+ log->location.sequence,
+ log->location.offset,
+ log->location.size);
+ }
+ g_assert(raw != NULL);
+ bluesky_deserialize_cloudlog(log, raw->data, raw->len);
+ bluesky_string_unref(raw);
+ }
+
+ /* At this point all metadata should be available and we need only remap
+ * the object data. */
+
+ int offset;
if ((log->location_flags | log->pending_write) & CLOUDLOG_JOURNAL) {
bluesky_cloudlog_stats_update(log, -1);
offset = log->log_offset + sizeof(struct log_header);
*/
#include <stdio.h>
+#include <stdlib.h>
#include <stdint.h>
#include <inttypes.h>
#include <glib.h>
}
/* Reconstruct the inode map from data stored in the cloud. */
-void bluesky_inode_map_deserialize(BlueSkyFS *fs, BlueSkyCloudLog *imap)
+static void bluesky_inode_map_deserialize(BlueSkyFS *fs, BlueSkyCloudLog *imap)
{
g_mutex_lock(imap->lock);
bluesky_cloudlog_fetch(imap);
g_assert(imap->data != NULL);
+ g_assert(imap->data->len == 16 * imap->links->len);
+ //uint64_t *inum_range = (uint64_t *)imap->data->data;
+ for (int i = 0; i < imap->links->len; i++) {
+ //int64_t start = GUINT64_FROM_LE(*inum_range++);
+ //int64_t end = GUINT64_FROM_LE(*inum_range++);
+ BlueSkyCloudLog *section = g_array_index(imap->links,
+ BlueSkyCloudLog *, i);
+ g_mutex_lock(section->lock);
+ bluesky_cloudlog_fetch(section);
+ g_print("Loaded cloudlog item (%zd bytes)\n", section->data->len);
+
+ uint64_t *inum = (uint64_t *)section->data->data;
+ for (int j = 0; j < section->links->len; j++) {
+ InodeMapEntry *entry;
+ entry = bluesky_inode_map_lookup(fs->inode_map, *inum, 1);
+ entry->inum = *inum;
+ entry->item = g_array_index(section->links,
+ BlueSkyCloudLog *, i);
+ bluesky_cloudlog_ref(entry->item);
+ entry->id = entry->item->id;
+ entry->location = entry->item->location;
+ inum++;
+ }
+ g_mutex_unlock(section->lock);
+ }
g_mutex_unlock(imap->lock);
}
+
+/* Find the most recent checkpoint record in the cloud and reload inode map
+ * data from it to initialize the filesystem. Returns a boolean indicating
+ * whether a checkpoint was found and loaded or not. */
+gboolean bluesky_checkpoint_load(BlueSkyFS *fs)
+{
+ char *last_segment = bluesky_store_lookup_last(fs->store, "log-");
+ if (last_segment == NULL)
+ return FALSE;
+
+ g_print("Last cloud log segment: %s\n", last_segment);
+ int seq = atoi(last_segment + 13);
+
+ BlueSkyRCStr *last = bluesky_store_get(fs->store, last_segment);
+ g_free(last_segment);
+ if (last == NULL) {
+ g_warning("Unable to fetch last log segment from cloud!");
+ return FALSE;
+ }
+
+ /* Scan through the contents of the last log segment to find a checkpoint
+ * record. We need to do a linear scan since at this point we don't have a
+ * direct pointer; once we have the last commit record then all other data
+ * can be loaded by directly following pointers. */
+ const char *buf = last->data;
+ size_t len = last->len;
+ const char *checkpoint = NULL;
+ size_t checkpoint_size = 0;
+ while (len > sizeof(struct cloudlog_header)) {
+ struct cloudlog_header *header = (struct cloudlog_header *)buf;
+ if (memcmp(header->magic, CLOUDLOG_MAGIC, 4) != 0) {
+ g_warning("Could not parse cloudlog entry!");
+ break;
+ }
+ int size = sizeof(struct cloudlog_header);
+ size += GUINT32_FROM_LE(header->size1);
+ size += GUINT32_FROM_LE(header->size2);
+ size += GUINT32_FROM_LE(header->size3);
+ if (size > len) {
+ g_warning("Cloudlog entry is malformed (size too large)!");
+ break;
+ }
+ if (header->type - '0' == LOGTYPE_CHECKPOINT) {
+ checkpoint = buf;
+ checkpoint_size = size;
+ }
+ buf += size;
+ len -= size;
+ }
+
+ g_print("Found checkpoint record at %zd (size %zd)\n",
+ checkpoint - last->data, checkpoint_size);
+
+ /* Bootstrap the loading process by manually setting the location of this
+ * log item. */
+ BlueSkyCloudLog *commit;
+ commit = bluesky_cloudlog_get(fs,
+ ((struct cloudlog_header *)checkpoint)->id);
+ g_mutex_lock(commit->lock);
+ commit->location_flags |= CLOUDLOG_CLOUD;
+ commit->location.directory = 0;
+ commit->location.sequence = seq;
+ commit->location.offset = checkpoint - last->data;
+ commit->location.size = checkpoint_size;
+ g_mutex_unlock(commit->lock);
+
+ bluesky_inode_map_deserialize(fs, commit);
+ bluesky_cloudlog_unref(commit);
+
+ return TRUE;
+}
BlueSkyFS *bluesky_init_fs(gchar *name, BlueSkyStore *store)
{
- g_print("Initializing fresh filesystem\n");
+ g_print("Initializing filesystem\n");
BlueSkyFS *fs = bluesky_new_fs(name);
fs->store = store;
fs->log = bluesky_log_new("journal");
fs->log->fs = fs;
+ bluesky_checkpoint_load(fs);
+ exit(0);
+
BlueSkyInode *root = bluesky_new_inode(BLUESKY_ROOT_INUM, fs,
BLUESKY_DIRECTORY);
root->nlink = 1;
item->data = NULL;
bluesky_cloudlog_fetch(item);
- log->committed = g_slist_prepend(log->committed, item);
+ log->committed = g_slist_prepend(log->committed, item);
g_atomic_int_add(&item->data_lock_count, -1);
g_mutex_unlock(item->lock);
data1 = data + sizeof(struct log_header);
data2 = data1 + len1;
data3 = data2 + len2;
- g_assert(data3 + len3 - data < len);
+ g_assert(data3 + len3 - data <= len);
+ item->type = header->type - '0';
+ item->inum = GUINT64_FROM_LE(header->inum);
} else if (memcmp(data, CLOUDLOG_MAGIC, 4) == 0) {
g_assert(len >= sizeof(struct cloudlog_header));
struct cloudlog_header *header = (struct cloudlog_header *)data;
data1 = data + sizeof(struct cloudlog_header);
data2 = data1 + len1;
data3 = data2 + len2;
- g_assert(data3 + len3 - data < len);
+ g_assert(data3 + len3 - data <= len);
+ item->type = header->type - '0';
+ //item->inum = GUINT64_FROM_LE(header->inum);
}
BlueSkyFS *fs = item->fs;
bluesky_string_unref(item->data);
item->data = NULL;
- //item->location_flags = CLOUDLOG_JOURNAL;
+ item->data_size = len1;
int link_count = len2 / sizeof(BlueSkyCloudID);
GArray *new_links = g_array_new(FALSE, TRUE, sizeof(BlueSkyCloudLog *));
g_mutex_lock(ref->lock);
g_assert(len3 >= sizeof(ref->location));
memcpy(&ref->location, data3, sizeof(ref->location));
+ ref->location_flags |= CLOUDLOG_CLOUD;
data3 += sizeof(ref->location); len3 -= sizeof(ref->location);
g_mutex_unlock(ref->lock);
}
g_free(store);
}
+char *bluesky_store_lookup_last(BlueSkyStore *store, const char *prefix)
+{
+ return store->impl->lookup_last(store->handle, prefix);
+}
+
BlueSkyStoreAsync *bluesky_store_async_new(BlueSkyStore *store)
{
BlueSkyStoreAsync *async;
{
}
+static char *filestore_lookup_last(gpointer store, const char *prefix)
+{
+ char *last = NULL;
+ GDir *dir = g_dir_open(".", 0, NULL);
+ if (dir == NULL) {
+ g_warning("Unable to open directory for listing");
+ return NULL;
+ }
+
+ const gchar *file;
+ while ((file = g_dir_read_name(dir)) != NULL) {
+ if (strncmp(file, prefix, strlen(prefix)) == 0) {
+ if (last == NULL || strcmp(file, last) > 0) {
+ g_free(last);
+ last = g_strdup(file);
+ }
+ }
+ }
+ g_dir_close(dir);
+
+ return last;
+}
+
static BlueSkyStoreImplementation filestore_impl = {
.create = filestore_create,
.destroy = filestore_destroy,
.submit = filestore_submit,
.cleanup = filestore_cleanup,
+ .lookup_last = filestore_lookup_last,
};
/* A store implementation which simply discards all data, for testing. */