5f241d4505c18fdaea6f9156f39f5d9d78f55810
[bluesky.git] / bluesky / cleaner.c
1 /* Blue Sky: File Systems in the Cloud
2  *
3  * Copyright (C) 2010  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * TODO: Licensing
7  */
8
9 #include <stdio.h>
10 #include <stdlib.h>
11 #include <stdint.h>
12 #include <inttypes.h>
13 #include <glib.h>
14 #include <string.h>
15
16 #include "bluesky-private.h"
17
18 /* Proxy component of the file system cleaner.  This consists effectively of
19  * code for merging multiple versions of the file system logs: that generated
20  * by us and that generated by the in-cloud cleaner.  Other logic, such as for
21  * rewriting log segments where needed and deleting old segments, is handled by
22  * the in-cloud cleaner component. */
23
24 /* A specialized function for loading an object from the cloud, used just by
25  * the cleaner.  When the cleaner runs, it will produce new objects that have
26  * the same object ID as before, differing only in the outgoing link locations
27  * as a result of cleaning.  We can't use the normal functions for loading
28  * objects since those can only deal with one version of an object.  Instead,
29  * we load the cleaned object with this function, and then merge the state into
30  * the official object as needed.
31  *
32  * The bluesky_cleaner_deserialize function is like
33  * bluesky_deserialize_cloudlog, but does a more limited deserialization and
34  * keeps the returned item entirely separate from any in-memory BlueSkyCloudLog
35  * objects. */
36
37 typedef struct {
38     BlueSkyCloudLogType type;
39     BlueSkyCloudID id;
40     BlueSkyCloudPointer location;
41     uint64_t inum;
42     GArray *links;
43 } BlueSkyCleanerItem;
44
45 typedef struct {
46     BlueSkyCloudID id;
47     BlueSkyCloudPointer location;
48 } BlueSkyCleanerLink;
49
50 static BlueSkyCleanerItem *bluesky_cleaner_deserialize(BlueSkyRCStr *raw)
51 {
52     const char *data = raw->data;
53     size_t len = raw->len;
54     const char *data1, *data2, *data3;
55     size_t len1, len2, len3;
56
57     g_assert(len > 4);
58     if (len < sizeof(struct cloudlog_header)
59         || memcmp(data, CLOUDLOG_MAGIC, 4) != 0)
60     {
61         g_warning("Deserializing garbage cloud log item from cleaner!");
62         return NULL;
63     };
64
65     struct cloudlog_header *header = (struct cloudlog_header *)data;
66     len1 = GUINT32_FROM_LE(header->size1);
67     len2 = GUINT32_FROM_LE(header->size2);
68     len3 = GUINT32_FROM_LE(header->size3);
69     data1 = data + sizeof(struct cloudlog_header);
70     data2 = data1 + len1;
71     data3 = data2 + len2;
72     g_assert(data3 + len3 - data <= len);
73
74     BlueSkyCleanerItem *item = g_new0(BlueSkyCleanerItem, 1);
75     item->type = header->type - '0';
76     item->inum = GUINT64_FROM_LE(header->inum);
77     memcpy(&item->id, &header->id, sizeof(BlueSkyCloudID));
78
79     int link_count = len2 / sizeof(BlueSkyCloudID);
80     g_print("Outgoing links: %d\n", link_count);
81     item->links = g_array_new(FALSE, TRUE, sizeof(BlueSkyCleanerLink));
82     for (int i = 0; i < link_count; i++) {
83         BlueSkyCleanerLink link;
84
85         g_assert(len2 >= sizeof(link.id));
86         memcpy(&link.id, data2, sizeof(link.id));
87         data2 += sizeof(link.id); len2 -= sizeof(link.id);
88
89         g_assert(len3 >= sizeof(link.location));
90         memcpy(&link.location, data3, sizeof(link.location));
91         data3 += sizeof(link.location); len3 -= sizeof(link.location);
92
93         g_array_append_val(item->links, link);
94     }
95
96     return item;
97 }
98
99 static void bluesky_cleaner_item_free(BlueSkyCleanerItem *item)
100 {
101     if (item == NULL)
102         return;
103     g_array_unref(item->links);
104     g_free(item);
105 }
106
107 /* Check the cleaner's logs to find the a more recent checkpoint record.  This
108  * should be called occasionally to see if the cleaner has done any work since
109  * our last check. */
110 static BlueSkyCleanerItem *bluesky_cleaner_find_checkpoint(BlueSkyFS *fs)
111 {
112     char *prefix = g_strdup_printf("log-%08d", BLUESKY_CLOUD_DIR_CLEANER);
113     char *last_segment = bluesky_store_lookup_last(fs->store, prefix);
114     g_free(prefix);
115     if (last_segment == NULL)
116         return NULL;
117
118     g_print("Last cloud log segment: %s\n", last_segment);
119     int seq = atoi(last_segment + 13);
120     g_free(last_segment);
121
122     if (seq <= fs->log_state->latest_cleaner_seq_seen)
123         return NULL;
124
125     g_print("New log segment appeared in cleaner directory: %d\n", seq);
126
127     BlueSkyCacheFile *cachefile;
128     cachefile = bluesky_cachefile_lookup(fs, BLUESKY_CLOUD_DIR_CLEANER, seq,
129                                          TRUE);
130     while (!cachefile->complete)
131         g_cond_wait(cachefile->cond, cachefile->lock);
132
133     g_print("Downloaded latest cleaner segment.\n");
134
135     int64_t offset = -1, length = 0;
136     while (TRUE) {
137         const BlueSkyRangesetItem *item;
138         item = bluesky_rangeset_lookup_next(cachefile->items, offset + 1);
139         if (item == NULL)
140             break;
141         offset = item->start;
142         length = item->length;
143     }
144
145     if (length == 0) {
146         bluesky_cachefile_unref(cachefile);
147         g_mutex_unlock(cachefile->lock);
148         return NULL;
149     }
150
151     g_print("Found a cleaner checkpoint record.\n");
152
153     BlueSkyRCStr *data = bluesky_cachefile_map_raw(cachefile, offset, length);
154     bluesky_cachefile_unref(cachefile);
155     g_mutex_unlock(cachefile->lock);
156
157     BlueSkyCleanerItem *checkpoint = bluesky_cleaner_deserialize(data);
158     checkpoint->location.directory = BLUESKY_CLOUD_DIR_CLEANER;
159     checkpoint->location.directory = seq;
160     bluesky_string_unref(data);
161
162     return checkpoint;
163 }
164
165 static BlueSkyCleanerItem *cleaner_load_item(BlueSkyFS *fs,
166                                              BlueSkyCloudPointer location)
167 {
168     g_print("Loading item %d/%d/%d...\n", location.directory, location.sequence, location.offset);
169
170     BlueSkyCacheFile *cachefile;
171     cachefile = bluesky_cachefile_lookup(fs, location.directory,
172                                          location.sequence, TRUE);
173     while (!cachefile->complete)
174         g_cond_wait(cachefile->cond, cachefile->lock);
175
176     /* TODO: Ought to check that we are loading an item which validated? */
177     BlueSkyRCStr *data = bluesky_cachefile_map_raw(cachefile, location.offset,
178                                                    location.size);
179     bluesky_cachefile_unref(cachefile);
180     g_mutex_unlock(cachefile->lock);
181
182     BlueSkyCleanerItem *item = bluesky_cleaner_deserialize(data);
183     bluesky_string_unref(data);
184
185     return item;
186 }
187
188 /* Does the item at the given cloud location from the cleaner need merging?  An
189  * item in the primary log does not need to be merged, as by definition we
190  * already know about it.  Similarly, old items in the cleaner's log--those
191  * that we have already seen from a previous merge--do not need to be mergd
192  * again. */
193 gboolean needs_merging(BlueSkyFS *fs, BlueSkyCloudPointer location)
194 {
195     if (location.directory == BLUESKY_CLOUD_DIR_PRIMARY)
196         return FALSE;
197
198     if (location.directory == BLUESKY_CLOUD_DIR_CLEANER
199         && location.sequence <= fs->log_state->latest_cleaner_seq_seen)
200         return FALSE;
201
202     return TRUE;
203 }
204
205 /* For an inode which has been modified by the cleaner and must be flushed out
206  * to cloud storage, mark it as appropriately dirty.  We will bypass writingt
207  * the inode to the journal if possible--but if there have ben other
208  * uncommitted changes besides what the cleaner did then we will force a
209  * journal write as well since the cloud shouldn't contain newer data than the
210  * journal.  Inode must be locked. */
211 static void cleaner_flush_inode(BlueSkyInode *inode)
212 {
213     // if (inode->change_commit != inode->change_count) {
214     if (TRUE) {
215         /* bluesky_inode_start_sync schedules a flush to the cloud so we're all
216          * done. */
217         bluesky_inode_start_sync(inode);
218         return;
219     }
220
221     g_assert(inode->unlogged_list == NULL);
222
223     bluesky_list_unlink(&inode->fs->dirty_list, inode->dirty_list);
224     inode->dirty_list = bluesky_list_prepend(&inode->fs->dirty_list, inode);
225     inode->change_cloud = inode->change_count;
226 }
227
228 static void merge_inode(BlueSkyFS *fs, BlueSkyCleanerItem *cleaner_inode)
229 {
230     /* There are two versions we are concerned with: cleaner_ is for the data
231      * stored in the cleaner's log, and proxy_ is for our most recent version,
232      * which the cleaner might or might not know about. */
233     uint64_t inum = cleaner_inode->inum;
234
235     g_print("Merging inode %"PRIu64" from cleaner\n", inum);
236
237     g_mutex_lock(fs->lock);
238     InodeMapEntry *entry = bluesky_inode_map_lookup(fs->inode_map, inum, 0);
239     if (entry == NULL) {
240         /* Inode doesn't exist: it was probably deleted so keep it that way. */
241         g_mutex_unlock(fs->lock);
242         return;
243     }
244
245     BlueSkyCloudLog *proxy_item = entry->item;
246     g_mutex_lock(proxy_item->lock);
247     //BlueSkyCloudPointer proxy_location = entry->item->location;
248     //BlueSkyCloudID proxy_id = entry->item->id;
249     g_mutex_unlock(proxy_item->lock);
250     g_mutex_unlock(fs->lock);
251
252     /* If the cleaner and the proxy have the same ID, and if the proxy's
253      * in-memory copy is unmodified, then we can simply use the cleaner's
254      * version of the inode. */
255     /* TODO */
256
257     /* Merge file data together for a regular file.  Iterate over the file
258      * blocks in the proxy's copy of the inode.  If the block ID is unchanged
259      * in the cleaner but the location was updated, then update the location in
260      * the cleaner because the block was relocated.  Otherwise ignore the
261      * cleaner's version for that block because the proxy's information is more
262      * recent. */
263     BlueSkyInode *proxy_inode = bluesky_get_inode(fs, inum);
264     g_mutex_lock(proxy_inode->lock);
265     if (proxy_inode->type == BLUESKY_REGULAR) {
266         for (int i = 0; i < proxy_inode->blocks->len; i++) {
267             BlueSkyBlock *b = &g_array_index(proxy_inode->blocks,
268                                              BlueSkyBlock, i);
269             if (b->type != BLUESKY_BLOCK_REF)
270                 continue;
271             if (i >= cleaner_inode->links->len)
272                 continue;
273             BlueSkyCleanerLink *cb = &g_array_index(cleaner_inode->links,
274                                                     BlueSkyCleanerLink, i);
275             if (memcmp(&b->ref->id, &cb->id, sizeof(BlueSkyCloudPointer)) != 0)
276                 continue;
277
278             g_print("  Updating block %d pointer\n", i);
279             b->ref->location = cb->location;
280         }
281     }
282     cleaner_flush_inode(proxy_inode);
283     g_mutex_unlock(proxy_inode->lock);
284
285     /* Mark the inode as modified so it will get written back to the cloud.  We
286      * don't actually need to force a synchronous write to our local journal
287      * since there have been no logical modifications. */
288 #if 0
289     g_mutex_lock(inode->fs->lock);
290     bluesky_list_unlink(&inode->fs->unlogged_list, inode->unlogged_list);
291     inode->unlogged_list = bluesky_list_prepend(&inode->fs->unlogged_list, inode);
292     g_mutex_unlock(inode->fs->lock);
293 #endif
294 }
295
296 void bluesky_cleaner_merge(BlueSkyFS *fs)
297 {
298     BlueSkyCleanerItem *checkpoint = bluesky_cleaner_find_checkpoint(fs);
299
300     if (checkpoint == NULL) {
301         g_warning("Unable to load cleaner checkpoint record!");
302         return;
303     }
304
305     /* Iterate over each of the inode map sections in the checkpoint */
306     for (int i = 0; i < checkpoint->links->len; i++) {
307         BlueSkyCleanerLink *link = &g_array_index(checkpoint->links,
308                                                   BlueSkyCleanerLink, i);
309         if (!needs_merging(fs, link->location))
310             continue;
311
312         BlueSkyCleanerItem *imap = cleaner_load_item(fs, link->location);
313         if (imap == NULL) {
314             g_warning("Unable to load cleaner inode map");
315             continue;
316         }
317
318         /* Iterate over all inodes found in the inode map section */
319         for (int j = 0; j < imap->links->len; j++) {
320             BlueSkyCleanerLink *link = &g_array_index(imap->links,
321                                                       BlueSkyCleanerLink, j);
322             if (!needs_merging(fs, link->location))
323                 continue;
324             BlueSkyCleanerItem *inode = cleaner_load_item(fs, link->location);
325             if (inode != NULL) {
326                 merge_inode(fs, inode);
327             }
328             bluesky_cleaner_item_free(inode);
329         }
330
331         bluesky_cleaner_item_free(imap);
332     }
333
334     fs->log_state->latest_cleaner_seq_seen = checkpoint->location.directory;
335     bluesky_cleaner_item_free(checkpoint);
336 }
337
338 /* Run the cleaner as a background task. */
339 static gpointer cleaner_thread(BlueSkyFS *fs)
340 {
341     while (TRUE) {
342         struct timespec delay;
343         delay.tv_sec = 30;
344         delay.tv_nsec = 0;
345         nanosleep(&delay, NULL);
346         bluesky_cleaner_merge(fs);
347     }
348
349     return NULL;
350 }
351
352 void bluesky_cleaner_thread_launch(BlueSkyFS *fs)
353 {
354     g_thread_create((GThreadFunc)cleaner_thread, fs, FALSE, NULL);
355 }