Add proper per-file copyright notices/licenses and top-level license.
[bluesky.git] / bluesky / cache.c
1 /* Blue Sky: File Systems in the Cloud
2  *
3  * Copyright (C) 2009  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  * 3. Neither the name of the University nor the names of its contributors
15  *    may be used to endorse or promote products derived from this software
16  *    without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
19  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
22  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28  * SUCH DAMAGE.
29  */
30
31 #define _GNU_SOURCE
32 #define _ATFILE_SOURCE
33
34 #include <stdio.h>
35 #include <stdint.h>
36 #include <stdlib.h>
37 #include <glib.h>
38 #include <string.h>
39 #include <errno.h>
40 #include <inttypes.h>
41 #include <sys/types.h>
42 #include <sys/stat.h>
43 #include <fcntl.h>
44 #include <unistd.h>
45
46 #include "bluesky-private.h"
47
48 #define WRITEBACK_DELAY (20 * 1000000)
49 #define CACHE_DROP_DELAY (20 * 1000000)
50
51 /* Filesystem caching and cache coherency.  There are actually a couple of
52  * different tasks that are performed here:
53  *   - Forcing data to the log if needed to reclaim memory or simply if the
54  *     data has been dirty in memory long enough.
55  *   - Writing batches of data to the cloud.
56  */
57
58 static void flushd_dirty_inode(BlueSkyInode *inode)
59 {
60     BlueSkyFS *fs = inode->fs;
61
62     g_mutex_lock(fs->lock);
63     bluesky_list_unlink(&fs->unlogged_list, inode->unlogged_list);
64     inode->unlogged_list = NULL;
65     g_mutex_unlock(fs->lock);
66
67     /* Inode is clean; nothing to do. */
68     if (inode->change_count == inode->change_commit)
69         return;
70
71     if (bluesky_verbose) {
72         g_log("bluesky/flushd", G_LOG_LEVEL_DEBUG,
73             "Starting flush of inode %"PRIu64, inode->inum);
74     }
75
76     bluesky_inode_start_sync(inode);
77 }
78
79 /* Check whether memory usage may have dropped below critical thresholds for
80  * waking up waiting threads. */
81 void flushd_check_wakeup(BlueSkyFS *fs)
82 {
83     int dirty = g_atomic_int_get(&fs->cache_dirty);
84     dirty += g_atomic_int_get(&fs->cache_log_dirty);
85
86     if (dirty <= bluesky_watermark_high_dirty)
87         g_cond_broadcast(fs->flushd_cond);
88 }
89
90 /* Try to flush dirty data to disk, either due to memory pressure or due to
91  * timeouts. */
92 static void flushd_dirty(BlueSkyFS *fs)
93 {
94     int64_t start_time = bluesky_get_current_time();
95     g_mutex_lock(fs->lock);
96
97     while (1) {
98         BlueSkyInode *inode;
99         if (fs->unlogged_list.prev == NULL)
100             break;
101         inode = fs->unlogged_list.prev->data;
102
103         if (bluesky_verbose) {
104             g_log("bluesky/flushd", G_LOG_LEVEL_DEBUG,
105                   "Considering flushing inode %"PRIu64, inode->inum);
106         }
107
108         /* Stop processing dirty inodes if we both have enough memory available
109          * and the oldest inode is sufficiently new that it need not be flushed
110          * out. */
111         uint64_t elapsed = bluesky_get_current_time() - inode->change_time;
112         if (g_atomic_int_get(&fs->cache_dirty) < bluesky_watermark_low_dirty
113                 && elapsed < WRITEBACK_DELAY)
114             break;
115         if (inode->change_time > start_time)
116             break;
117
118         bluesky_inode_ref(inode);
119
120         g_mutex_unlock(fs->lock);
121
122         g_mutex_lock(inode->lock);
123         flushd_dirty_inode(inode);
124         g_mutex_unlock(inode->lock);
125         bluesky_inode_unref(inode);
126
127         g_mutex_lock(fs->lock);
128         flushd_check_wakeup(fs);
129     }
130
131     g_cond_broadcast(fs->flushd_cond);
132
133     g_mutex_unlock(fs->lock);
134 }
135
136 /* Try to flush dirty data to the cloud.  This will take a snapshot of the
137  * entire filesystem (though only point-in-time consistent for isolated inodes
138  * and not the filesystem as a whole) and ensure all data is written to the
139  * cloud.  When the write completes, we will allow old journal segments (those
140  * that were fully written _before_ the snapshot process started) to be garbage
141  * collected.  Newer journal segments can't be collected yet since they may
142  * still contain data which has not been written persistently to the cloud.
143  *
144  * Note that some of this code relies on the fact that only this thread of
145  * control (running flushd_cloud) is manipulating the inode map, and so
146  * concurrent updates to the inode map are prevented even without the
147  * filesystem lock held.  Take great care if allowing multi-threaded access to
148  * the inode map... */
149 static void flushd_cloud(BlueSkyFS *fs)
150 {
151     g_mutex_lock(fs->lock);
152
153     /* TODO: Locking?  Since we're reading a single variable this is probably
154      * atomic but a lock could be safer. */
155     BlueSkyCloudLog *marker = bluesky_log_get_commit_point(fs);
156     int journal_seq_start = fs->log->seq_num;
157
158     while (1) {
159         BlueSkyInode *inode;
160         if (fs->dirty_list.prev == NULL)
161             break;
162         inode = fs->dirty_list.prev->data;
163
164         if (bluesky_verbose) {
165             g_log("bluesky/flushd", G_LOG_LEVEL_DEBUG,
166                   "Flushing inode %"PRIu64" to cloud", inode->inum);
167         }
168
169         bluesky_inode_ref(inode);
170
171         g_mutex_unlock(fs->lock);
172
173         g_mutex_lock(inode->lock);
174         g_assert(inode->change_cloud == inode->change_commit);
175         g_mutex_lock(fs->lock);
176         bluesky_list_unlink(&fs->dirty_list, inode->dirty_list);
177         inode->dirty_list = NULL;
178         g_mutex_unlock(fs->lock);
179
180         BlueSkyCloudLog *log = inode->committed_item;
181         inode->committed_item = NULL;
182         g_mutex_unlock(inode->lock);
183
184         if (log != NULL)
185             bluesky_cloudlog_serialize(log, fs);
186         bluesky_inode_unref(inode);
187         bluesky_cloudlog_unref(log);
188
189         g_mutex_lock(fs->lock);
190     }
191     g_mutex_unlock(fs->lock);
192
193     /* Write out any updated inode map entries, so that all inodes just written
194      * can be located, and then a final commit record. */
195     BlueSkyCloudLog *commit_record = bluesky_inode_map_serialize(fs);
196     if (commit_record != NULL) {
197         bluesky_cloudlog_serialize(commit_record, fs);
198     } else {
199         g_print("No need for a checkpoint record...\n");
200     }
201
202     bluesky_cloudlog_flush(fs);
203
204     /* Wait until all segments have been written to the cloud, so that it
205      * becomes safe to free up journal segments. */
206     while (fs->log_state->pending_segments != NULL) {
207         SerializedRecord *segment
208             = (SerializedRecord *)fs->log_state->pending_segments->data;
209         g_mutex_lock(segment->lock);
210         while (!segment->complete)
211             g_cond_wait(segment->cond, segment->lock);
212         g_mutex_unlock(segment->lock);
213
214         g_mutex_free(segment->lock);
215         g_cond_free(segment->cond);
216         g_free(segment);
217
218         fs->log_state->pending_segments
219             = g_list_delete_link(fs->log_state->pending_segments,
220                                  fs->log_state->pending_segments);
221     }
222
223     bluesky_log_write_commit_point(fs, marker);
224     bluesky_cloudlog_unref(commit_record);
225
226     g_print("All segments have been flushed, journal < %d is clean\n",
227             journal_seq_start);
228
229     fs->log->journal_watermark = journal_seq_start;
230
231     bluesky_inode_map_minimize(fs);
232 }
233
234 /* Drop cached data for a given inode, if it is clean.  inode must be locked. */
235 static void drop_caches(BlueSkyInode *inode)
236 {
237     if (inode->type == BLUESKY_REGULAR)
238         bluesky_file_drop_cached(inode);
239
240     BlueSkyCloudLog *log = inode->committed_item;
241     if (log != NULL) {
242         g_mutex_lock(log->lock);
243         if (log->data != NULL
244             && g_atomic_int_get(&log->data_lock_count) == 0
245             && (log->location_flags != 0))
246         {
247             bluesky_cloudlog_stats_update(log, -1);
248             bluesky_string_unref(log->data);
249             log->data = NULL;
250             bluesky_cloudlog_stats_update(log, 1);
251         }
252         g_mutex_unlock(log->lock);
253     }
254 }
255
256 /* Drop clean data from the cache if needed.  Clean data should generally be
257  * memory-mapped from log file or similar, so the kernel can drop this clean
258  * data from memory for us and hence memory management isn't too important.
259  * Mainly, we'll want to drop references to data that hasn't been accessed in a
260  * while so that it is possible to reclaim log segments on disk.
261  *
262  * If aggressive is set, try much harder to drop data from the caches to free
263  * up space. */
264 static void flushd_clean(BlueSkyFS *fs, int aggressive)
265 {
266     g_mutex_lock(fs->lock);
267
268     size_t inode_count = g_hash_table_size(fs->inodes);
269     if (!inode_count)
270         inode_count = 1;
271
272     while (inode_count-- > 0) {
273         BlueSkyInode *inode;
274         if (fs->accessed_list.prev == NULL)
275             break;
276         inode = fs->accessed_list.prev->data;
277
278         uint64_t elapsed = bluesky_get_current_time() - inode->access_time;
279         if (elapsed < CACHE_DROP_DELAY && !aggressive)
280             break;
281
282         if (bluesky_verbose) {
283             g_log("bluesky/flushd", G_LOG_LEVEL_DEBUG,
284                   "Considering dropping cached data for inode %"PRIu64,
285                   inode->inum);
286         }
287
288         bluesky_inode_ref(inode);
289
290         g_mutex_unlock(fs->lock);
291
292         g_mutex_lock(inode->lock);
293
294         g_mutex_lock(fs->lock);
295         bluesky_list_unlink(&fs->accessed_list, inode->accessed_list);
296         inode->accessed_list = bluesky_list_prepend(&fs->accessed_list, inode);
297         g_mutex_unlock(fs->lock);
298
299         drop_caches(inode);
300
301         g_mutex_unlock(inode->lock);
302         bluesky_inode_unref(inode);
303
304         g_mutex_lock(fs->lock);
305     }
306
307     g_mutex_unlock(fs->lock);
308 }
309
310 /* Scan through all currently-stored files in the journal/cache and garbage
311  * collect old unused ones, if needed. */
312 static void gather_cachefiles(gpointer key, gpointer value, gpointer user_data)
313 {
314     GList **files = (GList **)user_data;
315     *files = g_list_prepend(*files, value);
316 }
317
318 static gint compare_cachefiles(gconstpointer a, gconstpointer b)
319 {
320     int64_t ta, tb;
321
322     ta = ((BlueSkyCacheFile *)a)->atime;
323     tb = ((BlueSkyCacheFile *)b)->atime;
324     if (ta < tb)
325         return -1;
326     else if (ta > tb)
327         return 1;
328     else
329         return 0;
330 }
331
332 void bluesky_cachefile_gc(BlueSkyFS *fs)
333 {
334     GList *files = NULL;
335
336     g_mutex_lock(fs->log->mmap_lock);
337     g_hash_table_foreach(fs->log->mmap_cache, gather_cachefiles, &files);
338
339     /* Sort based on atime.  The atime should be stable since it shouln't be
340      * updated except by threads which can grab the mmap_lock, which we already
341      * hold. */
342     files = g_list_sort(files, compare_cachefiles);
343
344     /* Walk the list of files, starting with the oldest, deleting files if
345      * possible until enough space has been reclaimed. */
346     g_print("\nScanning cache: (total size = %d kB)\n", fs->log->disk_used);
347     while (files != NULL) {
348         BlueSkyCacheFile *cachefile = (BlueSkyCacheFile *)files->data;
349         /* Try to lock the structure, but if the lock is held by another thread
350          * then we'll just skip the file on this pass. */
351         if (g_mutex_trylock(cachefile->lock)) {
352             int64_t age = bluesky_get_current_time() - cachefile->atime;
353             if (bluesky_verbose) {
354                 g_print("%s addr=%p mapcount=%d refcount=%d size=%d atime_age=%f",
355                         cachefile->filename, cachefile->addr, cachefile->mapcount,
356                         cachefile->refcount, cachefile->disk_used, age / 1e6);
357                 if (cachefile->fetching)
358                     g_print(" (fetching)");
359                 g_print("\n");
360             }
361
362             gboolean deletion_candidate = FALSE;
363             if (g_atomic_int_get(&fs->log->disk_used)
364                     > bluesky_options.cache_size
365                 && g_atomic_int_get(&cachefile->refcount) == 0
366                 && g_atomic_int_get(&cachefile->mapcount) == 0)
367             {
368                 deletion_candidate = TRUE;
369             }
370
371             /* Don't allow journal files to be reclaimed until all data is
372              * known to be durably stored in the cloud. */
373             if (cachefile->type == CLOUDLOG_JOURNAL
374                 && cachefile->log_seq >= fs->log->journal_watermark)
375             {
376                 deletion_candidate = FALSE;
377             }
378
379             if (deletion_candidate) {
380                 if (bluesky_verbose) {
381                     g_print("   ...deleting\n");
382                 }
383                 if (unlinkat(fs->log->dirfd, cachefile->filename, 0) < 0) {
384                     fprintf(stderr, "Unable to unlink journal %s: %m\n",
385                             cachefile->filename);
386                 }
387
388                 g_atomic_int_add(&fs->log->disk_used, -cachefile->disk_used);
389                 g_hash_table_remove(fs->log->mmap_cache, cachefile->filename);
390                 bluesky_rangeset_free(cachefile->items);
391                 if (cachefile->prefetches != NULL)
392                     bluesky_rangeset_free(cachefile->prefetches);
393                 g_mutex_unlock(cachefile->lock);
394                 g_mutex_free(cachefile->lock);
395                 g_cond_free(cachefile->cond);
396                 g_free(cachefile->filename);
397                 g_free(cachefile);
398             } else {
399                 g_mutex_unlock(cachefile->lock);
400             }
401         }
402         files = g_list_delete_link(files, files);
403     }
404     g_list_free(files);
405     g_print("\nEnding cache size: %d kB\n", fs->log->disk_used);
406
407     g_mutex_unlock(fs->log->mmap_lock);
408 }
409
410 /* Run the flush daemon for a single iteration, though if it is already
411  * executing returns immediately. */
412 static gpointer flushd_task(BlueSkyFS *fs)
413 {
414     if (!g_mutex_trylock(fs->flushd_lock))
415         return NULL;
416
417     g_print("\nCloudlog cache: %d dirty, %d writeback, %d journal, %d cloud\n",
418             g_atomic_int_get(&fs->cache_log_dirty),
419             g_atomic_int_get(&fs->cache_log_writeback),
420             g_atomic_int_get(&fs->cache_log_journal),
421             g_atomic_int_get(&fs->cache_log_cloud));
422
423     flushd_dirty(fs);
424     flushd_cloud(fs);
425     flushd_clean(fs, 0);
426     bluesky_cachefile_gc(fs);
427
428     /* If running out of disk cache space, make another more aggressive pass to
429      * free up space. */
430     if (g_atomic_int_get(&fs->log->disk_used) > bluesky_options.cache_size) {
431         g_print("Still short on disk space, trying again to free space...\n");
432         flushd_clean(fs, 1);
433         bluesky_cachefile_gc(fs);
434     }
435
436     g_mutex_unlock(fs->flushd_lock);
437
438     return NULL;
439 }
440
441 void bluesky_flushd_invoke(BlueSkyFS *fs)
442 {
443     g_thread_create((GThreadFunc)flushd_task, fs, FALSE, NULL);
444 }
445
446 /* How urgent is flushing out data?  Returns one of several values:
447  *   0 - memory state is fine
448  *   1 - should launch flushd if not already running
449  *   2 - should delay writers while memory frees up
450  *   3 - should block writers until memory frees up
451  */
452 static int compute_pressure(BlueSkyFS *fs)
453 {
454     /* LEVEL 3 */
455     /* Too much dirty data in memory? */
456     if (g_atomic_int_get(&fs->cache_dirty)
457                 + g_atomic_int_get(&fs->cache_log_dirty)
458            > bluesky_watermark_high_dirty)
459     {
460         g_print("pressure: too much dirty data (3)\n");
461         return 3;
462     }
463
464     /* Too much uncommitted data in the journal on disk, not yet flushed to the
465      * cloud? */
466     /*printf("Dirty journals: %d to %d\n",
467            fs->log->journal_watermark, fs->log->seq_num);*/
468     int dirty_limit;
469     dirty_limit = bluesky_options.cache_size / (LOG_SEGMENT_SIZE / 1024) / 2;
470     int dirty_journals = fs->log->seq_num - fs->log->journal_watermark + 1;
471     if (dirty_journals > 1 && dirty_journals >= dirty_limit) {
472         printf("pressure: too many dirty journals (%d >= %d) (3)\n",
473                dirty_journals, dirty_limit);
474         return 3;
475     }
476
477     /* LEVEL 2 */
478     if (g_atomic_int_get(&fs->cache_dirty) > bluesky_watermark_med2_dirty) {
479         g_print("pressure: too much dirty data (2)\n");
480         return 2;
481     }
482
483     if (dirty_journals > 1 && dirty_journals > dirty_limit * 3 / 4) {
484         printf("pressure: many dirty journals (%d), should start writeback (2)\n",
485                dirty_journals);
486         return 2;
487     }
488
489     /* LEVEL 1 */
490     if (g_atomic_int_get(&fs->cache_dirty) > bluesky_watermark_medium_dirty) {
491         g_print("pressure: too much dirty data (1)\n");
492         return 1;
493     }
494
495     if (dirty_journals > 1 && dirty_journals > dirty_limit / 2) {
496         printf("pressure: many dirty journals (%d), should start writeback (1)\n",
497                dirty_journals);
498         return 1;
499     }
500
501     return 0;
502 }
503
504 void bluesky_flushd_invoke_conditional(BlueSkyFS *fs)
505 {
506     int pressure = compute_pressure(fs);
507     if (pressure == 0)
508         return;
509
510     if (bluesky_verbose) {
511         g_log("bluesky/flushd", G_LOG_LEVEL_DEBUG,
512               "Too much data; invoking flushd: dirty=%d",
513               g_atomic_int_get(&fs->cache_dirty));
514     }
515
516     bluesky_flushd_invoke(fs);
517
518     /* If the system is under heavy memory pressure, actually delay execution
519      * so the flush daemon can catch up. */
520     while (pressure > 1) {
521         g_log("bluesky/flushd", G_LOG_LEVEL_DEBUG,
522               "Waiting due to memory pressure, dirty=%d + %d",
523               g_atomic_int_get(&fs->cache_dirty),
524               g_atomic_int_get(&fs->cache_log_dirty));
525         g_mutex_lock(fs->lock);
526         pressure = compute_pressure(fs);
527         if (pressure > 2) {
528             g_cond_wait(fs->flushd_cond, fs->lock);
529         } else if (pressure > 1) {
530             /* Wait for up to 10 seconds. */
531             GTimeVal timeout;
532             g_get_current_time(&timeout);
533             g_time_val_add(&timeout, 10 * 1000 * 1000);
534             g_cond_timed_wait(fs->flushd_cond, fs->lock, &timeout);
535         }
536         g_mutex_unlock(fs->lock);
537         pressure = compute_pressure(fs);
538         if (pressure == 1)
539             break;  /* Do not loop indefinitely for a pressure of 1 */
540     }
541 }
542
543 /* Start a perpetually-running thread that flushes the cache occasionally. */
544 static gpointer flushd_thread(BlueSkyFS *fs)
545 {
546     while (TRUE) {
547         bluesky_flushd_invoke(fs);
548         struct timespec delay;
549         delay.tv_sec = 2;
550         delay.tv_nsec = 0;
551         nanosleep(&delay, NULL);
552     }
553
554     return NULL;
555 }
556
557 void bluesky_flushd_thread_launch(BlueSkyFS *fs)
558 {
559     g_thread_create((GThreadFunc)flushd_thread, fs, FALSE, NULL);
560 }