Implement fetching of cloud log items via range requests.
authorMichael Vrable <mvrable@cs.ucsd.edu>
Wed, 3 Nov 2010 21:35:07 +0000 (14:35 -0700)
committerMichael Vrable <mvrable@cs.ucsd.edu>
Wed, 3 Nov 2010 21:35:07 +0000 (14:35 -0700)
This still needs a bit more work for stability, checking for leaks, etc.,
but implements the basic functionality needed to selectively retrieve just
the needed byte ranges out of cloud log segments to download individual log
items.

bluesky/bluesky-private.h
bluesky/bluesky.h
bluesky/cloudlog.c
bluesky/log.c
bluesky/util.c

index 0f9d29d..b064465 100644 (file)
@@ -395,14 +395,13 @@ void bluesky_log_finish_all(GList *log_items);
 BlueSkyCloudLog *bluesky_log_get_commit_point(BlueSkyFS *fs);
 void bluesky_log_write_commit_point(BlueSkyFS *fs, BlueSkyCloudLog *marker);
 
-BlueSkyRCStr *bluesky_log_map_object(BlueSkyFS *fs, int log_dir,
-                                     int log_seq, int log_offset, int log_size,
-                                     gboolean map_data);
+BlueSkyRCStr *bluesky_log_map_object(BlueSkyCloudLog *item, gboolean map_data);
 void bluesky_mmap_unref(BlueSkyCacheFile *mmap);
 void bluesky_cachefile_unref(BlueSkyCacheFile *cachefile);
 
 BlueSkyCacheFile *bluesky_cachefile_lookup(BlueSkyFS *fs,
-                                           int clouddir, int log_seq);
+                                           int clouddir, int log_seq,
+                                           gboolean start_fetch);
 void bluesky_cachefile_gc(BlueSkyFS *fs);
 
 void bluesky_replay(BlueSkyFS *fs);
index 8fbcc0d..c27fb6c 100644 (file)
@@ -99,6 +99,7 @@ gboolean bluesky_rangeset_insert(BlueSkyRangeset *rangeset,
                                  gpointer data);
 const BlueSkyRangesetItem *bluesky_rangeset_lookup(BlueSkyRangeset *rangeset,
                                                    uint64_t offset);
+const BlueSkyRangesetItem *bluesky_rangeset_lookup_next(BlueSkyRangeset *rangeset, uint64_t offset);
 
 /* Storage interface.  This presents a key-value store abstraction, and can
  * have multiple implementations: in-memory, on-disk, in-cloud. */
index ebbdeeb..a37776a 100644 (file)
@@ -283,50 +283,15 @@ void bluesky_cloudlog_fetch(BlueSkyCloudLog *log)
      * Once that is done, we can fall through the case of remapping the data
      * itself. */
     if (log->type == LOGTYPE_UNKNOWN) {
-        bluesky_cloudlog_stats_update(log, -1);
-        BlueSkyRCStr *raw = NULL;
-        if ((log->location_flags | log->pending_write) & CLOUDLOG_JOURNAL) {
-            raw = bluesky_log_map_object(log->fs, -1, log->log_seq,
-                                         log->log_offset, log->log_size, FALSE);
-        }
-
-        if (raw == NULL && (log->location_flags & CLOUDLOG_CLOUD)) {
-            log->location_flags &= ~CLOUDLOG_JOURNAL;
-            raw = bluesky_log_map_object(log->fs,
-                                         log->location.directory,
-                                         log->location.sequence,
-                                         log->location.offset,
-                                         log->location.size,
-                                         FALSE);
-        }
-
+        BlueSkyRCStr *raw = bluesky_log_map_object(log, FALSE);
         g_assert(raw != NULL);
         bluesky_deserialize_cloudlog(log, raw->data, raw->len);
         bluesky_string_unref(raw);
-        bluesky_cloudlog_stats_update(log, 1);
     }
 
     /* At this point all metadata should be available and we need only remap
      * the object data. */
