Add code in the proxy cleaner component to iterate over new inodes.
authorMichael Vrable <mvrable@cs.ucsd.edu>
Thu, 9 Dec 2010 06:32:17 +0000 (22:32 -0800)
committerMichael Vrable <mvrable@cs.ucsd.edu>
Thu, 9 Dec 2010 06:32:17 +0000 (22:32 -0800)
For the moment we actually iterate over all inodes in the cleaned
checkpoint, but later can restrict to just those that are needed.  Actually
merging inode changes also still needs to be implemented.

bluesky/bluesky-private.h
bluesky/cleaner.c
bluesky/inode.c
bluesky/log.c

index adeb82e..b188ac2 100644 (file)
@@ -14,6 +14,7 @@
 #define _BLUESKY_PRIVATE_H
 
 #include "bluesky.h"
+#include <stdlib.h>
 
 #ifdef __cplusplus
 extern "C" {
@@ -417,6 +418,8 @@ void bluesky_log_finish_all(GList *log_items);
 BlueSkyCloudLog *bluesky_log_get_commit_point(BlueSkyFS *fs);
 void bluesky_log_write_commit_point(BlueSkyFS *fs, BlueSkyCloudLog *marker);
 
+BlueSkyRCStr *bluesky_cachefile_map_raw(BlueSkyCacheFile *cachefile,
+                                        off_t offset, size_t size);
 BlueSkyRCStr *bluesky_log_map_object(BlueSkyCloudLog *item, gboolean map_data);
 void bluesky_mmap_unref(BlueSkyCacheFile *mmap);
 void bluesky_cachefile_unref(BlueSkyCacheFile *cachefile);
@@ -473,7 +476,7 @@ void bluesky_inode_map_minimize(BlueSkyFS *fs);
 gboolean bluesky_checkpoint_load(BlueSkyFS *fs);
 
 /* Merging of log state with the work of the cleaner. */
-void bluesky_cleaner_find_checkpoint(BlueSkyFS *fs);
+void bluesky_cleaner_merge(BlueSkyFS *fs);
 
 #ifdef __cplusplus
 }
index ca61d94..b13983e 100644 (file)
  * rewriting log segments where needed and deleting old segments, is handled by
  * the in-cloud cleaner component. */
 
