Merge git+ssh://c09-44.sysnet.ucsd.edu/scratch/bluesky
authorMichael Vrable <mvrable@cs.ucsd.edu>
Mon, 25 Oct 2010 05:43:24 +0000 (22:43 -0700)
committerMichael Vrable <mvrable@cs.ucsd.edu>
Mon, 25 Oct 2010 05:43:24 +0000 (22:43 -0700)
.gitignore
bluesky/cache.c
bluesky/cloudlog.c
bluesky/crypto.c
bluesky/imap.c
bluesky/log.c
bluesky/store-s3.c
cleaner/cleaner

index b633b5a..f5ad579 100644 (file)
@@ -12,6 +12,7 @@ kvstore/lib*.so
 kvstore/kvstore
 logbench/logbench
 microbench/bench
+microbench/lockmem
 microbench/readbench
 microbench/statbench
 nfs3/nfsproxy
index d1c5c84..bfbfe3c 100644 (file)
@@ -6,10 +6,20 @@
  * TODO: Licensing
  */
 
+#define _GNU_SOURCE
+#define _ATFILE_SOURCE
+
+#include <stdio.h>
 #include <stdint.h>
+#include <stdlib.h>
 #include <glib.h>
 #include <string.h>
+#include <errno.h>
 #include <inttypes.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <unistd.h>
 
 #include "bluesky-private.h"
 
@@ -217,9 +227,6 @@ static void drop_caches(BlueSkyInode *inode)
             log->data = NULL;
             bluesky_cloudlog_stats_update(log, 1);
         }
-        if (log->location_flags & CLOUDLOG_CLOUD) {
-            log->location_flags &= ~CLOUDLOG_JOURNAL;
-        }
         g_mutex_unlock(log->lock);
     }
 }
@@ -228,8 +235,11 @@ static void drop_caches(BlueSkyInode *inode)
  * memory-mapped from log file or similar, so the kernel can drop this clean
  * data from memory for us and hence memory management isn't too important.
  * Mainly, we'll want to drop references to data that hasn't been accessed in a
