+static void robust_pwrite(int fd, const char *buf, ssize_t count, off_t offset)
+{
+ while (count > 0) {
+ ssize_t written = pwrite(fd, buf, count, offset);
+ if (written < 0) {
+ if (errno == EINTR)
+ continue;
+ g_warning("pwrite failure: %m");
+ return;
+ }
+ buf += written;
+ count -= written;
+ offset += written;
+ }
+}
+
+static void cloudlog_partial_fetch_complete(BlueSkyStoreAsync *async,
+ BlueSkyCacheFile *cachefile);
+
+static void cloudlog_partial_fetch_start(BlueSkyCacheFile *cachefile,
+ size_t offset, size_t length)
+{
+ g_atomic_int_inc(&cachefile->refcount);
+ g_print("Starting fetch of %s from cloud\n", cachefile->filename);
+ BlueSkyStoreAsync *async = bluesky_store_async_new(cachefile->fs->store);
+ async->op = STORE_OP_GET;
+ async->key = g_strdup(cachefile->filename);
+ async->start = offset;
+ async->len = length;
+ bluesky_store_async_add_notifier(async,
+ (GFunc)cloudlog_partial_fetch_complete,
+ cachefile);
+ bluesky_store_async_submit(async);
+ bluesky_store_async_unref(async);
+}
+
+static void cloudlog_partial_fetch_complete(BlueSkyStoreAsync *async,
+ BlueSkyCacheFile *cachefile)
+{
+ g_print("Partial fetch of %s from cloud complete, status = %d\n",
+ async->key, async->result);
+
+ g_mutex_lock(cachefile->lock);
+ if (async->result >= 0) {
+ /* Descrypt items fetched and write valid items out to the local log,
+ * but only if they do not overlap existing objects. This will protect
+ * against an attack by the cloud provider where one valid object is
+ * moved to another offset and used to overwrite data that we already
+ * have fetched. */
+ BlueSkyRangeset *items = bluesky_rangeset_new();
+ int fd = openat(cachefile->log->dirfd, cachefile->filename, O_WRONLY);
+ if (fd >= 0) {
+ async->data = bluesky_string_dup(async->data);
+ bluesky_cloudlog_decrypt(async->data->data, async->data->len,
+ cachefile->fs->keys, items);
+ uint64_t item_offset = 0;
+ while (TRUE) {
+ const BlueSkyRangesetItem *item;
+ item = bluesky_rangeset_lookup_next(items, item_offset);
+ if (item == NULL)
+ break;
+ g_print(" item offset from range request: %d\n",
+ (int)(item->start + async->start));
+ if (bluesky_rangeset_insert(cachefile->items,
+ async->start + item->start,
+ item->length, item->data))
+ {
+ robust_pwrite(fd, async->data->data + item->start,
+ item->length, async->start + item->start);
+ } else {
+ g_print(" item overlaps existing data!\n");
+ }
+ item_offset = item->start + 1;
+ }
+ /* TODO: Iterate over items and merge into cached file. */
+ close(fd);
+ } else {
+ g_warning("Unable to open and write to cache file %s: %m",
+ cachefile->filename);
+ }
+ } else {
+ g_print("Error fetching from cloud, retrying...\n");
+ cloudlog_partial_fetch_start(cachefile, async->start, async->len);
+ }
+
+ bluesky_cachefile_unref(cachefile);
+ g_cond_broadcast(cachefile->cond);
+ g_mutex_unlock(cachefile->lock);
+}
+
+static void cloudlog_fetch_start(BlueSkyCacheFile *cachefile)
+{
+ g_atomic_int_inc(&cachefile->refcount);
+ cachefile->fetching = TRUE;
+ g_print("Starting fetch of %s from cloud\n", cachefile->filename);
+ BlueSkyStoreAsync *async = bluesky_store_async_new(cachefile->fs->store);
+ async->op = STORE_OP_GET;
+ async->key = g_strdup(cachefile->filename);
+ bluesky_store_async_add_notifier(async,
+ (GFunc)cloudlog_partial_fetch_complete,
+ cachefile);
+ bluesky_store_async_submit(async);
+ bluesky_store_async_unref(async);
+}
+