Initial work on cloud log-structured storage.
[bluesky.git] / bluesky / log.c
index a4a8150..4c522b8 100644 (file)
 
 // Rough size limit for a log segment.  This is not a firm limit and there are
 // no absolute guarantees on the size of a log segment.
-#define LOG_SEGMENT_SIZE (1 << 20)
+#define LOG_SEGMENT_SIZE (1 << 23)
+
+#define HEADER_MAGIC 0x676f4c0a
+#define FOOTER_MAGIC 0x2e435243
+
+struct log_header {
+    uint32_t magic;             // HEADER_MAGIC
+    uint64_t offset;            // Starting byte offset of the log header
+    uint32_t keysize;           // Size of the log key (bytes)
+    uint32_t size;              // Size of the data item (bytes)
+} __attribute__((packed));
+
+struct log_footer {
+    uint32_t magic;             // FOOTER_MAGIC
+    uint32_t crc;               // Computed from log_header to log_footer.magic
+} __attribute__((packed));
 
 static void writebuf(int fd, const char *buf, size_t len)
 {
@@ -66,6 +81,12 @@ static gpointer log_thread(gpointer d)
 {
     BlueSkyLog *log = (BlueSkyLog *)d;
 
+    /* If there are multiple log items to write, we may write more than one
+     * before calling fsync().  The committed list is used to track all the
+     * items that should be marked as committed once that final fsync() is
+     * done. */
+    GSList *committed = NULL;
+
     int dirfd = open(log->log_directory, O_DIRECTORY);
     if (dirfd < 0) {
         fprintf(stderr, "Unable to open logging directory: %m\n");
@@ -91,14 +112,59 @@ static gpointer log_thread(gpointer d)
 
         BlueSkyLogItem *item = (BlueSkyLogItem *)g_async_queue_pop(log->queue);
         g_mutex_lock(item->lock);
+
+        off_t logsize = lseek(log->fd, 0, SEEK_CUR);
+        struct log_header header;
+        struct log_footer footer;
+
+        header.magic = GUINT32_TO_LE(HEADER_MAGIC);
+        header.offset = GUINT64_TO_LE(logsize);
+        header.keysize = GUINT32_TO_LE(strlen(item->key));
+        header.size = GUINT32_TO_LE(item->data->len);
+        footer.magic = GUINT32_TO_LE(FOOTER_MAGIC);
+
+        uint32_t crc = BLUESKY_CRC32C_SEED;
+
+        writebuf(log->fd, (const char *)&header, sizeof(header));
+        crc = crc32c(crc, (const char *)&header, sizeof(header));
+
         writebuf(log->fd, item->key, strlen(item->key));
+        crc = crc32c(crc, item->key, strlen(item->key));
+
         writebuf(log->fd, item->data->data, item->data->len);
-        fdatasync(log->fd);
-        item->committed = TRUE;
-        g_cond_signal(item->cond);
-        g_mutex_unlock(item->lock);
+        crc = crc32c(crc, item->data->data, item->data->len);
+
+        crc = crc32c(crc, (const char *)&footer,
+                     sizeof(footer) - sizeof(uint32_t));
+        footer.crc = crc32c_finalize(crc);
+        writebuf(log->fd, (const char *)&footer, sizeof(footer));
+
+        logsize += sizeof(header) + sizeof(footer);
+        logsize += strlen(item->key) + item->data->len;
+
+        committed  = g_slist_prepend(committed, item);
+
+        /* Force an fsync either if we will be closing this log segment and
+         * opening a new file, or if there are no other log items currently
+         * waiting to be written. */
+
+        if (logsize >= LOG_SEGMENT_SIZE
+            || g_async_queue_length(log->queue) <= 0)
+        {
+            int batchsize = 0;
+            fdatasync(log->fd);
+            while (committed != NULL) {
+                item = (BlueSkyLogItem *)committed->data;
+                item->committed = TRUE;
+                g_cond_signal(item->cond);
+                g_mutex_unlock(item->lock);
+                committed = g_slist_delete_link(committed, committed);
+                batchsize++;
+            }
+            /* if (batchsize > 1)
+                g_print("Log batch size: %d\n", batchsize); */
+        }
 
-        off_t logsize = lseek(log->fd, 0, SEEK_CUR);
         if (logsize < 0 || logsize >= LOG_SEGMENT_SIZE) {
             close(log->fd);
             log->fd = -1;