1 /* Blue Sky: File Systems in the Cloud
3 * Copyright (C) 2010 The Regents of the University of California
4 * Written by Michael Vrable <mvrable@cs.ucsd.edu>
10 #define _ATFILE_SOURCE
18 #include <sys/types.h>
23 #include "bluesky-private.h"
25 /* The logging layer for BlueSky. This is used to write filesystem changes
26 * durably to disk so that they can be recovered in the event of a system
29 /* The logging layer takes care out writing out a sequence of log records to
30 * disk. On disk, each record consists of a header, a data payload, and a
31 * footer. The footer contains a checksum of the record, meant to help with
32 * identifying corrupt log records (we would assume because the log record was
33 * only incompletely written out before a crash, which should only happen for
34 * log records that were not considered committed). */
36 // Rough size limit for a log segment. This is not a firm limit and there are
37 // no absolute guarantees on the size of a log segment.
38 #define LOG_SEGMENT_SIZE (1 << 23)
40 #define HEADER_MAGIC 0x676f4c0a
41 #define FOOTER_MAGIC 0x2e435243
44 uint32_t magic; // HEADER_MAGIC
45 uint64_t offset; // Starting byte offset of the log header
46 uint32_t size; // Size of the data item (bytes)
47 BlueSkyCloudID id; // Object identifier
48 } __attribute__((packed));
51 uint32_t magic; // FOOTER_MAGIC
52 uint32_t crc; // Computed from log_header to log_footer.magic
53 } __attribute__((packed));
55 static void writebuf(int fd, const char *buf, size_t len)
59 written = write(fd, buf, len);
60 if (written < 0 && errno == EINTR)
62 g_assert(written >= 0);
68 /* All log writes (at least for a single log) are made by one thread, so we
69 * don't need to worry about concurrent access to the log file. Log items to
70 * write are pulled off a queue (and so may be posted by any thread).
71 * fdatasync() is used to ensure the log items are stable on disk.
73 * The log is broken up into separate files, roughly of size LOG_SEGMENT_SIZE
74 * each. If a log segment is not currently open (log->fd is negative), a new
75 * one is created. Log segment filenames are assigned sequentially.
77 * Log replay ought to be implemented later, and ought to set the initial
78 * sequence number appropriately.
80 static gpointer log_thread(gpointer d)
82 BlueSkyLog *log = (BlueSkyLog *)d;
84 /* If there are multiple log items to write, we may write more than one
85 * before calling fsync(). The committed list is used to track all the
86 * items that should be marked as committed once that final fsync() is
88 GSList *committed = NULL;
90 int dirfd = open(log->log_directory, O_DIRECTORY);
92 fprintf(stderr, "Unable to open logging directory: %m\n");
99 g_snprintf(logfile, sizeof(logfile), "log-%08d", log->seq_num);
100 log->fd = openat(dirfd, logfile, O_CREAT|O_WRONLY|O_EXCL, 0600);
101 if (log->fd < 0 && errno == EEXIST) {
102 fprintf(stderr, "Log file %s already exists...\n", logfile);
105 } else if (log->fd < 0) {
106 fprintf(stderr, "Error opening logfile %s: %m\n", logfile);
113 BlueSkyCloudLog *item
114 = (BlueSkyCloudLog *)g_async_queue_pop(log->queue);
115 g_mutex_lock(item->lock);
116 g_assert(item->data != NULL);
118 if ((item->location_flags | item->pending_write) & CLOUDLOG_JOURNAL) {
119 g_mutex_unlock(item->lock);
120 bluesky_cloudlog_unref(item);
124 item->pending_write |= CLOUDLOG_JOURNAL;
126 off_t logsize = lseek(log->fd, 0, SEEK_CUR);
127 struct log_header header;
128 struct log_footer footer;
130 header.magic = GUINT32_TO_LE(HEADER_MAGIC);
131 header.offset = GUINT64_TO_LE(logsize);
132 header.size = GUINT32_TO_LE(item->data->len);
133 header.id = item->id;
134 footer.magic = GUINT32_TO_LE(FOOTER_MAGIC);
136 uint32_t crc = BLUESKY_CRC32C_SEED;
138 writebuf(log->fd, (const char *)&header, sizeof(header));
139 crc = crc32c(crc, (const char *)&header, sizeof(header));
141 writebuf(log->fd, item->data->data, item->data->len);
142 crc = crc32c(crc, item->data->data, item->data->len);
144 crc = crc32c(crc, (const char *)&footer,
145 sizeof(footer) - sizeof(uint32_t));
146 footer.crc = crc32c_finalize(crc);
147 writebuf(log->fd, (const char *)&footer, sizeof(footer));
149 item->log_seq = log->seq_num;
150 item->log_offset = logsize + sizeof(header);
151 item->log_size = item->data->len;
153 logsize += sizeof(header) + sizeof(footer) + item->data->len;
155 committed = g_slist_prepend(committed, item);
156 g_mutex_unlock(item->lock);
158 /* Force an fsync either if we will be closing this log segment and
159 * opening a new file, or if there are no other log items currently
160 * waiting to be written. */
162 if (logsize >= LOG_SEGMENT_SIZE
163 || g_async_queue_length(log->queue) <= 0)
167 while (committed != NULL) {
168 item = (BlueSkyCloudLog *)committed->data;
169 g_mutex_lock(item->lock);
170 item->pending_write &= ~CLOUDLOG_JOURNAL;
171 item->location_flags |= CLOUDLOG_JOURNAL;
172 g_cond_signal(item->cond);
173 g_mutex_unlock(item->lock);
174 committed = g_slist_delete_link(committed, committed);
177 /* if (batchsize > 1)
178 g_print("Log batch size: %d\n", batchsize); */
181 if (logsize < 0 || logsize >= LOG_SEGMENT_SIZE) {
191 BlueSkyLog *bluesky_log_new(const char *log_directory)
193 BlueSkyLog *log = g_new0(BlueSkyLog, 1);
195 log->log_directory = g_strdup(log_directory);
198 log->queue = g_async_queue_new();
200 g_thread_create(log_thread, log, FALSE, NULL);
205 void bluesky_log_item_submit(BlueSkyCloudLog *item, BlueSkyLog *log)
207 bluesky_cloudlog_ref(item);
208 g_async_queue_push(log->queue, item);
211 void bluesky_log_finish_all(GList *log_items)
213 while (log_items != NULL) {
214 BlueSkyCloudLog *item = (BlueSkyCloudLog *)log_items->data;
216 g_mutex_lock(item->lock);
217 while ((item->pending_write & CLOUDLOG_JOURNAL))
218 g_cond_wait(item->cond, item->lock);
219 g_mutex_unlock(item->lock);
220 bluesky_cloudlog_unref(item);
222 log_items = g_list_delete_link(log_items, log_items);