d7e29ebcffbe6066a2a1f1c0883cff20f9328483
[bluesky.git] / bluesky / cleaner.c
1 /* Blue Sky: File Systems in the Cloud
2  *
3  * Copyright (C) 2010  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * TODO: Licensing
7  */
8
9 #include <stdio.h>
10 #include <stdlib.h>
11 #include <stdint.h>
12 #include <inttypes.h>
13 #include <glib.h>
14 #include <string.h>
15
16 #include "bluesky-private.h"
17
18 /* Proxy component of the file system cleaner.  This consists effectively of
19  * code for merging multiple versions of the file system logs: that generated
20  * by us and that generated by the in-cloud cleaner.  Other logic, such as for
21  * rewriting log segments where needed and deleting old segments, is handled by
22  * the in-cloud cleaner component. */
23
24 /* A specialized function for loading an object from the cloud, used just by
25  * the cleaner.  When the cleaner runs, it will produce new objects that have
26  * the same object ID as before, differing only in the outgoing link locations
27  * as a result of cleaning.  We can't use the normal functions for loading
28  * objects since those can only deal with one version of an object.  Instead,
29  * we load the cleaned object with this function, and then merge the state into
30  * the official object as needed.
31  *
32  * The bluesky_cleaner_deserialize function is like
33  * bluesky_deserialize_cloudlog, but does a more limited deserialization and
34  * keeps the returned item entirely separate from any in-memory BlueSkyCloudLog
35  * objects. */
36
37 typedef struct {
38     BlueSkyCloudLogType type;
39     BlueSkyCloudID id;
40     BlueSkyCloudPointer location;
41     uint64_t inum;
42     GArray *links;
43 } BlueSkyCleanerItem;
44
45 typedef struct {
46     BlueSkyCloudID id;
47     BlueSkyCloudPointer location;
48 } BlueSkyCleanerLink;
49
50 static BlueSkyCleanerItem *bluesky_cleaner_deserialize(BlueSkyRCStr *raw)
51 {
52     const char *data = raw->data;
53     size_t len = raw->len;
54     const char *data1, *data2, *data3;
55     size_t len1, len2, len3;
56
57     g_assert(len > 4);
58     if (len < sizeof(struct cloudlog_header)
59         || memcmp(data, CLOUDLOG_MAGIC, 4) != 0)
60     {
61         g_warning("Deserializing garbage cloud log item from cleaner!");
62         return NULL;
63     };
64
65     struct cloudlog_header *header = (struct cloudlog_header *)data;
66     len1 = GUINT32_FROM_LE(header->size1);
67     len2 = GUINT32_FROM_LE(header->size2);
68     len3 = GUINT32_FROM_LE(header->size3);
69     data1 = data + sizeof(struct cloudlog_header);
70     data2 = data1 + len1;
71     data3 = data2 + len2;
72     g_assert(data3 + len3 - data <= len);
73
74     BlueSkyCleanerItem *item = g_new0(BlueSkyCleanerItem, 1);
75     item->type = header->type - '0';
76     item->inum = GUINT64_FROM_LE(header->inum);
77     memcpy(&item->id, &header->id, sizeof(BlueSkyCloudID));
78
79     int link_count = len2 / sizeof(BlueSkyCloudID);
80     g_print("Outgoing links: %d\n", link_count);
81     item->links = g_array_new(FALSE, TRUE, sizeof(BlueSkyCleanerLink));
82     for (int i = 0; i < link_count; i++) {
83         BlueSkyCleanerLink link;
84
85         g_assert(len2 >= sizeof(link.id));
86         memcpy(&link.id, data2, sizeof(link.id));
87         data2 += sizeof(link.id); len2 -= sizeof(link.id);
88
89         g_assert(len3 >= sizeof(link.location));
90         memcpy(&link.location, data3, sizeof(link.location));
91         data3 += sizeof(link.location); len3 -= sizeof(link.location);
92
93         g_array_append_val(item->links, link);
94     }
95
96     return item;
97 }
98
99 static void bluesky_cleaner_item_free(BlueSkyCleanerItem *item)
100 {
101     if (item == NULL)
102         return;
103     g_array_unref(item->links);
104     g_free(item);
105 }
106
107 /* Check the cleaner's logs to find the a more recent checkpoint record.  This
108  * should be called occasionally to see if the cleaner has done any work since
109  * our last check. */
110 static BlueSkyCleanerItem *bluesky_cleaner_find_checkpoint(BlueSkyFS *fs)
111 {
112     char *prefix = g_strdup_printf("log-%08d", BLUESKY_CLOUD_DIR_CLEANER);
113     char *last_segment = bluesky_store_lookup_last(fs->store, prefix);
114     g_free(prefix);
115     if (last_segment == NULL)
116         return NULL;
117
118     g_print("Last cloud log segment: %s\n", last_segment);
119     int seq = atoi(last_segment + 13);
120     g_free(last_segment);
121
122     if (seq <= fs->log_state->latest_cleaner_seq_seen)
123         return NULL;
124
125     g_print("New log segment appeared in cleaner directory: %d\n", seq);
126
127     BlueSkyCacheFile *cachefile;
128     cachefile = bluesky_cachefile_lookup(fs, BLUESKY_CLOUD_DIR_CLEANER, seq,
129                                          TRUE);
130     while (!cachefile->complete)
131         g_cond_wait(cachefile->cond, cachefile->lock);
132
133     g_print("Downloaded latest cleaner segment.\n");
134
135     int64_t offset = -1, length = 0;
136     while (TRUE) {
137         const BlueSkyRangesetItem *item;
138         item = bluesky_rangeset_lookup_next(cachefile->items, offset + 1);
139         if (item == NULL)
140             break;
141         offset = item->start;
142         length = item->length;
143     }
144
145     if (length == 0) {
146         bluesky_cachefile_unref(cachefile);
147         g_mutex_unlock(cachefile->lock);
148         return NULL;
149     }
150
151     g_print("Found a cleaner checkpoint record.\n");
152
153     BlueSkyRCStr *data = bluesky_cachefile_map_raw(cachefile, offset, length);
154     bluesky_cachefile_unref(cachefile);
155     g_mutex_unlock(cachefile->lock);
156
157     BlueSkyCleanerItem *checkpoint = bluesky_cleaner_deserialize(data);
158     bluesky_string_unref(data);
159
160     return checkpoint;
161 }
162
163 static BlueSkyCleanerItem *cleaner_load_item(BlueSkyFS *fs,
164                                              BlueSkyCloudPointer location)
165 {
166     g_print("Loading item %d/%d/%d...\n", location.directory, location.sequence, location.offset);
167
168     BlueSkyCacheFile *cachefile;
169     cachefile = bluesky_cachefile_lookup(fs, location.directory,
170                                          location.sequence, TRUE);
171     while (!cachefile->complete)
172         g_cond_wait(cachefile->cond, cachefile->lock);
173
174     /* TODO: Ought to check that we are loading an item which validated? */
175     BlueSkyRCStr *data = bluesky_cachefile_map_raw(cachefile, location.offset,
176                                                    location.size);
177     bluesky_cachefile_unref(cachefile);
178     g_mutex_unlock(cachefile->lock);
179
180     BlueSkyCleanerItem *item = bluesky_cleaner_deserialize(data);
181     bluesky_string_unref(data);
182
183     return item;
184 }
185
186 /* Does the item at the given cloud location from the cleaner need merging?  An
187  * item in the primary log does not need to be merged, as by definition we
188  * already know about it.  Similarly, old items in the cleaner's log--those
189  * that we have already seen from a previous merge--do not need to be mergd
190  * again. */
191 gboolean needs_merging(BlueSkyFS *fs, BlueSkyCloudPointer location)
192 {
193     if (location.directory == BLUESKY_CLOUD_DIR_PRIMARY)
194         return FALSE;
195
196     if (location.directory == BLUESKY_CLOUD_DIR_CLEANER
197         && location.sequence <= fs->log_state->latest_cleaner_seq_seen)
198         return FALSE;
199
200     return TRUE;
201 }
202
203 static void merge_inode(BlueSkyFS *fs, BlueSkyCleanerItem *cleaner_inode)
204 {
205     /* There are two versions we are concerned with: cleaner_ is for the data
206      * stored in the cleaner's log, and proxy_ is for our most recent version,
207      * which the cleaner might or might not know about. */
208     uint64_t inum = cleaner_inode->inum;
209
210     g_print("Merging inode %"PRIu64" from cleaner\n", inum);
211
212     g_mutex_lock(fs->lock);
213     InodeMapEntry *entry = bluesky_inode_map_lookup(fs->inode_map, inum, 0);
214     if (entry == NULL) {
215         /* Inode doesn't exist: it was probably deleted so keep it that way. */
216         g_mutex_unlock(fs->lock);
217         return;
218     }
219
220     BlueSkyCloudLog *proxy_item = entry->item;
221     g_mutex_lock(proxy_item->lock);
222     BlueSkyCloudPointer proxy_location = entry->item->location;
223     BlueSkyCloudID proxy_id = entry->item->id;
224     g_mutex_unlock(proxy_item->lock);
225     g_mutex_unlock(fs->lock);
226
227     /* If the cleaner and the proxy have the same ID, and if the proxy's
228      * in-memory copy is unmodified, then we can simply use the cleaner's
229      * version of the inode. */
230     /* TODO */
231
232     /* Merge file data together for a regular file.  Iterate over the file
233      * blocks in the proxy's copy of the inode.  If the block ID is unchanged
234      * in the cleaner but the location was updated, then update the location in
235      * the cleaner because the block was relocated.  Otherwise ignore the
236      * cleaner's version for that block because the proxy's information is more
237      * recent. */
238     BlueSkyInode *proxy_inode = bluesky_get_inode(fs, inum);
239     g_mutex_lock(proxy_inode->lock);
240     if (proxy_inode->type == BLUESKY_REGULAR) {
241         for (int i = 0; i < proxy_inode->blocks->len; i++) {
242             BlueSkyBlock *b = &g_array_index(proxy_inode->blocks,
243                                              BlueSkyBlock, i);
244             if (b->type != BLUESKY_BLOCK_REF)
245                 continue;
246             if (i >= cleaner_inode->links->len)
247                 continue;
248             BlueSkyCleanerLink *cb = &g_array_index(cleaner_inode->links,
249                                                     BlueSkyCleanerLink, i);
250             if (memcmp(&b->ref->id, &cb->id, sizeof(BlueSkyCloudPointer)) != 0)
251                 continue;
252
253             g_print("  Updating block %d pointer\n", i);
254             b->ref->location = cb->location;
255         }
256     }
257     g_mutex_unlock(proxy_inode->lock);
258
259     /* Mark the inode as modified so it will get written back to the cloud.  We
260      * don't actually need to force a synchronous write to our local journal
261      * since there have been no logical modifications. */
262 #if 0
263     g_mutex_lock(inode->fs->lock);
264     bluesky_list_unlink(&inode->fs->unlogged_list, inode->unlogged_list);
265     inode->unlogged_list = bluesky_list_prepend(&inode->fs->unlogged_list, inode);
266     g_mutex_unlock(inode->fs->lock);
267 #endif
268 }
269
270 void bluesky_cleaner_merge(BlueSkyFS *fs)
271 {
272     BlueSkyCleanerItem *checkpoint = bluesky_cleaner_find_checkpoint(fs);
273
274     if (checkpoint == NULL) {
275         g_warning("Unable to load cleaner checkpoint record!");
276         return;
277     }
278
279     /* Iterate over each of the inode map sections in the checkpoint */
280     for (int i = 0; i < checkpoint->links->len; i++) {
281         BlueSkyCleanerLink *link = &g_array_index(checkpoint->links,
282                                                   BlueSkyCleanerLink, i);
283         if (!needs_merging(fs, link->location))
284             continue;
285
286         BlueSkyCleanerItem *imap = cleaner_load_item(fs, link->location);
287         if (imap == NULL) {
288             g_warning("Unable to load cleaner inode map");
289             continue;
290         }
291
292         /* Iterate over all inodes found in the inode map section */
293         for (int j = 0; j < imap->links->len; j++) {
294             BlueSkyCleanerLink *link = &g_array_index(imap->links,
295                                                       BlueSkyCleanerLink, j);
296             if (!needs_merging(fs, link->location))
297                 continue;
298             BlueSkyCleanerItem *inode = cleaner_load_item(fs, link->location);
299             if (inode != NULL) {
300                 merge_inode(fs, inode);
301             }
302             bluesky_cleaner_item_free(inode);
303         }
304
305         bluesky_cleaner_item_free(imap);
306     }
307
308     bluesky_cleaner_item_free(checkpoint);
309 }