- * while so that it is possible to reclaim log segments on disk. */
-static void flushd_clean(BlueSkyFS *fs)
+ * while so that it is possible to reclaim log segments on disk.
+ *
+ * If aggressive is set, try much harder to drop data from the caches to free
+ * up space. */
+static void flushd_clean(BlueSkyFS *fs, int aggressive)
 {
     g_mutex_lock(fs->lock);
 
@@ -244,7 +254,7 @@ static void flushd_clean(BlueSkyFS *fs)
         inode = fs->accessed_list.prev->data;
 
         uint64_t elapsed = bluesky_get_current_time() - inode->access_time;
-        if (elapsed < CACHE_DROP_DELAY)
+        if (elapsed < CACHE_DROP_DELAY && !aggressive)
             break;
 
         if (bluesky_verbose) {
@@ -275,16 +285,125 @@ static void flushd_clean(BlueSkyFS *fs)
     g_mutex_unlock(fs->lock);
 }
 
+/* Scan through all currently-stored files in the journal/cache and garbage
+ * collect old unused ones, if needed. */
+static void gather_cachefiles(gpointer key, gpointer value, gpointer user_data)
+{
+    GList **files = (GList **)user_data;
+    *files = g_list_prepend(*files, value);
+}
+
+static gint compare_cachefiles(gconstpointer a, gconstpointer b)
+{
+    int64_t ta, tb;
+
+    ta = ((BlueSkyCacheFile *)a)->atime;
+    tb = ((BlueSkyCacheFile *)b)->atime;
+    if (ta < tb)
+        return -1;
+    else if (ta > tb)
+        return 1;
+    else
+        return 0;
+}
+
+void bluesky_cachefile_gc(BlueSkyFS *fs)
+{
+    GList *files = NULL;
+
+    g_mutex_lock(fs->log->mmap_lock);
+    g_hash_table_foreach(fs->log->mmap_cache, gather_cachefiles, &files);
+
+    /* Sort based on atime.  The atime should be stable since it shouln't be
+     * updated except by threads which can grab the mmap_lock, which we already
+     * hold. */
+    files = g_list_sort(files, compare_cachefiles);
+
+    /* Walk the list of files, starting with the oldest, deleting files if
+     * possible until enough space has been reclaimed. */
+    g_print("\nScanning cache: (total size = %d kB)\n", fs->log->disk_used);
+    while (files != NULL) {
+        BlueSkyCacheFile *cachefile = (BlueSkyCacheFile *)files->data;
+        /* Try to lock the structure, but if the lock is held by another thread
+         * then we'll just skip the file on this pass. */
+        if (g_mutex_trylock(cachefile->lock)) {
+            int64_t age = bluesky_get_current_time() - cachefile->atime;
+            g_print("%s addr=%p mapcount=%d refcount=%d atime_age=%f",
+                    cachefile->filename, cachefile->addr, cachefile->mapcount,
+                    cachefile->refcount, age / 1e6);
+            if (cachefile->fetching)
+                g_print(" (fetching)");
+            g_print("\n");
+
+            gboolean deletion_candidate = FALSE;
+            if (g_atomic_int_get(&fs->log->disk_used)
+                    > bluesky_options.cache_size
+                && g_atomic_int_get(&cachefile->refcount) == 0
+                && g_atomic_int_get(&cachefile->mapcount) == 0)
+            {
+                deletion_candidate = TRUE;
+            }
+
+            /* Don't allow journal files to be reclaimed until all data is
+             * known to be durably stored in the cloud. */
+            if (cachefile->type == CLOUDLOG_JOURNAL
+                && cachefile->log_seq >= fs->log->journal_watermark)
+            {
+                deletion_candidate = FALSE;
+            }
+
+            if (deletion_candidate) {
+                g_print("   ...deleting\n");
+                if (unlinkat(fs->log->dirfd, cachefile->filename, 0) < 0) {
+                    fprintf(stderr, "Unable to unlink journal %s: %m\n",
+                            cachefile->filename);
+                }
+
+                g_atomic_int_add(&fs->log->disk_used, -(cachefile->len / 1024));
+                g_hash_table_remove(fs->log->mmap_cache, cachefile->filename);
+                g_mutex_unlock(cachefile->lock);
+                g_mutex_free(cachefile->lock);
+                g_cond_free(cachefile->cond);
+                g_free(cachefile->filename);
+                g_free(cachefile);
+            } else {
+                g_mutex_unlock(cachefile->lock);
+            }
+        }
+        files = g_list_delete_link(files, files);
+    }
+    g_list_free(files);
+    g_print("\nEnding cache size: %d kB\n", fs->log->disk_used);
+
+    g_mutex_unlock(fs->log->mmap_lock);
+}
+
 /* Run the flush daemon for a single iteration, though if it is already
  * executing returns immediately. */
 static gpointer flushd_task(BlueSkyFS *fs)
 {
     if (!g_mutex_trylock(fs->flushd_lock))
         return NULL;
+
+    g_print("\nCloudlog cache: %d dirty, %d writeback, %d journal, %d cloud\n",
+            g_atomic_int_get(&fs->cache_log_dirty),
+            g_atomic_int_get(&fs->cache_log_writeback),
+            g_atomic_int_get(&fs->cache_log_journal),
+            g_atomic_int_get(&fs->cache_log_cloud));
+
     flushd_dirty(fs);
     flushd_cloud(fs);
-    flushd_clean(fs);
+    flushd_clean(fs, 0);
     bluesky_cachefile_gc(fs);
+
+    /* If running out of disk cache space, make another more aggressive pass to
+     * free up space. */
+    if (g_atomic_int_get(&fs->log->disk_used) > bluesky_options.cache_size) {
+        g_print("Still short on disk space, trying again to free space...\n");
+        flushd_clean(fs, 1);
+        bluesky_cachefile_gc(fs);
+    }
+
     g_mutex_unlock(fs->flushd_lock);
 
     return NULL;
index 142104f..e868d99 100644 (file)
@@ -259,6 +259,7 @@ BlueSkyCloudLog *bluesky_cloudlog_get(BlueSkyFS *fs, BlueSkyCloudID id)
     item = g_hash_table_lookup(fs->locations, &id);
     if (item == NULL) {
         item = bluesky_cloudlog_new(fs, &id);
+        bluesky_cloudlog_stats_update(item, 1);
         bluesky_cloudlog_insert_locked(item);
     } else {
         bluesky_cloudlog_ref(item);
@@ -282,6 +283,7 @@ void bluesky_cloudlog_fetch(BlueSkyCloudLog *log)
      * Once that is done, we can fall through the case of remapping the data
      * itself. */
     if (log->type == LOGTYPE_UNKNOWN) {
+        bluesky_cloudlog_stats_update(log, -1);
         BlueSkyRCStr *raw = NULL;
         if ((log->location_flags | log->pending_write) & CLOUDLOG_JOURNAL) {
             raw = bluesky_log_map_object(log->fs, -1, log->log_seq,
@@ -300,6 +302,7 @@ void bluesky_cloudlog_fetch(BlueSkyCloudLog *log)
         g_assert(raw != NULL);
         bluesky_deserialize_cloudlog(log, raw->data, raw->len);
         bluesky_string_unref(raw);
+        bluesky_cloudlog_stats_update(log, 1);
     }
 
     /* At this point all metadata should be available and we need only remap
@@ -389,6 +392,7 @@ BlueSkyCloudPointer bluesky_cloudlog_serialize(BlueSkyCloudLog *log,
         g_mutex_lock(fs->lock);
         InodeMapEntry *entry = bluesky_inode_map_lookup(fs->inode_map,
                                                         log->inum, 1);
+        bluesky_cloudlog_unref_delayed(entry->item);
         entry->item = log;
         bluesky_cloudlog_ref(entry->item);
         g_mutex_unlock(fs->lock);
@@ -529,8 +533,7 @@ void bluesky_cloudlog_decrypt(char *segment, size_t len, BlueSkyCryptKeys *keys)
         if (item_size > remaining_size)
             break;
         if (bluesky_crypt_block_decrypt(data, item_size, keys)) {
-            g_print("Decrypted valid cloud log item at offset %zd\n",
-                    data - segment);
+            /* TODO: Mark block as valid. */
         }
 
         data += item_size;
index 89f6b12..fd47491 100644 (file)
@@ -177,7 +177,7 @@ void bluesky_crypt_block_encrypt(gchar *cloud_block, size_t len,
     }
 
     bluesky_crypt_hmac((char *)&header->crypt_iv,
-                       cloud_block + len - (char *)&header->crypt_iv,
+                       cloud_block + len - (char *)&header->crypt_iv - GUINT32_FROM_LE(header->size3),
                        keys->authentication_key,
                        header->crypt_auth);
 
@@ -205,7 +205,7 @@ gboolean bluesky_crypt_block_decrypt(gchar *cloud_block, size_t len,
     }
 
     bluesky_crypt_hmac((char *)&header->crypt_iv,
-                       cloud_block + len - (char *)&header->crypt_iv,
+                       cloud_block + len - (char *)&header->crypt_iv - GUINT32_FROM_LE(header->size3),
                        keys->authentication_key,
                        hmac_check);
     if (memcmp(hmac_check, header->crypt_auth, CRYPTO_HASH_SIZE) != 0) {
index 9036263..1b9feb8 100644 (file)
@@ -154,6 +154,7 @@ static void bluesky_inode_map_serialize_section(BlueSkyFS *fs,
     log->data = bluesky_string_new_from_gstring(buf);
     bluesky_cloudlog_unref(range->serialized);
     range->serialized = log;
+    bluesky_cloudlog_stats_update(log, 1);
 }
 
 BlueSkyCloudLog *bluesky_inode_map_serialize(BlueSkyFS *fs)
@@ -182,6 +183,7 @@ BlueSkyCloudLog *bluesky_inode_map_serialize(BlueSkyFS *fs)
     }
 
     log->data = bluesky_string_new_from_gstring(buf);
+    bluesky_cloudlog_stats_update(log, 1);
 
     if (updated) {
         return log;
@@ -248,6 +250,7 @@ static void bluesky_inode_map_deserialize(BlueSkyFS *fs, BlueSkyCloudLog *imap)
             InodeMapEntry *entry;
             entry = bluesky_inode_map_lookup(fs->inode_map, *inum, 1);
             entry->inum = GUINT64_FROM_LE(*inum);
+            bluesky_cloudlog_unref_delayed(entry->item);
             entry->item = g_array_index(section->links,
                                         BlueSkyCloudLog *, j);
             bluesky_cloudlog_ref(entry->item);
@@ -312,6 +315,10 @@ gboolean bluesky_checkpoint_load(BlueSkyFS *fs)
         len -= size;
     }
 
+    if (checkpoint_size == 0) {
+        g_error("Unable to locate checkpoint record!\n");
+    }
+
     g_print("Found checkpoint record at %zd (size %zd)\n",
             checkpoint - last->data, checkpoint_size);
 
@@ -327,6 +334,7 @@ gboolean bluesky_checkpoint_load(BlueSkyFS *fs)
     commit->location.offset = checkpoint - last->data;
     commit->location.size = checkpoint_size;
     g_mutex_unlock(commit->lock);
+    bluesky_cloudlog_stats_update(commit, 1);
 
     bluesky_inode_map_deserialize(fs, commit);
     bluesky_cloudlog_unref(commit);
index e5f0321..76d218a 100644 (file)
@@ -310,6 +310,7 @@ BlueSkyCloudLog *bluesky_log_get_commit_point(BlueSkyFS *fs)
     BlueSkyCloudLog *marker = bluesky_cloudlog_new(fs, NULL);
     marker->type = LOGTYPE_JOURNAL_MARKER;
     marker->data = bluesky_string_new(g_strdup(""), 0);
+    bluesky_cloudlog_stats_update(marker, 1);
     bluesky_cloudlog_sync(marker);
 
     g_mutex_lock(marker->lock);
@@ -332,6 +333,7 @@ void bluesky_log_write_commit_point(BlueSkyFS *fs, BlueSkyCloudLog *marker)
     g_string_append_len(loc, (const gchar *)&seq, sizeof(seq));
     g_string_append_len(loc, (const gchar *)&offset, sizeof(offset));
     commit->data = bluesky_string_new_from_gstring(loc);
+    bluesky_cloudlog_stats_update(commit, 1);
     bluesky_cloudlog_sync(commit);
 
     g_mutex_lock(commit->lock);
@@ -532,98 +534,6 @@ void bluesky_mmap_unref(BlueSkyCacheFile *mmap)
     }
 }
 
-/* Scan through all currently-stored files in the journal/cache and garbage
- * collect old unused ones, if needed. */
-static void gather_cachefiles(gpointer key, gpointer value, gpointer user_data)
-{
-    GList **files = (GList **)user_data;
-    *files = g_list_prepend(*files, value);
-}
-
-static gint compare_cachefiles(gconstpointer a, gconstpointer b)
-{
-    int64_t ta, tb;
-
-    ta = ((BlueSkyCacheFile *)a)->atime;
-    tb = ((BlueSkyCacheFile *)b)->atime;
-    if (ta < tb)
-        return -1;
-    else if (ta > tb)
-        return 1;
-    else
-        return 0;
-}
-
-void bluesky_cachefile_gc(BlueSkyFS *fs)
-{
-    GList *files = NULL;
-
-    g_mutex_lock(fs->log->mmap_lock);
-    g_hash_table_foreach(fs->log->mmap_cache, gather_cachefiles, &files);
-
-    /* Sort based on atime.  The atime should be stable since it shouln't be
-     * updated except by threads which can grab the mmap_lock, which we already
-     * hold. */
-    files = g_list_sort(files, compare_cachefiles);
-
-    /* Walk the list of files, starting with the oldest, deleting files if
-     * possible until enough space has been reclaimed. */
-    g_print("\nScanning cache: (total size = %d kB)\n", fs->log->disk_used);
-    while (files != NULL) {
-        BlueSkyCacheFile *cachefile = (BlueSkyCacheFile *)files->data;
-        /* Try to lock the structure, but if the lock is held by another thread
-         * then we'll just skip the file on this pass. */
-        if (g_mutex_trylock(cachefile->lock)) {
-            int64_t age = bluesky_get_current_time() - cachefile->atime;
-            g_print("%s addr=%p mapcount=%d refcount=%d atime_age=%f",
-                    cachefile->filename, cachefile->addr, cachefile->mapcount,
-                    cachefile->refcount, age / 1e6);
-            if (cachefile->fetching)
-                g_print(" (fetching)");
-            g_print("\n");
-
-            gboolean deletion_candidate = FALSE;
-            if (g_atomic_int_get(&fs->log->disk_used)
-                    > bluesky_options.cache_size
-                && g_atomic_int_get(&cachefile->refcount) == 0
-                && g_atomic_int_get(&cachefile->mapcount) == 0)
-            {
-                deletion_candidate = TRUE;
-            }
-
-            /* Don't allow journal files to be reclaimed until all data is
-             * known to be durably stored in the cloud. */
-            if (cachefile->type == CLOUDLOG_JOURNAL
-                && cachefile->log_seq >= fs->log->journal_watermark)
-            {
-                deletion_candidate = FALSE;
-            }
-
-            if (deletion_candidate) {
-                g_print("   ...deleting\n");
-                if (unlinkat(fs->log->dirfd, cachefile->filename, 0) < 0) {
-                    fprintf(stderr, "Unable to unlink journal %s: %m\n",
-                            cachefile->filename);
-                }
-
-                g_atomic_int_add(&fs->log->disk_used, -(cachefile->len / 1024));
-                g_hash_table_remove(fs->log->mmap_cache, cachefile->filename);
-                g_mutex_unlock(cachefile->lock);
-                g_mutex_free(cachefile->lock);
-                g_cond_free(cachefile->cond);
-                g_free(cachefile->filename);
-                g_free(cachefile);
-            } else {
-                g_mutex_unlock(cachefile->lock);
-            }
-        }
-        files = g_list_delete_link(files, files);
-    }
-    g_list_free(files);
-
-    g_mutex_unlock(fs->log->mmap_lock);
-}
-
 /******************************* JOURNAL REPLAY *******************************
  * The journal replay code is used to recover filesystem state after a
  * filesystem restart.  We first look for the most recent commit record in the
@@ -724,9 +634,11 @@ static void reload_item(BlueSkyCloudLog *log_item,
     /*const BlueSkyCloudPointer *data3
         = (const BlueSkyCloudPointer *)(data + len1 + len2);*/
 
+    bluesky_cloudlog_stats_update(log_item, -1);
     bluesky_string_unref(log_item->data);
     log_item->data = NULL;
     log_item->location_flags = CLOUDLOG_JOURNAL;
+    bluesky_cloudlog_stats_update(log_item, 1);
 
     BlueSkyCloudID id0;
     memset(&id0, 0, sizeof(id0));
index 7f23434..4735de4 100644 (file)
@@ -42,6 +42,7 @@ struct put_info {
 struct list_info {
     int success;
     char *last_entry;
+    gboolean truncated;
 };
 
 static S3Status s3store_get_handler(int bufferSize, const char *buffer,
@@ -145,13 +146,14 @@ static S3Status s3store_list_handler(int isTruncated,
         g_free(info->last_entry);
         info->last_entry = g_strdup(contents[contentsCount - 1].key);
     }
+    info->truncated = isTruncated;
     return S3StatusOK;
 }
 
 static char *s3store_lookup_last(gpointer s, const char *prefix)
 {
     S3Store *store = (S3Store *)s;
-    struct list_info info = {0, NULL};
+    struct list_info info = {0, NULL, FALSE};
 
     struct S3ListBucketHandler handler;
     handler.responseHandler.propertiesCallback
@@ -161,9 +163,15 @@ static char *s3store_lookup_last(gpointer s, const char *prefix)
 
     char *marker = NULL;
 
-    S3_list_bucket(&store->bucket, prefix, marker, NULL, 1024, NULL, &handler, &info);
+    do {
+        S3_list_bucket(&store->bucket, prefix, marker, NULL, 1024, NULL,
+                       &handler, &info);
+        g_free(marker);
+        marker = g_strdup(info.last_entry);
+        g_print("Last key: %s\n", info.last_entry);
+    } while (info.truncated);
 
-    g_print("Last key: %s\n", info.last_entry);
+    g_free(marker);
 
     return info.last_entry;
 }
index 95bbb08..c4b1222 100755 (executable)
@@ -8,7 +8,7 @@
 # Copyright (C) 2010  The Regents of the University of California
 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
 
-import base64, os, re, struct, sys
+import base64, os, re, struct, sys, time
 import boto
 from boto.s3.key import Key
 
@@ -67,15 +67,31 @@ class FileBackend:
         m = re.match(r"^log-(\d+)-(\d+)$", name)
         if m: return (int(m.group(1)), int(m.group(2)))
 
+def retry_wrap(method):
+    def wrapped(self, *args, **kwargs):
+        for retries in range(3):
+            try:
+                return method(self, *args, **kwargs)
+            except:
+                print >>sys.stderr, "S3 operation failed, retrying..."
+                self.connect()
+                time.sleep(1.0)
+        return method(self, *args, **kwargs)
+    return wrapped
+
 class S3Backend:
     """An interface to BlueSky where the log segments are on in Amazon S3."""
 
     def __init__(self, bucket, path='', cachedir="."):
-        self.conn = boto.connect_s3(is_secure=False)
-        self.bucket = self.conn.get_bucket(bucket)
+        self.bucket_name = bucket
         self.path = path
         self.cachedir = cachedir
         self.cache = {}
+        self.connect()
+
+    def connect(self):
+        self.conn = boto.connect_s3(is_secure=False)
+        self.bucket = self.conn.get_bucket(self.bucket_name)
 
     def list(self):
         files = []
@@ -83,6 +99,7 @@ class S3Backend:
             files.append((k.key, k.size))
         return files
 
+    @retry_wrap
     def read(self, filename, offset=0, length=None):
         if filename in self.cache:
             fp = open(os.path.join(self.cachedir, filename), 'rb')
@@ -106,6 +123,7 @@ class S3Backend:
                 data = data[0:length]
             return data
 
+    @retry_wrap
     def write(self, filename, data):
         k = Key(self.bucket)
         k.key = self.path + filename
@@ -113,6 +131,7 @@ class S3Backend:
         if filename in self.cache:
             del self.cache[filename]
 
+    @retry_wrap
     def delete(self, filename):
         k = Key(self.bucket)
         k.key = self.path + filename