Fix Amazon S3 store_lookup_last implementation.
[bluesky.git] / bluesky / imap.c
1 /* Blue Sky: File Systems in the Cloud
2  *
3  * Copyright (C) 2009  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * TODO: Licensing
7  */
8
9 #include <stdio.h>
10 #include <stdlib.h>
11 #include <stdint.h>
12 #include <inttypes.h>
13 #include <glib.h>
14 #include <string.h>
15
16 #include "bluesky-private.h"
17
18 /* Inode maps.  There is both an in-memory representation as well as the
19  * serialized form in the cloud.
20  *
21  * The inode map is broken up into pieces by partitioning in the inode number
22  * space.  The checkpoint region will contain pointers to each of these map
23  * ranges. */
24
25 /* Roughly how many inodes to try to put in each inode map range. */
26 static int target_imap_size = 4096;
27
28 /* Comparison function for lookuping up inode map entries.  Reads a 64-bit
29  * value from the pointed-to address to do the comparison, so we require that
30  * the InodeMapEntry and InodeMapRange structures start with the 64-bit value
31  * they are sorted by. */
32 static gint compare(gconstpointer a, gconstpointer b, gpointer user_data)
33 {
34     uint64_t x = *(uint64_t *)a;
35     uint64_t y = *(uint64_t *)b;
36
37     if (x < y)
38         return -1;
39     else if (x > y)
40         return 1;
41     else
42         return 0;
43 }
44
45 /* Look up the inode map entry for the given inode number.  If action is +1,
46  * create a new entry if it does not already exist; if it is -1 then delete the
47  * entry instead.  If 0, return the entry if it is already present.  A non-zero
48  * action value will mark the inode map range as dirty. */
49 InodeMapEntry *bluesky_inode_map_lookup(GSequence *inode_map, uint64_t inum,
50                                         int action)
51 {
52     GSequenceIter *i;
53
54     /* First, scan through to find the inode map section that covers the
55      * appropriate range.  Create one if it does not exist but we were asked to
56      * create an entry. */
57     InodeMapRange *range = NULL;
58
59     i = g_sequence_search(inode_map, &inum, compare, NULL);
60     i = g_sequence_iter_prev(i);
61     if (!g_sequence_iter_is_end(i))
62         range = (InodeMapRange *)g_sequence_get(i);
63     if (range == NULL || inum < range->start || inum > range->end) {
64         if (action <= 0)
65             return NULL;
66
67         /* Create a new range.  First, determine bounds on the range enpoints
68          * based on neighboring ranges.  Then, shrink the size of the range to
69          * a reasonable size that covers the needed inode number. */
70         range = g_new0(InodeMapRange, 1);
71         range->start = 0;
72         range->end = G_MAXUINT64;
73         range->map_entries = g_sequence_new(NULL);
74         range->serialized = NULL;
75
76         g_print("Creating inode map range, 1: start=%"PRIu64" end=%"PRIu64"\n",
77                 range->start, range->end);
78
79         if (!g_sequence_iter_is_begin(i) && !g_sequence_iter_is_end(i))
80             range->start = ((InodeMapRange *)g_sequence_get(i))->end + 1;
81         i = g_sequence_iter_next(i);
82         if (!g_sequence_iter_is_end(i))
83             range->end = ((InodeMapRange *)g_sequence_get(i))->start - 1;
84
85         g_print("Creating inode map range, 2: start=%"PRIu64" end=%"PRIu64"\n",
86                 range->start, range->end);
87         g_assert(inum >= range->start && inum <= range->end);
88
89         range->start = MAX(range->start,
90                            inum & ~(uint64_t)(target_imap_size - 1));
91         range->end = MIN(range->end, range->start + target_imap_size - 1);
92
93         g_print("Creating inode map range, 3: start=%"PRIu64" end=%"PRIu64"\n",
94                 range->start, range->end);
95
96         g_sequence_insert_sorted(inode_map, range, compare, NULL);
97     }
98
99     /* Next, try to find the entry within this range of the inode map. */
100     InodeMapEntry *entry = NULL;
101     i = g_sequence_search(range->map_entries, &inum, compare, NULL);
102     i = g_sequence_iter_prev(i);
103     if (!g_sequence_iter_is_end(i))
104         entry = (InodeMapEntry *)g_sequence_get(i);
105     if (entry == NULL || inum != entry->inum) {
106         if (action <= 0)
107             return NULL;
108
109         entry = g_new0(InodeMapEntry, 1);
110         entry->inum = inum;
111         g_sequence_insert_sorted(range->map_entries, entry, compare, NULL);
112
113         if (bluesky_verbose)
114             g_print("Created inode map entry for %"PRIu64"\n", inum);
115     }
116
117     if (action != 0) {
118         bluesky_cloudlog_unref_delayed(range->serialized);
119         range->serialized = NULL;
120     }
121
122     /* Were we requested to delete the item? */
123     if (action < 0) {
124         g_sequence_remove(i);
125         entry = NULL;
126     }
127
128     return entry;
129 }
130
131 /* Convert a section of the inode map to serialized form, in preparation for
132  * writing it out to the cloud. */
133 static void bluesky_inode_map_serialize_section(BlueSkyFS *fs,
134                                                 InodeMapRange *range)
135 {
136     if (range->serialized != NULL)
137         return;
138
139     GString *buf = g_string_new("");
140     BlueSkyCloudLog *log = bluesky_cloudlog_new(fs, NULL);
141     log->type = LOGTYPE_INODE_MAP;
142     log->inum = 0;
143
144     GSequenceIter *i = g_sequence_get_begin_iter(range->map_entries);
145     while (!g_sequence_iter_is_end(i)) {
146         InodeMapEntry *entry = (InodeMapEntry *)g_sequence_get(i);
147         uint64_t inum = GUINT64_TO_LE(entry->inum);
148         g_string_append_len(buf, (const char *)&inum, sizeof(inum));
149         bluesky_cloudlog_ref(entry->item);
150         g_array_append_val(log->links, entry->item);
151         i = g_sequence_iter_next(i);
152     }
153
154     log->data = bluesky_string_new_from_gstring(buf);
155     bluesky_cloudlog_unref(range->serialized);
156     range->serialized = log;
157     bluesky_cloudlog_stats_update(log, 1);
158 }
159
160 BlueSkyCloudLog *bluesky_inode_map_serialize(BlueSkyFS *fs)
161 {
162     gboolean updated = FALSE;
163     GString *buf = g_string_new("");
164     BlueSkyCloudLog *log = bluesky_cloudlog_new(fs, NULL);
165     log->type = LOGTYPE_CHECKPOINT;
166     log->inum = 0;
167
168     GSequenceIter *i = g_sequence_get_begin_iter(fs->inode_map);
169     while (!g_sequence_iter_is_end(i)) {
170         InodeMapRange *range = (InodeMapRange *)g_sequence_get(i);
171         uint64_t inum = GUINT64_TO_LE(range->start);
172         g_string_append_len(buf, (const char *)&inum, sizeof(inum));
173         inum = GUINT64_TO_LE(range->end);
174         g_string_append_len(buf, (const char *)&inum, sizeof(inum));
175
176         if (range->serialized == NULL) {
177             bluesky_inode_map_serialize_section(fs, range);
178             updated = TRUE;
179         }
180         bluesky_cloudlog_ref(range->serialized);
181         g_array_append_val(log->links, range->serialized);
182         i = g_sequence_iter_next(i);
183     }
184
185     log->data = bluesky_string_new_from_gstring(buf);
186     bluesky_cloudlog_stats_update(log, 1);
187
188     if (updated) {
189         return log;
190     } else {
191         bluesky_cloudlog_unref(log);
192         return NULL;
193     }
194 }
195
196 /* Minimize resources consumed the inode map.  This should only be called once
197  * an updated inode map has been serialized to the cloud, and will replace
198  * cloud log objects with skeletal versions that just reference the data
199  * location in the cloud (rather than pinning all object data in memory). */
200 void bluesky_inode_map_minimize(BlueSkyFS *fs)
201 {
202     GSequenceIter *i = g_sequence_get_begin_iter(fs->inode_map);
203     while (!g_sequence_iter_is_end(i)) {
204         InodeMapRange *range = (InodeMapRange *)g_sequence_get(i);
205
206         if (range->serialized != NULL)
207             bluesky_cloudlog_erase(range->serialized);
208
209         GSequenceIter *j;
210         for (j = g_sequence_get_begin_iter(range->map_entries);
211              !g_sequence_iter_is_end(j); j = g_sequence_iter_next(j))
212         {
213             InodeMapEntry *entry = (InodeMapEntry *)g_sequence_get(j);
214             BlueSkyCloudLog *item = entry->item;
215             if (item != NULL) {
216                 g_mutex_lock(item->lock);
217                 if (g_atomic_int_get(&item->refcount) == 1) {
218                     bluesky_cloudlog_erase(item);
219                 }
220                 g_mutex_unlock(item->lock);
221             } else {
222                 g_warning("Null item for inode map entry %"PRIu64"!",
223                           entry->inum);
224             }
225         }
226
227         i = g_sequence_iter_next(i);
228     }
229 }
230
231 /* Reconstruct the inode map from data stored in the cloud. */
232 static void bluesky_inode_map_deserialize(BlueSkyFS *fs, BlueSkyCloudLog *imap)
233 {
234     g_mutex_lock(imap->lock);
235     bluesky_cloudlog_fetch(imap);
236     g_assert(imap->data != NULL);
237     g_assert(imap->data->len == 16 * imap->links->len);
238     //uint64_t *inum_range = (uint64_t *)imap->data->data;
239     for (int i = 0; i < imap->links->len; i++) {
240         //int64_t start = GUINT64_FROM_LE(*inum_range++);
241         //int64_t end = GUINT64_FROM_LE(*inum_range++);
242         BlueSkyCloudLog *section = g_array_index(imap->links,
243                                                  BlueSkyCloudLog *, i);
244         g_mutex_lock(section->lock);
245         bluesky_cloudlog_fetch(section);
246         g_print("Loaded cloudlog item (%zd bytes)\n", section->data->len);
247
248         uint64_t *inum = (uint64_t *)section->data->data;
249         for (int j = 0; j < section->links->len; j++) {
250             InodeMapEntry *entry;
251             entry = bluesky_inode_map_lookup(fs->inode_map, *inum, 1);
252             entry->inum = GUINT64_FROM_LE(*inum);
253             bluesky_cloudlog_unref_delayed(entry->item);
254             entry->item = g_array_index(section->links,
255                                         BlueSkyCloudLog *, j);
256             bluesky_cloudlog_ref(entry->item);
257             fs->next_inum = MAX(fs->next_inum, entry->inum + 1);
258             inum++;
259         }
260         g_mutex_unlock(section->lock);
261     }
262     g_mutex_unlock(imap->lock);
263 }
264
265 /* Find the most recent checkpoint record in the cloud and reload inode map
266  * data from it to initialize the filesystem.  Returns a boolean indicating
267  * whether a checkpoint was found and loaded or not. */
268 gboolean bluesky_checkpoint_load(BlueSkyFS *fs)
269 {
270     char *last_segment = bluesky_store_lookup_last(fs->store, "log-");
271     if (last_segment == NULL)
272         return FALSE;
273
274     g_print("Last cloud log segment: %s\n", last_segment);
275     int seq = atoi(last_segment + 13);
276     fs->log_state->location.sequence = seq + 1;
277
278     BlueSkyRCStr *last = bluesky_store_get(fs->store, last_segment);
279     g_free(last_segment);
280     if (last == NULL) {
281         g_warning("Unable to fetch last log segment from cloud!");
282         return FALSE;
283     }
284
285     last = bluesky_string_dup(last);
286     bluesky_cloudlog_decrypt(last->data, last->len, fs->keys);
287
288     /* Scan through the contents of the last log segment to find a checkpoint
289      * record.  We need to do a linear scan since at this point we don't have a
290      * direct pointer; once we have the last commit record then all other data
291      * can be loaded by directly following pointers. */
292     const char *buf = last->data;
293     size_t len = last->len;
294     const char *checkpoint = NULL;
295     size_t checkpoint_size = 0;
296     while (len > sizeof(struct cloudlog_header)) {
297         struct cloudlog_header *header = (struct cloudlog_header *)buf;
298         if (memcmp(header->magic, CLOUDLOG_MAGIC, 4) != 0) {
299             g_warning("Could not parse cloudlog entry!");
300             break;
301         }
302         int size = sizeof(struct cloudlog_header);
303         size += GUINT32_FROM_LE(header->size1);
304         size += GUINT32_FROM_LE(header->size2);
305         size += GUINT32_FROM_LE(header->size3);
306         if (size > len) {
307             g_warning("Cloudlog entry is malformed (size too large)!");
308             break;
309         }
310         if (header->type - '0' == LOGTYPE_CHECKPOINT) {
311             checkpoint = buf;
312             checkpoint_size = size;
313         }
314         buf += size;
315         len -= size;
316     }
317
318     if (checkpoint_size == 0) {
319         g_error("Unable to locate checkpoint record!\n");
320     }
321
322     g_print("Found checkpoint record at %zd (size %zd)\n",
323             checkpoint - last->data, checkpoint_size);
324
325     /* Bootstrap the loading process by manually setting the location of this
326      * log item. */
327     BlueSkyCloudLog *commit;
328     commit = bluesky_cloudlog_get(fs,
329                                   ((struct cloudlog_header *)checkpoint)->id);
330     g_mutex_lock(commit->lock);
331     commit->location_flags |= CLOUDLOG_CLOUD;
332     commit->location.directory = 0;
333     commit->location.sequence = seq;
334     commit->location.offset = checkpoint - last->data;
335     commit->location.size = checkpoint_size;
336     g_mutex_unlock(commit->lock);
337     bluesky_cloudlog_stats_update(commit, 1);
338
339     bluesky_inode_map_deserialize(fs, commit);
340     bluesky_cloudlog_unref(commit);
341
342     return TRUE;
343 }