From: Michael Vrable Date: Mon, 26 Jul 2010 03:19:13 +0000 (-0700) Subject: Some initial work on logging gathering data into cloud log segments. X-Git-Url: https://git.vrable.net/?a=commitdiff_plain;h=60b4792d65ba4b2a45733894f6a57e6581ddc487;p=bluesky.git Some initial work on logging gathering data into cloud log segments. This is still in progress, and needs to be better hooked in to cache management as well as actually writing out a proper sequence of logs instead of overwriting the same location each time. But it should have the basics of gathering up data for dirty inodes into a segment and writing it. --- diff --git a/bluesky/bluesky-private.h b/bluesky/bluesky-private.h index baeb141..822e029 100644 --- a/bluesky/bluesky-private.h +++ b/bluesky/bluesky-private.h @@ -229,6 +229,9 @@ typedef struct { // TODO: Location in journal/cache + // Pointers to other objects + GArray *pointers; + // Serialized data, if available in memory (otherwise NULL). BlueSkyRCStr *data; } BlueSkyCloudLog; @@ -236,10 +239,13 @@ typedef struct { gboolean bluesky_cloudlog_equal(gconstpointer a, gconstpointer b); guint bluesky_cloudlog_hash(gconstpointer a); BlueSkyCloudLog *bluesky_cloudlog_new(BlueSkyFS *fs); +gchar *bluesky_cloudlog_id_to_string(BlueSkyCloudID id); +BlueSkyCloudID bluesky_cloudlog_id_from_string(const gchar *idstr); void bluesky_cloudlog_ref(BlueSkyCloudLog *log); void bluesky_cloudlog_unref(BlueSkyCloudLog *log); BlueSkyLogItem *bluesky_cloudlog_sync(BlueSkyCloudLog *log); void bluesky_cloudlog_insert(BlueSkyCloudLog *log); +void bluesky_cloudlog_write_log(BlueSkyFS *fs); #ifdef __cplusplus } diff --git a/bluesky/cache.c b/bluesky/cache.c index 00d0fec..6a91873 100644 --- a/bluesky/cache.c +++ b/bluesky/cache.c @@ -270,6 +270,7 @@ static gpointer flushd_task(BlueSkyFS *fs) return NULL; flushd_dirty(fs); flushd_clean(fs); + bluesky_cloudlog_write_log(fs); g_mutex_unlock(fs->flushd_lock); return NULL; diff --git a/bluesky/cloudlog.c b/bluesky/cloudlog.c index 0c5db98..84e63d8 100644 --- a/bluesky/cloudlog.c +++ b/bluesky/cloudlog.c @@ -6,6 +6,7 @@ * TODO: Licensing */ +#include #include #include #include @@ -27,6 +28,40 @@ BlueSkyCloudID bluesky_cloudlog_new_id() return id; } +gchar *bluesky_cloudlog_id_to_string(BlueSkyCloudID id) +{ + char buf[sizeof(BlueSkyCloudID) * 2 + 1]; + buf[0] = '\0'; + + for (int i = 0; i < sizeof(BlueSkyCloudID); i++) { + sprintf(&buf[2*i], "%02x", (uint8_t)(id.bytes[i])); + } + + return g_strdup(buf); +} + +BlueSkyCloudID bluesky_cloudlog_id_from_string(const gchar *idstr) +{ + BlueSkyCloudID id; + memset(&id, 0, sizeof(id)); + for (int i = 0; i < 2*sizeof(BlueSkyCloudID); i++) { + char c = idstr[i]; + if (c == '\0') { + g_warning("Short cloud id: %s\n", idstr); + break; + } + int val = 0; + if (c >= '0' && c <= '9') + val = c - '0'; + else if (c >= 'a' && c <= 'f') + val = c - 'a' + 10; + else + g_warning("Bad character in cloud id: %s\n", idstr); + id.bytes[i / 2] += val << (i % 2 ? 0 : 4); + } + return id; +} + gboolean bluesky_cloudlog_equal(gconstpointer a, gconstpointer b) { BlueSkyCloudID *id1 = (BlueSkyCloudID *)a, *id2 = (BlueSkyCloudID *)b; @@ -54,6 +89,7 @@ BlueSkyCloudLog *bluesky_cloudlog_new(BlueSkyFS *fs) log->fs = fs; log->type = LOGTYPE_UNKNOWN; log->id = bluesky_cloudlog_new_id(); + log->pointers = g_array_new(FALSE, TRUE, sizeof(BlueSkyCloudID)); g_atomic_int_set(&log->refcount, 1); return log; @@ -75,7 +111,7 @@ void bluesky_cloudlog_unref(BlueSkyCloudLog *log) BlueSkyLogItem *bluesky_cloudlog_sync(BlueSkyCloudLog *log) { BlueSkyLogItem *log_item = bluesky_log_item_new(); - log_item->key = g_strdup("cloudlog"); + log_item->key = bluesky_cloudlog_id_to_string(log->id); log_item->data = log->data; bluesky_string_ref(log->data); bluesky_log_item_submit(log_item, log->fs->log); @@ -90,3 +126,120 @@ void bluesky_cloudlog_insert(BlueSkyCloudLog *log) g_hash_table_insert(log->fs->locations, &log->id, log); g_mutex_unlock(log->fs->lock); } + +/* Serialize objects into a log segment to be written to the cloud. */ +struct log_state { + GString *data; + BlueSkyCloudPointer location; + GList *inode_list; +}; + +struct log_header { + char magic[4]; + uint32_t size; + BlueSkyCloudID id; + uint32_t pointer_count; +} __attribute__((packed)); + +struct logref { + BlueSkyCloudID id; + BlueSkyCloudPointer location; +} __attribute__((packed)); + +struct log_footer { + char refmagic[4]; + struct logref refs[0]; +}; + +BlueSkyCloudPointer bluesky_cloudlog_serialize(BlueSkyCloudLog *log, + struct log_state *state) +{ + if (log->location_flags & CLOUDLOG_CLOUD) { + return log->location; + } + + g_print("Flushing object %s to cloud...\n", + bluesky_cloudlog_id_to_string(log->id)); + + for (int i = 0; i < log->pointers->len; i++) { + BlueSkyCloudID id = g_array_index(log->pointers, BlueSkyCloudID, i); + g_print(" ...checking reference %s...\n", + bluesky_cloudlog_id_to_string(id)); + g_mutex_lock(log->fs->lock); + BlueSkyCloudLog *log2 + = (BlueSkyCloudLog *)g_hash_table_lookup(log->fs->locations, &id); + // TODO: refcount + g_mutex_unlock(log->fs->lock); + g_assert(log2 != NULL); + bluesky_cloudlog_serialize(log2, state); + } + + g_assert(log->data != NULL); + + log->location = state->location; + log->location.offset = state->data->len; + log->location.size + = sizeof(struct log_header) + sizeof(BlueSkyCloudID) * 0 + + log->data->len; + + struct log_header header; + memcpy(header.magic, "AgI ", 4); + header.size = GUINT32_TO_LE(log->location.size); + header.id = log->id; + header.pointer_count = GUINT32_TO_LE(0); + + g_string_append_len(state->data, (const char *)&header, sizeof(header)); + g_string_append_len(state->data, log->data->data, log->data->len); + + log->location_flags |= CLOUDLOG_CLOUD; + + return log->location; +} + +static void find_inodes(gpointer key, gpointer value, gpointer user_data) +{ + struct log_state *state = (struct log_state *)user_data; + BlueSkyCloudLog *item = (BlueSkyCloudLog *)value; + + if (item->type != LOGTYPE_INODE) + return; + + bluesky_cloudlog_ref(item); + state->inode_list = g_list_prepend(state->inode_list, item); +} + +void bluesky_cloudlog_write_log(BlueSkyFS *fs) +{ + g_print("Starting cloudlog write...\n"); + + struct log_state state; + state.data = g_string_new(""); + state.location.directory = 0; + state.location.sequence = 0; + state.location.offset = 0; + state.location.size = 0; + state.inode_list = NULL; + + g_mutex_lock(fs->lock); + g_hash_table_foreach(fs->locations, find_inodes, &state); + g_mutex_unlock(fs->lock); + + while (state.inode_list != NULL) { + BlueSkyCloudLog *log = (BlueSkyCloudLog *)state.inode_list->data; + bluesky_cloudlog_serialize(log, &state); + state.inode_list = g_list_delete_link(state.inode_list, + state.inode_list); + } + + g_print("Serialized %zd bytes of data\n", state.data->len); + + BlueSkyStoreAsync *async = bluesky_store_async_new(fs->store); + async->op = STORE_OP_PUT; + async->key = g_strdup_printf("log-%08d-%08d", + state.location.directory, + state.location.sequence); + async->data = bluesky_string_new_from_gstring(state.data); + bluesky_store_async_submit(async); + bluesky_store_async_wait(async); + bluesky_store_async_unref(async); +} diff --git a/bluesky/debug.c b/bluesky/debug.c index 3496608..cfb6a81 100644 --- a/bluesky/debug.c +++ b/bluesky/debug.c @@ -42,7 +42,8 @@ static void cloudlog_dump(gpointer key, gpointer value, gpointer user_data) for (int i = 0; i < sizeof(BlueSkyCloudID); i++) { g_print("%02x", (uint8_t)(log->id.bytes[i])); } - g_print(": inode=%"PRIu64" locs=%x\n", log->inum, log->location_flags); + g_print(": ty=%d inode=%"PRIu64" locs=%x\n", + log->type, log->inum, log->location_flags); } /* Dump a summary of filesystem state as it is cached in memory. */ diff --git a/bluesky/file.c b/bluesky/file.c index b107a40..8faae00 100644 --- a/bluesky/file.c +++ b/bluesky/file.c @@ -266,17 +266,14 @@ void bluesky_block_flush(BlueSkyFS *fs, BlueSkyBlock *block, BlueSkyRCStr *data = block->data; - GChecksum *csum = g_checksum_new(G_CHECKSUM_SHA256); - g_checksum_update(csum, (const guchar *)data->data, data->len); - gchar *name = g_strdup(g_checksum_get_string(csum)); - - /* Start commit to the local log. */ - BlueSkyLogItem *log_item = bluesky_log_item_new(); - log_item->key = g_strdup(name); - log_item->data = data; + BlueSkyCloudLog *cloudlog = bluesky_cloudlog_new(fs); + gchar *name = bluesky_cloudlog_id_to_string(cloudlog->id); + cloudlog->type = LOGTYPE_DATA; + cloudlog->inum = 0; //FIXME + cloudlog->data = data; bluesky_string_ref(data); - bluesky_log_item_submit(log_item, fs->log); - *log_items = g_list_prepend(*log_items, log_item); + *log_items = g_list_prepend(*log_items, bluesky_cloudlog_sync(cloudlog)); + bluesky_cloudlog_insert(cloudlog); /* Store the file data asynchronously, and don't bother waiting for a * response. */ @@ -295,8 +292,6 @@ void bluesky_block_flush(BlueSkyFS *fs, BlueSkyBlock *block, block->type = BLUESKY_BLOCK_CACHED; g_atomic_int_add(&fs->cache_dirty, -1); - - g_checksum_free(csum); } /* Flush all blocks in a file to stable storage. */ diff --git a/bluesky/inode.c b/bluesky/inode.c index 3e8d686..2780253 100644 --- a/bluesky/inode.c +++ b/bluesky/inode.c @@ -316,11 +316,23 @@ void bluesky_inode_start_sync(BlueSkyInode *inode, BlueSkyStoreAsync *barrier) sprintf(key, "inode-%016"PRIx64, inode->inum); BlueSkyCloudLog *cloudlog = bluesky_cloudlog_new(fs); - cloudlog->type = LOGTYPE_DATA; + cloudlog->type = LOGTYPE_INODE; cloudlog->inum = inode->inum; cloudlog->data = data; bluesky_string_ref(data); + if (inode->type == BLUESKY_REGULAR) { + for (int i = 0; i < inode->blocks->len; i++) { + BlueSkyBlock *b = &g_array_index(inode->blocks, BlueSkyBlock, i); + if (b->type == BLUESKY_BLOCK_CACHED + || b->type == BLUESKY_BLOCK_REF) + { + BlueSkyCloudID id = bluesky_cloudlog_id_from_string(b->ref); + g_array_append_val(cloudlog->pointers, id); + } + } + } + log_items = g_list_prepend(log_items, bluesky_cloudlog_sync(cloudlog)); bluesky_cloudlog_insert(cloudlog);