+/* A specialized function for loading an object from the cloud, used just by
+ * the cleaner.  When the cleaner runs, it will produce new objects that have
+ * the same object ID as before, differing only in the outgoing link locations
+ * as a result of cleaning.  We can't use the normal functions for loading
+ * objects since those can only deal with one version of an object.  Instead,
+ * we load the cleaned object with this function, and then merge the state into
+ * the official object as needed.
+ *
+ * The bluesky_cleaner_deserialize function is like
+ * bluesky_deserialize_cloudlog, but does a more limited deserialization and
+ * keeps the returned item entirely separate from any in-memory BlueSkyCloudLog
+ * objects. */
+
+typedef struct {
+    BlueSkyCloudLogType type;
+    BlueSkyCloudID id;
+    BlueSkyCloudPointer location;
+    uint64_t inum;
+    GArray *links;
+} BlueSkyCleanerItem;
+
+typedef struct {
+    BlueSkyCloudID id;
+    BlueSkyCloudPointer location;
+} BlueSkyCleanerLink;
+
+static BlueSkyCleanerItem *bluesky_cleaner_deserialize(BlueSkyRCStr *raw)
+{
+    const char *data = raw->data;
+    size_t len = raw->len;
+    const char *data1, *data2, *data3;
+    size_t len1, len2, len3;
+
+    g_assert(len > 4);
+    if (len < sizeof(struct cloudlog_header)
+        || memcmp(data, CLOUDLOG_MAGIC, 4) != 0)
+    {
+        g_warning("Deserializing garbage cloud log item from cleaner!");
+        return NULL;
+    };
+
+    struct cloudlog_header *header = (struct cloudlog_header *)data;
+    len1 = GUINT32_FROM_LE(header->size1);
+    len2 = GUINT32_FROM_LE(header->size2);
+    len3 = GUINT32_FROM_LE(header->size3);
+    data1 = data + sizeof(struct cloudlog_header);
+    data2 = data1 + len1;
+    data3 = data2 + len2;
+    g_assert(data3 + len3 - data <= len);
+
+    BlueSkyCleanerItem *item = g_new0(BlueSkyCleanerItem, 1);
+    item->type = header->type - '0';
+    item->inum = GUINT64_FROM_LE(header->inum);
+    memcpy(&item->id, &header->id, sizeof(BlueSkyCloudID));
+
+    int link_count = len2 / sizeof(BlueSkyCloudID);
+    g_print("Outgoing links: %d\n", link_count);
+    item->links = g_array_new(FALSE, TRUE, sizeof(BlueSkyCleanerLink));
+    for (int i = 0; i < link_count; i++) {
+        BlueSkyCleanerLink link;
+
+        g_assert(len2 >= sizeof(link.id));
+        memcpy(&link.id, data2, sizeof(link.id));
+        data2 += sizeof(link.id); len2 -= sizeof(link.id);
+
+        g_assert(len3 >= sizeof(link.location));
+        memcpy(&link.location, data3, sizeof(link.location));
+        data3 += sizeof(link.location); len3 -= sizeof(link.location);
+
+        g_array_append_val(item->links, link);
+    }
+
+    return item;
+}
+
+static void bluesky_cleaner_item_free(BlueSkyCleanerItem *item)
+{
+    if (item == NULL)
+        return;
+    g_array_unref(item->links);
+    g_free(item);
+}
+
 /* Check the cleaner's logs to find the a more recent checkpoint record.  This
  * should be called occasionally to see if the cleaner has done any work since
  * our last check. */
-void bluesky_cleaner_find_checkpoint(BlueSkyFS *fs)
+static BlueSkyCleanerItem *bluesky_cleaner_find_checkpoint(BlueSkyFS *fs)
 {
     char *prefix = g_strdup_printf("log-%08d", BLUESKY_CLOUD_DIR_CLEANER);
     char *last_segment = bluesky_store_lookup_last(fs->store, prefix);
     g_free(prefix);
     if (last_segment == NULL)
-        return;
+        return NULL;
 
     g_print("Last cloud log segment: %s\n", last_segment);
     int seq = atoi(last_segment + 13);
     g_free(last_segment);
 
     if (seq <= fs->log_state->latest_cleaner_seq_seen)
-        return;
+        return NULL;
 
     g_print("New log segment appeared in cleaner directory: %d\n", seq);
 
@@ -59,10 +142,92 @@ void bluesky_cleaner_find_checkpoint(BlueSkyFS *fs)
         length = item->length;
     }
 
-    if (length > 0) {
-        g_print("Last object: %"PRIu64" + %"PRIu64"\n", offset, length);
-    }
+    if (length == 0)
+        return NULL;
+
+    g_print("Found a cleaner checkpoint record.\n");
+
+    BlueSkyRCStr *data = bluesky_cachefile_map_raw(cachefile, offset, length);
+    bluesky_cachefile_unref(cachefile);
+    g_mutex_unlock(cachefile->lock);
 
