Add S3 backend for the cleaner.
[bluesky.git] / cleaner / cleaner
1 #!/usr/bin/env python
2 #
3 # A simple cleaner for the BlueSky cloud file system.  At the moment this is an
4 # offline cleaner--the file system must not be in use at the time that the
5 # cleaning is performed.  Later, it will be extended to be an online/concurrent
6 # cleaner, where cleaning is performed even while the file system is mounted.
7 #
8 # Copyright (C) 2010  The Regents of the University of California
9 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
10
11 import base64, os, re, struct, sys
12 import boto
13 from boto.s3.key import Key
14
15 # The BlueSky 'struct cloudlog_header' data type.
16 HEADER_FORMAT = '<4sb16sQIII'
17 HEADER_MAGIC = 'AgI-'
18 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
19
20 class ITEM_TYPE:
21     DATA = '1'
22     INODE = '2'
23     INODE_MAP = '3'
24     CHECKPOINT = '4'
25
26 class FileBackend:
27     """An interface to BlueSky where the log segments are on local disk.
28
29     This is mainly intended for testing purposes, as the real cleaner would
30     operate where data is being stored in S3."""
31
32     def __init__(self, path):
33         self.path = path
34
35     def list(self):
36         """Return a listing of all log segments and their sizes."""
37
38         files = [f for f in os.listdir(self.path) if f.startswith('log-')]
39         files.sort()
40
41         return [(f, os.stat(os.path.join(self.path, f)).st_size)
42                 for f in files]
43
44     def read(self, filename):
45         fp = open(os.path.join(self.path, filename), 'rb')
46         return fp.read()
47
48     def write(self, filename, data):
49         fp = open(os.path.join(self.path, filename), 'wb')
50         fp.write(data)
51         fp.close()
52
53     def delete(self, filename):
54         os.unlink(os.path.join(self.path, filename))
55
56     def loc_to_name(self, location):
57         return "log-%08d-%08d" % (location)
58
59     def name_to_loc(self, name):
60         m = re.match(r"^log-(\d+)-(\d+)$", name)
61         if m: return (int(m.group(1)), int(m.group(2)))
62
63 class S3Backend:
64     """An interface to BlueSky where the log segments are on in Amazon S3."""
65
66     def __init__(self, bucket, path='', cachedir=None):
67         self.conn = boto.connect_s3(is_secure=False)
68         self.bucket = self.conn.get_bucket(bucket)
69         self.path = path
70
71     def list(self):
72         files = []
73         for k in self.bucket.list(self.path + 'log-'):
74             files.append((k.key, k.size))
75         return files
76
77     def read(self, filename):
78         k = Key(self.bucket)
79         k.key = self.path + filename
80         return k.get_contents_as_string()
81
82     def write(self, filename, data):
83         k = Key(self.bucket)
84         k.key = self.path + filename
85         k.set_contents_from_string(data)
86
87     def delete(self, filename):
88         k = Key(self.bucket)
89         k.key = self.path + filename
90         k.delete()
91
92     def loc_to_name(self, location):
93         return "log-%08d-%08d" % (location)
94
95     def name_to_loc(self, name):
96         m = re.match(r"^log-(\d+)-(\d+)$", name)
97         if m: return (int(m.group(1)), int(m.group(2)))
98
99 class LogItem:
100     """In-memory representation of a single item stored in a log file."""
101
102     def __str__(self):
103         return "<Item ty=%s location=%s size=%d id=%s...>" % (self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
104
105     @staticmethod
106     def random_id():
107         return open('/dev/urandom').read(16)
108
109     def serialize(self):
110         link_ids = []
111         link_locs = []
112         for (i, l) in self.links:
113             link_ids.append(i)
114             if i != '\0' * 16:
115                 link_locs.append(struct.pack('<IIII', *l))
116         link_ids = ''.join(link_ids)
117         link_locs = ''.join(link_locs)
118
119         header = struct.pack(HEADER_FORMAT,
120                              HEADER_MAGIC, ord(self.type), self.id, self.inum,
121                              len(self.data), len(link_ids), len(link_locs))
122         return header + self.data + link_ids + link_locs
123
124 class LogSegment:
125     def __init__(self, backend, location):
126         self.backend = backend
127         self.location = location
128         self.data = []
129
130     def __len__(self):
131         return sum(len(s) for s in self.data)
132
133     def write(self, item):
134         data = item.serialize()
135         offset = len(self)
136         self.data.append(data)
137         item.location = self.location + (offset, len(data))
138
139     def close(self):
140         data = ''.join(self.data)
141         filename = self.backend.loc_to_name(self.location)
142         print "Would write %d bytes of data to %s" % (len(data), filename)
143         self.backend.write(filename, data)
144
145 class LogDirectory:
146     TARGET_SIZE = 4 << 20
147
148     def __init__(self, backend, dir):
149         self.backend = backend
150         self.dir_num = dir
151         self.seq_num = 0
152         for logname in backend.list():
153             loc = backend.name_to_loc(logname[0])
154             if loc is not None and loc[0] == dir:
155                 self.seq_num = max(self.seq_num, loc[1] + 1)
156         self.groups = {}
157         print "Starting sequence number is", self.seq_num
158
159     def open_segment(self):
160         seg = LogSegment(self.backend, (self.dir_num, self.seq_num))
161         self.seq_num += 1
162         return seg
163
164     def write(self, item, segment_group=0):
165         if segment_group not in self.groups:
166             self.groups[segment_group] = self.open_segment()
167         seg = self.groups[segment_group]
168         seg.write(item)
169         if len(seg) >= LogDirectory.TARGET_SIZE:
170             seg.close()
171             del self.groups[segment_group]
172
173     def close_all(self):
174         for k in list(self.groups.keys()):
175             self.groups[k].close()
176             del self.groups[k]
177
178 class UtilizationTracker:
179     """A simple object that tracks what fraction of each segment is used.
180
181     This data can be used to guide segment cleaning decisions."""
182
183     def __init__(self, backend):
184         self.segments = {}
185         for (segment, size) in backend.list():
186             self.segments[segment] = [size, 0]
187
188     def add_item(self, item):
189         if isinstance(item, LogItem):
190             item = item.location
191         if item is None: return
192         (dir, seq, offset, size) = item
193         filename = "log-%08d-%08d" % (dir, seq)
194         self.segments[filename][1] += size
195
196 def parse_item(data):
197     if len(data) < HEADER_SIZE: return
198     header = struct.unpack_from(HEADER_FORMAT, data, 0)
199     size = HEADER_SIZE + sum(header[4:7])
200
201     if header[0] != HEADER_MAGIC:
202         print "Bad header magic!"
203         return
204
205     if len(data) != size:
206         print "Item size does not match!"
207         return
208
209     item = LogItem()
210     item.id = header[2]
211     item.inum = header[3]
212     item.location = None
213     item.type = chr(header[1])
214     item.size = size
215     item.data = data[HEADER_SIZE : HEADER_SIZE + header[4]]
216     links = []
217     link_ids = data[HEADER_SIZE + header[4]
218                     : HEADER_SIZE + header[4] + header[5]]
219     link_locs = data[HEADER_SIZE + header[4] + header[5]
220                      : HEADER_SIZE + sum(header[4:7])]
221     for i in range(len(link_ids) // 16):
222         id = link_ids[16*i : 16*i + 16]
223         if id == '\0' * 16:
224             loc = None
225         else:
226             loc = struct.unpack('<IIII', link_locs[0:16])
227             link_locs = link_locs[16:]
228         links.append((id, loc))
229     item.links = links
230     return item
231
232 def load_item(backend, location):
233     """Load the cloud item pointed at by the 4-tuple 'location'.
234
235     The elements of the tuple are (directory, sequence, offset, size)."""
236
237     filename = backend.loc_to_name((location[0], location[1]))
238     data = backend.read(filename)[location[2] : location[2] + location[3]]
239     item = parse_item(data)
240     item.location = location
241     return item
242
243 def parse_log(data, location=None):
244     """Parse contents of a log file, yielding a sequence of log items."""
245
246     if isinstance(location, str):
247         m = re.match(r"^log-(\d+)-(\d+)$", location)
248         if m:
249             location = (int(m.group(1)), int(m.group(2)))
250         else:
251             location = None
252
253     offset = 0
254     while len(data) - offset >= HEADER_SIZE:
255         header = struct.unpack_from(HEADER_FORMAT, data, offset)
256         size = HEADER_SIZE + sum(header[4:7])
257         if header[0] != HEADER_MAGIC:
258             print "Bad header magic!"
259             break
260         if size + offset > len(data):
261             print "Short data record at end of log: %s < %s" % (len(data) - offset, size)
262             break
263         item = parse_item(data[offset : offset + size])
264         if location is not None:
265             item.location = location + (offset, size)
266         if item is not None: yield item
267         offset += size
268
269 def load_checkpoint_record(backend):
270     for (log, size) in reversed(backend.list()):
271         for item in reversed(list(parse_log(backend.read(log), log))):
272             print item
273             if item.type == ITEM_TYPE.CHECKPOINT:
274                 return item
275
276 class InodeMap:
277     def __init__(self):
278         pass
279
280     def build(self, backend, checkpoint_record):
281         """Reconstruct the inode map from the checkpoint record given.
282
283         This will also build up information about segment utilization."""
284
285         self.checkpoint_record = checkpoint_record
286
287         util = UtilizationTracker(backend)
288         util.add_item(checkpoint_record)
289         inodes = {}
290         self.obsolete_segments = set()
291
292         print "Inode map:"
293         for i in range(len(checkpoint_record.data) // 16):
294             (start, end) = struct.unpack_from("<QQ", checkpoint_record.data, 16*i)
295             imap = load_item(backend, checkpoint_record.links[i][1])
296             util.add_item(imap)
297             print "[%d, %d]: %s" % (start, end, imap)
298             for j in range(len(imap.data) // 8):
299                 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
300                 inode = load_item(backend, imap.links[j][1])
301                 inodes[inum] = inode
302                 data_segments = set()
303                 util.add_item(inode)
304                 for i in inode.links:
305                     util.add_item(i[1])
306                     data_segments.add(i[1][0:2])
307                 print "  %d: %s (%d data segments)" % (inum, inode, len(data_segments))
308
309         print
310         print "Segment utilizations:"
311         for (s, u) in sorted(util.segments.items()):
312             print "%s: %s %s" % (s, u, float(u[1]) / u[0])
313             if u[1] == 0:
314                 print "Deleting..."
315                 backend.delete(s)
316
317         self.inodes = inodes
318         self.util = util
319         self.updated_inodes = set()
320
321     def mark_updated(self, inum):
322         self.updated_inodes.add(inum)
323
324     def write(self, backend, log):
325         updated_inodes = sorted(self.updated_inodes, reverse=True)
326
327         new_checkpoint = LogItem()
328         new_checkpoint.id = LogItem.random_id()
329         new_checkpoint.inum = 0
330         new_checkpoint.type = ITEM_TYPE.CHECKPOINT
331         new_checkpoint.data = ""
332         new_checkpoint.links = []
333
334         for i in range(len(self.checkpoint_record.data) // 16):
335             (start, end) = struct.unpack_from("<QQ", self.checkpoint_record.data, 16*i)
336
337             new_checkpoint.data += self.checkpoint_record.data[16*i : 16*i + 16]
338
339             # Case 1: No inodes in this range of the old inode map have
340             # changed.  Simply emit a new pointer to the same inode map block.
341             if len(updated_inodes) == 0 or updated_inodes[-1] > end:
342                 old_location = self.checkpoint_record.links[i][1][0:2]
343                 if old_location not in self.obsolete_segments:
344                     new_checkpoint.links.append(self.checkpoint_record.links[i])
345                     continue
346
347             # Case 2: Some inodes have been updated.  Create a new inode map
348             # block, write it out, and point the new checkpoint at it.
349             inodes = [k for k in self.inodes if k >= start and k <= end]
350             inodes.sort()
351
352             block = LogItem()
353             block.id = LogItem.random_id()
354             block.inum = 0
355             block.type = ITEM_TYPE.INODE_MAP
356             block.links = []
357             block.data = ""
358             for j in inodes:
359                 block.data += struct.pack("<Q", j)
360                 block.links.append((self.inodes[j].id, self.inodes[j].location))
361             log.write(block, 2)
362
363             new_checkpoint.links.append((block.id, block.location))
364
365             while len(updated_inodes) > 0 and updated_inodes[-1] <= end:
366                 updated_inodes.pop()
367
368         log.write(new_checkpoint, 2)
369         self.checkpoint_record = new_checkpoint
370
371 def rewrite_inode(backend, inode_map, inum, log, copy_data=True):
372     inode = inode_map.inodes[inum]
373     if copy_data:
374         blocks = []
375         for l in inode.links:
376             data = load_item(backend, l[1])
377             blocks.append(data)
378             log.write(data, 0)
379         inode.links = [(b.id, b.location) for b in blocks]
380     log.write(inode, 1)
381     inode_map.mark_updated(inum)
382
383 def run_cleaner(backend, inode_map, log):
384     # Determine which segments are poorly utilized and should be cleaned.  We
385     # need better heuristics here.
386     for (s, u) in sorted(inode_map.util.segments.items()):
387         if (float(u[1]) / u[0] < 0.6 or u[1] < 32768) and u[1] > 0:
388             print "Should clean segment", s
389             loc = backend.name_to_loc(s)
390             if s: inode_map.obsolete_segments.add(loc)
391
392     # TODO: We probably also want heuristics that will find inodes with
393     # badly-fragmented data and rewrite that to achieve better locality.
394
395     # Given that list of segments to clean, scan through those segments to find
396     # data which is still live and mark relevant inodes as needing to be
397     # rewritten.
398     dirty_inodes = set()
399     dirty_inode_data = set()
400     for s in inode_map.obsolete_segments:
401         filename = backend.loc_to_name(s)
402         print "Scanning", filename, "for live data"
403         for item in parse_log(backend.read(filename), filename):
404             if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
405                 if item.inum != 0:
406                     inode = inode_map.inodes[item.inum]
407                     if s == inode.location[0:2]:
408                         dirty_inodes.add(item.inum)
409                     if item.inum not in dirty_inode_data:
410                         for b in inode.links:
411                             if s == b[1][0:2]:
412                                 dirty_inode_data.add(item.inum)
413                                 break
414
415     print "Inodes to rewrite:", dirty_inodes
416     print "Inodes with data to rewrite:", dirty_inode_data
417     for i in sorted(dirty_inodes.union(dirty_inode_data)):
418         rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
419
420 if __name__ == '__main__':
421     backend = S3Backend("mvrable-bluesky", cachedir=".")
422     chkpt = load_checkpoint_record(backend)
423     print backend.list()
424     imap = InodeMap()
425     imap.build(backend, chkpt)
426     print chkpt
427
428     log_dir = LogDirectory(backend, 0)
429     run_cleaner(backend, imap, log_dir)
430     imap.write(backend, log_dir)
431     log_dir.close_all()