Add very basic caching to the cleaner S3 backend.
[bluesky.git] / cleaner / cleaner
1 #!/usr/bin/env python
2 #
3 # A simple cleaner for the BlueSky cloud file system.  At the moment this is an
4 # offline cleaner--the file system must not be in use at the time that the
5 # cleaning is performed.  Later, it will be extended to be an online/concurrent
6 # cleaner, where cleaning is performed even while the file system is mounted.
7 #
8 # Copyright (C) 2010  The Regents of the University of California
9 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
10
11 import base64, os, re, struct, sys
12 import boto
13 from boto.s3.key import Key
14
15 # The BlueSky 'struct cloudlog_header' data type.
16 HEADER_FORMAT = '<4sb16sQIII'
17 HEADER_MAGIC = 'AgI-'
18 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
19
20 class ITEM_TYPE:
21     DATA = '1'
22     INODE = '2'
23     INODE_MAP = '3'
24     CHECKPOINT = '4'
25
26 class FileBackend:
27     """An interface to BlueSky where the log segments are on local disk.
28
29     This is mainly intended for testing purposes, as the real cleaner would
30     operate where data is being stored in S3."""
31
32     def __init__(self, path):
33         self.path = path
34
35     def list(self):
36         """Return a listing of all log segments and their sizes."""
37
38         files = [f for f in os.listdir(self.path) if f.startswith('log-')]
39         files.sort()
40
41         return [(f, os.stat(os.path.join(self.path, f)).st_size)
42                 for f in files]
43
44     def read(self, filename):
45         fp = open(os.path.join(self.path, filename), 'rb')
46         return fp.read()
47
48     def write(self, filename, data):
49         fp = open(os.path.join(self.path, filename), 'wb')
50         fp.write(data)
51         fp.close()
52
53     def delete(self, filename):
54         os.unlink(os.path.join(self.path, filename))
55
56     def loc_to_name(self, location):
57         return "log-%08d-%08d" % (location)
58
59     def name_to_loc(self, name):
60         m = re.match(r"^log-(\d+)-(\d+)$", name)
61         if m: return (int(m.group(1)), int(m.group(2)))
62
63 class S3Backend:
64     """An interface to BlueSky where the log segments are on in Amazon S3."""
65
66     def __init__(self, bucket, path='', cachedir="."):
67         self.conn = boto.connect_s3(is_secure=False)
68         self.bucket = self.conn.get_bucket(bucket)
69         self.path = path
70         self.cachedir = cachedir
71         self.cache = {}
72
73     def list(self):
74         files = []
75         for k in self.bucket.list(self.path + 'log-'):
76             files.append((k.key, k.size))
77         return files
78
79     def read(self, filename):
80         if filename in self.cache:
81             fp = open(os.path.join(self.cachedir, filename), 'rb')
82             return fp.read()
83         else:
84             k = Key(self.bucket)
85             k.key = self.path + filename
86             data = k.get_contents_as_string()
87             fp = open(os.path.join(self.cachedir, filename), 'wb')
88             fp.write(data)
89             fp.close()
90             self.cache[filename] = True
91             return data
92
93     def write(self, filename, data):
94         k = Key(self.bucket)
95         k.key = self.path + filename
96         k.set_contents_from_string(data)
97         if filename in self.cache:
98             del self.cache[filename]
99
100     def delete(self, filename):
101         k = Key(self.bucket)
102         k.key = self.path + filename
103         k.delete()
104         if filename in self.cache:
105             del self.cache[filename]
106
107     def loc_to_name(self, location):
108         return "log-%08d-%08d" % (location)
109
110     def name_to_loc(self, name):
111         m = re.match(r"^log-(\d+)-(\d+)$", name)
112         if m: return (int(m.group(1)), int(m.group(2)))
113
114 class LogItem:
115     """In-memory representation of a single item stored in a log file."""
116
117     def __str__(self):
118         return "<Item ty=%s location=%s size=%d id=%s...>" % (self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
119
120     @staticmethod
121     def random_id():
122         return open('/dev/urandom').read(16)
123
124     def serialize(self):
125         link_ids = []
126         link_locs = []
127         for (i, l) in self.links:
128             link_ids.append(i)
129             if i != '\0' * 16:
130                 link_locs.append(struct.pack('<IIII', *l))
131         link_ids = ''.join(link_ids)
132         link_locs = ''.join(link_locs)
133
134         header = struct.pack(HEADER_FORMAT,
135                              HEADER_MAGIC, ord(self.type), self.id, self.inum,
136                              len(self.data), len(link_ids), len(link_locs))
137         return header + self.data + link_ids + link_locs
138
139 class LogSegment:
140     def __init__(self, backend, location):
141         self.backend = backend
142         self.location = location
143         self.data = []
144
145     def __len__(self):
146         return sum(len(s) for s in self.data)
147
148     def write(self, item):
149         data = item.serialize()
150         offset = len(self)
151         self.data.append(data)
152         item.location = self.location + (offset, len(data))
153
154     def close(self):
155         data = ''.join(self.data)
156         filename = self.backend.loc_to_name(self.location)
157         print "Would write %d bytes of data to %s" % (len(data), filename)
158         self.backend.write(filename, data)
159
160 class LogDirectory:
161     TARGET_SIZE = 4 << 20
162
163     def __init__(self, backend, dir):
164         self.backend = backend
165         self.dir_num = dir
166         self.seq_num = 0
167         for logname in backend.list():
168             loc = backend.name_to_loc(logname[0])
169             if loc is not None and loc[0] == dir:
170                 self.seq_num = max(self.seq_num, loc[1] + 1)
171         self.groups = {}
172         print "Starting sequence number is", self.seq_num
173
174     def open_segment(self):
175         seg = LogSegment(self.backend, (self.dir_num, self.seq_num))
176         self.seq_num += 1
177         return seg
178
179     def write(self, item, segment_group=0):
180         if segment_group not in self.groups:
181             self.groups[segment_group] = self.open_segment()
182         seg = self.groups[segment_group]
183         seg.write(item)
184         if len(seg) >= LogDirectory.TARGET_SIZE:
185             seg.close()
186             del self.groups[segment_group]
187
188     def close_all(self):
189         for k in list(self.groups.keys()):
190             self.groups[k].close()
191             del self.groups[k]
192
193 class UtilizationTracker:
194     """A simple object that tracks what fraction of each segment is used.
195
196     This data can be used to guide segment cleaning decisions."""
197
198     def __init__(self, backend):
199         self.segments = {}
200         for (segment, size) in backend.list():
201             self.segments[segment] = [size, 0]
202
203     def add_item(self, item):
204         if isinstance(item, LogItem):
205             item = item.location
206         if item is None: return
207         (dir, seq, offset, size) = item
208         filename = "log-%08d-%08d" % (dir, seq)
209         self.segments[filename][1] += size
210
211 def parse_item(data):
212     if len(data) < HEADER_SIZE: return
213     header = struct.unpack_from(HEADER_FORMAT, data, 0)
214     size = HEADER_SIZE + sum(header[4:7])
215
216     if header[0] != HEADER_MAGIC:
217         print "Bad header magic!"
218         return
219
220     if len(data) != size:
221         print "Item size does not match!"
222         return
223
224     item = LogItem()
225     item.id = header[2]
226     item.inum = header[3]
227     item.location = None
228     item.type = chr(header[1])
229     item.size = size
230     item.data = data[HEADER_SIZE : HEADER_SIZE + header[4]]
231     links = []
232     link_ids = data[HEADER_SIZE + header[4]
233                     : HEADER_SIZE + header[4] + header[5]]
234     link_locs = data[HEADER_SIZE + header[4] + header[5]
235                      : HEADER_SIZE + sum(header[4:7])]
236     for i in range(len(link_ids) // 16):
237         id = link_ids[16*i : 16*i + 16]
238         if id == '\0' * 16:
239             loc = None
240         else:
241             loc = struct.unpack('<IIII', link_locs[0:16])
242             link_locs = link_locs[16:]
243         links.append((id, loc))
244     item.links = links
245     return item
246
247 def load_item(backend, location):
248     """Load the cloud item pointed at by the 4-tuple 'location'.
249
250     The elements of the tuple are (directory, sequence, offset, size)."""
251
252     filename = backend.loc_to_name((location[0], location[1]))
253     data = backend.read(filename)[location[2] : location[2] + location[3]]
254     item = parse_item(data)
255     item.location = location
256     return item
257
258 def parse_log(data, location=None):
259     """Parse contents of a log file, yielding a sequence of log items."""
260
261     if isinstance(location, str):
262         m = re.match(r"^log-(\d+)-(\d+)$", location)
263         if m:
264             location = (int(m.group(1)), int(m.group(2)))
265         else:
266             location = None
267
268     offset = 0
269     while len(data) - offset >= HEADER_SIZE:
270         header = struct.unpack_from(HEADER_FORMAT, data, offset)
271         size = HEADER_SIZE + sum(header[4:7])
272         if header[0] != HEADER_MAGIC:
273             print "Bad header magic!"
274             break
275         if size + offset > len(data):
276             print "Short data record at end of log: %s < %s" % (len(data) - offset, size)
277             break
278         item = parse_item(data[offset : offset + size])
279         if location is not None:
280             item.location = location + (offset, size)
281         if item is not None: yield item
282         offset += size
283
284 def load_checkpoint_record(backend):
285     for (log, size) in reversed(backend.list()):
286         for item in reversed(list(parse_log(backend.read(log), log))):
287             print item
288             if item.type == ITEM_TYPE.CHECKPOINT:
289                 return item
290
291 class InodeMap:
292     def __init__(self):
293         pass
294
295     def build(self, backend, checkpoint_record):
296         """Reconstruct the inode map from the checkpoint record given.
297
298         This will also build up information about segment utilization."""
299
300         self.checkpoint_record = checkpoint_record
301
302         util = UtilizationTracker(backend)
303         util.add_item(checkpoint_record)
304         inodes = {}
305         self.obsolete_segments = set()
306
307         print "Inode map:"
308         for i in range(len(checkpoint_record.data) // 16):
309             (start, end) = struct.unpack_from("<QQ", checkpoint_record.data, 16*i)
310             imap = load_item(backend, checkpoint_record.links[i][1])
311             util.add_item(imap)
312             print "[%d, %d]: %s" % (start, end, imap)
313             for j in range(len(imap.data) // 8):
314                 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
315                 inode = load_item(backend, imap.links[j][1])
316                 inodes[inum] = inode
317                 data_segments = set()
318                 util.add_item(inode)
319                 for i in inode.links:
320                     util.add_item(i[1])
321                     data_segments.add(i[1][0:2])
322                 print "  %d: %s (%d data segments)" % (inum, inode, len(data_segments))
323
324         print
325         print "Segment utilizations:"
326         for (s, u) in sorted(util.segments.items()):
327             print "%s: %s %s" % (s, u, float(u[1]) / u[0])
328             if u[1] == 0:
329                 print "Deleting..."
330                 backend.delete(s)
331
332         self.inodes = inodes
333         self.util = util
334         self.updated_inodes = set()
335
336     def mark_updated(self, inum):
337         self.updated_inodes.add(inum)
338
339     def write(self, backend, log):
340         updated_inodes = sorted(self.updated_inodes, reverse=True)
341
342         new_checkpoint = LogItem()
343         new_checkpoint.id = LogItem.random_id()
344         new_checkpoint.inum = 0
345         new_checkpoint.type = ITEM_TYPE.CHECKPOINT
346         new_checkpoint.data = ""
347         new_checkpoint.links = []
348
349         for i in range(len(self.checkpoint_record.data) // 16):
350             (start, end) = struct.unpack_from("<QQ", self.checkpoint_record.data, 16*i)
351
352             new_checkpoint.data += self.checkpoint_record.data[16*i : 16*i + 16]
353
354             # Case 1: No inodes in this range of the old inode map have
355             # changed.  Simply emit a new pointer to the same inode map block.
356             if len(updated_inodes) == 0 or updated_inodes[-1] > end:
357                 old_location = self.checkpoint_record.links[i][1][0:2]
358                 if old_location not in self.obsolete_segments:
359                     new_checkpoint.links.append(self.checkpoint_record.links[i])
360                     continue
361
362             # Case 2: Some inodes have been updated.  Create a new inode map
363             # block, write it out, and point the new checkpoint at it.
364             inodes = [k for k in self.inodes if k >= start and k <= end]
365             inodes.sort()
366
367             block = LogItem()
368             block.id = LogItem.random_id()
369             block.inum = 0
370             block.type = ITEM_TYPE.INODE_MAP
371             block.links = []
372             block.data = ""
373             for j in inodes:
374                 block.data += struct.pack("<Q", j)
375                 block.links.append((self.inodes[j].id, self.inodes[j].location))
376             log.write(block, 2)
377
378             new_checkpoint.links.append((block.id, block.location))
379
380             while len(updated_inodes) > 0 and updated_inodes[-1] <= end:
381                 updated_inodes.pop()
382
383         log.write(new_checkpoint, 2)
384         self.checkpoint_record = new_checkpoint
385
386 def rewrite_inode(backend, inode_map, inum, log, copy_data=True):
387     inode = inode_map.inodes[inum]
388     if copy_data:
389         blocks = []
390         for l in inode.links:
391             data = load_item(backend, l[1])
392             blocks.append(data)
393             log.write(data, 0)
394         inode.links = [(b.id, b.location) for b in blocks]
395     log.write(inode, 1)
396     inode_map.mark_updated(inum)
397
398 def run_cleaner(backend, inode_map, log):
399     # Determine which segments are poorly utilized and should be cleaned.  We
400     # need better heuristics here.
401     for (s, u) in sorted(inode_map.util.segments.items()):
402         if (float(u[1]) / u[0] < 0.6 or u[1] < 32768) and u[1] > 0:
403             print "Should clean segment", s
404             loc = backend.name_to_loc(s)
405             if s: inode_map.obsolete_segments.add(loc)
406
407     # TODO: We probably also want heuristics that will find inodes with
408     # badly-fragmented data and rewrite that to achieve better locality.
409
410     # Given that list of segments to clean, scan through those segments to find
411     # data which is still live and mark relevant inodes as needing to be
412     # rewritten.
413     dirty_inodes = set()
414     dirty_inode_data = set()
415     for s in inode_map.obsolete_segments:
416         filename = backend.loc_to_name(s)
417         print "Scanning", filename, "for live data"
418         for item in parse_log(backend.read(filename), filename):
419             if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
420                 if item.inum != 0:
421                     inode = inode_map.inodes[item.inum]
422                     if s == inode.location[0:2]:
423                         dirty_inodes.add(item.inum)
424                     if item.inum not in dirty_inode_data:
425                         for b in inode.links:
426                             if s == b[1][0:2]:
427                                 dirty_inode_data.add(item.inum)
428                                 break
429
430     print "Inodes to rewrite:", dirty_inodes
431     print "Inodes with data to rewrite:", dirty_inode_data
432     for i in sorted(dirty_inodes.union(dirty_inode_data)):
433         rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
434
435 if __name__ == '__main__':
436     backend = S3Backend("mvrable-bluesky", cachedir=".")
437     chkpt = load_checkpoint_record(backend)
438     print backend.list()
439     imap = InodeMap()
440     imap.build(backend, chkpt)
441     print chkpt
442
443     log_dir = LogDirectory(backend, 0)
444     run_cleaner(backend, imap, log_dir)
445     imap.write(backend, log_dir)
446     log_dir.close_all()