projects
/
bluesky.git
/ commitdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
| commitdiff |
tree
raw
|
patch
|
inline
| side by side (parent:
c0d5239
)
Track journal files which contain dirty data and which can be reclaimed.
author
Michael Vrable
<mvrable@cs.ucsd.edu>
Wed, 18 Aug 2010 20:45:40 +0000
(13:45 -0700)
committer
Michael Vrable
<mvrable@cs.ucsd.edu>
Wed, 18 Aug 2010 20:45:40 +0000
(13:45 -0700)
bluesky/bluesky-private.h
patch
|
blob
|
history
bluesky/cloudlog.c
patch
|
blob
|
history
bluesky/debug.c
patch
|
blob
|
history
bluesky/inode.c
patch
|
blob
|
history
bluesky/log.c
patch
|
blob
|
history
diff --git
a/bluesky/bluesky-private.h
b/bluesky/bluesky-private.h
index
73f38e1
..
c066fd3
100644
(file)
--- a/
bluesky/bluesky-private.h
+++ b/
bluesky/bluesky-private.h
@@
-192,6
+192,13
@@
struct _BlueSkyCloudLog {
int location_flags;
int pending_read, pending_write;
int location_flags;
int pending_read, pending_write;
+ // If the object is not yet flushed to cloud storage but is written to a
+ // journal file locally, a reference to that journal file so that we can
+ // keep the dirty_refs count updated. When the object is deleted or
+ // becomes clean, decrement the dirty_refs counter of the journal file and
+ // set this pointer to NULL.
+ BlueSkyCacheFile *dirty_journal;
+
// A stable identifier for the object (only changes when authenticated data
// is written out, but stays the same when the in-cloud cleaner relocates
// the object).
// A stable identifier for the object (only changes when authenticated data
// is written out, but stays the same when the in-cloud cleaner relocates
// the object).
@@
-248,12
+255,16
@@
uint32_t crc32c(uint32_t crc, const char *buf, unsigned int length);
uint32_t crc32c_finalize(uint32_t crc);
struct _BlueSkyLog {
uint32_t crc32c_finalize(uint32_t crc);
struct _BlueSkyLog {
+ BlueSkyFS *fs;
char *log_directory;
GAsyncQueue *queue;
int fd, dirfd;
int seq_num;
GSList *committed;
char *log_directory;
GAsyncQueue *queue;
int fd, dirfd;
int seq_num;
GSList *committed;
+ /* The currently-open log file. */
+ BlueSkyCacheFile *current_log;
+
/* Cache of log segments which have been memory-mapped. */
GMutex *mmap_lock;
GHashTable *mmap_cache;
/* Cache of log segments which have been memory-mapped. */
GMutex *mmap_lock;
GHashTable *mmap_cache;
@@
-279,8
+290,13
@@
struct _BlueSkyCacheFile {
size_t len;
BlueSkyFS *fs;
BlueSkyLog *log;
size_t len;
BlueSkyFS *fs;
BlueSkyLog *log;
- gboolean fetching, ready;
+ gboolean fetching, ready;
// Cloud data: downloading or ready for use
int64_t atime; // Access time, for cache management
int64_t atime; // Access time, for cache management
+
+ /* Journal: Count of objects which are not yet committed to cloud storage
+ * but need to be; a non-zero value prevents the journal file from being
+ * deleted. */
+ gint dirty_refs;
};
BlueSkyLog *bluesky_log_new(const char *log_directory);
};
BlueSkyLog *bluesky_log_new(const char *log_directory);
@@
-289,7
+305,10
@@
void bluesky_log_finish_all(GList *log_items);
BlueSkyRCStr *bluesky_log_map_object(BlueSkyFS *fs, int log_dir, int log_seq,
int log_offset, int log_size);
void bluesky_mmap_unref(BlueSkyCacheFile *mmap);
BlueSkyRCStr *bluesky_log_map_object(BlueSkyFS *fs, int log_dir, int log_seq,
int log_offset, int log_size);
void bluesky_mmap_unref(BlueSkyCacheFile *mmap);
+void bluesky_cachefile_unref(BlueSkyCacheFile *cachefile);
+BlueSkyCacheFile *bluesky_cachefile_lookup(BlueSkyFS *fs,
+ int clouddir, int log_seq);
void bluesky_cachefile_gc(BlueSkyFS *fs);
#ifdef __cplusplus
void bluesky_cachefile_gc(BlueSkyFS *fs);
#ifdef __cplusplus
diff --git
a/bluesky/cloudlog.c
b/bluesky/cloudlog.c
index
3809b00
..
196eeb8
100644
(file)
--- a/
bluesky/cloudlog.c
+++ b/
bluesky/cloudlog.c
@@
-158,6
+158,8
@@
void bluesky_cloudlog_unref(BlueSkyCloudLog *log)
}
g_array_unref(log->links);
bluesky_string_unref(log->data);
}
g_array_unref(log->links);
bluesky_string_unref(log->data);
+ if (log->dirty_journal != NULL)
+ g_atomic_int_add(&log->dirty_journal->dirty_refs, -1);
g_free(log);
}
}
g_free(log);
}
}
@@
-260,8
+262,14
@@
BlueSkyCloudPointer bluesky_cloudlog_serialize(BlueSkyCloudLog *log,
g_string_append_len(state->data, (const char *)&header, sizeof(header));
g_string_append_len(state->data, log->data->data, log->data->len);
g_string_append_len(state->data, (const char *)&header, sizeof(header));
g_string_append_len(state->data, log->data->data, log->data->len);
+ /* TODO: We should mark the objects as committed on the cloud until the
+ * data is flushed and acknowledged. */
log->location_flags |= CLOUDLOG_CLOUD;
bluesky_cloudlog_stats_update(log, 1);
log->location_flags |= CLOUDLOG_CLOUD;
bluesky_cloudlog_stats_update(log, 1);
+ if (log->dirty_journal != NULL) {
+ g_atomic_int_add(&log->dirty_journal->dirty_refs, -1);
+ log->dirty_journal = NULL;
+ }
g_mutex_unlock(log->lock);
if (state->data->len > CLOUDLOG_SEGMENT_SIZE)
g_mutex_unlock(log->lock);
if (state->data->len > CLOUDLOG_SEGMENT_SIZE)
diff --git
a/bluesky/debug.c
b/bluesky/debug.c
index
207cb7b
..
0a5f254
100644
(file)
--- a/
bluesky/debug.c
+++ b/
bluesky/debug.c
@@
-56,9
+56,9
@@
static void cache_dump(gpointer key, gpointer value, gpointer user_data)
BlueSkyCacheFile *cache = (BlueSkyCacheFile *)value;
int64_t age = bluesky_get_current_time() - cache->atime;
BlueSkyCacheFile *cache = (BlueSkyCacheFile *)value;
int64_t age = bluesky_get_current_time() - cache->atime;
- g_print("%s addr=%p mapcount=%d refcount=%d atime_age=%f",
+ g_print("%s addr=%p mapcount=%d refcount=%d
dirty=%d
atime_age=%f",
cache->filename, cache->addr, cache->mapcount, cache->refcount,
cache->filename, cache->addr, cache->mapcount, cache->refcount,
- age / 1e6);
+
cache->dirty_refs,
age / 1e6);
if (cache->fetching)
g_print(" (fetching)");
g_print("\n");
if (cache->fetching)
g_print(" (fetching)");
g_print("\n");
diff --git
a/bluesky/inode.c
b/bluesky/inode.c
index
2696713
..
d284479
100644
(file)
--- a/
bluesky/inode.c
+++ b/
bluesky/inode.c
@@
-109,6
+109,7
@@
BlueSkyFS *bluesky_init_fs(gchar *name, BlueSkyStore *store)
if (fs != NULL) {
fs->store = store;
fs->log = bluesky_log_new("journal");
if (fs != NULL) {
fs->store = store;
fs->log = bluesky_log_new("journal");
+ fs->log->fs = fs;
g_print("Loaded filesystem superblock\n");
g_free(fs->name);
fs->name = g_strdup(name);
g_print("Loaded filesystem superblock\n");
g_free(fs->name);
fs->name = g_strdup(name);
@@
-121,6
+122,7
@@
BlueSkyFS *bluesky_init_fs(gchar *name, BlueSkyStore *store)
BlueSkyFS *fs = bluesky_new_fs(name);
fs->store = store;
fs->log = bluesky_log_new("journal");
BlueSkyFS *fs = bluesky_new_fs(name);
fs->store = store;
fs->log = bluesky_log_new("journal");
+ fs->log->fs = fs;
BlueSkyInode *root = bluesky_new_inode(BLUESKY_ROOT_INUM, fs,
BLUESKY_DIRECTORY);
BlueSkyInode *root = bluesky_new_inode(BLUESKY_ROOT_INUM, fs,
BLUESKY_DIRECTORY);
diff --git
a/bluesky/log.c
b/bluesky/log.c
index
976e32e
..
66b4edb
100644
(file)
--- a/
bluesky/log.c
+++ b/
bluesky/log.c
@@
-103,6
+103,11
@@
static gboolean log_open(BlueSkyLog *log)
log->fd = -1;
}
log->fd = -1;
}
+ if (log->current_log != NULL) {
+ bluesky_cachefile_unref(log->current_log);
+ log->current_log = NULL;
+ }
+
while (log->fd < 0) {
g_snprintf(logname, sizeof(logname), "journal-%08d", log->seq_num);
log->fd = openat(log->dirfd, logname, O_CREAT|O_WRONLY|O_EXCL, 0600);
while (log->fd < 0) {
g_snprintf(logname, sizeof(logname), "journal-%08d", log->seq_num);
log->fd = openat(log->dirfd, logname, O_CREAT|O_WRONLY|O_EXCL, 0600);
@@
-116,6
+121,10
@@
static gboolean log_open(BlueSkyLog *log)
}
}
}
}
+ log->current_log = bluesky_cachefile_lookup(log->fs, -1, log->seq_num);
+ g_assert(log->current_log != NULL);
+ g_mutex_unlock(log->current_log->lock);
+
if (ftruncate(log->fd, LOG_SEGMENT_SIZE) < 0) {
fprintf(stderr, "Unable to truncate logfile %s: %m\n", logname);
}
if (ftruncate(log->fd, LOG_SEGMENT_SIZE) < 0) {
fprintf(stderr, "Unable to truncate logfile %s: %m\n", logname);
}
@@
-204,6
+213,13
@@
static gpointer log_thread(gpointer d)
offset += sizeof(header) + sizeof(footer) + item->data->len;
offset += sizeof(header) + sizeof(footer) + item->data->len;
+ /* Since we have just written a new dirty object to the journal,
+ * increment the count of live dirty objects in that journal file. The
+ * count will be decremented when objects are deleted or written to the
+ * cloud. */
+ g_atomic_int_add(&log->current_log->dirty_refs, 1);
+ item->dirty_journal = log->current_log;
+
/* Replace the log item's string data with a memory-mapped copy of the
* data, now that it has been written to the log file. (Even if it
* isn't yet on disk, it should at least be in the page cache and so
/* Replace the log item's string data with a memory-mapped copy of the
* data, now that it has been written to the log file. (Even if it
* isn't yet on disk, it should at least be in the page cache and so
@@
-481,16
+497,16
@@
void bluesky_cachefile_gc(BlueSkyFS *fs)
* then we'll just skip the file on this pass. */
if (g_mutex_trylock(cachefile->lock)) {
int64_t age = bluesky_get_current_time() - cachefile->atime;
* then we'll just skip the file on this pass. */
if (g_mutex_trylock(cachefile->lock)) {
int64_t age = bluesky_get_current_time() - cachefile->atime;
- g_print("%s addr=%p mapcount=%d refcount=%d atime_age=%f",
+ g_print("%s addr=%p mapcount=%d refcount=%d
dirty=%d
atime_age=%f",
cachefile->filename, cachefile->addr, cachefile->mapcount,
cachefile->filename, cachefile->addr, cachefile->mapcount,
- cachefile->refcount, age / 1e6);
+ cachefile->refcount,
cachefile->dirty_refs,
age / 1e6);
if (cachefile->fetching)
g_print(" (fetching)");
g_print("\n");
if (g_atomic_int_get(&cachefile->refcount) == 0
&& g_atomic_int_get(&cachefile->mapcount) == 0
if (cachefile->fetching)
g_print(" (fetching)");
g_print("\n");
if (g_atomic_int_get(&cachefile->refcount) == 0
&& g_atomic_int_get(&cachefile->mapcount) == 0
- &&
cachefile->type == CLOUDLOG_CLOUD /* FIXME: journals too */
)
+ &&
g_atomic_int_get(&cachefile->dirty_refs) == 0
)
{
g_print(" ...deleting\n");
if (unlinkat(fs->log->dirfd, cachefile->filename, 0) < 0) {
{
g_print(" ...deleting\n");
if (unlinkat(fs->log->dirfd, cachefile->filename, 0) < 0) {