X-Git-Url: http://git.vrable.net/?a=blobdiff_plain;f=bluesky%2Flog.c;h=4c522b8788f8c4cc3e02dc9628508f9b6dd768dd;hb=60b4792d65ba4b2a45733894f6a57e6581ddc487;hp=c3d0c2b77e490e541d518b34051e16671c89dd0d;hpb=7298b7a416aed5be1b82b54015c6944b9379eee6;p=bluesky.git diff --git a/bluesky/log.c b/bluesky/log.c index c3d0c2b..4c522b8 100644 --- a/bluesky/log.c +++ b/bluesky/log.c @@ -37,6 +37,21 @@ // no absolute guarantees on the size of a log segment. #define LOG_SEGMENT_SIZE (1 << 23) +#define HEADER_MAGIC 0x676f4c0a +#define FOOTER_MAGIC 0x2e435243 + +struct log_header { + uint32_t magic; // HEADER_MAGIC + uint64_t offset; // Starting byte offset of the log header + uint32_t keysize; // Size of the log key (bytes) + uint32_t size; // Size of the data item (bytes) +} __attribute__((packed)); + +struct log_footer { + uint32_t magic; // FOOTER_MAGIC + uint32_t crc; // Computed from log_header to log_footer.magic +} __attribute__((packed)); + static void writebuf(int fd, const char *buf, size_t len) { while (len > 0) { @@ -97,14 +112,42 @@ static gpointer log_thread(gpointer d) BlueSkyLogItem *item = (BlueSkyLogItem *)g_async_queue_pop(log->queue); g_mutex_lock(item->lock); + + off_t logsize = lseek(log->fd, 0, SEEK_CUR); + struct log_header header; + struct log_footer footer; + + header.magic = GUINT32_TO_LE(HEADER_MAGIC); + header.offset = GUINT64_TO_LE(logsize); + header.keysize = GUINT32_TO_LE(strlen(item->key)); + header.size = GUINT32_TO_LE(item->data->len); + footer.magic = GUINT32_TO_LE(FOOTER_MAGIC); + + uint32_t crc = BLUESKY_CRC32C_SEED; + + writebuf(log->fd, (const char *)&header, sizeof(header)); + crc = crc32c(crc, (const char *)&header, sizeof(header)); + writebuf(log->fd, item->key, strlen(item->key)); + crc = crc32c(crc, item->key, strlen(item->key)); + writebuf(log->fd, item->data->data, item->data->len); + crc = crc32c(crc, item->data->data, item->data->len); + + crc = crc32c(crc, (const char *)&footer, + sizeof(footer) - sizeof(uint32_t)); + footer.crc = crc32c_finalize(crc); + writebuf(log->fd, (const char *)&footer, sizeof(footer)); + + logsize += sizeof(header) + sizeof(footer); + logsize += strlen(item->key) + item->data->len; + committed = g_slist_prepend(committed, item); /* Force an fsync either if we will be closing this log segment and * opening a new file, or if there are no other log items currently * waiting to be written. */ - off_t logsize = lseek(log->fd, 0, SEEK_CUR); + if (logsize >= LOG_SEGMENT_SIZE || g_async_queue_length(log->queue) <= 0) {