Add proper per-file copyright notices/licenses and top-level license.
[bluesky.git] / bluesky / cloudlog.c
1 /* Blue Sky: File Systems in the Cloud
2  *
3  * Copyright (C) 2009  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  * 3. Neither the name of the University nor the names of its contributors
15  *    may be used to endorse or promote products derived from this software
16  *    without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
19  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
22  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28  * SUCH DAMAGE.
29  */
30
31 #include <stdio.h>
32 #include <stdint.h>
33 #include <glib.h>
34 #include <string.h>
35
36 #include "bluesky-private.h"
37
38 // Rough size limit for a log segment.  This is not a firm limit and there are
39 // no absolute guarantees on the size of a log segment.
40 #define CLOUDLOG_SEGMENT_SIZE (4 << 20)
41
42 // Maximum number of segments to attempt to upload concurrently
43 int cloudlog_concurrent_uploads = 32;
44
45 BlueSkyCloudID bluesky_cloudlog_new_id()
46 {
47     BlueSkyCloudID id;
48     bluesky_crypt_random_bytes((uint8_t *)&id.bytes, sizeof(id));
49     return id;
50 }
51
52 gchar *bluesky_cloudlog_id_to_string(BlueSkyCloudID id)
53 {
54     char buf[sizeof(BlueSkyCloudID) * 2 + 1];
55     buf[0] = '\0';
56
57     for (int i = 0; i < sizeof(BlueSkyCloudID); i++) {
58         sprintf(&buf[2*i], "%02x", (uint8_t)(id.bytes[i]));
59     }
60
61     return g_strdup(buf);
62 }
63
64 BlueSkyCloudID bluesky_cloudlog_id_from_string(const gchar *idstr)
65 {
66     BlueSkyCloudID id;
67     memset(&id, 0, sizeof(id));
68     for (int i = 0; i < 2*sizeof(BlueSkyCloudID); i++) {
69         char c = idstr[i];
70         if (c == '\0') {
71             g_warning("Short cloud id: %s\n", idstr);
72             break;
73         }
74         int val = 0;
75         if (c >= '0' && c <= '9')
76             val = c - '0';
77         else if (c >= 'a' && c <= 'f')
78             val = c - 'a' + 10;
79         else
80             g_warning("Bad character in cloud id: %s\n", idstr);
81         id.bytes[i / 2] += val << (i % 2 ? 0 : 4);
82     }
83     return id;
84 }
85
86 gboolean bluesky_cloudlog_equal(gconstpointer a, gconstpointer b)
87 {
88     BlueSkyCloudID *id1 = (BlueSkyCloudID *)a, *id2 = (BlueSkyCloudID *)b;
89
90     return memcmp(id1, id2, sizeof(BlueSkyCloudID)) == 0;
91 }
92
93 guint bluesky_cloudlog_hash(gconstpointer a)
94 {
95     BlueSkyCloudID *id = (BlueSkyCloudID *)a;
96
97     // Assume that bits in the ID are randomly chosen so that any subset of the
98     // bits can be used as a hash key.
99     return *(guint *)(&id->bytes);
100 }
101
102 /* Formatting of cloud log segments.  This handles grouping items together
103  * before writing a batch to the cloud, handling indirection through items like
104  * the inode map, etc. */
105
106 BlueSkyCloudLog *bluesky_cloudlog_new(BlueSkyFS *fs, const BlueSkyCloudID *id)
107 {
108     BlueSkyCloudLog *log = g_new0(BlueSkyCloudLog, 1);
109
110     log->lock = g_mutex_new();
111     log->cond = g_cond_new();
112     log->fs = fs;
113     log->type = LOGTYPE_UNKNOWN;
114     if (id != NULL)
115         memcpy(&log->id, id, sizeof(BlueSkyCloudID));
116     else
117         log->id = bluesky_cloudlog_new_id();
118     log->links = g_array_new(FALSE, TRUE, sizeof(BlueSkyCloudLog *));
119     g_atomic_int_set(&log->refcount, 1);
120
121     return log;
122 }
123
124 /* Helper function for updating memory usage statistics for a filesystem (the
125  * cache_log_* variables).  This will increment (type=1) or decrement (type=-1)
126  * the counter associated with the current state of the cloud log item.  The
127  * item should be locked or otherwise protected from concurrent access. */
128 void bluesky_cloudlog_stats_update(BlueSkyCloudLog *log, int type)
129 {
130     BlueSkyFS *fs = log->fs;
131
132     if (log->location_flags & CLOUDLOG_CLOUD) {
133         g_atomic_int_add(&fs->cache_log_cloud, type);
134     } else if (log->location_flags & CLOUDLOG_JOURNAL) {
135         g_atomic_int_add(&fs->cache_log_journal, type);
136     } else if (log->pending_write & CLOUDLOG_JOURNAL) {
137         g_atomic_int_add(&fs->cache_log_journal, type);
138     } else if (log->data != NULL) {
139         g_atomic_int_add(&fs->cache_log_dirty, type);
140     }
141 }
142
143 /* The reference held by the hash table does not count towards the reference
144  * count.  When a new object is created, it initially has a reference count of
145  * 1 for the creator, and similarly fetching an item from the hash table will
146  * also create a reference.  If the reference count drops to zero,
147  * bluesky_cloudlog_unref attempts to remove the object from the hash
148  * table--but there is a potential race since another thread might read the
149  * object from the hash table at the same time.  So an object with a reference
150  * count of zero may still be resurrected, in which case we need to abort the
151  * destruction.  Once the object is gone from the hash table, and if the
152  * reference count is still zero, it can actually be deleted. */
153 void bluesky_cloudlog_ref(BlueSkyCloudLog *log)
154 {
155     if (log == NULL)
156         return;
157
158     g_atomic_int_inc(&log->refcount);
159 }
160
161 void bluesky_cloudlog_unref(BlueSkyCloudLog *log)
162 {
163     if (log == NULL)
164         return;
165
166     if (g_atomic_int_dec_and_test(&log->refcount)) {
167         BlueSkyFS *fs = log->fs;
168
169         g_mutex_lock(fs->lock);
170         if (g_atomic_int_get(&log->refcount) > 0) {
171             g_mutex_unlock(fs->lock);
172             return;
173         }
174
175         if (!g_hash_table_remove(fs->locations, &log->id)) {
176             if (bluesky_verbose)
177                 g_warning("Could not find and remove cloud log item from hash table!");
178         }
179         g_mutex_unlock(fs->lock);
180
181         bluesky_cloudlog_stats_update(log, -1);
182         log->type = LOGTYPE_INVALID;
183         g_mutex_free(log->lock);
184         g_cond_free(log->cond);
185         for (int i = 0; i < log->links->len; i++) {
186             BlueSkyCloudLog *c = g_array_index(log->links,
187                                                BlueSkyCloudLog *, i);
188             bluesky_cloudlog_unref(c);
189         }
190         g_array_unref(log->links);
191         bluesky_string_unref(log->data);
192         g_free(log);
193     }
194 }
195
196 /* For locking reasons cloudlog unrefs may sometimes need to be performed in
197  * the future.  We launch a thread for handling these delayed unreference
198  * requests. */
199 static gpointer cloudlog_unref_thread(gpointer q)
200 {
201     GAsyncQueue *queue = (GAsyncQueue *)q;
202
203     while (TRUE) {
204         BlueSkyCloudLog *item = (BlueSkyCloudLog *)g_async_queue_pop(queue);
205         bluesky_cloudlog_unref(item);
206     }
207
208     return NULL;
209 }
210
211 void bluesky_cloudlog_unref_delayed(BlueSkyCloudLog *log)
212 {
213     if (log != NULL)
214         g_async_queue_push(log->fs->unref_queue, log);
215 }
216
217 /* Erase the information contained within the in-memory cloud log
218  * representation.  This does not free up the item itself, but frees the data
219  * and references to other log items and resets the type back to unknown.  If
220  * the object was written out to persistent storage, all state about it can be
221  * recovered by loading the object back in.  The object must be locked before
222  * calling this function. */
223 void bluesky_cloudlog_erase(BlueSkyCloudLog *log)
224 {
225     g_assert(log->data_lock_count == 0);
226
227     if (log->type == LOGTYPE_UNKNOWN)
228         return;
229
230     log->type = LOGTYPE_UNKNOWN;
231     log->data_size = 0;
232     bluesky_string_unref(log->data);
233     log->data = NULL;
234     log->data_lock_count = 0;
235
236     for (int i = 0; i < log->links->len; i++) {
237         BlueSkyCloudLog *c = g_array_index(log->links,
238                                            BlueSkyCloudLog *, i);
239         bluesky_cloudlog_unref(c);
240     }
241     g_array_unref(log->links);
242     log->links = g_array_new(FALSE, TRUE, sizeof(BlueSkyCloudLog *));
243 }
244
245 /* Start a write of the object to the local log. */
246 void bluesky_cloudlog_sync(BlueSkyCloudLog *log)
247 {
248     bluesky_log_item_submit(log, log->fs->log);
249 }
250
251 /* Add the given entry to the global hash table containing cloud log entries.
252  * Takes ownership of the caller's reference. */
253 void bluesky_cloudlog_insert_locked(BlueSkyCloudLog *log)
254 {
255     g_hash_table_insert(log->fs->locations, &log->id, log);
256 }
257
258 void bluesky_cloudlog_insert(BlueSkyCloudLog *log)
259 {
260     g_mutex_lock(log->fs->lock);
261     bluesky_cloudlog_insert_locked(log);
262     g_mutex_unlock(log->fs->lock);
263 }
264
265 /* Look up the cloud log entry for the given ID.  If create is TRUE and the
266  * item does not exist, create a special pending entry that can later be filled
267  * in when the real item is loaded.  The returned item has a reference held.
268  * As a special case, if a null ID is provided then NULL is returned. */
269 BlueSkyCloudLog *bluesky_cloudlog_get(BlueSkyFS *fs, BlueSkyCloudID id)
270 {
271     static BlueSkyCloudID id0 = {{0}};
272
273     if (memcmp(&id, &id0, sizeof(BlueSkyCloudID)) == 0)
274         return NULL;
275
276     g_mutex_lock(fs->lock);
277     BlueSkyCloudLog *item;
278     item = g_hash_table_lookup(fs->locations, &id);
279     if (item == NULL) {
280         item = bluesky_cloudlog_new(fs, &id);
281         bluesky_cloudlog_stats_update(item, 1);
282         bluesky_cloudlog_insert_locked(item);
283     } else {
284         bluesky_cloudlog_ref(item);
285     }
286     g_mutex_unlock(fs->lock);
287     return item;
288 }
289
290 /* Work to fetch a cloudlog item in a background thread.  The item will be
291  * locked while the fetch is in progress and unlocked when it completes. */
292 static GThreadPool *fetch_pool;
293
294 static void background_fetch_task(gpointer p, gpointer unused)
295 {
296     BlueSkyCloudLog *item = (BlueSkyCloudLog *)p;
297
298     g_mutex_lock(item->lock);
299     g_mutex_unlock(item->lock);
300     bluesky_cloudlog_unref(item);
301 }
302
303 void bluesky_cloudlog_background_fetch(BlueSkyCloudLog *item)
304 {
305     bluesky_cloudlog_ref(item);
306     g_thread_pool_push(fetch_pool, item, NULL);
307 }
308
309 /* Attempt to prefetch a cloud log item.  This does not guarantee that it will
310  * be made available, but does make it more likely that a future call to
311  * bluesky_cloudlog_fetch will complete quickly.  Item must be locked? */
312 void bluesky_cloudlog_prefetch(BlueSkyCloudLog *item)
313 {
314     if (item->data != NULL)
315         return;
316
317     /* When operating in a non log-structured mode, simply start a background
318      * fetch immediately when asked to prefetch. */
319     if (bluesky_options.disable_aggregation
320         || bluesky_options.disable_read_aggregation) {
321         bluesky_cloudlog_background_fetch(item);
322         return;
323     }
324
325     /* TODO: Some of the code here is duplicated with bluesky_log_map_object.
326      * Refactor to fix that. */
327     BlueSkyFS *fs = item->fs;
328     BlueSkyCacheFile *map = NULL;
329
330     /* First, check to see if the journal still contains a copy of the item and
331      * if so update the atime on the journal so it is likely to be kept around
332      * until we need it. */
333     if ((item->location_flags | item->pending_write) & CLOUDLOG_JOURNAL) {
334         map = bluesky_cachefile_lookup(fs, -1, item->log_seq, TRUE);
335         if (map != NULL) {
336             map->atime = bluesky_get_current_time();
337             bluesky_cachefile_unref(map);
338             g_mutex_unlock(map->lock);
339             return;
340         }
341     }
342
343     item->location_flags &= ~CLOUDLOG_JOURNAL;
344     if (!(item->location_flags & CLOUDLOG_CLOUD))
345         return;
346
347     map = bluesky_cachefile_lookup(fs,
348                                    item->location.directory,
349                                    item->location.sequence,
350                                    FALSE);
351     if (map == NULL)
352         return;
353
354     /* At this point, we have information about the log segment containing the
355      * item we need.  If our item is already fetched, we have nothing to do
356      * except update the atime.  If not, queue up a fetch of our object. */
357     const BlueSkyRangesetItem *rangeitem;
358     rangeitem = bluesky_rangeset_lookup(map->items,
359                                         item->location.offset);
360     if (rangeitem == NULL) {
361         if (map->prefetches == NULL)
362             map->prefetches = bluesky_rangeset_new();
363
364         gchar *id = bluesky_cloudlog_id_to_string(item->id);
365         if (bluesky_verbose)
366             g_print("Need to prefetch %s\n", id);
367         g_free(id);
368
369         bluesky_rangeset_insert(map->prefetches,
370                                 item->location.offset,
371                                 item->location.size, NULL);
372
373         uint64_t start, length;
374         bluesky_rangeset_get_extents(map->prefetches, &start, &length);
375         if (bluesky_verbose)
376             g_print("Range to prefetch: %"PRIu64" + %"PRIu64"\n",
377                     start, length);
378     }
379
380     bluesky_cachefile_unref(map);
381     g_mutex_unlock(map->lock);
382 }
383
384 /* Ensure that a cloud log item is loaded in memory, and if not read it in.
385  * TODO: Make asynchronous, and make this also fetch from the cloud.  Right now
386  * we only read from the log.  Log item must be locked. */
387 void bluesky_cloudlog_fetch(BlueSkyCloudLog *log)
388 {
389     if (log->data != NULL)
390         return;
391
392     BlueSkyProfile *profile = bluesky_profile_get();
393     if (profile != NULL)
394         bluesky_profile_add_event(profile, g_strdup_printf("Fetch log entry"));
395
396     /* There are actually two cases: a full deserialization if we have not ever
397      * read the object before, and a partial deserialization where the metadata
398      * is already in memory and we just need to remap the data.  If the object
399      * type has not yet been set, we'll need to read and parse the metadata.
400      * Once that is done, we can fall through the case of remapping the data
401      * itself. */
402     if (log->type == LOGTYPE_UNKNOWN) {
403         BlueSkyRCStr *raw = bluesky_log_map_object(log, FALSE);
404         g_assert(raw != NULL);
405         bluesky_deserialize_cloudlog(log, raw->data, raw->len);
406         bluesky_string_unref(raw);
407     }
408
409     /* At this point all metadata should be available and we need only remap
410      * the object data. */
411     log->data = bluesky_log_map_object(log, TRUE);
412
413     if (log->data == NULL) {
414         g_error("Unable to fetch cloudlog entry!");
415     }
416
417     if (profile != NULL)
418         bluesky_profile_add_event(profile, g_strdup_printf("Fetch complete"));
419     g_cond_broadcast(log->cond);
420 }
421
422 BlueSkyCloudPointer bluesky_cloudlog_serialize(BlueSkyCloudLog *log,
423                                                BlueSkyFS *fs)
424 {
425     BlueSkyCloudLogState *state = fs->log_state;
426
427     if ((log->location_flags | log->pending_write) & CLOUDLOG_CLOUD) {
428         return log->location;
429     }
430
431     for (int i = 0; i < log->links->len; i++) {
432         BlueSkyCloudLog *ref = g_array_index(log->links,
433                                              BlueSkyCloudLog *, i);
434         if (ref != NULL)
435             bluesky_cloudlog_serialize(ref, fs);
436     }
437
438     /* FIXME: Ought lock to be taken earlier? */
439     g_mutex_lock(log->lock);
440     bluesky_cloudlog_fetch(log);
441     g_assert(log->data != NULL);
442
443     bluesky_cloudlog_stats_update(log, -1);
444
445     GString *data1 = g_string_new("");
446     GString *data2 = g_string_new("");
447     GString *data3 = g_string_new("");
448     bluesky_serialize_cloudlog(log, data1, data2, data3);
449
450     log->location = state->location;
451     log->location.offset = state->data->len;
452     log->data_size = data1->len;
453
454     struct cloudlog_header header;
455     memcpy(header.magic, CLOUDLOG_MAGIC, 4);
456     memset(header.crypt_auth, sizeof(header.crypt_auth), 0);
457     memset(header.crypt_iv, sizeof(header.crypt_iv), 0);
458     header.type = log->type + '0';
459     header.size1 = GUINT32_TO_LE(data1->len);
460     header.size2 = GUINT32_TO_LE(data2->len);
461     header.size3 = GUINT32_TO_LE(data3->len);
462     header.id = log->id;
463     header.inum = GUINT64_TO_LE(log->inum);
464
465     g_string_append_len(state->data, (const char *)&header, sizeof(header));
466     g_string_append_len(state->data, data1->str, data1->len);
467     g_string_append_len(state->data, data2->str, data2->len);
468     g_string_append_len(state->data, data3->str, data3->len);
469
470     log->location.size = state->data->len - log->location.offset;
471
472     g_string_free(data1, TRUE);
473     g_string_free(data2, TRUE);
474     g_string_free(data3, TRUE);
475
476     /* If the object we flushed was an inode, update the inode map. */
477     if (log->type == LOGTYPE_INODE) {
478         g_mutex_lock(fs->lock);
479         InodeMapEntry *entry = bluesky_inode_map_lookup(fs->inode_map,
480                                                         log->inum, 1);
481         bluesky_cloudlog_unref_delayed(entry->item);
482         entry->item = log;
483         bluesky_cloudlog_ref(entry->item);
484         g_mutex_unlock(fs->lock);
485     }
486
487     /* TODO: We should mark the objects as committed on the cloud until the
488      * data is flushed and acknowledged. */
489     log->pending_write |= CLOUDLOG_CLOUD;
490     bluesky_cloudlog_stats_update(log, 1);
491     state->writeback_list = g_slist_prepend(state->writeback_list, log);
492     bluesky_cloudlog_ref(log);
493     g_mutex_unlock(log->lock);
494
495     if (state->data->len > CLOUDLOG_SEGMENT_SIZE
496         || bluesky_options.disable_aggregation)
497     {
498         bluesky_cloudlog_flush(fs);
499     }
500
501     return log->location;
502 }
503
504 static void cloudlog_flush_complete(BlueSkyStoreAsync *async,
505                                     SerializedRecord *record)
506 {
507     g_print("Write of %s to cloud complete, status = %d\n",
508             async->key, async->result);
509
510     g_mutex_lock(record->lock);
511     if (async->result >= 0) {
512         while (record->items != NULL) {
513             BlueSkyCloudLog *item = (BlueSkyCloudLog *)record->items->data;
514             g_mutex_lock(item->lock);
515             bluesky_cloudlog_stats_update(item, -1);
516             item->pending_write &= ~CLOUDLOG_CLOUD;
517             item->location_flags |= CLOUDLOG_CLOUD;
518             bluesky_cloudlog_stats_update(item, 1);
519             g_mutex_unlock(item->lock);
520             bluesky_cloudlog_unref(item);
521
522             record->items = g_slist_delete_link(record->items, record->items);
523         }
524
525         bluesky_string_unref(record->data);
526         record->data = NULL;
527         g_slist_free(record->items);
528         record->items = NULL;
529         record->complete = TRUE;
530
531         BlueSkyCloudLogState *state = record->fs->log_state;
532         g_mutex_lock(state->uploads_pending_lock);
533         state->uploads_pending--;
534         g_cond_broadcast(state->uploads_pending_cond);
535         g_mutex_unlock(state->uploads_pending_lock);
536
537         g_cond_broadcast(record->cond);
538     } else {
539         g_print("Write should be resubmitted...\n");
540
541         BlueSkyStoreAsync *async2 = bluesky_store_async_new(async->store);
542         async2->op = STORE_OP_PUT;
543         async2->key = g_strdup(async->key);
544         async2->data = record->data;
545         async2->profile = async->profile;
546         bluesky_string_ref(record->data);
547         bluesky_store_async_submit(async2);
548         bluesky_store_async_add_notifier(async2,
549                                          (GFunc)cloudlog_flush_complete,
550                                          record);
551         bluesky_store_async_unref(async2);
552     }
553     g_mutex_unlock(record->lock);
554 }
555
556 /* Finish up a partially-written cloud log segment and flush it to storage. */
557 static void cloud_flush_background(SerializedRecord *record)
558 {
559     bluesky_cloudlog_encrypt(record->raw_data, record->fs->keys);
560     record->data = bluesky_string_new_from_gstring(record->raw_data);
561     record->raw_data = NULL;
562
563     BlueSkyStoreAsync *async = bluesky_store_async_new(record->fs->store);
564     async->op = STORE_OP_PUT;
565     async->key = record->key;
566     async->data = record->data;
567     bluesky_string_ref(record->data);
568     bluesky_store_async_submit(async);
569     bluesky_store_async_add_notifier(async,
570                                      (GFunc)cloudlog_flush_complete,
571                                      record);
572     bluesky_store_async_unref(async);
573 }
574
575 void bluesky_cloudlog_flush(BlueSkyFS *fs)
576 {
577     BlueSkyCloudLogState *state = fs->log_state;
578     if (state->data == NULL || state->data->len == 0)
579         return;
580
581     g_mutex_lock(state->uploads_pending_lock);
582     while (state->uploads_pending > cloudlog_concurrent_uploads)
583         g_cond_wait(state->uploads_pending_cond, state->uploads_pending_lock);
584     state->uploads_pending++;
585     g_mutex_unlock(state->uploads_pending_lock);
586
587     /* TODO: Append some type of commit record to the log segment? */
588
589     g_print("Serializing %zd bytes of data to cloud\n", state->data->len);
590     SerializedRecord *record = g_new0(SerializedRecord, 1);
591     record->fs = fs;
592     record->raw_data = state->data;
593     record->data = NULL;
594     record->items = state->writeback_list;
595     record->lock = g_mutex_new();
596     record->cond = g_cond_new();
597     state->writeback_list = NULL;
598
599     record->key = g_strdup_printf("log-%08d-%08d",
600                                   state->location.directory,
601                                   state->location.sequence);
602
603     state->pending_segments = g_list_prepend(state->pending_segments, record);
604
605     /* Encryption of data and upload happen in the background, for additional
606      * parallelism when uploading large amounts of data. */
607     g_thread_create((GThreadFunc)cloud_flush_background, record, FALSE, NULL);
608
609     state->location.sequence++;
610     state->location.offset = 0;
611     state->data = g_string_new("");
612 }
613
614 /* Make an encryption pass over a cloud log segment to encrypt private data in
615  * it. */
616 void bluesky_cloudlog_encrypt(GString *segment, BlueSkyCryptKeys *keys)
617 {
618     char *data = segment->str;
619     size_t remaining_size = segment->len;
620
621     while (remaining_size >= sizeof(struct cloudlog_header)) {
622         struct cloudlog_header *header = (struct cloudlog_header *)data;
623         size_t item_size = sizeof(struct cloudlog_header)
624                            + GUINT32_FROM_LE(header->size1)
625                            + GUINT32_FROM_LE(header->size2)
626                            + GUINT32_FROM_LE(header->size3);
627         if (item_size > remaining_size)
628             break;
629         bluesky_crypt_block_encrypt(data, item_size, keys);
630
631         data += item_size;
632         remaining_size -= item_size;
633     }
634 }
635
636 /* Make an decryption pass over a cloud log segment to decrypt items which were
637  * encrypted.  Also computes a list of all offsets which at which valid
638  * cloud log items are found and adds those offsets to items (if non-NULL).
639  *
640  * If allow_unauth is set to true, then allow a limited set of unauthenticated
641  * items that may have been rewritten by a file system cleaner.  These include
642  * the checkpoint and inode map records only; other items must still pass
643  * authentication. */
644 void bluesky_cloudlog_decrypt(char *segment, size_t len,
645                               BlueSkyCryptKeys *keys,
646                               BlueSkyRangeset *items,
647                               gboolean allow_unauth)
648 {
649     char *data = segment;
650     size_t remaining_size = len;
651     size_t offset = 0;
652
653     while (remaining_size >= sizeof(struct cloudlog_header)) {
654         struct cloudlog_header *header = (struct cloudlog_header *)data;
655         size_t item_size = sizeof(struct cloudlog_header)
656                            + GUINT32_FROM_LE(header->size1)
657                            + GUINT32_FROM_LE(header->size2)
658                            + GUINT32_FROM_LE(header->size3);
659         if (item_size > remaining_size)
660             break;
661         if (bluesky_crypt_block_decrypt(data, item_size, keys, allow_unauth)) {
662             if (items != NULL) {
663                 if (bluesky_verbose)
664                     g_print("  data item at %zx\n", offset);
665                 bluesky_rangeset_insert(items, offset, item_size,
666                                         GINT_TO_POINTER(TRUE));
667             }
668         } else {
669             g_warning("Unauthenticated data at offset %zd", offset);
670             if (items != NULL) {
671                 bluesky_rangeset_insert(items, offset, item_size,
672                                         GINT_TO_POINTER(TRUE));
673             }
674         }
675
676         data += item_size;
677         offset += item_size;
678         remaining_size -= item_size;
679     }
680 }
681
682 void bluesky_cloudlog_threads_init(BlueSkyFS *fs)
683 {
684     fs->unref_queue = g_async_queue_new();
685     g_thread_create(cloudlog_unref_thread, fs->unref_queue, FALSE, NULL);
686     fetch_pool = g_thread_pool_new(background_fetch_task, NULL, 40, FALSE,
687                                    NULL);
688 }