*/
#include <stdio.h>
+#include <stdlib.h>
#include <stdint.h>
#include <inttypes.h>
#include <glib.h>
#include "bluesky-private.h"
+/* Magic number at the start of the checkpoint record, to check for version
+ * mismatches. */
+#define CHECKPOINT_MAGIC 0x7ad7dafb42a498b4ULL
+
/* Inode maps. There is both an in-memory representation as well as the
* serialized form in the cloud.
*
entry->inum = inum;
g_sequence_insert_sorted(range->map_entries, entry, compare, NULL);
- g_print("Created inode map entry for %"PRIu64"\n", inum);
+ if (bluesky_verbose)
+ g_print("Created inode map entry for %"PRIu64"\n", inum);
}
if (action != 0) {
- bluesky_cloudlog_unref(range->serialized);
+ bluesky_cloudlog_unref_delayed(range->serialized);
range->serialized = NULL;
}
InodeMapEntry *entry = (InodeMapEntry *)g_sequence_get(i);
uint64_t inum = GUINT64_TO_LE(entry->inum);
g_string_append_len(buf, (const char *)&inum, sizeof(inum));
+ bluesky_cloudlog_ref(entry->item);
g_array_append_val(log->links, entry->item);
i = g_sequence_iter_next(i);
}
log->data = bluesky_string_new_from_gstring(buf);
bluesky_cloudlog_unref(range->serialized);
range->serialized = log;
+ bluesky_cloudlog_stats_update(log, 1);
}
BlueSkyCloudLog *bluesky_inode_map_serialize(BlueSkyFS *fs)
{
+ gboolean updated = FALSE;
GString *buf = g_string_new("");
BlueSkyCloudLog *log = bluesky_cloudlog_new(fs, NULL);
log->type = LOGTYPE_CHECKPOINT;
log->inum = 0;
+ /* The checkpoint record starts with a magic number, followed by the
+ * version vector which lists the latest sequence number of all other logs
+ * (currently, only the cleaner) which have been seen. */
+ uint64_t magic = GUINT64_TO_LE(CHECKPOINT_MAGIC);
+ g_string_append_len(buf, (const char *)&magic, sizeof(magic));
+ uint32_t versions;
+ versions = GUINT32_TO_LE(fs->log_state->latest_cleaner_seq_seen >= 0);
+ g_string_append_len(buf, (const char *)&versions, sizeof(versions));
+ if (fs->log_state->latest_cleaner_seq_seen >= 0) {
+ versions = GUINT32_TO_LE(BLUESKY_CLOUD_DIR_CLEANER);
+ g_string_append_len(buf, (const char *)&versions, sizeof(versions));
+ versions = GUINT32_TO_LE(fs->log_state->latest_cleaner_seq_seen);
+ g_string_append_len(buf, (const char *)&versions, sizeof(versions));
+ }
+
GSequenceIter *i = g_sequence_get_begin_iter(fs->inode_map);
while (!g_sequence_iter_is_end(i)) {
InodeMapRange *range = (InodeMapRange *)g_sequence_get(i);
inum = GUINT64_TO_LE(range->end);
g_string_append_len(buf, (const char *)&inum, sizeof(inum));
- if (range->serialized == NULL)
+ if (range->serialized == NULL) {
bluesky_inode_map_serialize_section(fs, range);
+ updated = TRUE;
+ }
bluesky_cloudlog_ref(range->serialized);
g_array_append_val(log->links, range->serialized);
i = g_sequence_iter_next(i);
}
log->data = bluesky_string_new_from_gstring(buf);
- return log;
+ bluesky_cloudlog_stats_update(log, 1);
+
+ if (updated) {
+ return log;
+ } else {
+ bluesky_cloudlog_unref(log);
+ return NULL;
+ }
+}
+
+/* Minimize resources consumed the inode map. This should only be called once
+ * an updated inode map has been serialized to the cloud, and will replace
+ * cloud log objects with skeletal versions that just reference the data
+ * location in the cloud (rather than pinning all object data in memory). */
+void bluesky_inode_map_minimize(BlueSkyFS *fs)
+{
+ GSequenceIter *i = g_sequence_get_begin_iter(fs->inode_map);
+ while (!g_sequence_iter_is_end(i)) {
+ InodeMapRange *range = (InodeMapRange *)g_sequence_get(i);
+
+ if (range->serialized != NULL)
+ bluesky_cloudlog_erase(range->serialized);
+
+ GSequenceIter *j;
+ for (j = g_sequence_get_begin_iter(range->map_entries);
+ !g_sequence_iter_is_end(j); j = g_sequence_iter_next(j))
+ {
+ InodeMapEntry *entry = (InodeMapEntry *)g_sequence_get(j);
+ BlueSkyCloudLog *item = entry->item;
+ if (item != NULL) {
+ g_mutex_lock(item->lock);
+ if (g_atomic_int_get(&item->refcount) == 1) {
+ bluesky_cloudlog_erase(item);
+ }
+ g_mutex_unlock(item->lock);
+ } else {
+ g_warning("Null item for inode map entry %"PRIu64"!",
+ entry->inum);
+ }
+ }
+
+ i = g_sequence_iter_next(i);
+ }
+}
+
+/* Reconstruct the inode map from data stored in the cloud. */
+static void bluesky_inode_map_deserialize(BlueSkyFS *fs, BlueSkyCloudLog *imap)
+{
+ g_mutex_lock(imap->lock);
+ bluesky_cloudlog_fetch(imap);
+ g_assert(imap->data != NULL);
+ g_assert(imap->data->len >= 12);
+ uint64_t magic;
+ uint32_t vector_data;
+ memcpy((char *)&magic, imap->data->data, sizeof(magic));
+ g_assert(GUINT64_FROM_LE(magic) == CHECKPOINT_MAGIC);
+ memcpy((char *)&vector_data, imap->data->data + 8, sizeof(vector_data));
+ g_assert(GUINT32_FROM_LE(vector_data) <= 2);
+
+ int vector_size = GUINT32_FROM_LE(vector_data);
+ g_assert(imap->data->len == 16 * imap->links->len + 12 + 8 * vector_size);
+
+ for (int i = 0; i < vector_size; i++) {
+ memcpy((char *)&vector_data, imap->data->data + 12 + 8*i,
+ sizeof(vector_data));
+ if (GUINT32_FROM_LE(vector_data) == 1) {
+ memcpy((char *)&vector_data, imap->data->data + 16 + 8*i,
+ sizeof(vector_data));
+ fs->log_state->latest_cleaner_seq_seen
+ = GUINT32_FROM_LE(vector_data);
+ g_print("Deserializing checkpoint: last cleaner sequence is %d\n",
+ GUINT32_FROM_LE(vector_data));
+ }
+ }
+
+ //uint64_t *inum_range = (uint64_t *)imap->data->data;
+ for (int i = 0; i < imap->links->len; i++) {
+ //int64_t start = GUINT64_FROM_LE(*inum_range++);
+ //int64_t end = GUINT64_FROM_LE(*inum_range++);
+ BlueSkyCloudLog *section = g_array_index(imap->links,
+ BlueSkyCloudLog *, i);
+ g_mutex_lock(section->lock);
+ bluesky_cloudlog_fetch(section);
+ g_print("Loaded cloudlog item (%zd bytes)\n", section->data->len);
+
+ uint64_t *inum = (uint64_t *)section->data->data;
+ for (int j = 0; j < section->links->len; j++) {
+ InodeMapEntry *entry;
+ entry = bluesky_inode_map_lookup(fs->inode_map, *inum, 1);
+ entry->inum = GUINT64_FROM_LE(*inum);
+ bluesky_cloudlog_unref_delayed(entry->item);
+ entry->item = g_array_index(section->links,
+ BlueSkyCloudLog *, j);
+ bluesky_cloudlog_ref(entry->item);
+ fs->next_inum = MAX(fs->next_inum, entry->inum + 1);
+ inum++;
+ }
+ g_mutex_unlock(section->lock);
+ }
+ g_mutex_unlock(imap->lock);
+}
+
+/* Find the most recent checkpoint record in the cloud and reload inode map
+ * data from it to initialize the filesystem. Returns a boolean indicating
+ * whether a checkpoint was found and loaded or not. */
+gboolean bluesky_checkpoint_load(BlueSkyFS *fs)
+{
+ g_print("Claiming cloud log directory: %d\n",
+ fs->log_state->location.directory);
+ char *prefix = g_strdup_printf("log-%08d",
+ fs->log_state->location.directory);
+ char *last_segment = bluesky_store_lookup_last(fs->store, prefix);
+ g_free(prefix);
+ if (last_segment == NULL)
+ return FALSE;
+
+ g_print("Last cloud log segment: %s\n", last_segment);
+ int seq = atoi(last_segment + 13);
+ fs->log_state->location.sequence = seq + 1;
+
+ BlueSkyRCStr *last = bluesky_store_get(fs->store, last_segment);
+ g_free(last_segment);
+ if (last == NULL) {
+ g_warning("Unable to fetch last log segment from cloud!");
+ return FALSE;
+ }
+
+ last = bluesky_string_dup(last);
+ bluesky_cloudlog_decrypt(last->data, last->len, fs->keys, NULL, FALSE);
+
+ /* Scan through the contents of the last log segment to find a checkpoint
+ * record. We need to do a linear scan since at this point we don't have a
+ * direct pointer; once we have the last commit record then all other data
+ * can be loaded by directly following pointers. */
+ const char *buf = last->data;
+ size_t len = last->len;
+ const char *checkpoint = NULL;
+ size_t checkpoint_size = 0;
+ while (len > sizeof(struct cloudlog_header)) {
+ struct cloudlog_header *header = (struct cloudlog_header *)buf;
+ if (memcmp(header->magic, CLOUDLOG_MAGIC, 4) != 0) {
+ g_warning("Could not parse cloudlog entry!");
+ break;
+ }
+ int size = sizeof(struct cloudlog_header);
+ size += GUINT32_FROM_LE(header->size1);
+ size += GUINT32_FROM_LE(header->size2);
+ size += GUINT32_FROM_LE(header->size3);
+ if (size > len) {
+ g_warning("Cloudlog entry is malformed (size too large)!");
+ break;
+ }
+ if (header->type - '0' == LOGTYPE_CHECKPOINT) {
+ checkpoint = buf;
+ checkpoint_size = size;
+ }
+ buf += size;
+ len -= size;
+ }
+
+ if (checkpoint_size == 0) {
+ g_error("Unable to locate checkpoint record!\n");
+ }
+
+ g_print("Found checkpoint record at %zd (size %zd)\n",
+ checkpoint - last->data, checkpoint_size);
+
+ /* Bootstrap the loading process by manually setting the location of this
+ * log item. */
+ BlueSkyCloudLog *commit;
+ commit = bluesky_cloudlog_get(fs,
+ ((struct cloudlog_header *)checkpoint)->id);
+ g_mutex_lock(commit->lock);
+ commit->location_flags |= CLOUDLOG_CLOUD;
+ commit->location.directory = 0;
+ commit->location.sequence = seq;
+ commit->location.offset = checkpoint - last->data;
+ commit->location.size = checkpoint_size;
+ g_mutex_unlock(commit->lock);
+ bluesky_cloudlog_stats_update(commit, 1);
+
+ bluesky_inode_map_deserialize(fs, commit);
+ bluesky_cloudlog_unref(commit);
+
+ return TRUE;
}