a892e723473b7d1fd816e65f7745db347c8a1c65
[bluesky.git] / bluesky / cleaner.c
1 /* Blue Sky: File Systems in the Cloud
2  *
3  * Copyright (C) 2010  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * TODO: Licensing
7  */
8
9 #include <stdio.h>
10 #include <stdlib.h>
11 #include <stdint.h>
12 #include <inttypes.h>
13 #include <glib.h>
14 #include <string.h>
15
16 #include "bluesky-private.h"
17
18 /* Proxy component of the file system cleaner.  This consists effectively of
19  * code for merging multiple versions of the file system logs: that generated
20  * by us and that generated by the in-cloud cleaner.  Other logic, such as for
21  * rewriting log segments where needed and deleting old segments, is handled by
22  * the in-cloud cleaner component. */
23
24 /* A specialized function for loading an object from the cloud, used just by
25  * the cleaner.  When the cleaner runs, it will produce new objects that have
26  * the same object ID as before, differing only in the outgoing link locations
27  * as a result of cleaning.  We can't use the normal functions for loading
28  * objects since those can only deal with one version of an object.  Instead,
29  * we load the cleaned object with this function, and then merge the state into
30  * the official object as needed.
31  *
32  * The bluesky_cleaner_deserialize function is like
33  * bluesky_deserialize_cloudlog, but does a more limited deserialization and
34  * keeps the returned item entirely separate from any in-memory BlueSkyCloudLog
35  * objects. */
36
37 typedef struct {
38     BlueSkyCloudLogType type;
39     BlueSkyCloudID id;
40     BlueSkyCloudPointer location;
41     uint64_t inum;
42     GArray *links;
43 } BlueSkyCleanerItem;
44
45 typedef struct {
46     BlueSkyCloudID id;
47     BlueSkyCloudPointer location;
48 } BlueSkyCleanerLink;
49
50 static BlueSkyCleanerItem *bluesky_cleaner_deserialize(BlueSkyRCStr *raw)
51 {
52     const char *data = raw->data;
53     size_t len = raw->len;
54     const char *data1, *data2, *data3;
55     size_t len1, len2, len3;
56
57     g_assert(len > 4);
58     if (len < sizeof(struct cloudlog_header)
59         || memcmp(data, CLOUDLOG_MAGIC, 4) != 0)
60     {
61         g_warning("Deserializing garbage cloud log item from cleaner!");
62         return NULL;
63     };
64
65     struct cloudlog_header *header = (struct cloudlog_header *)data;
66     len1 = GUINT32_FROM_LE(header->size1);
67     len2 = GUINT32_FROM_LE(header->size2);
68     len3 = GUINT32_FROM_LE(header->size3);
69     data1 = data + sizeof(struct cloudlog_header);
70     data2 = data1 + len1;
71     data3 = data2 + len2;
72     g_assert(data3 + len3 - data <= len);
73
74     BlueSkyCleanerItem *item = g_new0(BlueSkyCleanerItem, 1);
75     item->type = header->type - '0';
76     item->inum = GUINT64_FROM_LE(header->inum);
77     memcpy(&item->id, &header->id, sizeof(BlueSkyCloudID));
78
79     int link_count = len2 / sizeof(BlueSkyCloudID);
80     g_print("Outgoing links: %d\n", link_count);
81     item->links = g_array_new(FALSE, TRUE, sizeof(BlueSkyCleanerLink));
82     for (int i = 0; i < link_count; i++) {
83         BlueSkyCleanerLink link;
84
85         g_assert(len2 >= sizeof(link.id));
86         memcpy(&link.id, data2, sizeof(link.id));
87         data2 += sizeof(link.id); len2 -= sizeof(link.id);
88
89         g_assert(len3 >= sizeof(link.location));
90         memcpy(&link.location, data3, sizeof(link.location));
91         data3 += sizeof(link.location); len3 -= sizeof(link.location);
92
93         g_array_append_val(item->links, link);
94     }
95
96     return item;
97 }
98
99 static void bluesky_cleaner_item_free(BlueSkyCleanerItem *item)
100 {
101     if (item == NULL)
102         return;
103     g_array_unref(item->links);
104     g_free(item);
105 }
106
107 /* Check the cleaner's logs to find the a more recent checkpoint record.  This
108  * should be called occasionally to see if the cleaner has done any work since
109  * our last check. */
110 static BlueSkyCleanerItem *bluesky_cleaner_find_checkpoint(BlueSkyFS *fs)
111 {
112     char *prefix = g_strdup_printf("log-%08d", BLUESKY_CLOUD_DIR_CLEANER);
113     char *last_segment = bluesky_store_lookup_last(fs->store, prefix);
114     g_free(prefix);
115     if (last_segment == NULL)
116         return NULL;
117
118     g_print("Last cloud log segment: %s (processed up to %d)\n",
119             last_segment, fs->log_state->latest_cleaner_seq_seen);
120     int seq = atoi(last_segment + 13);
121     g_free(last_segment);
122
123     if (seq <= fs->log_state->latest_cleaner_seq_seen)
124         return NULL;
125
126     g_print("New log segment appeared in cleaner directory: %d\n", seq);
127
128     BlueSkyCacheFile *cachefile;
129     cachefile = bluesky_cachefile_lookup(fs, BLUESKY_CLOUD_DIR_CLEANER, seq,
130                                          TRUE);
131     while (!cachefile->complete)
132         g_cond_wait(cachefile->cond, cachefile->lock);
133
134     g_print("Downloaded latest cleaner segment.\n");
135
136     int64_t offset = -1, length = 0;
137     while (TRUE) {
138         const BlueSkyRangesetItem *item;
139         item = bluesky_rangeset_lookup_next(cachefile->items, offset + 1);
140         if (item == NULL)
141             break;
142         offset = item->start;
143         length = item->length;
144     }
145
146     if (length == 0) {
147         bluesky_cachefile_unref(cachefile);
148         g_mutex_unlock(cachefile->lock);
149         return NULL;
150     }
151
152     g_print("Found a cleaner checkpoint record.\n");
153
154     BlueSkyRCStr *data = bluesky_cachefile_map_raw(cachefile, offset, length);
155     bluesky_cachefile_unref(cachefile);
156     g_mutex_unlock(cachefile->lock);
157
158     BlueSkyCleanerItem *checkpoint = bluesky_cleaner_deserialize(data);
159     checkpoint->location.directory = BLUESKY_CLOUD_DIR_CLEANER;
160     checkpoint->location.sequence = seq;
161     bluesky_string_unref(data);
162
163     return checkpoint;
164 }
165
166 static BlueSkyCleanerItem *cleaner_load_item(BlueSkyFS *fs,
167                                              BlueSkyCloudPointer location)
168 {
169     g_print("Loading item %d/%d/%d...\n", location.directory, location.sequence, location.offset);
170
171     BlueSkyCacheFile *cachefile;
172     cachefile = bluesky_cachefile_lookup(fs, location.directory,
173                                          location.sequence, TRUE);
174     while (!cachefile->complete)
175         g_cond_wait(cachefile->cond, cachefile->lock);
176
177     /* TODO: Ought to check that we are loading an item which validated? */
178     BlueSkyRCStr *data = bluesky_cachefile_map_raw(cachefile, location.offset,
179                                                    location.size);
180     bluesky_cachefile_unref(cachefile);
181     g_mutex_unlock(cachefile->lock);
182
183     BlueSkyCleanerItem *item = bluesky_cleaner_deserialize(data);
184     bluesky_string_unref(data);
185
186     return item;
187 }
188
189 /* Does the item at the given cloud location from the cleaner need merging?  An
190  * item in the primary log does not need to be merged, as by definition we
191  * already know about it.  Similarly, old items in the cleaner's log--those
192  * that we have already seen from a previous merge--do not need to be mergd
193  * again. */
194 gboolean needs_merging(BlueSkyFS *fs, BlueSkyCloudPointer location)
195 {
196     if (location.directory == BLUESKY_CLOUD_DIR_PRIMARY)
197         return FALSE;
198
199     if (location.directory == BLUESKY_CLOUD_DIR_CLEANER
200         && location.sequence <= fs->log_state->latest_cleaner_seq_seen)
201         return FALSE;
202
203     return TRUE;
204 }
205
206 /* For an inode which has been modified by the cleaner and must be flushed out
207  * to cloud storage, mark it as appropriately dirty.  We will bypass writingt
208  * the inode to the journal if possible--but if there have ben other
209  * uncommitted changes besides what the cleaner did then we will force a
210  * journal write as well since the cloud shouldn't contain newer data than the
211  * journal.  Inode must be locked. */
212 static void cleaner_flush_inode(BlueSkyInode *inode)
213 {
214     // if (inode->change_commit != inode->change_count) {
215     if (TRUE) {
216         /* bluesky_inode_start_sync schedules a flush to the cloud so we're all
217          * done. */
218         bluesky_inode_start_sync(inode);
219         return;
220     }
221
222     g_assert(inode->unlogged_list == NULL);
223
224     bluesky_list_unlink(&inode->fs->dirty_list, inode->dirty_list);
225     inode->dirty_list = bluesky_list_prepend(&inode->fs->dirty_list, inode);
226     inode->change_cloud = inode->change_count;
227 }
228
229 static void merge_inode(BlueSkyFS *fs, BlueSkyCleanerItem *cleaner_inode)
230 {
231     /* There are two versions we are concerned with: cleaner_ is for the data
232      * stored in the cleaner's log, and proxy_ is for our most recent version,
233      * which the cleaner might or might not know about. */
234     uint64_t inum = cleaner_inode->inum;
235
236     g_print("Merging inode %"PRIu64" from cleaner\n", inum);
237
238     g_mutex_lock(fs->lock);
239     InodeMapEntry *entry = bluesky_inode_map_lookup(fs->inode_map, inum, 0);
240     if (entry == NULL) {
241         /* Inode doesn't exist: it was probably deleted so keep it that way. */
242         g_mutex_unlock(fs->lock);
243         return;
244     }
245
246     BlueSkyCloudLog *proxy_item = entry->item;
247     g_mutex_lock(proxy_item->lock);
248     //BlueSkyCloudPointer proxy_location = entry->item->location;
249     //BlueSkyCloudID proxy_id = entry->item->id;
250     g_mutex_unlock(proxy_item->lock);
251     g_mutex_unlock(fs->lock);
252
253     /* If the cleaner and the proxy have the same ID, and if the proxy's
254      * in-memory copy is unmodified, then we can simply use the cleaner's
255      * version of the inode. */
256     /* TODO */
257
258     /* Merge file data together for a regular file.  Iterate over the file
259      * blocks in the proxy's copy of the inode.  If the block ID is unchanged
260      * in the cleaner but the location was updated, then update the location in
261      * the cleaner because the block was relocated.  Otherwise ignore the
262      * cleaner's version for that block because the proxy's information is more
263      * recent. */
264     BlueSkyInode *proxy_inode = bluesky_get_inode(fs, inum);
265     g_mutex_lock(proxy_inode->lock);
266     if (proxy_inode->type == BLUESKY_REGULAR) {
267         for (int i = 0; i < proxy_inode->blocks->len; i++) {
268             BlueSkyBlock *b = &g_array_index(proxy_inode->blocks,
269                                              BlueSkyBlock, i);
270             if (b->type != BLUESKY_BLOCK_REF)
271                 continue;
272             if (i >= cleaner_inode->links->len)
273                 continue;
274             BlueSkyCleanerLink *cb = &g_array_index(cleaner_inode->links,
275                                                     BlueSkyCleanerLink, i);
276             if (memcmp(&b->ref->id, &cb->id, sizeof(BlueSkyCloudPointer)) != 0)
277                 continue;
278
279             g_print("  Updating block %d pointer\n", i);
280             b->ref->location = cb->location;
281         }
282     }
283     cleaner_flush_inode(proxy_inode);
284     g_mutex_unlock(proxy_inode->lock);
285
286     /* Mark the inode as modified so it will get written back to the cloud.  We
287      * don't actually need to force a synchronous write to our local journal
288      * since there have been no logical modifications. */
289 #if 0
290     g_mutex_lock(inode->fs->lock);
291     bluesky_list_unlink(&inode->fs->unlogged_list, inode->unlogged_list);
292     inode->unlogged_list = bluesky_list_prepend(&inode->fs->unlogged_list, inode);
293     g_mutex_unlock(inode->fs->lock);
294 #endif
295 }
296
297 void bluesky_cleaner_merge(BlueSkyFS *fs)
298 {
299     BlueSkyCleanerItem *checkpoint = bluesky_cleaner_find_checkpoint(fs);
300
301     if (checkpoint == NULL) {
302         g_warning("Unable to load cleaner checkpoint record!");
303         return;
304     }
305
306     if (checkpoint->type != LOGTYPE_CHECKPOINT) {
307         g_warning("Last cleaner object not a checkpoint; cleaning probably in progress.");
308         bluesky_cleaner_item_free(checkpoint);
309         return;
310     }
311
312     /* Iterate over each of the inode map sections in the checkpoint */
313     for (int i = 0; i < checkpoint->links->len; i++) {
314         BlueSkyCleanerLink *link = &g_array_index(checkpoint->links,
315                                                   BlueSkyCleanerLink, i);
316         if (!needs_merging(fs, link->location))
317             continue;
318
319         BlueSkyCleanerItem *imap = cleaner_load_item(fs, link->location);
320         if (imap == NULL) {
321             g_warning("Unable to load cleaner inode map");
322             continue;
323         }
324
325         /* Iterate over all inodes found in the inode map section */
326         for (int j = 0; j < imap->links->len; j++) {
327             BlueSkyCleanerLink *link = &g_array_index(imap->links,
328                                                       BlueSkyCleanerLink, j);
329             if (!needs_merging(fs, link->location))
330                 continue;
331             BlueSkyCleanerItem *inode = cleaner_load_item(fs, link->location);
332             if (inode != NULL) {
333                 merge_inode(fs, inode);
334             }
335             bluesky_cleaner_item_free(inode);
336         }
337
338         bluesky_cleaner_item_free(imap);
339     }
340
341     fs->log_state->latest_cleaner_seq_seen = checkpoint->location.sequence;
342     bluesky_cleaner_item_free(checkpoint);
343 }
344
345 /* Run the cleaner as a background task. */
346 static gpointer cleaner_thread(BlueSkyFS *fs)
347 {
348     while (TRUE) {
349         struct timespec delay;
350         delay.tv_sec = 30;
351         delay.tv_nsec = 0;
352         nanosleep(&delay, NULL);
353         bluesky_cleaner_merge(fs);
354     }
355
356     return NULL;
357 }
358
359 void bluesky_cleaner_thread_launch(BlueSkyFS *fs)
360 {
361     g_thread_create((GThreadFunc)cleaner_thread, fs, FALSE, NULL);
362 }