Add proper per-file copyright notices/licenses and top-level license.
[bluesky.git] / bluesky / cleaner.c
1 /* Blue Sky: File Systems in the Cloud
2  *
3  * Copyright (C) 2010  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  * 3. Neither the name of the University nor the names of its contributors
15  *    may be used to endorse or promote products derived from this software
16  *    without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
19  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
22  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28  * SUCH DAMAGE.
29  */
30
31 #include <stdio.h>
32 #include <stdlib.h>
33 #include <stdint.h>
34 #include <inttypes.h>
35 #include <glib.h>
36 #include <string.h>
37
38 #include "bluesky-private.h"
39
40 /* Proxy component of the file system cleaner.  This consists effectively of
41  * code for merging multiple versions of the file system logs: that generated
42  * by us and that generated by the in-cloud cleaner.  Other logic, such as for
43  * rewriting log segments where needed and deleting old segments, is handled by
44  * the in-cloud cleaner component. */
45
46 /* A specialized function for loading an object from the cloud, used just by
47  * the cleaner.  When the cleaner runs, it will produce new objects that have
48  * the same object ID as before, differing only in the outgoing link locations
49  * as a result of cleaning.  We can't use the normal functions for loading
50  * objects since those can only deal with one version of an object.  Instead,
51  * we load the cleaned object with this function, and then merge the state into
52  * the official object as needed.
53  *
54  * The bluesky_cleaner_deserialize function is like
55  * bluesky_deserialize_cloudlog, but does a more limited deserialization and
56  * keeps the returned item entirely separate from any in-memory BlueSkyCloudLog
57  * objects. */
58
59 typedef struct {
60     BlueSkyCloudLogType type;
61     BlueSkyCloudID id;
62     BlueSkyCloudPointer location;
63     uint64_t inum;
64     GArray *links;
65 } BlueSkyCleanerItem;
66
67 typedef struct {
68     BlueSkyCloudID id;
69     BlueSkyCloudPointer location;
70 } BlueSkyCleanerLink;
71
72 static BlueSkyCleanerItem *bluesky_cleaner_deserialize(BlueSkyRCStr *raw)
73 {
74     const char *data = raw->data;
75     size_t len = raw->len;
76     const char *data1, *data2, *data3;
77     size_t len1, len2, len3;
78
79     g_assert(len > 4);
80     if (len < sizeof(struct cloudlog_header)
81         || memcmp(data, CLOUDLOG_MAGIC, 4) != 0)
82     {
83         g_warning("Deserializing garbage cloud log item from cleaner!");
84         return NULL;
85     };
86
87     struct cloudlog_header *header = (struct cloudlog_header *)data;
88     len1 = GUINT32_FROM_LE(header->size1);
89     len2 = GUINT32_FROM_LE(header->size2);
90     len3 = GUINT32_FROM_LE(header->size3);
91     data1 = data + sizeof(struct cloudlog_header);
92     data2 = data1 + len1;
93     data3 = data2 + len2;
94     g_assert(data3 + len3 - data <= len);
95
96     BlueSkyCleanerItem *item = g_new0(BlueSkyCleanerItem, 1);
97     item->type = header->type - '0';
98     item->inum = GUINT64_FROM_LE(header->inum);
99     memcpy(&item->id, &header->id, sizeof(BlueSkyCloudID));
100
101     int link_count = len2 / sizeof(BlueSkyCloudID);
102     g_print("Outgoing links: %d\n", link_count);
103     item->links = g_array_new(FALSE, TRUE, sizeof(BlueSkyCleanerLink));
104     for (int i = 0; i < link_count; i++) {
105         BlueSkyCleanerLink link;
106
107         g_assert(len2 >= sizeof(link.id));
108         memcpy(&link.id, data2, sizeof(link.id));
109         data2 += sizeof(link.id); len2 -= sizeof(link.id);
110
111         g_assert(len3 >= sizeof(link.location));
112         memcpy(&link.location, data3, sizeof(link.location));
113         data3 += sizeof(link.location); len3 -= sizeof(link.location);
114
115         g_array_append_val(item->links, link);
116     }
117
118     return item;
119 }
120
121 static void bluesky_cleaner_item_free(BlueSkyCleanerItem *item)
122 {
123     if (item == NULL)
124         return;
125     g_array_unref(item->links);
126     g_free(item);
127 }
128
129 /* Check the cleaner's logs to find the a more recent checkpoint record.  This
130  * should be called occasionally to see if the cleaner has done any work since
131  * our last check. */
132 static BlueSkyCleanerItem *bluesky_cleaner_find_checkpoint(BlueSkyFS *fs)
133 {
134     char *prefix = g_strdup_printf("log-%08d", BLUESKY_CLOUD_DIR_CLEANER);
135     char *last_segment = bluesky_store_lookup_last(fs->store, prefix);
136     g_free(prefix);
137     if (last_segment == NULL)
138         return NULL;
139
140     g_print("Last cloud log segment: %s (processed up to %d)\n",
141             last_segment, fs->log_state->latest_cleaner_seq_seen);
142     int seq = atoi(last_segment + 13);
143     g_free(last_segment);
144
145     if (seq <= fs->log_state->latest_cleaner_seq_seen)
146         return NULL;
147
148     g_print("New log segment appeared in cleaner directory: %d\n", seq);
149
150     BlueSkyCacheFile *cachefile;
151     cachefile = bluesky_cachefile_lookup(fs, BLUESKY_CLOUD_DIR_CLEANER, seq,
152                                          TRUE);
153     while (!cachefile->complete)
154         g_cond_wait(cachefile->cond, cachefile->lock);
155
156     g_print("Downloaded latest cleaner segment.\n");
157
158     int64_t offset = -1, length = 0;
159     while (TRUE) {
160         const BlueSkyRangesetItem *item;
161         item = bluesky_rangeset_lookup_next(cachefile->items, offset + 1);
162         if (item == NULL)
163             break;
164         offset = item->start;
165         length = item->length;
166     }
167
168     if (length == 0) {
169         bluesky_cachefile_unref(cachefile);
170         g_mutex_unlock(cachefile->lock);
171         return NULL;
172     }
173
174     g_print("Found a cleaner checkpoint record.\n");
175
176     BlueSkyRCStr *data = bluesky_cachefile_map_raw(cachefile, offset, length);
177     bluesky_cachefile_unref(cachefile);
178     g_mutex_unlock(cachefile->lock);
179
180     BlueSkyCleanerItem *checkpoint = bluesky_cleaner_deserialize(data);
181     checkpoint->location.directory = BLUESKY_CLOUD_DIR_CLEANER;
182     checkpoint->location.sequence = seq;
183     bluesky_string_unref(data);
184
185     return checkpoint;
186 }
187
188 static BlueSkyCleanerItem *cleaner_load_item(BlueSkyFS *fs,
189                                              BlueSkyCloudPointer location)
190 {
191     g_print("Loading item %d/%d/%d...\n", location.directory, location.sequence, location.offset);
192
193     BlueSkyCacheFile *cachefile;
194     cachefile = bluesky_cachefile_lookup(fs, location.directory,
195                                          location.sequence, TRUE);
196     while (!cachefile->complete)
197         g_cond_wait(cachefile->cond, cachefile->lock);
198
199     /* TODO: Ought to check that we are loading an item which validated? */
200     BlueSkyRCStr *data = bluesky_cachefile_map_raw(cachefile, location.offset,
201                                                    location.size);
202     bluesky_cachefile_unref(cachefile);
203     g_mutex_unlock(cachefile->lock);
204
205     BlueSkyCleanerItem *item = bluesky_cleaner_deserialize(data);
206     bluesky_string_unref(data);
207
208     return item;
209 }
210
211 /* Does the item at the given cloud location from the cleaner need merging?  An
212  * item in the primary log does not need to be merged, as by definition we
213  * already know about it.  Similarly, old items in the cleaner's log--those
214  * that we have already seen from a previous merge--do not need to be mergd
215  * again. */
216 gboolean needs_merging(BlueSkyFS *fs, BlueSkyCloudPointer location)
217 {
218     if (location.directory == BLUESKY_CLOUD_DIR_PRIMARY)
219         return FALSE;
220
221     if (location.directory == BLUESKY_CLOUD_DIR_CLEANER
222         && location.sequence <= fs->log_state->latest_cleaner_seq_seen)
223         return FALSE;
224
225     return TRUE;
226 }
227
228 /* For an inode which has been modified by the cleaner and must be flushed out
229  * to cloud storage, mark it as appropriately dirty.  We will bypass writingt
230  * the inode to the journal if possible--but if there have ben other
231  * uncommitted changes besides what the cleaner did then we will force a
232  * journal write as well since the cloud shouldn't contain newer data than the
233  * journal.  Inode must be locked. */
234 static void cleaner_flush_inode(BlueSkyInode *inode)
235 {
236     // if (inode->change_commit != inode->change_count) {
237     if (TRUE) {
238         /* bluesky_inode_start_sync schedules a flush to the cloud so we're all
239          * done. */
240         bluesky_inode_start_sync(inode);
241         return;
242     }
243
244     g_assert(inode->unlogged_list == NULL);
245
246     bluesky_list_unlink(&inode->fs->dirty_list, inode->dirty_list);
247     inode->dirty_list = bluesky_list_prepend(&inode->fs->dirty_list, inode);
248     inode->change_cloud = inode->change_count;
249 }
250
251 static void merge_inode(BlueSkyFS *fs, BlueSkyCleanerItem *cleaner_inode)
252 {
253     /* There are two versions we are concerned with: cleaner_ is for the data
254      * stored in the cleaner's log, and proxy_ is for our most recent version,
255      * which the cleaner might or might not know about. */
256     uint64_t inum = cleaner_inode->inum;
257
258     g_print("Merging inode %"PRIu64" from cleaner\n", inum);
259
260     g_mutex_lock(fs->lock);
261     InodeMapEntry *entry = bluesky_inode_map_lookup(fs->inode_map, inum, 0);
262     if (entry == NULL) {
263         /* Inode doesn't exist: it was probably deleted so keep it that way. */
264         g_mutex_unlock(fs->lock);
265         return;
266     }
267
268     BlueSkyCloudLog *proxy_item = entry->item;
269     g_mutex_lock(proxy_item->lock);
270     //BlueSkyCloudPointer proxy_location = entry->item->location;
271     //BlueSkyCloudID proxy_id = entry->item->id;
272     g_mutex_unlock(proxy_item->lock);
273     g_mutex_unlock(fs->lock);
274
275     /* If the cleaner and the proxy have the same ID, and if the proxy's
276      * in-memory copy is unmodified, then we can simply use the cleaner's
277      * version of the inode. */
278     /* TODO */
279
280     /* Merge file data together for a regular file.  Iterate over the file
281      * blocks in the proxy's copy of the inode.  If the block ID is unchanged
282      * in the cleaner but the location was updated, then update the location in
283      * the cleaner because the block was relocated.  Otherwise ignore the
284      * cleaner's version for that block because the proxy's information is more
285      * recent. */
286     BlueSkyInode *proxy_inode = bluesky_get_inode(fs, inum);
287     g_mutex_lock(proxy_inode->lock);
288     if (proxy_inode->type == BLUESKY_REGULAR) {
289         for (int i = 0; i < proxy_inode->blocks->len; i++) {
290             BlueSkyBlock *b = &g_array_index(proxy_inode->blocks,
291                                              BlueSkyBlock, i);
292             if (b->type != BLUESKY_BLOCK_REF)
293                 continue;
294             if (i >= cleaner_inode->links->len)
295                 continue;
296             BlueSkyCleanerLink *cb = &g_array_index(cleaner_inode->links,
297                                                     BlueSkyCleanerLink, i);
298             if (memcmp(&b->ref->id, &cb->id, sizeof(BlueSkyCloudPointer)) != 0)
299                 continue;
300
301             g_print("  Updating block %d pointer\n", i);
302             b->ref->location = cb->location;
303         }
304     }
305     cleaner_flush_inode(proxy_inode);
306     g_mutex_unlock(proxy_inode->lock);
307
308     /* Mark the inode as modified so it will get written back to the cloud.  We
309      * don't actually need to force a synchronous write to our local journal
310      * since there have been no logical modifications. */
311 #if 0
312     g_mutex_lock(inode->fs->lock);
313     bluesky_list_unlink(&inode->fs->unlogged_list, inode->unlogged_list);
314     inode->unlogged_list = bluesky_list_prepend(&inode->fs->unlogged_list, inode);
315     g_mutex_unlock(inode->fs->lock);
316 #endif
317 }
318
319 void bluesky_cleaner_merge(BlueSkyFS *fs)
320 {
321     BlueSkyCleanerItem *checkpoint = bluesky_cleaner_find_checkpoint(fs);
322
323     if (checkpoint == NULL) {
324         g_warning("Unable to load cleaner checkpoint record!");
325         return;
326     }
327
328     if (checkpoint->type != LOGTYPE_CHECKPOINT) {
329         g_warning("Last cleaner object not a checkpoint; cleaning probably in progress.");
330         bluesky_cleaner_item_free(checkpoint);
331         return;
332     }
333
334     /* Iterate over each of the inode map sections in the checkpoint */
335     for (int i = 0; i < checkpoint->links->len; i++) {
336         BlueSkyCleanerLink *link = &g_array_index(checkpoint->links,
337                                                   BlueSkyCleanerLink, i);
338         if (!needs_merging(fs, link->location))
339             continue;
340
341         BlueSkyCleanerItem *imap = cleaner_load_item(fs, link->location);
342         if (imap == NULL) {
343             g_warning("Unable to load cleaner inode map");
344             continue;
345         }
346
347         /* Iterate over all inodes found in the inode map section */
348         for (int j = 0; j < imap->links->len; j++) {
349             BlueSkyCleanerLink *link = &g_array_index(imap->links,
350                                                       BlueSkyCleanerLink, j);
351             if (!needs_merging(fs, link->location))
352                 continue;
353             BlueSkyCleanerItem *inode = cleaner_load_item(fs, link->location);
354             if (inode != NULL) {
355                 merge_inode(fs, inode);
356             }
357             bluesky_cleaner_item_free(inode);
358         }
359
360         bluesky_cleaner_item_free(imap);
361     }
362
363     fs->log_state->latest_cleaner_seq_seen = checkpoint->location.sequence;
364     bluesky_cleaner_item_free(checkpoint);
365 }
366
367 /* Run the cleaner as a background task. */
368 static gpointer cleaner_thread(BlueSkyFS *fs)
369 {
370     while (TRUE) {
371         struct timespec delay;
372         delay.tv_sec = 30;
373         delay.tv_nsec = 0;
374         nanosleep(&delay, NULL);
375         bluesky_cleaner_merge(fs);
376     }
377
378     return NULL;
379 }
380
381 void bluesky_cleaner_thread_launch(BlueSkyFS *fs)
382 {
383     g_thread_create((GThreadFunc)cleaner_thread, fs, FALSE, NULL);
384 }