+    BlueSkyCleanerItem *checkpoint = bluesky_cleaner_deserialize(data);
+    bluesky_string_unref(data);
+
+    return checkpoint;
+}
+
+static BlueSkyCleanerItem *cleaner_load_item(BlueSkyFS *fs,
+                                             BlueSkyCloudPointer location)
+{
+    g_print("Loading item %d/%d/%d...\n", location.directory, location.sequence, location.offset);
+
+    BlueSkyCacheFile *cachefile;
+    cachefile = bluesky_cachefile_lookup(fs, location.directory,
+                                         location.sequence, TRUE);
+    while (!cachefile->complete)
+        g_cond_wait(cachefile->cond, cachefile->lock);
+
+    /* TODO: Ought to check that we are loading an item which validated? */
+    BlueSkyRCStr *data = bluesky_cachefile_map_raw(cachefile, location.offset,
+                                                   location.size);
     bluesky_cachefile_unref(cachefile);
     g_mutex_unlock(cachefile->lock);
+
+    BlueSkyCleanerItem *item = bluesky_cleaner_deserialize(data);
+    bluesky_string_unref(data);
+
+    return item;
+}
+
+/* Does the item at the given cloud location from the cleaner need merging?  An
+ * item in the primary log does not need to be merged, as by definition we
+ * already know about it.  Similarly, old items in the cleaner's log--those
+ * that we have already seen from a previous merge--do not need to be mergd
+ * again. */
+gboolean needs_merging(BlueSkyFS *fs, BlueSkyCloudPointer location)
+{
+    if (location.directory == BLUESKY_CLOUD_DIR_PRIMARY)
+        return FALSE;
+
+    return TRUE;
+}
+
+void bluesky_cleaner_merge(BlueSkyFS *fs)
+{
+    BlueSkyCleanerItem *checkpoint = bluesky_cleaner_find_checkpoint(fs);
+
+    if (checkpoint == NULL) {
+        g_warning("Unable to load cleaner checkpoint record!");
+        return;
+    }
+
+    /* Iterate over each of the inode map sections in the checkpoint */
+    for (int i = 0; i < checkpoint->links->len; i++) {
+        BlueSkyCleanerLink *link = &g_array_index(checkpoint->links,
+                                                  BlueSkyCleanerLink, i);
+        /*if (!needs_merging(fs, link->location))
+            continue; */
+
+        BlueSkyCleanerItem *imap = cleaner_load_item(fs, link->location);
+        if (imap == NULL) {
+            g_warning("Unable to load cleaner inode map");
+            continue;
+        }
+
+        /* Iterate over all inodes found in the inode map section */
+        for (int j = 0; j < imap->links->len; j++) {
+            BlueSkyCleanerLink *link = &g_array_index(imap->links,
+                                                      BlueSkyCleanerLink, j);
+            BlueSkyCleanerItem *inode = cleaner_load_item(fs, link->location);
+            if (inode != NULL) {
+                g_print("Got inode %"PRIu64"\n", inode->inum);
+            }
+            bluesky_cleaner_item_free(inode);
+        }
+
+        bluesky_cleaner_item_free(imap);
+    }
+
+    bluesky_cleaner_item_free(checkpoint);
 }
index 34af7de..dd20cc9 100644 (file)
@@ -132,7 +132,7 @@ BlueSkyFS *bluesky_init_fs(gchar *name, BlueSkyStore *store,
         bluesky_inode_do_sync(root);
     }
 
-    bluesky_cleaner_find_checkpoint(fs);
+    bluesky_cleaner_merge(fs);
 
     return fs;
 }
index 3fb9ef0..2e5c920 100644 (file)
 #define HEADER_MAGIC 0x676f4c0a
 #define FOOTER_MAGIC 0x2e435243
 
