+static void cloudlog_partial_fetch_complete(BlueSkyStoreAsync *async,
+ BlueSkyCacheFile *cachefile);
+
+static void cloudlog_partial_fetch_start(BlueSkyCacheFile *cachefile,
+ size_t offset, size_t length)
+{
+ g_atomic_int_inc(&cachefile->refcount);
+ if (bluesky_verbose)
+ g_print("Starting partial fetch of %s from cloud (%zd + %zd)\n",
+ cachefile->filename, offset, length);
+ BlueSkyStoreAsync *async = bluesky_store_async_new(cachefile->fs->store);
+ async->op = STORE_OP_GET;
+ async->key = g_strdup(cachefile->filename);
+ async->start = offset;
+ async->len = length;
+ async->profile = bluesky_profile_get();
+ bluesky_store_async_add_notifier(async,
+ (GFunc)cloudlog_partial_fetch_complete,
+ cachefile);
+ bluesky_store_async_submit(async);
+ bluesky_store_async_unref(async);
+}
+
+static void cloudlog_partial_fetch_complete(BlueSkyStoreAsync *async,
+ BlueSkyCacheFile *cachefile)
+{
+ if (bluesky_verbose || async->result != 0)
+ g_print("Fetch of %s from cloud complete, status = %d\n",
+ async->key, async->result);
+
+ g_mutex_lock(cachefile->lock);
+ if (async->result >= 0) {
+ if (async->len == 0) {
+ if (bluesky_verbose)
+ g_print("Complete object was fetched.\n");
+ cachefile->complete = TRUE;
+ }
+
+ /* Descrypt items fetched and write valid items out to the local log,
+ * but only if they do not overlap existing objects. This will protect
+ * against an attack by the cloud provider where one valid object is
+ * moved to another offset and used to overwrite data that we already
+ * have fetched. */
+ BlueSkyRangeset *items = bluesky_rangeset_new();
+ int fd = openat(cachefile->log->dirfd, cachefile->filename, O_WRONLY);
+ if (fd >= 0) {
+ gboolean allow_unauth;
+ async->data = bluesky_string_dup(async->data);
+ allow_unauth = cachefile->log_dir == BLUESKY_CLOUD_DIR_CLEANER;
+ bluesky_cloudlog_decrypt(async->data->data, async->data->len,
+ cachefile->fs->keys, items, allow_unauth);
+ uint64_t item_offset = 0;
+ while (TRUE) {
+ const BlueSkyRangesetItem *item;
+ item = bluesky_rangeset_lookup_next(items, item_offset);
+ if (item == NULL)
+ break;
+ if (bluesky_verbose) {
+ g_print(" item offset from range request: %d\n",
+ (int)(item->start + async->start));
+ }
+ if (bluesky_rangeset_insert(cachefile->items,
+ async->start + item->start,
+ item->length, item->data))
+ {
+ robust_pwrite(fd, async->data->data + item->start,
+ item->length, async->start + item->start);
+ } else {
+ g_print(" item overlaps existing data!\n");
+ }
+ item_offset = item->start + 1;
+ }
+ /* TODO: Iterate over items and merge into cached file. */
+ close(fd);
+ } else {
+ g_warning("Unable to open and write to cache file %s: %m",
+ cachefile->filename);
+ }
+
+ bluesky_rangeset_free(items);
+ } else {
+ g_print("Error fetching %s from cloud, retrying...\n", async->key);
+ cloudlog_partial_fetch_start(cachefile, async->start, async->len);
+ }
+
+ /* Update disk-space usage statistics, since the writes above may have
+ * consumed more space. */
+ g_atomic_int_add(&cachefile->log->disk_used, -cachefile->disk_used);
+ struct stat statbuf;
+ if (fstatat(cachefile->log->dirfd, cachefile->filename, &statbuf, 0) >= 0) {
+ /* Convert from 512-byte blocks to 1-kB units */
+ cachefile->disk_used = (statbuf.st_blocks + 1) / 2;
+ }
+ g_atomic_int_add(&cachefile->log->disk_used, cachefile->disk_used);
+
+ bluesky_cachefile_unref(cachefile);
+ g_cond_broadcast(cachefile->cond);
+ g_mutex_unlock(cachefile->lock);
+}
+
+static void cloudlog_fetch_start(BlueSkyCacheFile *cachefile)
+{
+ g_atomic_int_inc(&cachefile->refcount);
+ cachefile->fetching = TRUE;
+ if (bluesky_verbose)
+ g_print("Starting fetch of %s from cloud\n", cachefile->filename);
+ BlueSkyStoreAsync *async = bluesky_store_async_new(cachefile->fs->store);
+ async->op = STORE_OP_GET;
+ async->key = g_strdup(cachefile->filename);
+ async->profile = bluesky_profile_get();
+ bluesky_store_async_add_notifier(async,
+ (GFunc)cloudlog_partial_fetch_complete,
+ cachefile);
+ bluesky_store_async_submit(async);
+ bluesky_store_async_unref(async);
+}