Support encrypted log items in the cleaner.
[bluesky.git] / cleaner / cleaner
1 #!/usr/bin/env python
2 #
3 # A simple cleaner for the BlueSky cloud file system.  At the moment this is an
4 # offline cleaner--the file system must not be in use at the time that the
5 # cleaning is performed.  Later, it will be extended to be an online/concurrent
6 # cleaner, where cleaning is performed even while the file system is mounted.
7 #
8 # Copyright (C) 2010  The Regents of the University of California
9 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
10
11 import base64, os, re, struct, sys
12 import boto
13 from boto.s3.key import Key
14
15 # The BlueSky 'struct cloudlog_header' data type.
16 HEADER_FORMAT = '<4s48sb16sQIII'
17 HEADER_CRYPTBYTES = 48
18 HEADER_MAGIC1 = 'AgI-'          # Unencrypted data
19 HEADER_MAGIC2 = 'AgI='          # Encrypted data
20 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
21
22 class ITEM_TYPE:
23     DATA = '1'
24     INODE = '2'
25     INODE_MAP = '3'
26     CHECKPOINT = '4'
27
28 class FileBackend:
29     """An interface to BlueSky where the log segments are on local disk.
30
31     This is mainly intended for testing purposes, as the real cleaner would
32     operate where data is being stored in S3."""
33
34     def __init__(self, path):
35         self.path = path
36
37     def list(self):
38         """Return a listing of all log segments and their sizes."""
39
40         files = [f for f in os.listdir(self.path) if f.startswith('log-')]
41         files.sort()
42
43         return [(f, os.stat(os.path.join(self.path, f)).st_size)
44                 for f in files]
45
46     def read(self, filename, offset=0, length=None):
47         fp = open(os.path.join(self.path, filename), 'rb')
48         if offset > 0:
49             fp.seek(offset)
50         if legnth is None:
51             return fp.read()
52         else:
53             return fp.read(length)
54
55     def write(self, filename, data):
56         fp = open(os.path.join(self.path, filename), 'wb')
57         fp.write(data)
58         fp.close()
59
60     def delete(self, filename):
61         os.unlink(os.path.join(self.path, filename))
62
63     def loc_to_name(self, location):
64         return "log-%08d-%08d" % (location)
65
66     def name_to_loc(self, name):
67         m = re.match(r"^log-(\d+)-(\d+)$", name)
68         if m: return (int(m.group(1)), int(m.group(2)))
69
70 class S3Backend:
71     """An interface to BlueSky where the log segments are on in Amazon S3."""
72
73     def __init__(self, bucket, path='', cachedir="."):
74         self.conn = boto.connect_s3(is_secure=False)
75         self.bucket = self.conn.get_bucket(bucket)
76         self.path = path
77         self.cachedir = cachedir
78         self.cache = {}
79
80     def list(self):
81         files = []
82         for k in self.bucket.list(self.path + 'log-'):
83             files.append((k.key, k.size))
84         return files
85
86     def read(self, filename, offset=0, length=None):
87         if filename in self.cache:
88             fp = open(os.path.join(self.cachedir, filename), 'rb')
89             if offset > 0:
90                 fp.seek(offset)
91             if length is None:
92                 return fp.read()
93             else:
94                 return fp.read(length)
95         else:
96             k = Key(self.bucket)
97             k.key = self.path + filename
98             data = k.get_contents_as_string()
99             fp = open(os.path.join(self.cachedir, filename), 'wb')
100             fp.write(data)
101             fp.close()
102             self.cache[filename] = True
103             if offset > 0:
104                 data = data[offset:]
105             if length is not None:
106                 data = data[0:length]
107             return data
108
109     def write(self, filename, data):
110         k = Key(self.bucket)
111         k.key = self.path + filename
112         k.set_contents_from_string(data)
113         if filename in self.cache:
114             del self.cache[filename]
115
116     def delete(self, filename):
117         k = Key(self.bucket)
118         k.key = self.path + filename
119         k.delete()
120         if filename in self.cache:
121             del self.cache[filename]
122
123     def loc_to_name(self, location):
124         return "log-%08d-%08d" % (location)
125
126     def name_to_loc(self, name):
127         m = re.match(r"^log-(\d+)-(\d+)$", name)
128         if m: return (int(m.group(1)), int(m.group(2)))
129
130 class LogItem:
131     """In-memory representation of a single item stored in a log file."""
132
133     def __init__(self):
134         self.cryptkeys = '\0' * HEADER_CRYPTBYTES
135         self.encrypted = False
136
137     def __str__(self):
138         return "<Item%s ty=%s location=%s size=%d id=%s...>" % (self.encrypted and '$' or '', self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
139
140     @staticmethod
141     def random_id():
142         return open('/dev/urandom').read(16)
143
144     def serialize(self):
145         link_ids = []
146         link_locs = []
147         for (i, l) in self.links:
148             link_ids.append(i)
149             if i != '\0' * 16:
150                 link_locs.append(struct.pack('<IIII', *l))
151         link_ids = ''.join(link_ids)
152         link_locs = ''.join(link_locs)
153
154         if self.encrypted:
155             magic = HEADER_MAGIC2
156         else:
157             magic = HEADER_MAGIC1
158         header = struct.pack(HEADER_FORMAT,
159                              magic, self.cryptkeys,
160                              ord(self.type), self.id, self.inum,
161                              len(self.data), len(link_ids), len(link_locs))
162         return header + self.data + link_ids + link_locs
163
164 class LogSegment:
165     def __init__(self, backend, location):
166         self.backend = backend
167         self.location = location
168         self.data = []
169
170     def __len__(self):
171         return sum(len(s) for s in self.data)
172
173     def write(self, item):
174         data = item.serialize()
175         offset = len(self)
176         self.data.append(data)
177         item.location = self.location + (offset, len(data))
178
179     def close(self):
180         data = ''.join(self.data)
181         filename = self.backend.loc_to_name(self.location)
182         print "Would write %d bytes of data to %s" % (len(data), filename)
183         self.backend.write(filename, data)
184
185 class LogDirectory:
186     TARGET_SIZE = 4 << 20
187
188     def __init__(self, backend, dir):
189         self.backend = backend
190         self.dir_num = dir
191         self.seq_num = 0
192         for logname in backend.list():
193             loc = backend.name_to_loc(logname[0])
194             if loc is not None and loc[0] == dir:
195                 self.seq_num = max(self.seq_num, loc[1] + 1)
196         self.groups = {}
197         print "Starting sequence number is", self.seq_num
198
199     def open_segment(self):
200         seg = LogSegment(self.backend, (self.dir_num, self.seq_num))
201         self.seq_num += 1
202         return seg
203
204     def write(self, item, segment_group=0):
205         if segment_group not in self.groups:
206             self.groups[segment_group] = self.open_segment()
207         seg = self.groups[segment_group]
208         seg.write(item)
209         if len(seg) >= LogDirectory.TARGET_SIZE:
210             seg.close()
211             del self.groups[segment_group]
212
213     def close_all(self):
214         for k in list(self.groups.keys()):
215             self.groups[k].close()
216             del self.groups[k]
217
218 class UtilizationTracker:
219     """A simple object that tracks what fraction of each segment is used.
220
221     This data can be used to guide segment cleaning decisions."""
222
223     def __init__(self, backend):
224         self.segments = {}
225         for (segment, size) in backend.list():
226             self.segments[segment] = [size, 0]
227
228     def add_item(self, item):
229         if isinstance(item, LogItem):
230             item = item.location
231         if item is None: return
232         (dir, seq, offset, size) = item
233         filename = "log-%08d-%08d" % (dir, seq)
234         self.segments[filename][1] += size
235
236 def parse_item(data):
237     if len(data) < HEADER_SIZE: return
238     header = struct.unpack_from(HEADER_FORMAT, data, 0)
239     size = HEADER_SIZE + sum(header[5:8])
240
241     if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
242         print "Bad header magic!"
243         return
244
245     if len(data) != size:
246         print "Item size does not match: %d != %d" % (size, len(data))
247         return
248
249     item = LogItem()
250     if header[0] == HEADER_MAGIC2: item.encrypted = True
251     item.cryptkeys = header[1]
252     item.id = header[3]
253     item.inum = header[4]
254     item.location = None
255     item.type = chr(header[2])
256     item.size = size
257     item.data = data[HEADER_SIZE : HEADER_SIZE + header[5]]
258     links = []
259     link_ids = data[HEADER_SIZE + header[5]
260                     : HEADER_SIZE + header[5] + header[6]]
261     link_locs = data[HEADER_SIZE + header[5] + header[6]
262                      : HEADER_SIZE + sum(header[5:8])]
263     for i in range(len(link_ids) // 16):
264         id = link_ids[16*i : 16*i + 16]
265         if id == '\0' * 16:
266             loc = None
267         else:
268             loc = struct.unpack('<IIII', link_locs[0:16])
269             link_locs = link_locs[16:]
270         links.append((id, loc))
271     item.links = links
272     return item
273
274 def load_item(backend, location):
275     """Load the cloud item pointed at by the 4-tuple 'location'.
276
277     The elements of the tuple are (directory, sequence, offset, size)."""
278
279     filename = backend.loc_to_name((location[0], location[1]))
280     data = backend.read(filename, location[2], location[3])
281     item = parse_item(data)
282     item.location = location
283     return item
284
285 def parse_log(data, location=None):
286     """Parse contents of a log file, yielding a sequence of log items."""
287
288     if isinstance(location, str):
289         m = re.match(r"^log-(\d+)-(\d+)$", location)
290         if m:
291             location = (int(m.group(1)), int(m.group(2)))
292         else:
293             location = None
294
295     offset = 0
296     while len(data) - offset >= HEADER_SIZE:
297         header = struct.unpack_from(HEADER_FORMAT, data, offset)
298         size = HEADER_SIZE + sum(header[5:8])
299         if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
300             print "Bad header magic!"
301             break
302         if size + offset > len(data):
303             print "Short data record at end of log: %s < %s" % (len(data) - offset, size)
304             break
305         item = parse_item(data[offset : offset + size])
306         if location is not None:
307             item.location = location + (offset, size)
308         if item is not None: yield item
309         offset += size
310
311 def load_checkpoint_record(backend):
312     for (log, size) in reversed(backend.list()):
313         for item in reversed(list(parse_log(backend.read(log), log))):
314             print item
315             if item.type == ITEM_TYPE.CHECKPOINT:
316                 return item
317
318 class InodeMap:
319     def __init__(self):
320         pass
321
322     def build(self, backend, checkpoint_record):
323         """Reconstruct the inode map from the checkpoint record given.
324
325         This will also build up information about segment utilization."""
326
327         self.checkpoint_record = checkpoint_record
328
329         util = UtilizationTracker(backend)
330         util.add_item(checkpoint_record)
331         inodes = {}
332         self.obsolete_segments = set()
333
334         print "Inode map:"
335         for i in range(len(checkpoint_record.data) // 16):
336             (start, end) = struct.unpack_from("<QQ", checkpoint_record.data, 16*i)
337             imap = load_item(backend, checkpoint_record.links[i][1])
338             util.add_item(imap)
339             print "[%d, %d]: %s" % (start, end, imap)
340             for j in range(len(imap.data) // 8):
341                 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
342                 inode = load_item(backend, imap.links[j][1])
343                 inodes[inum] = inode
344                 data_segments = set()
345                 util.add_item(inode)
346                 for i in inode.links:
347                     util.add_item(i[1])
348                     data_segments.add(i[1][0:2])
349                 print "  %d: %s (%d data segments)" % (inum, inode, len(data_segments))
350
351         print
352         print "Segment utilizations:"
353         for (s, u) in sorted(util.segments.items()):
354             print "%s: %s %s" % (s, u, float(u[1]) / u[0])
355             if u[1] == 0:
356                 print "Deleting..."
357                 backend.delete(s)
358
359         self.inodes = inodes
360         self.util = util
361         self.updated_inodes = set()
362
363     def mark_updated(self, inum):
364         self.updated_inodes.add(inum)
365
366     def write(self, backend, log):
367         updated_inodes = sorted(self.updated_inodes, reverse=True)
368
369         new_checkpoint = LogItem()
370         new_checkpoint.id = LogItem.random_id()
371         new_checkpoint.inum = 0
372         new_checkpoint.type = ITEM_TYPE.CHECKPOINT
373         new_checkpoint.data = ""
374         new_checkpoint.links = []
375
376         for i in range(len(self.checkpoint_record.data) // 16):
377             (start, end) = struct.unpack_from("<QQ", self.checkpoint_record.data, 16*i)
378
379             new_checkpoint.data += self.checkpoint_record.data[16*i : 16*i + 16]
380
381             # Case 1: No inodes in this range of the old inode map have
382             # changed.  Simply emit a new pointer to the same inode map block.
383             if len(updated_inodes) == 0 or updated_inodes[-1] > end:
384                 old_location = self.checkpoint_record.links[i][1][0:2]
385                 if old_location not in self.obsolete_segments:
386                     new_checkpoint.links.append(self.checkpoint_record.links[i])
387                     continue
388
389             # Case 2: Some inodes have been updated.  Create a new inode map
390             # block, write it out, and point the new checkpoint at it.
391             inodes = [k for k in self.inodes if k >= start and k <= end]
392             inodes.sort()
393
394             block = LogItem()
395             block.id = LogItem.random_id()
396             block.inum = 0
397             block.type = ITEM_TYPE.INODE_MAP
398             block.links = []
399             block.data = ""
400             for j in inodes:
401                 block.data += struct.pack("<Q", j)
402                 block.links.append((self.inodes[j].id, self.inodes[j].location))
403             log.write(block, 2)
404
405             new_checkpoint.links.append((block.id, block.location))
406
407             while len(updated_inodes) > 0 and updated_inodes[-1] <= end:
408                 updated_inodes.pop()
409
410         log.write(new_checkpoint, 2)
411         self.checkpoint_record = new_checkpoint
412
413 def rewrite_inode(backend, inode_map, inum, log, copy_data=True):
414     inode = inode_map.inodes[inum]
415     if copy_data:
416         blocks = []
417         for l in inode.links:
418             data = load_item(backend, l[1])
419             blocks.append(data)
420             log.write(data, 0)
421         inode.links = [(b.id, b.location) for b in blocks]
422     log.write(inode, 1)
423     inode_map.mark_updated(inum)
424
425 def run_cleaner(backend, inode_map, log, repack_inodes=False):
426     # Determine which segments are poorly utilized and should be cleaned.  We
427     # need better heuristics here.
428     for (s, u) in sorted(inode_map.util.segments.items()):
429         if (float(u[1]) / u[0] < 0.6 or u[1] < 32768) and u[1] > 0:
430             print "Should clean segment", s
431             loc = backend.name_to_loc(s)
432             if s: inode_map.obsolete_segments.add(loc)
433
434     # TODO: We probably also want heuristics that will find inodes with
435     # badly-fragmented data and rewrite that to achieve better locality.
436
437     # Given that list of segments to clean, scan through those segments to find
438     # data which is still live and mark relevant inodes as needing to be
439     # rewritten.
440     if repack_inodes:
441         dirty_inodes = set(inode_map.inodes)
442     else:
443         dirty_inodes = set()
444     dirty_inode_data = set()
445     for s in inode_map.obsolete_segments:
446         filename = backend.loc_to_name(s)
447         print "Scanning", filename, "for live data"
448         for item in parse_log(backend.read(filename), filename):
449             if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
450                 if item.inum != 0:
451                     inode = inode_map.inodes[item.inum]
452                     if s == inode.location[0:2]:
453                         dirty_inodes.add(item.inum)
454                     if item.inum not in dirty_inode_data:
455                         for b in inode.links:
456                             if s == b[1][0:2]:
457                                 dirty_inode_data.add(item.inum)
458                                 break
459
460     print "Inodes to rewrite:", dirty_inodes
461     print "Inodes with data to rewrite:", dirty_inode_data
462     for i in sorted(dirty_inodes.union(dirty_inode_data)):
463         rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
464
465 if __name__ == '__main__':
466     backend = S3Backend("mvrable-bluesky", cachedir=".")
467     chkpt = load_checkpoint_record(backend)
468     print backend.list()
469     imap = InodeMap()
470     imap.build(backend, chkpt)
471     print chkpt
472
473     log_dir = LogDirectory(backend, 0)
474     run_cleaner(backend, imap, log_dir)
475     imap.write(backend, log_dir)
476     log_dir.close_all()