+static size_t readbuf(int fd, char *buf, size_t len)
+{
+    size_t total_bytes = 0;
+    while (len > 0) {
+        ssize_t bytes = read(fd, buf, len);
+        if (bytes < 0 && errno == EINTR)
+            continue;
+        g_assert(bytes >= 0);
+        if (bytes == 0)
+            break;
+        buf += bytes;
+        len -= bytes;
+    }
+    return total_bytes;
+}
+
 static void writebuf(int fd, const char *buf, size_t len)
 {
     while (len > 0) {
@@ -567,6 +583,57 @@ static void cloudlog_fetch_start(BlueSkyCacheFile *cachefile)
     bluesky_store_async_unref(async);
 }
 
+/* Map and return a read-only version of a byte range from a cached file.  The
+ * CacheFile object must be locked. */
+BlueSkyRCStr *bluesky_cachefile_map_raw(BlueSkyCacheFile *cachefile,
+                                        off_t offset, size_t size)
+{
+    cachefile->atime = bluesky_get_current_time();
+
+    /* Easy case: the needed data is already in memory */
+    if (cachefile->addr != NULL && offset + size <= cachefile->len)
+        return bluesky_string_new_from_mmap(cachefile, offset, size);
+
+    int fd = openat(cachefile->log->dirfd, cachefile->filename, O_RDONLY);
+    if (fd < 0) {
+        fprintf(stderr, "Error opening logfile %s: %m\n",
+                cachefile->filename);
+        return NULL;
+    }
+
+    off_t length = lseek(fd, 0, SEEK_END);
+    if (offset + size > length) {
+        close(fd);
+        return NULL;
+    }
+
+    /* File is not mapped in memory.  Map the entire file in, then return a
+     * pointer to just the required data. */
+    if (cachefile->addr == NULL) {
+        cachefile->addr = (const char *)mmap(NULL, length, PROT_READ,
+                                             MAP_SHARED, fd, 0);
+        cachefile->len = length;
+        g_atomic_int_inc(&cachefile->refcount);
+
+        close(fd);
+        return bluesky_string_new_from_mmap(cachefile, offset, size);
+    }
+
+    /* Otherwise, the file was mapped in but doesn't cover the data we need.
+     * This shouldn't happen much, if at all, but if it does just read the data
+     * we need directly from the file.  We lose memory-management benefits of
+     * using mmapped data, but otherwise this works. */
+    char *buf = g_malloc(size);
+    size_t actual_size = readbuf(fd, buf, size);
+    close(fd);
+    if (actual_size != size) {
+        g_free(buf);
+        return NULL;
+    } else {
+        return bluesky_string_new(buf, size);
+    }
+}
+
 /* The arguments are mostly straightforward.  log_dir is -1 for access from the
  * journal, and non-negative for access to a cloud log segment.  map_data
  * should be TRUE for the case that are mapping just the data of an item where
@@ -575,7 +642,6 @@ static void cloudlog_fetch_start(BlueSkyCacheFile *cachefile)
 BlueSkyRCStr *bluesky_log_map_object(BlueSkyCloudLog *item, gboolean map_data)
 {
     BlueSkyFS *fs = item->fs;
-    BlueSkyLog *log = fs->log;
     BlueSkyCacheFile *map = NULL;
     BlueSkyRCStr *str = NULL;
     int location = 0;
@@ -655,24 +721,6 @@ BlueSkyRCStr *bluesky_log_map_object(BlueSkyCloudLog *item, gboolean map_data)
         }
     }
 
-    if (map->addr == NULL) {
-        int fd = openat(log->dirfd, map->filename, O_RDONLY);
-
-        if (fd < 0) {
-            fprintf(stderr, "Error opening logfile %s: %m\n", map->filename);
-            goto exit2;
-        }
-
-        off_t length = lseek(fd, 0, SEEK_END);
-        map->addr = (const char *)mmap(NULL, length, PROT_READ, MAP_SHARED,
-                                       fd, 0);
-        map->len = length;
-
-        g_atomic_int_inc(&map->refcount);
-
-        close(fd);
-    }
-
     if (map_data) {
         if (location == CLOUDLOG_JOURNAL)
             file_offset += sizeof(struct log_header);
@@ -681,8 +729,7 @@ BlueSkyRCStr *bluesky_log_map_object(BlueSkyCloudLog *item, gboolean map_data)
 
         file_size = item->data_size;
     }
-    str = bluesky_string_new_from_mmap(map, file_offset, file_size);
-    map->atime = bluesky_get_current_time();
+    str = bluesky_cachefile_map_raw(map, file_offset, file_size);
 
 exit2:
     bluesky_cachefile_unref(map);