-
-    int offset;
-    if ((log->location_flags | log->pending_write) & CLOUDLOG_JOURNAL) {
-        bluesky_cloudlog_stats_update(log, -1);
-        offset = log->log_offset + sizeof(struct log_header);
-        log->data = bluesky_log_map_object(log->fs, -1, log->log_seq,
-                                           offset, log->data_size, TRUE);
-        bluesky_cloudlog_stats_update(log, 1);
-    }
-
-    if (log->data == NULL && (log->location_flags & CLOUDLOG_CLOUD)) {
-        log->location_flags &= ~CLOUDLOG_JOURNAL;
-        bluesky_cloudlog_stats_update(log, -1);
-        offset = log->location.offset + sizeof(struct cloudlog_header);
-        log->data = bluesky_log_map_object(log->fs, log->location.directory,
-                                           log->location.sequence,
-                                           offset, log->data_size, TRUE);
-        bluesky_cloudlog_stats_update(log, 1);
-    }
+    log->data = bluesky_log_map_object(log, TRUE);
 
     if (log->data == NULL) {
         g_error("Unable to fetch cloudlog entry!");
index 38606c1..88b95df 100644 (file)
@@ -110,7 +110,8 @@ static gboolean log_open(BlueSkyLog *log)
         }
     }
 
-    log->current_log = bluesky_cachefile_lookup(log->fs, -1, log->seq_num);
+    log->current_log = bluesky_cachefile_lookup(log->fs, -1, log->seq_num,
+                                                FALSE);
     g_assert(log->current_log != NULL);
     g_mutex_unlock(log->current_log->lock);
 
@@ -354,59 +355,13 @@ void bluesky_cachefile_unref(BlueSkyCacheFile *cachefile)
     g_atomic_int_add(&cachefile->refcount, -1);
 }
 
