4cda7c7f95ec70dd2742d3442625cb2a52f19954
[bluesky.git] / cleaner / cleaner
1 #!/usr/bin/env python
2 #
3 # A simple cleaner for the BlueSky cloud file system.  At the moment this is an
4 # offline cleaner--the file system must not be in use at the time that the
5 # cleaning is performed.  Later, it will be extended to be an online/concurrent
6 # cleaner, where cleaning is performed even while the file system is mounted.
7 #
8 # Copyright (C) 2010  The Regents of the University of California
9 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
10
11 import base64, os, re, struct, sys, time
12 import boto
13 from boto.s3.key import Key
14
15 # The BlueSky 'struct cloudlog_header' data type.
16 HEADER_FORMAT = '<4s48sb16sQIII'
17 HEADER_CRYPTBYTES = 48
18 HEADER_MAGIC1 = 'AgI-'          # Unencrypted data
19 HEADER_MAGIC2 = 'AgI='          # Encrypted data
20 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
21
22 CHECKPOINT_MAGIC = struct.pack('<Q', 0x7ad7dafb42a498b4)
23
24 class ITEM_TYPE:
25     DATA = '1'
26     INODE = '2'
27     INODE_MAP = '3'
28     CHECKPOINT = '4'
29
30 class Backend:
31     """Base class for BlueSky storage backends."""
32
33     def loc_to_name(self, location):
34         return "log-%08d-%08d" % (location)
35
36     def name_to_loc(self, name):
37         m = re.match(r"^log-(\d+)-(\d+)$", name)
38         if m: return (int(m.group(1)), int(m.group(2)))
39
40
41 class FileBackend(Backend):
42     """An interface to BlueSky where the log segments are on local disk.
43
44     This is mainly intended for testing purposes, as the real cleaner would
45     operate where data is being stored in S3."""
46
47     def __init__(self, path):
48         self.path = path
49
50     def list(self, directory=0):
51         """Return a listing of all log segments and their sizes."""
52
53         prefix = "log-%08d-" % (directory,)
54         files = [f for f in os.listdir(self.path) if f.startswith(prefix)]
55         files.sort()
56
57         return [(f, os.stat(os.path.join(self.path, f)).st_size)
58                 for f in files]
59
60     def read(self, filename, offset=0, length=None):
61         fp = open(os.path.join(self.path, filename), 'rb')
62         if offset > 0:
63             fp.seek(offset)
64         if length is None:
65             return fp.read()
66         else:
67             return fp.read(length)
68
69     def write(self, filename, data):
70         fp = open(os.path.join(self.path, filename), 'wb')
71         fp.write(data)
72         fp.close()
73
74     def delete(self, filename):
75         os.unlink(os.path.join(self.path, filename))
76
77 def retry_wrap(method):
78     def wrapped(self, *args, **kwargs):
79         for retries in range(3):
80             try:
81                 return method(self, *args, **kwargs)
82             except:
83                 print >>sys.stderr, "S3 operation failed, retrying..."
84                 self.connect()
85                 time.sleep(1.0)
86         return method(self, *args, **kwargs)
87     return wrapped
88
89 class S3Backend(Backend):
90     """An interface to BlueSky where the log segments are on in Amazon S3."""
91
92     def __init__(self, bucket, path='', cachedir="."):
93         self.bucket_name = bucket
94         self.path = path
95         self.cachedir = cachedir
96         self.cache = {}
97         self.connect()
98
99     def connect(self):
100         self.conn = boto.connect_s3(is_secure=False)
101         self.bucket = self.conn.get_bucket(self.bucket_name)
102
103     def list(self, directory=0):
104         files = []
105         prefix = "log-%08d-" % (directory,)
106         for k in self.bucket.list(self.path + prefix):
107             files.append((k.key, k.size))
108         return files
109
110     @retry_wrap
111     def read(self, filename, offset=0, length=None):
112         if filename in self.cache:
113             fp = open(os.path.join(self.cachedir, filename), 'rb')
114             if offset > 0:
115                 fp.seek(offset)
116             if length is None:
117                 return fp.read()
118             else:
119                 return fp.read(length)
120         else:
121             k = Key(self.bucket)
122             k.key = self.path + filename
123             data = k.get_contents_as_string()
124             fp = open(os.path.join(self.cachedir, filename), 'wb')
125             fp.write(data)
126             fp.close()
127             self.cache[filename] = True
128             if offset > 0:
129                 data = data[offset:]
130             if length is not None:
131                 data = data[0:length]
132             return data
133
134     @retry_wrap
135     def write(self, filename, data):
136         k = Key(self.bucket)
137         k.key = self.path + filename
138         k.set_contents_from_string(data)
139         if filename in self.cache:
140             del self.cache[filename]
141
142     @retry_wrap
143     def delete(self, filename):
144         k = Key(self.bucket)
145         k.key = self.path + filename
146         k.delete()
147         if filename in self.cache:
148             del self.cache[filename]
149
150 class SimpleBackend(Backend):
151     """An interface to the simple BlueSky test network server."""
152
153     def __init__(self, server=('localhost', 12345), cachedir="."):
154         self.bucket_name = bucket
155         self.server_address = server
156         self.cachedir = cachedir
157         self.cache = {}
158
159     def _get_socket(self):
160         return socket.create_connection(self.server_address).makefile()
161
162     def list(self, directory=0):
163         files = []
164         prefix = "log-%08d-" % (directory,)
165         for k in self.bucket.list(self.path + prefix):
166             files.append((k.key, k.size))
167         return files
168
169     def read(self, filename, offset=0, length=None):
170         if filename in self.cache:
171             fp = open(os.path.join(self.cachedir, filename), 'rb')
172             if offset > 0:
173                 fp.seek(offset)
174             if length is None:
175                 return fp.read()
176             else:
177                 return fp.read(length)
178         else:
179             f = self._get_socket()
180             f.write("GET %s %d %d\n" % (filename, 0, 0))
181             f.flush()
182             datalen = int(f.readline())
183             if datalen < 0:
184                 raise RuntimeError
185             data = f.read(datalen)
186             fp = open(os.path.join(self.cachedir, filename), 'wb')
187             fp.write(data)
188             fp.close()
189             self.cache[filename] = True
190             if offset > 0:
191                 data = data[offset:]
192             if length is not None:
193                 data = data[0:length]
194             return data
195
196     def write(self, filename, data):
197         f = self._get_socket()
198         f.write("PUT %s %d %d\n" % (filename, len(data)))
199         f.write(data)
200         f.flush()
201         result = int(f.readline())
202         if filename in self.cache:
203             del self.cache[filename]
204
205     def delete(self, filename):
206         pass
207
208 class LogItem:
209     """In-memory representation of a single item stored in a log file."""
210
211     def __init__(self):
212         self.cryptkeys = '\0' * HEADER_CRYPTBYTES
213         self.encrypted = False
214
215     def __str__(self):
216         return "<Item%s ty=%s location=%s size=%d id=%s...>" % (self.encrypted and '$' or '', self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
217
218     @staticmethod
219     def random_id():
220         return open('/dev/urandom').read(16)
221
222     def serialize(self):
223         link_ids = []
224         link_locs = []
225         for (i, l) in self.links:
226             link_ids.append(i)
227             if i != '\0' * 16:
228                 link_locs.append(struct.pack('<IIII', *l))
229         link_ids = ''.join(link_ids)
230         link_locs = ''.join(link_locs)
231
232         if self.encrypted:
233             magic = HEADER_MAGIC2
234         else:
235             magic = HEADER_MAGIC1
236         header = struct.pack(HEADER_FORMAT,
237                              magic, self.cryptkeys,
238                              ord(self.type), self.id, self.inum,
239                              len(self.data), len(link_ids), len(link_locs))
240         return header + self.data + link_ids + link_locs
241
242 class LogSegment:
243     def __init__(self, backend, location):
244         self.backend = backend
245         self.location = location
246         self.data = []
247
248     def __len__(self):
249         return sum(len(s) for s in self.data)
250
251     def write(self, item):
252         data = item.serialize()
253         offset = len(self)
254         self.data.append(data)
255         item.location = self.location + (offset, len(data))
256
257     def close(self):
258         data = ''.join(self.data)
259         filename = self.backend.loc_to_name(self.location)
260         print "Would write %d bytes of data to %s" % (len(data), filename)
261         self.backend.write(filename, data)
262
263 class LogDirectory:
264     TARGET_SIZE = 4 << 20
265
266     def __init__(self, backend, dir):
267         self.backend = backend
268         self.dir_num = dir
269         self.seq_num = 0
270         for logname in backend.list(dir):
271             print "Old log file:", logname
272             loc = backend.name_to_loc(logname[0])
273             if loc is not None and loc[0] == dir:
274                 self.seq_num = max(self.seq_num, loc[1] + 1)
275         self.groups = {}
276         print "Starting sequence number is", self.seq_num
277
278     def open_segment(self):
279         seg = LogSegment(self.backend, (self.dir_num, self.seq_num))
280         self.seq_num += 1
281         return seg
282
283     def write(self, item, segment_group=0):
284         if segment_group not in self.groups:
285             self.groups[segment_group] = self.open_segment()
286         seg = self.groups[segment_group]
287         seg.write(item)
288         if len(seg) >= LogDirectory.TARGET_SIZE:
289             seg.close()
290             del self.groups[segment_group]
291
292     def close_all(self):
293         for k in list(self.groups.keys()):
294             self.groups[k].close()
295             del self.groups[k]
296
297 class UtilizationTracker:
298     """A simple object that tracks what fraction of each segment is used.
299
300     This data can be used to guide segment cleaning decisions."""
301
302     def __init__(self, backend):
303         self.segments = {}
304         for (segment, size) in backend.list(0) + backend.list(1):
305             self.segments[segment] = [size, 0]
306
307     def add_item(self, item):
308         if isinstance(item, LogItem):
309             item = item.location
310         if item is None: return
311         (dir, seq, offset, size) = item
312         filename = "log-%08d-%08d" % (dir, seq)
313         self.segments[filename][1] += size
314
315 def parse_item(data):
316     if len(data) < HEADER_SIZE: return
317     header = struct.unpack_from(HEADER_FORMAT, data, 0)
318     size = HEADER_SIZE + sum(header[5:8])
319
320     if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
321         print "Bad header magic!"
322         return
323
324     if len(data) != size:
325         print "Item size does not match: %d != %d" % (size, len(data))
326         return
327
328     item = LogItem()
329     if header[0] == HEADER_MAGIC2: item.encrypted = True
330     item.cryptkeys = header[1]
331     item.id = header[3]
332     item.inum = header[4]
333     item.location = None
334     item.type = chr(header[2])
335     item.size = size
336     item.data = data[HEADER_SIZE : HEADER_SIZE + header[5]]
337     links = []
338     link_ids = data[HEADER_SIZE + header[5]
339                     : HEADER_SIZE + header[5] + header[6]]
340     link_locs = data[HEADER_SIZE + header[5] + header[6]
341                      : HEADER_SIZE + sum(header[5:8])]
342     for i in range(len(link_ids) // 16):
343         id = link_ids[16*i : 16*i + 16]
344         if id == '\0' * 16:
345             loc = None
346         else:
347             loc = struct.unpack('<IIII', link_locs[0:16])
348             link_locs = link_locs[16:]
349         links.append((id, loc))
350     item.links = links
351     return item
352
353 def load_item(backend, location):
354     """Load the cloud item pointed at by the 4-tuple 'location'.
355
356     The elements of the tuple are (directory, sequence, offset, size)."""
357
358     filename = backend.loc_to_name((location[0], location[1]))
359     data = backend.read(filename, location[2], location[3])
360     item = parse_item(data)
361     item.location = location
362     return item
363
364 def parse_log(data, location=None):
365     """Parse contents of a log file, yielding a sequence of log items."""
366
367     if isinstance(location, str):
368         m = re.match(r"^log-(\d+)-(\d+)$", location)
369         if m:
370             location = (int(m.group(1)), int(m.group(2)))
371         else:
372             location = None
373
374     offset = 0
375     while len(data) - offset >= HEADER_SIZE:
376         header = struct.unpack_from(HEADER_FORMAT, data, offset)
377         size = HEADER_SIZE + sum(header[5:8])
378         if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
379             print "Bad header magic!"
380             break
381         if size + offset > len(data):
382             print "Short data record at end of log: %s < %s" % (len(data) - offset, size)
383             break
384         item = parse_item(data[offset : offset + size])
385         if location is not None:
386             item.location = location + (offset, size)
387         if item is not None: yield item
388         offset += size
389
390 def load_checkpoint_record(backend, directory=0):
391     for (log, size) in reversed(backend.list(directory)):
392         for item in reversed(list(parse_log(backend.read(log), log))):
393             print item
394             if item.type == ITEM_TYPE.CHECKPOINT:
395                 return item
396
397 class InodeMap:
398     def __init__(self):
399         pass
400
401     def build(self, backend, checkpoint_record):
402         """Reconstruct the inode map from the checkpoint record given.
403
404         This will also build up information about segment utilization."""
405
406         self.version_vector = {}
407         self.checkpoint_record = checkpoint_record
408
409         util = UtilizationTracker(backend)
410         util.add_item(checkpoint_record)
411         inodes = {}
412         self.obsolete_segments = set()
413
414         data = checkpoint_record.data
415         if not data.startswith(CHECKPOINT_MAGIC):
416             raise ValueError, "Invalid checkpoint record!"
417         data = data[len(CHECKPOINT_MAGIC):]
418         (vvlen,) = struct.unpack_from("<I", data, 0)
419         self.vvsize = 4 + 8*vvlen
420         for i in range(vvlen):
421             (v1, v2) = struct.unpack_from("<II", data, 4 + 8*i)
422             self.version_vector[v1] = v2
423         print self.version_vector
424         self.version_vector[checkpoint_record.location[0]] \
425             = checkpoint_record.location[1]
426         print self.version_vector
427
428         data = data[self.vvsize:]
429
430         print "Inode map:"
431         for i in range(len(data) // 16):
432             (start, end) = struct.unpack_from("<QQ", data, 16*i)
433             imap = load_item(backend, checkpoint_record.links[i][1])
434             util.add_item(imap)
435             print "[%d, %d]: %s" % (start, end, imap)
436             for j in range(len(imap.data) // 8):
437                 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
438                 inode = load_item(backend, imap.links[j][1])
439                 inodes[inum] = inode
440                 data_segments = set()
441                 util.add_item(inode)
442                 for i in inode.links:
443                     util.add_item(i[1])
444                     data_segments.add(i[1][0:2])
445                 print "  %d: %s (%d data segments)" % (inum, inode, len(data_segments))
446
447         print
448         print "Segment utilizations:"
449         for (s, u) in sorted(util.segments.items()):
450             print "%s: %s %s" % (s, u, float(u[1]) / u[0])
451             if u[1] == 0:
452                 # print "Deleting..."
453                 # backend.delete(s)
454                 pass
455
456         self.inodes = inodes
457         self.util = util
458         self.updated_inodes = set()
459
460     def mark_updated(self, inum):
461         self.updated_inodes.add(inum)
462
463     def write(self, backend, log):
464         updated_inodes = sorted(self.updated_inodes, reverse=True)
465
466         new_checkpoint = LogItem()
467         new_checkpoint.id = LogItem.random_id()
468         new_checkpoint.inum = 0
469         new_checkpoint.type = ITEM_TYPE.CHECKPOINT
470         new_checkpoint.data = CHECKPOINT_MAGIC
471         new_checkpoint.links = []
472
473         new_checkpoint.data += struct.pack('<I', len(self.version_vector))
474         for d in sorted(self.version_vector):
475             new_checkpoint.data += struct.pack('<II', d, self.version_vector[d])
476
477         data = self.checkpoint_record.data[self.vvsize + len(CHECKPOINT_MAGIC):]
478         for i in range(len(data) // 16):
479             (start, end) = struct.unpack_from("<QQ", data, 16*i)
480
481             new_checkpoint.data += data[16*i : 16*i + 16]
482
483             # Case 1: No inodes in this range of the old inode map have
484             # changed.  Simply emit a new pointer to the same inode map block.
485             if len(updated_inodes) == 0 or updated_inodes[-1] > end:
486                 old_location = self.checkpoint_record.links[i][1][0:2]
487                 if old_location not in self.obsolete_segments:
488                     new_checkpoint.links.append(self.checkpoint_record.links[i])
489                     continue
490
491             # Case 2: Some inodes have been updated.  Create a new inode map
492             # block, write it out, and point the new checkpoint at it.
493             inodes = [k for k in self.inodes if k >= start and k <= end]
494             inodes.sort()
495
496             block = LogItem()
497             block.id = LogItem.random_id()
498             block.inum = 0
499             block.type = ITEM_TYPE.INODE_MAP
500             block.links = []
501             block.data = ""
502             for j in inodes:
503                 block.data += struct.pack("<Q", j)
504                 block.links.append((self.inodes[j].id, self.inodes[j].location))
505             log.write(block, 2)
506
507             new_checkpoint.links.append((block.id, block.location))
508
509             while len(updated_inodes) > 0 and updated_inodes[-1] <= end:
510                 updated_inodes.pop()
511
512         log.write(new_checkpoint, 2)
513         self.checkpoint_record = new_checkpoint
514
515 def rewrite_inode(backend, inode_map, inum, log, copy_data=True):
516     inode = inode_map.inodes[inum]
517     if copy_data:
518         blocks = []
519         for l in inode.links:
520             data = load_item(backend, l[1])
521             blocks.append(data)
522             log.write(data, 0)
523         inode.links = [(b.id, b.location) for b in blocks]
524     log.write(inode, 1)
525     inode_map.mark_updated(inum)
526
527 def run_cleaner(backend, inode_map, log, repack_inodes=False):
528     # Determine which segments are poorly utilized and should be cleaned.  We
529     # need better heuristics here.
530     for (s, u) in sorted(inode_map.util.segments.items()):
531         if (float(u[1]) / u[0] < 0.6) and u[1] > 0:
532             print "Should clean segment", s
533             loc = backend.name_to_loc(s)
534             if s: inode_map.obsolete_segments.add(loc)
535
536     # TODO: We probably also want heuristics that will find inodes with
537     # badly-fragmented data and rewrite that to achieve better locality.
538
539     # Given that list of segments to clean, scan through those segments to find
540     # data which is still live and mark relevant inodes as needing to be
541     # rewritten.
542     if repack_inodes:
543         dirty_inodes = set(inode_map.inodes)
544     else:
545         dirty_inodes = set()
546     dirty_inode_data = set()
547     for s in inode_map.obsolete_segments:
548         filename = backend.loc_to_name(s)
549         print "Scanning", filename, "for live data"
550         for item in parse_log(backend.read(filename), filename):
551             if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
552                 if item.inum != 0:
553                     inode = inode_map.inodes[item.inum]
554                     if s == inode.location[0:2]:
555                         dirty_inodes.add(item.inum)
556                     if item.inum not in dirty_inode_data:
557                         for b in inode.links:
558                             if s == b[1][0:2]:
559                                 dirty_inode_data.add(item.inum)
560                                 break
561
562     print "Inodes to rewrite:", dirty_inodes
563     print "Inodes with data to rewrite:", dirty_inode_data
564     for i in sorted(dirty_inodes.union(dirty_inode_data)):
565         rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
566
567 if __name__ == '__main__':
568     backend = S3Backend("mvrable-bluesky", cachedir=".")
569     #backend = FileBackend(".")
570     chkpt = load_checkpoint_record(backend)
571     print backend.list()
572     imap = InodeMap()
573     imap.build(backend, chkpt)
574     print chkpt
575
576     log_dir = LogDirectory(backend, 1)
577     run_cleaner(backend, imap, log_dir)
578     print "Version vector:", imap.version_vector
579     imap.write(backend, log_dir)
580     log_dir.close_all()