Add some proxy statistics.
[bluesky.git] / bluesky / imap.c
1 /* Blue Sky: File Systems in the Cloud
2  *
3  * Copyright (C) 2009  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * TODO: Licensing
7  */
8
9 #include <stdio.h>
10 #include <stdlib.h>
11 #include <stdint.h>
12 #include <inttypes.h>
13 #include <glib.h>
14 #include <string.h>
15
16 #include "bluesky-private.h"
17
18 /* Magic number at the start of the checkpoint record, to check for version
19  * mismatches. */
20 #define CHECKPOINT_MAGIC 0x7ad7dafb42a498b4ULL
21
22 /* Inode maps.  There is both an in-memory representation as well as the
23  * serialized form in the cloud.
24  *
25  * The inode map is broken up into pieces by partitioning in the inode number
26  * space.  The checkpoint region will contain pointers to each of these map
27  * ranges. */
28
29 /* Roughly how many inodes to try to put in each inode map range. */
30 static int target_imap_size = 4096;
31
32 /* Comparison function for lookuping up inode map entries.  Reads a 64-bit
33  * value from the pointed-to address to do the comparison, so we require that
34  * the InodeMapEntry and InodeMapRange structures start with the 64-bit value
35  * they are sorted by. */
36 static gint compare(gconstpointer a, gconstpointer b, gpointer user_data)
37 {
38     uint64_t x = *(uint64_t *)a;
39     uint64_t y = *(uint64_t *)b;
40
41     if (x < y)
42         return -1;
43     else if (x > y)
44         return 1;
45     else
46         return 0;
47 }
48
49 /* Look up the inode map entry for the given inode number.  If action is +1,
50  * create a new entry if it does not already exist; if it is -1 then delete the
51  * entry instead.  If 0, return the entry if it is already present.  A non-zero
52  * action value will mark the inode map range as dirty. */
53 InodeMapEntry *bluesky_inode_map_lookup(GSequence *inode_map, uint64_t inum,
54                                         int action)
55 {
56     GSequenceIter *i;
57
58     /* First, scan through to find the inode map section that covers the
59      * appropriate range.  Create one if it does not exist but we were asked to
60      * create an entry. */
61     InodeMapRange *range = NULL;
62
63     i = g_sequence_search(inode_map, &inum, compare, NULL);
64     i = g_sequence_iter_prev(i);
65     if (!g_sequence_iter_is_end(i))
66         range = (InodeMapRange *)g_sequence_get(i);
67     if (range == NULL || inum < range->start || inum > range->end) {
68         if (action <= 0)
69             return NULL;
70
71         /* Create a new range.  First, determine bounds on the range enpoints
72          * based on neighboring ranges.  Then, shrink the size of the range to
73          * a reasonable size that covers the needed inode number. */
74         range = g_new0(InodeMapRange, 1);
75         range->start = 0;
76         range->end = G_MAXUINT64;
77         range->map_entries = g_sequence_new(NULL);
78         range->serialized = NULL;
79
80         g_print("Creating inode map range, 1: start=%"PRIu64" end=%"PRIu64"\n",
81                 range->start, range->end);
82
83         if (!g_sequence_iter_is_begin(i) && !g_sequence_iter_is_end(i))
84             range->start = ((InodeMapRange *)g_sequence_get(i))->end + 1;
85         i = g_sequence_iter_next(i);
86         if (!g_sequence_iter_is_end(i))
87             range->end = ((InodeMapRange *)g_sequence_get(i))->start - 1;
88
89         g_print("Creating inode map range, 2: start=%"PRIu64" end=%"PRIu64"\n",
90                 range->start, range->end);
91         g_assert(inum >= range->start && inum <= range->end);
92
93         range->start = MAX(range->start,
94                            inum & ~(uint64_t)(target_imap_size - 1));
95         range->end = MIN(range->end, range->start + target_imap_size - 1);
96
97         g_print("Creating inode map range, 3: start=%"PRIu64" end=%"PRIu64"\n",
98                 range->start, range->end);
99
100         g_sequence_insert_sorted(inode_map, range, compare, NULL);
101     }
102
103     /* Next, try to find the entry within this range of the inode map. */
104     InodeMapEntry *entry = NULL;
105     i = g_sequence_search(range->map_entries, &inum, compare, NULL);
106     i = g_sequence_iter_prev(i);
107     if (!g_sequence_iter_is_end(i))
108         entry = (InodeMapEntry *)g_sequence_get(i);
109     if (entry == NULL || inum != entry->inum) {
110         if (action <= 0)
111             return NULL;
112
113         entry = g_new0(InodeMapEntry, 1);
114         entry->inum = inum;
115         g_sequence_insert_sorted(range->map_entries, entry, compare, NULL);
116
117         if (bluesky_verbose)
118             g_print("Created inode map entry for %"PRIu64"\n", inum);
119     }
120
121     if (action != 0) {
122         bluesky_cloudlog_unref_delayed(range->serialized);
123         range->serialized = NULL;
124     }
125
126     /* Were we requested to delete the item? */
127     if (action < 0) {
128         g_sequence_remove(i);
129         entry = NULL;
130     }
131
132     return entry;
133 }
134
135 /* Convert a section of the inode map to serialized form, in preparation for
136  * writing it out to the cloud. */
137 static void bluesky_inode_map_serialize_section(BlueSkyFS *fs,
138                                                 InodeMapRange *range)
139 {
140     if (range->serialized != NULL)
141         return;
142
143     GString *buf = g_string_new("");
144     BlueSkyCloudLog *log = bluesky_cloudlog_new(fs, NULL);
145     log->type = LOGTYPE_INODE_MAP;
146     log->inum = 0;
147
148     GSequenceIter *i = g_sequence_get_begin_iter(range->map_entries);
149     while (!g_sequence_iter_is_end(i)) {
150         InodeMapEntry *entry = (InodeMapEntry *)g_sequence_get(i);
151         uint64_t inum = GUINT64_TO_LE(entry->inum);
152         g_string_append_len(buf, (const char *)&inum, sizeof(inum));
153         bluesky_cloudlog_ref(entry->item);
154         g_array_append_val(log->links, entry->item);
155         i = g_sequence_iter_next(i);
156     }
157
158     log->data = bluesky_string_new_from_gstring(buf);
159     bluesky_cloudlog_unref(range->serialized);
160     range->serialized = log;
161     bluesky_cloudlog_stats_update(log, 1);
162 }
163
164 BlueSkyCloudLog *bluesky_inode_map_serialize(BlueSkyFS *fs)
165 {
166     gboolean updated = FALSE;
167     GString *buf = g_string_new("");
168     BlueSkyCloudLog *log = bluesky_cloudlog_new(fs, NULL);
169     log->type = LOGTYPE_CHECKPOINT;
170     log->inum = 0;
171
172     /* The checkpoint record starts with a magic number, followed by the
173      * version vector which lists the latest sequence number of all other logs
174      * (currently, only the cleaner) which have been seen. */
175     uint64_t magic = GUINT64_TO_LE(CHECKPOINT_MAGIC);
176     g_string_append_len(buf, (const char *)&magic, sizeof(magic));
177     uint32_t versions;
178     versions = GUINT32_TO_LE(fs->log_state->latest_cleaner_seq_seen >= 0);
179     g_string_append_len(buf, (const char *)&versions, sizeof(versions));
180     if (fs->log_state->latest_cleaner_seq_seen >= 0) {
181         versions = GUINT32_TO_LE(BLUESKY_CLOUD_DIR_CLEANER);
182         g_string_append_len(buf, (const char *)&versions, sizeof(versions));
183         versions = GUINT32_TO_LE(fs->log_state->latest_cleaner_seq_seen);
184         g_string_append_len(buf, (const char *)&versions, sizeof(versions));
185     }
186
187     GSequenceIter *i = g_sequence_get_begin_iter(fs->inode_map);
188     while (!g_sequence_iter_is_end(i)) {
189         InodeMapRange *range = (InodeMapRange *)g_sequence_get(i);
190         uint64_t inum = GUINT64_TO_LE(range->start);
191         g_string_append_len(buf, (const char *)&inum, sizeof(inum));
192         inum = GUINT64_TO_LE(range->end);
193         g_string_append_len(buf, (const char *)&inum, sizeof(inum));
194
195         if (range->serialized == NULL) {
196             bluesky_inode_map_serialize_section(fs, range);
197             updated = TRUE;
198         }
199         bluesky_cloudlog_ref(range->serialized);
200         g_array_append_val(log->links, range->serialized);
201         i = g_sequence_iter_next(i);
202     }
203
204     log->data = bluesky_string_new_from_gstring(buf);
205     bluesky_cloudlog_stats_update(log, 1);
206
207     if (updated) {
208         return log;
209     } else {
210         bluesky_cloudlog_unref(log);
211         return NULL;
212     }
213 }
214
215 /* Minimize resources consumed the inode map.  This should only be called once
216  * an updated inode map has been serialized to the cloud, and will replace
217  * cloud log objects with skeletal versions that just reference the data
218  * location in the cloud (rather than pinning all object data in memory). */
219 void bluesky_inode_map_minimize(BlueSkyFS *fs)
220 {
221     GSequenceIter *i = g_sequence_get_begin_iter(fs->inode_map);
222     while (!g_sequence_iter_is_end(i)) {
223         InodeMapRange *range = (InodeMapRange *)g_sequence_get(i);
224
225         if (range->serialized != NULL)
226             bluesky_cloudlog_erase(range->serialized);
227
228         GSequenceIter *j;
229         for (j = g_sequence_get_begin_iter(range->map_entries);
230              !g_sequence_iter_is_end(j); j = g_sequence_iter_next(j))
231         {
232             InodeMapEntry *entry = (InodeMapEntry *)g_sequence_get(j);
233             BlueSkyCloudLog *item = entry->item;
234             if (item != NULL) {
235                 g_mutex_lock(item->lock);
236                 if (g_atomic_int_get(&item->refcount) == 1) {
237                     bluesky_cloudlog_erase(item);
238                 }
239                 g_mutex_unlock(item->lock);
240             } else {
241                 g_warning("Null item for inode map entry %"PRIu64"!",
242                           entry->inum);
243             }
244         }
245
246         i = g_sequence_iter_next(i);
247     }
248 }
249
250 /* Reconstruct the inode map from data stored in the cloud. */
251 static void bluesky_inode_map_deserialize(BlueSkyFS *fs, BlueSkyCloudLog *imap)
252 {
253     g_mutex_lock(imap->lock);
254     bluesky_cloudlog_fetch(imap);
255     g_assert(imap->data != NULL);
256     g_assert(imap->data->len >= 12);
257     uint64_t magic;
258     uint32_t vector_data;
259     memcpy((char *)&magic, imap->data->data, sizeof(magic));
260     g_assert(GUINT64_FROM_LE(magic) == CHECKPOINT_MAGIC);
261     memcpy((char *)&vector_data, imap->data->data + 8, sizeof(vector_data));
262     g_assert(GUINT32_FROM_LE(vector_data) <= 2);
263
264     int vector_size = GUINT32_FROM_LE(vector_data);
265     g_assert(imap->data->len == 16 * imap->links->len + 12 + 8 * vector_size);
266
267     for (int i = 0; i < vector_size; i++) {
268         memcpy((char *)&vector_data, imap->data->data + 12 + 8*i,
269                sizeof(vector_data));
270         if (GUINT32_FROM_LE(vector_data) == 1) {
271             memcpy((char *)&vector_data, imap->data->data + 16 + 8*i,
272                    sizeof(vector_data));
273             fs->log_state->latest_cleaner_seq_seen
274                 = GUINT32_FROM_LE(vector_data);
275             g_print("Deserializing checkpoint: last cleaner sequence is %d\n",
276                     GUINT32_FROM_LE(vector_data));
277         }
278     }
279
280     //uint64_t *inum_range = (uint64_t *)imap->data->data;
281     for (int i = 0; i < imap->links->len; i++) {
282         //int64_t start = GUINT64_FROM_LE(*inum_range++);
283         //int64_t end = GUINT64_FROM_LE(*inum_range++);
284         BlueSkyCloudLog *section = g_array_index(imap->links,
285                                                  BlueSkyCloudLog *, i);
286         g_mutex_lock(section->lock);
287         bluesky_cloudlog_fetch(section);
288         g_print("Loaded cloudlog item (%zd bytes)\n", section->data->len);
289
290         uint64_t *inum = (uint64_t *)section->data->data;
291         for (int j = 0; j < section->links->len; j++) {
292             InodeMapEntry *entry;
293             entry = bluesky_inode_map_lookup(fs->inode_map, *inum, 1);
294             entry->inum = GUINT64_FROM_LE(*inum);
295             bluesky_cloudlog_unref_delayed(entry->item);
296             entry->item = g_array_index(section->links,
297                                         BlueSkyCloudLog *, j);
298             bluesky_cloudlog_ref(entry->item);
299             fs->next_inum = MAX(fs->next_inum, entry->inum + 1);
300             inum++;
301         }
302         g_mutex_unlock(section->lock);
303     }
304     g_mutex_unlock(imap->lock);
305 }
306
307 /* Find the most recent checkpoint record in the cloud and reload inode map
308  * data from it to initialize the filesystem.  Returns a boolean indicating
309  * whether a checkpoint was found and loaded or not. */
310 gboolean bluesky_checkpoint_load(BlueSkyFS *fs)
311 {
312     g_print("Claiming cloud log directory: %d\n",
313             fs->log_state->location.directory);
314     char *prefix = g_strdup_printf("log-%08d",
315                                    fs->log_state->location.directory);
316     char *last_segment = bluesky_store_lookup_last(fs->store, prefix);
317     g_free(prefix);
318     if (last_segment == NULL)
319         return FALSE;
320
321     g_print("Last cloud log segment: %s\n", last_segment);
322     int seq = atoi(last_segment + 13);
323     fs->log_state->location.sequence = seq + 1;
324
325     BlueSkyRCStr *last = bluesky_store_get(fs->store, last_segment);
326     g_free(last_segment);
327     if (last == NULL) {
328         g_warning("Unable to fetch last log segment from cloud!");
329         return FALSE;
330     }
331
332     last = bluesky_string_dup(last);
333     bluesky_cloudlog_decrypt(last->data, last->len, fs->keys, NULL, FALSE);
334
335     /* Scan through the contents of the last log segment to find a checkpoint
336      * record.  We need to do a linear scan since at this point we don't have a
337      * direct pointer; once we have the last commit record then all other data
338      * can be loaded by directly following pointers. */
339     const char *buf = last->data;
340     size_t len = last->len;
341     const char *checkpoint = NULL;
342     size_t checkpoint_size = 0;
343     while (len > sizeof(struct cloudlog_header)) {
344         struct cloudlog_header *header = (struct cloudlog_header *)buf;
345         if (memcmp(header->magic, CLOUDLOG_MAGIC, 4) != 0) {
346             g_warning("Could not parse cloudlog entry!");
347             break;
348         }
349         int size = sizeof(struct cloudlog_header);
350         size += GUINT32_FROM_LE(header->size1);
351         size += GUINT32_FROM_LE(header->size2);
352         size += GUINT32_FROM_LE(header->size3);
353         if (size > len) {
354             g_warning("Cloudlog entry is malformed (size too large)!");
355             break;
356         }
357         if (header->type - '0' == LOGTYPE_CHECKPOINT) {
358             checkpoint = buf;
359             checkpoint_size = size;
360         }
361         buf += size;
362         len -= size;
363     }
364
365     if (checkpoint_size == 0) {
366         g_error("Unable to locate checkpoint record!\n");
367     }
368
369     g_print("Found checkpoint record at %zd (size %zd)\n",
370             checkpoint - last->data, checkpoint_size);
371
372     /* Bootstrap the loading process by manually setting the location of this
373      * log item. */
374     BlueSkyCloudLog *commit;
375     commit = bluesky_cloudlog_get(fs,
376                                   ((struct cloudlog_header *)checkpoint)->id);
377     g_mutex_lock(commit->lock);
378     commit->location_flags |= CLOUDLOG_CLOUD;
379     commit->location.directory = 0;
380     commit->location.sequence = seq;
381     commit->location.offset = checkpoint - last->data;
382     commit->location.size = checkpoint_size;
383     g_mutex_unlock(commit->lock);
384     bluesky_cloudlog_stats_update(commit, 1);
385
386     bluesky_inode_map_deserialize(fs, commit);
387     bluesky_cloudlog_unref(commit);
388
389     return TRUE;
390 }