X-Git-Url: http://git.vrable.net/?a=blobdiff_plain;f=bluesky%2Flog.c;h=4c522b8788f8c4cc3e02dc9628508f9b6dd768dd;hb=db4915b4f7fc8f74958c4d1891dc69b76cbbe383;hp=a4a81505d7565d522150e9be6df5bc79b3fecf99;hpb=0ba2a52ec1386c8928b397f775e4a4f97339fa2a;p=bluesky.git diff --git a/bluesky/log.c b/bluesky/log.c index a4a8150..4c522b8 100644 --- a/bluesky/log.c +++ b/bluesky/log.c @@ -35,7 +35,22 @@ // Rough size limit for a log segment. This is not a firm limit and there are // no absolute guarantees on the size of a log segment. -#define LOG_SEGMENT_SIZE (1 << 20) +#define LOG_SEGMENT_SIZE (1 << 23) + +#define HEADER_MAGIC 0x676f4c0a +#define FOOTER_MAGIC 0x2e435243 + +struct log_header { + uint32_t magic; // HEADER_MAGIC + uint64_t offset; // Starting byte offset of the log header + uint32_t keysize; // Size of the log key (bytes) + uint32_t size; // Size of the data item (bytes) +} __attribute__((packed)); + +struct log_footer { + uint32_t magic; // FOOTER_MAGIC + uint32_t crc; // Computed from log_header to log_footer.magic +} __attribute__((packed)); static void writebuf(int fd, const char *buf, size_t len) { @@ -66,6 +81,12 @@ static gpointer log_thread(gpointer d) { BlueSkyLog *log = (BlueSkyLog *)d; + /* If there are multiple log items to write, we may write more than one + * before calling fsync(). The committed list is used to track all the + * items that should be marked as committed once that final fsync() is + * done. */ + GSList *committed = NULL; + int dirfd = open(log->log_directory, O_DIRECTORY); if (dirfd < 0) { fprintf(stderr, "Unable to open logging directory: %m\n"); @@ -91,14 +112,59 @@ static gpointer log_thread(gpointer d) BlueSkyLogItem *item = (BlueSkyLogItem *)g_async_queue_pop(log->queue); g_mutex_lock(item->lock); + + off_t logsize = lseek(log->fd, 0, SEEK_CUR); + struct log_header header; + struct log_footer footer; + + header.magic = GUINT32_TO_LE(HEADER_MAGIC); + header.offset = GUINT64_TO_LE(logsize); + header.keysize = GUINT32_TO_LE(strlen(item->key)); + header.size = GUINT32_TO_LE(item->data->len); + footer.magic = GUINT32_TO_LE(FOOTER_MAGIC); + + uint32_t crc = BLUESKY_CRC32C_SEED; + + writebuf(log->fd, (const char *)&header, sizeof(header)); + crc = crc32c(crc, (const char *)&header, sizeof(header)); + writebuf(log->fd, item->key, strlen(item->key)); + crc = crc32c(crc, item->key, strlen(item->key)); + writebuf(log->fd, item->data->data, item->data->len); - fdatasync(log->fd); - item->committed = TRUE; - g_cond_signal(item->cond); - g_mutex_unlock(item->lock); + crc = crc32c(crc, item->data->data, item->data->len); + + crc = crc32c(crc, (const char *)&footer, + sizeof(footer) - sizeof(uint32_t)); + footer.crc = crc32c_finalize(crc); + writebuf(log->fd, (const char *)&footer, sizeof(footer)); + + logsize += sizeof(header) + sizeof(footer); + logsize += strlen(item->key) + item->data->len; + + committed = g_slist_prepend(committed, item); + + /* Force an fsync either if we will be closing this log segment and + * opening a new file, or if there are no other log items currently + * waiting to be written. */ + + if (logsize >= LOG_SEGMENT_SIZE + || g_async_queue_length(log->queue) <= 0) + { + int batchsize = 0; + fdatasync(log->fd); + while (committed != NULL) { + item = (BlueSkyLogItem *)committed->data; + item->committed = TRUE; + g_cond_signal(item->cond); + g_mutex_unlock(item->lock); + committed = g_slist_delete_link(committed, committed); + batchsize++; + } + /* if (batchsize > 1) + g_print("Log batch size: %d\n", batchsize); */ + } - off_t logsize = lseek(log->fd, 0, SEEK_CUR); if (logsize < 0 || logsize >= LOG_SEGMENT_SIZE) { close(log->fd); log->fd = -1;