Add in header fields for per-object encryption/authentication.
[bluesky.git] / cleaner / cleaner
1 #!/usr/bin/env python
2 #
3 # A simple cleaner for the BlueSky cloud file system.  At the moment this is an
4 # offline cleaner--the file system must not be in use at the time that the
5 # cleaning is performed.  Later, it will be extended to be an online/concurrent
6 # cleaner, where cleaning is performed even while the file system is mounted.
7 #
8 # Copyright (C) 2010  The Regents of the University of California
9 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
10
11 import base64, os, re, struct, sys
12 import boto
13 from boto.s3.key import Key
14
15 # The BlueSky 'struct cloudlog_header' data type.
16 HEADER_FORMAT = '<4s48sb16sQIII'
17 HEADER_CRYPTBYTES = 48
18 HEADER_MAGIC = 'AgI-'
19 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
20
21 class ITEM_TYPE:
22     DATA = '1'
23     INODE = '2'
24     INODE_MAP = '3'
25     CHECKPOINT = '4'
26
27 class FileBackend:
28     """An interface to BlueSky where the log segments are on local disk.
29
30     This is mainly intended for testing purposes, as the real cleaner would
31     operate where data is being stored in S3."""
32
33     def __init__(self, path):
34         self.path = path
35
36     def list(self):
37         """Return a listing of all log segments and their sizes."""
38
39         files = [f for f in os.listdir(self.path) if f.startswith('log-')]
40         files.sort()
41
42         return [(f, os.stat(os.path.join(self.path, f)).st_size)
43                 for f in files]
44
45     def read(self, filename):
46         fp = open(os.path.join(self.path, filename), 'rb')
47         return fp.read()
48
49     def write(self, filename, data):
50         fp = open(os.path.join(self.path, filename), 'wb')
51         fp.write(data)
52         fp.close()
53
54     def delete(self, filename):
55         os.unlink(os.path.join(self.path, filename))
56
57     def loc_to_name(self, location):
58         return "log-%08d-%08d" % (location)
59
60     def name_to_loc(self, name):
61         m = re.match(r"^log-(\d+)-(\d+)$", name)
62         if m: return (int(m.group(1)), int(m.group(2)))
63
64 class S3Backend:
65     """An interface to BlueSky where the log segments are on in Amazon S3."""
66
67     def __init__(self, bucket, path='', cachedir="."):
68         self.conn = boto.connect_s3(is_secure=False)
69         self.bucket = self.conn.get_bucket(bucket)
70         self.path = path
71         self.cachedir = cachedir
72         self.cache = {}
73
74     def list(self):
75         files = []
76         for k in self.bucket.list(self.path + 'log-'):
77             files.append((k.key, k.size))
78         return files
79
80     def read(self, filename):
81         if filename in self.cache:
82             fp = open(os.path.join(self.cachedir, filename), 'rb')
83             return fp.read()
84         else:
85             k = Key(self.bucket)
86             k.key = self.path + filename
87             data = k.get_contents_as_string()
88             fp = open(os.path.join(self.cachedir, filename), 'wb')
89             fp.write(data)
90             fp.close()
91             self.cache[filename] = True
92             return data
93
94     def write(self, filename, data):
95         k = Key(self.bucket)
96         k.key = self.path + filename
97         k.set_contents_from_string(data)
98         if filename in self.cache:
99             del self.cache[filename]
100
101     def delete(self, filename):
102         k = Key(self.bucket)
103         k.key = self.path + filename
104         k.delete()
105         if filename in self.cache:
106             del self.cache[filename]
107
108     def loc_to_name(self, location):
109         return "log-%08d-%08d" % (location)
110
111     def name_to_loc(self, name):
112         m = re.match(r"^log-(\d+)-(\d+)$", name)
113         if m: return (int(m.group(1)), int(m.group(2)))
114
115 class LogItem:
116     """In-memory representation of a single item stored in a log file."""
117
118     def __init__(self):
119         self.cryptkeys = '\0' * HEADER_CRYPTBYTES
120
121     def __str__(self):
122         return "<Item ty=%s location=%s size=%d id=%s...>" % (self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
123
124     @staticmethod
125     def random_id():
126         return open('/dev/urandom').read(16)
127
128     def serialize(self):
129         link_ids = []
130         link_locs = []
131         for (i, l) in self.links:
132             link_ids.append(i)
133             if i != '\0' * 16:
134                 link_locs.append(struct.pack('<IIII', *l))
135         link_ids = ''.join(link_ids)
136         link_locs = ''.join(link_locs)
137
138         header = struct.pack(HEADER_FORMAT,
139                              HEADER_MAGIC, self.cryptkeys,
140                              ord(self.type), self.id, self.inum,
141                              len(self.data), len(link_ids), len(link_locs))
142         return header + self.data + link_ids + link_locs
143
144 class LogSegment:
145     def __init__(self, backend, location):
146         self.backend = backend
147         self.location = location
148         self.data = []
149
150     def __len__(self):
151         return sum(len(s) for s in self.data)
152
153     def write(self, item):
154         data = item.serialize()
155         offset = len(self)
156         self.data.append(data)
157         item.location = self.location + (offset, len(data))
158
159     def close(self):
160         data = ''.join(self.data)
161         filename = self.backend.loc_to_name(self.location)
162         print "Would write %d bytes of data to %s" % (len(data), filename)
163         self.backend.write(filename, data)
164
165 class LogDirectory:
166     TARGET_SIZE = 4 << 20
167
168     def __init__(self, backend, dir):
169         self.backend = backend
170         self.dir_num = dir
171         self.seq_num = 0
172         for logname in backend.list():
173             loc = backend.name_to_loc(logname[0])
174             if loc is not None and loc[0] == dir:
175                 self.seq_num = max(self.seq_num, loc[1] + 1)
176         self.groups = {}
177         print "Starting sequence number is", self.seq_num
178
179     def open_segment(self):
180         seg = LogSegment(self.backend, (self.dir_num, self.seq_num))
181         self.seq_num += 1
182         return seg
183
184     def write(self, item, segment_group=0):
185         if segment_group not in self.groups:
186             self.groups[segment_group] = self.open_segment()
187         seg = self.groups[segment_group]
188         seg.write(item)
189         if len(seg) >= LogDirectory.TARGET_SIZE:
190             seg.close()
191             del self.groups[segment_group]
192
193     def close_all(self):
194         for k in list(self.groups.keys()):
195             self.groups[k].close()
196             del self.groups[k]
197
198 class UtilizationTracker:
199     """A simple object that tracks what fraction of each segment is used.
200
201     This data can be used to guide segment cleaning decisions."""
202
203     def __init__(self, backend):
204         self.segments = {}
205         for (segment, size) in backend.list():
206             self.segments[segment] = [size, 0]
207
208     def add_item(self, item):
209         if isinstance(item, LogItem):
210             item = item.location
211         if item is None: return
212         (dir, seq, offset, size) = item
213         filename = "log-%08d-%08d" % (dir, seq)
214         self.segments[filename][1] += size
215
216 def parse_item(data):
217     if len(data) < HEADER_SIZE: return
218     header = struct.unpack_from(HEADER_FORMAT, data, 0)
219     size = HEADER_SIZE + sum(header[5:8])
220
221     if header[0] != HEADER_MAGIC:
222         print "Bad header magic!"
223         return
224
225     if len(data) != size:
226         print "Item size does not match: %d != %d" % (size, len(data))
227         return
228
229     item = LogItem()
230     item.cryptkeys = header[1]
231     item.id = header[3]
232     item.inum = header[4]
233     item.location = None
234     item.type = chr(header[2])
235     item.size = size
236     item.data = data[HEADER_SIZE : HEADER_SIZE + header[5]]
237     links = []
238     link_ids = data[HEADER_SIZE + header[5]
239                     : HEADER_SIZE + header[5] + header[6]]
240     link_locs = data[HEADER_SIZE + header[5] + header[6]
241                      : HEADER_SIZE + sum(header[5:8])]
242     for i in range(len(link_ids) // 16):
243         id = link_ids[16*i : 16*i + 16]
244         if id == '\0' * 16:
245             loc = None
246         else:
247             loc = struct.unpack('<IIII', link_locs[0:16])
248             link_locs = link_locs[16:]
249         links.append((id, loc))
250     item.links = links
251     return item
252
253 def load_item(backend, location):
254     """Load the cloud item pointed at by the 4-tuple 'location'.
255
256     The elements of the tuple are (directory, sequence, offset, size)."""
257
258     filename = backend.loc_to_name((location[0], location[1]))
259     data = backend.read(filename)[location[2] : location[2] + location[3]]
260     item = parse_item(data)
261     item.location = location
262     return item
263
264 def parse_log(data, location=None):
265     """Parse contents of a log file, yielding a sequence of log items."""
266
267     if isinstance(location, str):
268         m = re.match(r"^log-(\d+)-(\d+)$", location)
269         if m:
270             location = (int(m.group(1)), int(m.group(2)))
271         else:
272             location = None
273
274     offset = 0
275     while len(data) - offset >= HEADER_SIZE:
276         header = struct.unpack_from(HEADER_FORMAT, data, offset)
277         size = HEADER_SIZE + sum(header[5:8])
278         if header[0] != HEADER_MAGIC:
279             print "Bad header magic!"
280             break
281         if size + offset > len(data):
282             print "Short data record at end of log: %s < %s" % (len(data) - offset, size)
283             break
284         item = parse_item(data[offset : offset + size])
285         if location is not None:
286             item.location = location + (offset, size)
287         if item is not None: yield item
288         offset += size
289
290 def load_checkpoint_record(backend):
291     for (log, size) in reversed(backend.list()):
292         for item in reversed(list(parse_log(backend.read(log), log))):
293             print item
294             if item.type == ITEM_TYPE.CHECKPOINT:
295                 return item
296
297 class InodeMap:
298     def __init__(self):
299         pass
300
301     def build(self, backend, checkpoint_record):
302         """Reconstruct the inode map from the checkpoint record given.
303
304         This will also build up information about segment utilization."""
305
306         self.checkpoint_record = checkpoint_record
307
308         util = UtilizationTracker(backend)
309         util.add_item(checkpoint_record)
310         inodes = {}
311         self.obsolete_segments = set()
312
313         print "Inode map:"
314         for i in range(len(checkpoint_record.data) // 16):
315             (start, end) = struct.unpack_from("<QQ", checkpoint_record.data, 16*i)
316             imap = load_item(backend, checkpoint_record.links[i][1])
317             util.add_item(imap)
318             print "[%d, %d]: %s" % (start, end, imap)
319             for j in range(len(imap.data) // 8):
320                 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
321                 inode = load_item(backend, imap.links[j][1])
322                 inodes[inum] = inode
323                 data_segments = set()
324                 util.add_item(inode)
325                 for i in inode.links:
326                     util.add_item(i[1])
327                     data_segments.add(i[1][0:2])
328                 print "  %d: %s (%d data segments)" % (inum, inode, len(data_segments))
329
330         print
331         print "Segment utilizations:"
332         for (s, u) in sorted(util.segments.items()):
333             print "%s: %s %s" % (s, u, float(u[1]) / u[0])
334             if u[1] == 0:
335                 print "Deleting..."
336                 backend.delete(s)
337
338         self.inodes = inodes
339         self.util = util
340         self.updated_inodes = set()
341
342     def mark_updated(self, inum):
343         self.updated_inodes.add(inum)
344
345     def write(self, backend, log):
346         updated_inodes = sorted(self.updated_inodes, reverse=True)
347
348         new_checkpoint = LogItem()
349         new_checkpoint.id = LogItem.random_id()
350         new_checkpoint.inum = 0
351         new_checkpoint.type = ITEM_TYPE.CHECKPOINT
352         new_checkpoint.data = ""
353         new_checkpoint.links = []
354
355         for i in range(len(self.checkpoint_record.data) // 16):
356             (start, end) = struct.unpack_from("<QQ", self.checkpoint_record.data, 16*i)
357
358             new_checkpoint.data += self.checkpoint_record.data[16*i : 16*i + 16]
359
360             # Case 1: No inodes in this range of the old inode map have
361             # changed.  Simply emit a new pointer to the same inode map block.
362             if len(updated_inodes) == 0 or updated_inodes[-1] > end:
363                 old_location = self.checkpoint_record.links[i][1][0:2]
364                 if old_location not in self.obsolete_segments:
365                     new_checkpoint.links.append(self.checkpoint_record.links[i])
366                     continue
367
368             # Case 2: Some inodes have been updated.  Create a new inode map
369             # block, write it out, and point the new checkpoint at it.
370             inodes = [k for k in self.inodes if k >= start and k <= end]
371             inodes.sort()
372
373             block = LogItem()
374             block.id = LogItem.random_id()
375             block.inum = 0
376             block.type = ITEM_TYPE.INODE_MAP
377             block.links = []
378             block.data = ""
379             for j in inodes:
380                 block.data += struct.pack("<Q", j)
381                 block.links.append((self.inodes[j].id, self.inodes[j].location))
382             log.write(block, 2)
383
384             new_checkpoint.links.append((block.id, block.location))
385
386             while len(updated_inodes) > 0 and updated_inodes[-1] <= end:
387                 updated_inodes.pop()
388
389         log.write(new_checkpoint, 2)
390         self.checkpoint_record = new_checkpoint
391
392 def rewrite_inode(backend, inode_map, inum, log, copy_data=True):
393     inode = inode_map.inodes[inum]
394     if copy_data:
395         blocks = []
396         for l in inode.links:
397             data = load_item(backend, l[1])
398             blocks.append(data)
399             log.write(data, 0)
400         inode.links = [(b.id, b.location) for b in blocks]
401     log.write(inode, 1)
402     inode_map.mark_updated(inum)
403
404 def run_cleaner(backend, inode_map, log):
405     # Determine which segments are poorly utilized and should be cleaned.  We
406     # need better heuristics here.
407     for (s, u) in sorted(inode_map.util.segments.items()):
408         if (float(u[1]) / u[0] < 0.6 or u[1] < 32768) and u[1] > 0:
409             print "Should clean segment", s
410             loc = backend.name_to_loc(s)
411             if s: inode_map.obsolete_segments.add(loc)
412
413     # TODO: We probably also want heuristics that will find inodes with
414     # badly-fragmented data and rewrite that to achieve better locality.
415
416     # Given that list of segments to clean, scan through those segments to find
417     # data which is still live and mark relevant inodes as needing to be
418     # rewritten.
419     dirty_inodes = set()
420     dirty_inode_data = set()
421     for s in inode_map.obsolete_segments:
422         filename = backend.loc_to_name(s)
423         print "Scanning", filename, "for live data"
424         for item in parse_log(backend.read(filename), filename):
425             if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
426                 if item.inum != 0:
427                     inode = inode_map.inodes[item.inum]
428                     if s == inode.location[0:2]:
429                         dirty_inodes.add(item.inum)
430                     if item.inum not in dirty_inode_data:
431                         for b in inode.links:
432                             if s == b[1][0:2]:
433                                 dirty_inode_data.add(item.inum)
434                                 break
435
436     print "Inodes to rewrite:", dirty_inodes
437     print "Inodes with data to rewrite:", dirty_inode_data
438     for i in sorted(dirty_inodes.union(dirty_inode_data)):
439         rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
440
441 if __name__ == '__main__':
442     backend = S3Backend("mvrable-bluesky", cachedir=".")
443     chkpt = load_checkpoint_record(backend)
444     print backend.list()
445     imap = InodeMap()
446     imap.build(backend, chkpt)
447     print chkpt
448
449     log_dir = LogDirectory(backend, 0)
450     run_cleaner(backend, imap, log_dir)
451     imap.write(backend, log_dir)
452     log_dir.close_all()