Add write throttling based on the size of the uncommitted journal
[bluesky.git] / bluesky / cache.c
1 /* Blue Sky: File Systems in the Cloud
2  *
3  * Copyright (C) 2009  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * TODO: Licensing
7  */
8
9 #define _GNU_SOURCE
10 #define _ATFILE_SOURCE
11
12 #include <stdio.h>
13 #include <stdint.h>
14 #include <stdlib.h>
15 #include <glib.h>
16 #include <string.h>
17 #include <errno.h>
18 #include <inttypes.h>
19 #include <sys/types.h>
20 #include <sys/stat.h>
21 #include <fcntl.h>
22 #include <unistd.h>
23
24 #include "bluesky-private.h"
25
26 #define WRITEBACK_DELAY (20 * 1000000)
27 #define CACHE_DROP_DELAY (20 * 1000000)
28
29 /* Filesystem caching and cache coherency.  There are actually a couple of
30  * different tasks that are performed here:
31  *   - Forcing data to the log if needed to reclaim memory or simply if the
32  *     data has been dirty in memory long enough.
33  *   - Writing batches of data to the cloud.
34  */
35
36 static void flushd_dirty_inode(BlueSkyInode *inode)
37 {
38     BlueSkyFS *fs = inode->fs;
39
40     g_mutex_lock(fs->lock);
41     bluesky_list_unlink(&fs->unlogged_list, inode->unlogged_list);
42     inode->unlogged_list = NULL;
43     g_mutex_unlock(fs->lock);
44
45     /* Inode is clean; nothing to do. */
46     if (inode->change_count == inode->change_commit)
47         return;
48
49     if (bluesky_verbose) {
50         g_log("bluesky/flushd", G_LOG_LEVEL_DEBUG,
51             "Starting flush of inode %"PRIu64, inode->inum);
52     }
53
54     bluesky_inode_start_sync(inode);
55 }
56
57 /* Check whether memory usage may have dropped below critical thresholds for
58  * waking up waiting threads. */
59 void flushd_check_wakeup(BlueSkyFS *fs)
60 {
61     int dirty = g_atomic_int_get(&fs->cache_dirty);
62     dirty += g_atomic_int_get(&fs->cache_log_dirty);
63
64     if (dirty <= bluesky_watermark_high_dirty)
65         g_cond_broadcast(fs->flushd_cond);
66 }
67
68 /* Try to flush dirty data to disk, either due to memory pressure or due to
69  * timeouts. */
70 static void flushd_dirty(BlueSkyFS *fs)
71 {
72     int64_t start_time = bluesky_get_current_time();
73     g_mutex_lock(fs->lock);
74
75     while (1) {
76         BlueSkyInode *inode;
77         if (fs->unlogged_list.prev == NULL)
78             break;
79         inode = fs->unlogged_list.prev->data;
80
81         if (bluesky_verbose) {
82             g_log("bluesky/flushd", G_LOG_LEVEL_DEBUG,
83                   "Considering flushing inode %"PRIu64, inode->inum);
84         }
85
86         /* Stop processing dirty inodes if we both have enough memory available
87          * and the oldest inode is sufficiently new that it need not be flushed
88          * out. */
89         uint64_t elapsed = bluesky_get_current_time() - inode->change_time;
90         if (g_atomic_int_get(&fs->cache_dirty) < bluesky_watermark_low_dirty
91                 && elapsed < WRITEBACK_DELAY)
92             break;
93         if (inode->change_time > start_time)
94             break;
95
96         bluesky_inode_ref(inode);
97
98         g_mutex_unlock(fs->lock);
99
100         g_mutex_lock(inode->lock);
101         flushd_dirty_inode(inode);
102         g_mutex_unlock(inode->lock);
103         bluesky_inode_unref(inode);
104
105         g_mutex_lock(fs->lock);
106         flushd_check_wakeup(fs);
107     }
108
109     g_cond_broadcast(fs->flushd_cond);
110
111     g_mutex_unlock(fs->lock);
112 }
113
114 /* Try to flush dirty data to the cloud.  This will take a snapshot of the
115  * entire filesystem (though only point-in-time consistent for isolated inodes
116  * and not the filesystem as a whole) and ensure all data is written to the
117  * cloud.  When the write completes, we will allow old journal segments (those
118  * that were fully written _before_ the snapshot process started) to be garbage
119  * collected.  Newer journal segments can't be collected yet since they may
120  * still contain data which has not been written persistently to the cloud.
121  *
122  * Note that some of this code relies on the fact that only this thread of
123  * control (running flushd_cloud) is manipulating the inode map, and so
124  * concurrent updates to the inode map are prevented even without the
125  * filesystem lock held.  Take great care if allowing multi-threaded access to
126  * the inode map... */
127 static void flushd_cloud(BlueSkyFS *fs)
128 {
129     g_mutex_lock(fs->lock);
130
131     /* TODO: Locking?  Since we're reading a single variable this is probably
132      * atomic but a lock could be safer. */
133     BlueSkyCloudLog *marker = bluesky_log_get_commit_point(fs);
134     int journal_seq_start = fs->log->seq_num;
135
136     while (1) {
137         BlueSkyInode *inode;
138         if (fs->dirty_list.prev == NULL)
139             break;
140         inode = fs->dirty_list.prev->data;
141
142         if (bluesky_verbose) {
143             g_log("bluesky/flushd", G_LOG_LEVEL_DEBUG,
144                   "Flushing inode %"PRIu64" to cloud", inode->inum);
145         }
146
147         bluesky_inode_ref(inode);
148
149         g_mutex_unlock(fs->lock);
150
151         g_mutex_lock(inode->lock);
152         g_assert(inode->change_cloud == inode->change_commit);
153         g_mutex_lock(fs->lock);
154         bluesky_list_unlink(&fs->dirty_list, inode->dirty_list);
155         inode->dirty_list = NULL;
156         g_mutex_unlock(fs->lock);
157
158         BlueSkyCloudLog *log = inode->committed_item;
159         inode->committed_item = NULL;
160         g_mutex_unlock(inode->lock);
161
162         if (log != NULL)
163             bluesky_cloudlog_serialize(log, fs);
164         bluesky_inode_unref(inode);
165         bluesky_cloudlog_unref(log);
166
167         g_mutex_lock(fs->lock);
168     }
169     g_mutex_unlock(fs->lock);
170
171     /* Write out any updated inode map entries, so that all inodes just written
172      * can be located, and then a final commit record. */
173     BlueSkyCloudLog *commit_record = bluesky_inode_map_serialize(fs);
174     if (commit_record != NULL) {
175         bluesky_cloudlog_serialize(commit_record, fs);
176     } else {
177         g_print("No need for a checkpoint record...\n");
178     }
179
180     bluesky_cloudlog_flush(fs);
181
182     /* Wait until all segments have been written to the cloud, so that it
183      * becomes safe to free up journal segments. */
184     while (fs->log_state->pending_segments != NULL) {
185         SerializedRecord *segment
186             = (SerializedRecord *)fs->log_state->pending_segments->data;
187         g_mutex_lock(segment->lock);
188         while (!segment->complete)
189             g_cond_wait(segment->cond, segment->lock);
190         g_mutex_unlock(segment->lock);
191
192         g_mutex_free(segment->lock);
193         g_cond_free(segment->cond);
194         g_free(segment);
195
196         fs->log_state->pending_segments
197             = g_list_delete_link(fs->log_state->pending_segments,
198                                  fs->log_state->pending_segments);
199     }
200
201     bluesky_log_write_commit_point(fs, marker);
202     bluesky_cloudlog_unref(commit_record);
203
204     g_print("All segments have been flushed, journal < %d is clean\n",
205             journal_seq_start);
206
207     fs->log->journal_watermark = journal_seq_start;
208
209     bluesky_inode_map_minimize(fs);
210 }
211
212 /* Drop cached data for a given inode, if it is clean.  inode must be locked. */
213 static void drop_caches(BlueSkyInode *inode)
214 {
215     if (inode->type == BLUESKY_REGULAR)
216         bluesky_file_drop_cached(inode);
217
218     BlueSkyCloudLog *log = inode->committed_item;
219     if (log != NULL) {
220         g_mutex_lock(log->lock);
221         if (log->data != NULL
222             && g_atomic_int_get(&log->data_lock_count) == 0
223             && (log->location_flags != 0))
224         {
225             bluesky_cloudlog_stats_update(log, -1);
226             bluesky_string_unref(log->data);
227             log->data = NULL;
228             bluesky_cloudlog_stats_update(log, 1);
229         }
230         g_mutex_unlock(log->lock);
231     }
232 }
233
234 /* Drop clean data from the cache if needed.  Clean data should generally be
235  * memory-mapped from log file or similar, so the kernel can drop this clean
236  * data from memory for us and hence memory management isn't too important.
237  * Mainly, we'll want to drop references to data that hasn't been accessed in a
238  * while so that it is possible to reclaim log segments on disk.
239  *
240  * If aggressive is set, try much harder to drop data from the caches to free
241  * up space. */
242 static void flushd_clean(BlueSkyFS *fs, int aggressive)
243 {
244     g_mutex_lock(fs->lock);
245
246     size_t inode_count = g_hash_table_size(fs->inodes);
247     if (!inode_count)
248         inode_count = 1;
249
250     while (inode_count-- > 0) {
251         BlueSkyInode *inode;
252         if (fs->accessed_list.prev == NULL)
253             break;
254         inode = fs->accessed_list.prev->data;
255
256         uint64_t elapsed = bluesky_get_current_time() - inode->access_time;
257         if (elapsed < CACHE_DROP_DELAY && !aggressive)
258             break;
259
260         if (bluesky_verbose) {
261             g_log("bluesky/flushd", G_LOG_LEVEL_DEBUG,
262                   "Considering dropping cached data for inode %"PRIu64,
263                   inode->inum);
264         }
265
266         bluesky_inode_ref(inode);
267
268         g_mutex_unlock(fs->lock);
269
270         g_mutex_lock(inode->lock);
271
272         g_mutex_lock(fs->lock);
273         bluesky_list_unlink(&fs->accessed_list, inode->accessed_list);
274         inode->accessed_list = bluesky_list_prepend(&fs->accessed_list, inode);
275         g_mutex_unlock(fs->lock);
276
277         drop_caches(inode);
278
279         g_mutex_unlock(inode->lock);
280         bluesky_inode_unref(inode);
281
282         g_mutex_lock(fs->lock);
283     }
284
285     g_mutex_unlock(fs->lock);
286 }
287
288 /* Scan through all currently-stored files in the journal/cache and garbage
289  * collect old unused ones, if needed. */
290 static void gather_cachefiles(gpointer key, gpointer value, gpointer user_data)
291 {
292     GList **files = (GList **)user_data;
293     *files = g_list_prepend(*files, value);
294 }
295
296 static gint compare_cachefiles(gconstpointer a, gconstpointer b)
297 {
298     int64_t ta, tb;
299
300     ta = ((BlueSkyCacheFile *)a)->atime;
301     tb = ((BlueSkyCacheFile *)b)->atime;
302     if (ta < tb)
303         return -1;
304     else if (ta > tb)
305         return 1;
306     else
307         return 0;
308 }
309
310 void bluesky_cachefile_gc(BlueSkyFS *fs)
311 {
312     GList *files = NULL;
313
314     g_mutex_lock(fs->log->mmap_lock);
315     g_hash_table_foreach(fs->log->mmap_cache, gather_cachefiles, &files);
316
317     /* Sort based on atime.  The atime should be stable since it shouln't be
318      * updated except by threads which can grab the mmap_lock, which we already
319      * hold. */
320     files = g_list_sort(files, compare_cachefiles);
321
322     /* Walk the list of files, starting with the oldest, deleting files if
323      * possible until enough space has been reclaimed. */
324     g_print("\nScanning cache: (total size = %d kB)\n", fs->log->disk_used);
325     while (files != NULL) {
326         BlueSkyCacheFile *cachefile = (BlueSkyCacheFile *)files->data;
327         /* Try to lock the structure, but if the lock is held by another thread
328          * then we'll just skip the file on this pass. */
329         if (g_mutex_trylock(cachefile->lock)) {
330             int64_t age = bluesky_get_current_time() - cachefile->atime;
331             if (bluesky_verbose) {
332                 g_print("%s addr=%p mapcount=%d refcount=%d size=%d atime_age=%f",
333                         cachefile->filename, cachefile->addr, cachefile->mapcount,
334                         cachefile->refcount, cachefile->disk_used, age / 1e6);
335                 if (cachefile->fetching)
336                     g_print(" (fetching)");
337                 g_print("\n");
338             }
339
340             gboolean deletion_candidate = FALSE;
341             if (g_atomic_int_get(&fs->log->disk_used)
342                     > bluesky_options.cache_size
343                 && g_atomic_int_get(&cachefile->refcount) == 0
344                 && g_atomic_int_get(&cachefile->mapcount) == 0)
345             {
346                 deletion_candidate = TRUE;
347             }
348
349             /* Don't allow journal files to be reclaimed until all data is
350              * known to be durably stored in the cloud. */
351             if (cachefile->type == CLOUDLOG_JOURNAL
352                 && cachefile->log_seq >= fs->log->journal_watermark)
353             {
354                 deletion_candidate = FALSE;
355             }
356
357             if (deletion_candidate) {
358                 if (bluesky_verbose) {
359                     g_print("   ...deleting\n");
360                 }
361                 if (unlinkat(fs->log->dirfd, cachefile->filename, 0) < 0) {
362                     fprintf(stderr, "Unable to unlink journal %s: %m\n",
363                             cachefile->filename);
364                 }
365
366                 g_atomic_int_add(&fs->log->disk_used, -cachefile->disk_used);
367                 g_hash_table_remove(fs->log->mmap_cache, cachefile->filename);
368                 bluesky_rangeset_free(cachefile->items);
369                 if (cachefile->prefetches != NULL)
370                     bluesky_rangeset_free(cachefile->prefetches);
371                 g_mutex_unlock(cachefile->lock);
372                 g_mutex_free(cachefile->lock);
373                 g_cond_free(cachefile->cond);
374                 g_free(cachefile->filename);
375                 g_free(cachefile);
376             } else {
377                 g_mutex_unlock(cachefile->lock);
378             }
379         }
380         files = g_list_delete_link(files, files);
381     }
382     g_list_free(files);
383     g_print("\nEnding cache size: %d kB\n", fs->log->disk_used);
384
385     g_mutex_unlock(fs->log->mmap_lock);
386 }
387
388 /* Run the flush daemon for a single iteration, though if it is already
389  * executing returns immediately. */
390 static gpointer flushd_task(BlueSkyFS *fs)
391 {
392     if (!g_mutex_trylock(fs->flushd_lock))
393         return NULL;
394
395     g_print("\nCloudlog cache: %d dirty, %d writeback, %d journal, %d cloud\n",
396             g_atomic_int_get(&fs->cache_log_dirty),
397             g_atomic_int_get(&fs->cache_log_writeback),
398             g_atomic_int_get(&fs->cache_log_journal),
399             g_atomic_int_get(&fs->cache_log_cloud));
400
401     flushd_dirty(fs);
402     flushd_cloud(fs);
403     flushd_clean(fs, 0);
404     bluesky_cachefile_gc(fs);
405
406     /* If running out of disk cache space, make another more aggressive pass to
407      * free up space. */
408     if (g_atomic_int_get(&fs->log->disk_used) > bluesky_options.cache_size) {
409         g_print("Still short on disk space, trying again to free space...\n");
410         flushd_clean(fs, 1);
411         bluesky_cachefile_gc(fs);
412     }
413
414     g_mutex_unlock(fs->flushd_lock);
415
416     return NULL;
417 }
418
419 void bluesky_flushd_invoke(BlueSkyFS *fs)
420 {
421     g_thread_create((GThreadFunc)flushd_task, fs, FALSE, NULL);
422 }
423
424 /* How urgent is flushing out data?  Returns one of several values:
425  *   0 - memory state is fine
426  *   1 - should launch flushd if not already running
427  *   2 - should block writers until memory frees up
428  */
429 static int compute_pressure(BlueSkyFS *fs)
430 {
431     /* LEVEL 2 */
432     /* Too much dirty data in memory? */
433     if (g_atomic_int_get(&fs->cache_dirty)
434                 + g_atomic_int_get(&fs->cache_log_dirty)
435            > bluesky_watermark_high_dirty)
436         return 2;
437
438     /* Too much uncommitted data in the journal on disk, not yet flushed to the
439      * cloud? */
440     printf("Dirty journals: %d to %d\n",
441            fs->log->journal_watermark, fs->log->seq_num);
442     int dirty_limit;
443     dirty_limit = bluesky_options.cache_size / (LOG_SEGMENT_SIZE / 1024) / 2;
444     int dirty_journals = fs->log->seq_num - fs->log->journal_watermark + 1;
445     if (dirty_journals > 1 && dirty_journals >= dirty_limit) {
446         printf("Too many dirty journals (%d >= %d)\n",
447                dirty_journals, dirty_limit);
448         return 2;
449     }
450
451     /* LEVEL 1 */
452     if (g_atomic_int_get(&fs->cache_dirty) < bluesky_watermark_medium_dirty)
453         return 1;
454
455     if (dirty_journals > 1 && dirty_journals > dirty_limit / 2) {
456         printf("Many dirty journals (%d), should start writeback\n",
457                dirty_journals);
458         return 1;
459     }
460
461     return 0;
462 }
463
464 void bluesky_flushd_invoke_conditional(BlueSkyFS *fs)
465 {
466     int pressure = compute_pressure(fs);
467     if (pressure == 0)
468         return;
469
470     if (bluesky_verbose) {
471         g_log("bluesky/flushd", G_LOG_LEVEL_DEBUG,
472               "Too much data; invoking flushd: dirty=%d",
473               g_atomic_int_get(&fs->cache_dirty));
474     }
475
476     bluesky_flushd_invoke(fs);
477
478     /* If the system is under heavy memory pressure, actually delay execution
479      * so the flush daemon can catch up. */
480     while (pressure > 1) {
481         g_log("bluesky/flushd", G_LOG_LEVEL_DEBUG,
482               "Waiting due to memory pressure, dirty=%d + %d",
483               g_atomic_int_get(&fs->cache_dirty),
484               g_atomic_int_get(&fs->cache_log_dirty));
485         g_mutex_lock(fs->lock);
486         pressure = compute_pressure(fs);
487         if (pressure > 1)
488             g_cond_wait(fs->flushd_cond, fs->lock);
489         g_mutex_unlock(fs->lock);
490         pressure = compute_pressure(fs);
491     }
492 }
493
494 /* Start a perpetually-running thread that flushes the cache occasionally. */
495 static gpointer flushd_thread(BlueSkyFS *fs)
496 {
497     while (TRUE) {
498         bluesky_flushd_invoke(fs);
499         struct timespec delay;
500         delay.tv_sec = 2;
501         delay.tv_nsec = 0;
502         nanosleep(&delay, NULL);
503     }
504
505     return NULL;
506 }
507
508 void bluesky_flushd_thread_launch(BlueSkyFS *fs)
509 {
510     g_thread_create((GThreadFunc)flushd_thread, fs, FALSE, NULL);
511 }