-static void cloudlog_fetch_complete(BlueSkyStoreAsync *async,
-                                    BlueSkyCacheFile *cachefile);
-
-static void cloudlog_fetch_start(BlueSkyCacheFile *cachefile)
-{
-    g_atomic_int_inc(&cachefile->refcount);
-    cachefile->fetching = TRUE;
-    g_print("Starting fetch of %s from cloud\n", cachefile->filename);
-    BlueSkyStoreAsync *async = bluesky_store_async_new(cachefile->fs->store);
-    async->op = STORE_OP_GET;
-    async->key = g_strdup(cachefile->filename);
-    bluesky_store_async_add_notifier(async,
-                                     (GFunc)cloudlog_fetch_complete,
-                                     cachefile);
-    bluesky_store_async_submit(async);
-    bluesky_store_async_unref(async);
-}
-
-static void cloudlog_fetch_complete(BlueSkyStoreAsync *async,
-                                    BlueSkyCacheFile *cachefile)
-{
-    g_print("Fetch of %s from cloud complete, status = %d\n",
-            async->key, async->result);
-
-    g_mutex_lock(cachefile->lock);
-    if (async->result >= 0) {
-        char *pathname = g_strdup_printf("%s/%s",
-                                         cachefile->log->log_directory,
-                                         cachefile->filename);
-        async->data = bluesky_string_dup(async->data);
-        bluesky_cloudlog_decrypt(async->data->data, async->data->len,
-                                 cachefile->fs->keys, cachefile->items);
-        if (!g_file_set_contents(pathname, async->data->data, async->data->len,
-                                 NULL))
-            g_print("Error writing out fetched file to cache!\n");
-        g_free(pathname);
-
-        cachefile->fetching = FALSE;
-        cachefile->ready = TRUE;
-    } else {
-        g_print("Error fetching from cloud, retrying...\n");
-        cloudlog_fetch_start(cachefile);
-    }
-
-    bluesky_cachefile_unref(cachefile);
-    g_cond_broadcast(cachefile->cond);
-    g_mutex_unlock(cachefile->lock);
-}
+static void cloudlog_fetch_start(BlueSkyCacheFile *cachefile);
 
 /* Find the BlueSkyCacheFile object for the given journal or cloud log segment.
  * Returns the object in the locked state and with a reference taken. */
 BlueSkyCacheFile *bluesky_cachefile_lookup(BlueSkyFS *fs,
-                                           int clouddir, int log_seq)
+                                           int clouddir, int log_seq,
+                                           gboolean start_fetch)
 {
     if (page_size == 0) {
         page_size = getpagesize();
@@ -455,8 +410,14 @@ BlueSkyCacheFile *bluesky_cachefile_lookup(BlueSkyFS *fs,
 
         g_hash_table_insert(log->mmap_cache, map->filename, map);
 
+        int fd = openat(log->dirfd, logname, O_WRONLY | O_CREAT, 0600);
+        if (fd >= 0) {
+            ftruncate(fd, 5 << 20);     // FIXME
+            close(fd);
+        }
+
         // If the log file is stored in the cloud, we may need to fetch it
-        if (clouddir >= 0)
+        if (clouddir >= 0 && start_fetch)
             cloudlog_fetch_start(map);
     } else {
         g_mutex_lock(map->lock);
@@ -468,55 +429,164 @@ BlueSkyCacheFile *bluesky_cachefile_lookup(BlueSkyFS *fs,
     return map;
 }
 
+static void robust_pwrite(int fd, const char *buf, ssize_t count, off_t offset)
+{
+    while (count > 0) {
+        ssize_t written = pwrite(fd, buf, count, offset);
+        if (written < 0) {
+            if (errno == EINTR)
+                continue;
+            g_warning("pwrite failure: %m");
+            return;
+        }
+        buf += written;
+        count -= written;
+        offset += written;
+    }
+}
+
+static void cloudlog_partial_fetch_complete(BlueSkyStoreAsync *async,
+                                            BlueSkyCacheFile *cachefile);
+
+static void cloudlog_partial_fetch_start(BlueSkyCacheFile *cachefile,
+                                         size_t offset, size_t length)
+{
+    g_atomic_int_inc(&cachefile->refcount);
+    g_print("Starting fetch of %s from cloud\n", cachefile->filename);
+    BlueSkyStoreAsync *async = bluesky_store_async_new(cachefile->fs->store);
+    async->op = STORE_OP_GET;
+    async->key = g_strdup(cachefile->filename);
+    async->start = offset;
+    async->len = length;
+    bluesky_store_async_add_notifier(async,
+                                     (GFunc)cloudlog_partial_fetch_complete,
+                                     cachefile);
+    bluesky_store_async_submit(async);
+    bluesky_store_async_unref(async);
+}
+
+static void cloudlog_partial_fetch_complete(BlueSkyStoreAsync *async,
+                                            BlueSkyCacheFile *cachefile)
+{
+    g_print("Partial fetch of %s from cloud complete, status = %d\n",
+            async->key, async->result);
+
+    g_mutex_lock(cachefile->lock);
+    if (async->result >= 0) {
+        /* Descrypt items fetched and write valid items out to the local log,
+         * but only if they do not overlap existing objects.  This will protect
+         * against an attack by the cloud provider where one valid object is
+         * moved to another offset and used to overwrite data that we already
+         * have fetched. */
+        BlueSkyRangeset *items = bluesky_rangeset_new();
+        int fd = openat(cachefile->log->dirfd, cachefile->filename, O_WRONLY);
+        if (fd >= 0) {
+            async->data = bluesky_string_dup(async->data);
+            bluesky_cloudlog_decrypt(async->data->data, async->data->len,
+                                     cachefile->fs->keys, items);
+            uint64_t item_offset = 0;
+            while (TRUE) {
+                const BlueSkyRangesetItem *item;
+                item = bluesky_rangeset_lookup_next(items, item_offset);
+                if (item == NULL)
+                    break;
+                g_print("  item offset from range request: %d\n",
+                        (int)(item->start + async->start));
+                if (bluesky_rangeset_insert(cachefile->items,
+                                            async->start + item->start,
+                                            item->length, item->data))
+                {
+                    robust_pwrite(fd, async->data->data + item->start,
+                                  item->length, async->start + item->start);
+                } else {
+                    g_print("    item overlaps existing data!\n");
+                }
+                item_offset = item->start + 1;
+            }
+            /* TODO: Iterate over items and merge into cached file. */
+            close(fd);
+        } else {
+            g_warning("Unable to open and write to cache file %s: %m",
+                      cachefile->filename);
+        }
+    } else {
+        g_print("Error fetching from cloud, retrying...\n");
+        cloudlog_partial_fetch_start(cachefile, async->start, async->len);
+    }
+
+    bluesky_cachefile_unref(cachefile);
+    g_cond_broadcast(cachefile->cond);
+    g_mutex_unlock(cachefile->lock);
+}
+
+static void cloudlog_fetch_start(BlueSkyCacheFile *cachefile)
+{
+    g_atomic_int_inc(&cachefile->refcount);
+    cachefile->fetching = TRUE;
+    g_print("Starting fetch of %s from cloud\n", cachefile->filename);
+    BlueSkyStoreAsync *async = bluesky_store_async_new(cachefile->fs->store);
+    async->op = STORE_OP_GET;
+    async->key = g_strdup(cachefile->filename);
+    bluesky_store_async_add_notifier(async,
+                                     (GFunc)cloudlog_partial_fetch_complete,
+                                     cachefile);
+    bluesky_store_async_submit(async);
+    bluesky_store_async_unref(async);
+}
+
 /* The arguments are mostly straightforward.  log_dir is -1 for access from the
  * journal, and non-negative for access to a cloud log segment.  map_data
  * should be TRUE for the case that are mapping just the data of an item where
  * we have already parsed the item headers; this surpresses the error when the
  * access is not to the first bytes of the item. */
-BlueSkyRCStr *bluesky_log_map_object(BlueSkyFS *fs, int log_dir,
-                                     int log_seq, int log_offset, int log_size,
-                                     gboolean map_data)
+BlueSkyRCStr *bluesky_log_map_object(BlueSkyCloudLog *item, gboolean map_data)
 {
+    BlueSkyFS *fs = item->fs;
+    BlueSkyLog *log = fs->log;
+    BlueSkyCacheFile *map = NULL;
+    BlueSkyRCStr *str = NULL;
+    int location = 0;
+    size_t file_offset = 0, file_size = 0;
+    gboolean range_request = TRUE;
+
     if (page_size == 0) {
         page_size = getpagesize();
     }
 
-    BlueSkyLog *log = fs->log;
-    BlueSkyCacheFile *map = bluesky_cachefile_lookup(fs, log_dir, log_seq);
+    bluesky_cloudlog_stats_update(item, -1);
 
-    if (map == NULL) {
-        return NULL;
+    /* First, check to see if the journal still contains a copy of the item and
+     * if so use that. */
+    if ((item->location_flags | item->pending_write) & CLOUDLOG_JOURNAL) {
+        map = bluesky_cachefile_lookup(fs, -1, item->log_seq, TRUE);
+        if (map != NULL) {
+            location = CLOUDLOG_JOURNAL;
+            file_offset = item->log_offset;
+            file_size = item->log_size;
+        }
     }
 
-    /* Log segments fetched from the cloud might only be partially-fetched.
-     * Check whether the object we are interested in is available. */
-    if (log_dir >= 0) {
-        const BlueSkyRangesetItem *rangeitem;
-        rangeitem = bluesky_rangeset_lookup(map->items, log_offset);
-        if (rangeitem == NULL || rangeitem->start != log_offset) {
-            g_warning("log-%d: Item at offset %d does not seem to be available\n", log_seq, log_offset);
-        }
-        if (map_data && rangeitem != NULL
-            && log_offset > rangeitem->start
-            && log_size <= rangeitem->length - (log_offset - rangeitem->start))
-        {
-            g_warning("  ...allowing access to middle of log item");
+    if (location == 0 && (item->location_flags & CLOUDLOG_CLOUD)) {
+        item->location_flags &= ~CLOUDLOG_JOURNAL;
+        map = bluesky_cachefile_lookup(fs,
+                                       item->location.directory,
+                                       item->location.sequence,
+                                       !range_request);
+        if (map == NULL) {
+            g_warning("Unable to remap cloud log segment!");
+            goto exit1;
         }
+        location = CLOUDLOG_CLOUD;
+        file_offset = item->location.offset;
+        file_size = item->location.size;
     }
 
     if (map->addr == NULL) {
-        while (!map->ready && map->fetching) {
-            g_print("Waiting for log segment to be fetched from cloud...\n");
-            g_cond_wait(map->cond, map->lock);
-        }
-
         int fd = openat(log->dirfd, map->filename, O_RDONLY);
 
         if (fd < 0) {
             fprintf(stderr, "Error opening logfile %s: %m\n", map->filename);
-            bluesky_cachefile_unref(map);
-            g_mutex_unlock(map->lock);
-            return NULL;
+            goto exit2;
         }
 
         off_t length = lseek(fd, 0, SEEK_END);
@@ -526,17 +596,55 @@ BlueSkyRCStr *bluesky_log_map_object(BlueSkyFS *fs, int log_dir,
         map->len = length;
         g_atomic_int_add(&log->disk_used, map->len / 1024);
 
-        g_print("Re-mapped log segment %d...\n", log_seq);
         g_atomic_int_inc(&map->refcount);
 
         close(fd);
     }
 
-    BlueSkyRCStr *str;
+    /* Log segments fetched from the cloud might only be partially-fetched.
+     * Check whether the object we are interested in is available. */
+    if (location == CLOUDLOG_CLOUD) {
+        while (TRUE) {
+            const BlueSkyRangesetItem *rangeitem;
+            rangeitem = bluesky_rangeset_lookup(map->items, file_offset);
+            if (rangeitem != NULL && (rangeitem->start != file_offset
+                                      || rangeitem->length != file_size)) {
+                g_warning("log-%d: Item offset %zd seems to be invalid!",
+                          (int)item->location.sequence, file_offset);
+                goto exit2;
+            }
+            if (rangeitem == NULL) {
+                g_print("Item at offset 0x%zx not available, need to fetch.\n",
+                        file_offset);
+                if (range_request)
+                    cloudlog_partial_fetch_start(map, file_offset, file_size);
+                g_cond_wait(map->cond, map->lock);
+            } else if (rangeitem->start == file_offset
+                       && rangeitem->length == file_size) {
+                g_print("Item now available.\n");
+                break;
+            }
+        }
+    }
+
+    if (map_data) {
+        if (location == CLOUDLOG_JOURNAL)
+            file_offset += sizeof(struct log_header);
+        else
+            file_offset += sizeof(struct cloudlog_header);
+
+        file_size = item->data_size;
+    }
+    str = bluesky_string_new_from_mmap(map, file_offset, file_size);
     map->atime = bluesky_get_current_time();
-    str = bluesky_string_new_from_mmap(map, log_offset, log_size);
+
+    g_print("Returning item at offset 0x%zx.\n", file_offset);
+
+exit2:
     bluesky_cachefile_unref(map);
     g_mutex_unlock(map->lock);
+exit1:
+    bluesky_cloudlog_stats_update(item, 1);
     return str;
 }
 
index 4e8b011..e7514fc 100644 (file)
@@ -292,3 +292,23 @@ const BlueSkyRangesetItem *bluesky_rangeset_lookup(BlueSkyRangeset *rangeset,
     else
         return NULL;
 }
+
+/* Look up the first rangeset item starting at or following the given address.
+ * Can be used to iterate through a rangeset. */
+const BlueSkyRangesetItem *bluesky_rangeset_lookup_next(BlueSkyRangeset *rangeset, uint64_t offset)
+{
+    GSequenceIter *i;
+    i = g_sequence_search(rangeset->seq, &offset, compare64, NULL);
+    i = g_sequence_iter_prev(i);
+    if (g_sequence_iter_is_end(i))
+        return NULL;
+    BlueSkyRangesetItem *item = (BlueSkyRangesetItem *)g_sequence_get(i);
+    if (item->start < offset) {
+        i = g_sequence_iter_next(i);
+        if (g_sequence_iter_is_end(i))
+            item = NULL;
+        else
+            item = (BlueSkyRangesetItem *)g_sequence_get(i);
+    }
+    return item;
+}