Dump some statistics when the cleaner runs
[bluesky.git] / cleaner / cleaner
1 #!/usr/bin/env python
2 #
3 # A simple cleaner for the BlueSky cloud file system.  At the moment this is an
4 # offline cleaner--the file system must not be in use at the time that the
5 # cleaning is performed.  Later, it will be extended to be an online/concurrent
6 # cleaner, where cleaning is performed even while the file system is mounted.
7 #
8 # Copyright (C) 2010  The Regents of the University of California
9 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
10
11 import base64, os, re, struct, sys, time
12 import boto
13 from boto.s3.key import Key
14
15 # The BlueSky 'struct cloudlog_header' data type.
16 HEADER_FORMAT = '<4s48sb16sQIII'
17 HEADER_CRYPTBYTES = 48
18 HEADER_MAGIC1 = 'AgI-'          # Unencrypted data
19 HEADER_MAGIC2 = 'AgI='          # Encrypted data
20 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
21
22 CHECKPOINT_MAGIC = struct.pack('<Q', 0x7ad7dafb42a498b4)
23
24 class ITEM_TYPE:
25     DATA = '1'
26     INODE = '2'
27     INODE_MAP = '3'
28     CHECKPOINT = '4'
29
30 class Backend:
31     """Base class for BlueSky storage backends."""
32
33     def loc_to_name(self, location):
34         return "log-%08d-%08d" % (location)
35
36     def name_to_loc(self, name):
37         m = re.match(r"^log-(\d+)-(\d+)$", name)
38         if m: return (int(m.group(1)), int(m.group(2)))
39
40     def dump_stats(self):
41         pass
42
43 class FileBackend(Backend):
44     """An interface to BlueSky where the log segments are on local disk.
45
46     This is mainly intended for testing purposes, as the real cleaner would
47     operate where data is being stored in S3."""
48
49     def __init__(self, path):
50         self.path = path
51
52     def list(self, directory=0):
53         """Return a listing of all log segments and their sizes."""
54
55         prefix = "log-%08d-" % (directory,)
56         files = [f for f in os.listdir(self.path) if f.startswith(prefix)]
57         files.sort()
58
59         return [(f, os.stat(os.path.join(self.path, f)).st_size)
60                 for f in files]
61
62     def read(self, filename, offset=0, length=None):
63         fp = open(os.path.join(self.path, filename), 'rb')
64         if offset > 0:
65             fp.seek(offset)
66         if length is None:
67             return fp.read()
68         else:
69             return fp.read(length)
70
71     def write(self, filename, data):
72         fp = open(os.path.join(self.path, filename), 'wb')
73         fp.write(data)
74         fp.close()
75
76     def delete(self, filename):
77         os.unlink(os.path.join(self.path, filename))
78
79 def retry_wrap(method):
80     def wrapped(self, *args, **kwargs):
81         for retries in range(3):
82             try:
83                 return method(self, *args, **kwargs)
84             except:
85                 print >>sys.stderr, "S3 operation failed, retrying..."
86                 self.connect()
87                 time.sleep(1.0)
88         return method(self, *args, **kwargs)
89     return wrapped
90
91 class S3Backend(Backend):
92     """An interface to BlueSky where the log segments are on in Amazon S3."""
93
94     def __init__(self, bucket, path='', cachedir="."):
95         self.bucket_name = bucket
96         self.path = path
97         self.cachedir = cachedir
98         self.cache = {}
99         self.connect()
100         self.stats_get = [0, 0]
101         self.stats_put = [0, 0]
102
103     def connect(self):
104         self.conn = boto.connect_s3(is_secure=False)
105         self.bucket = self.conn.get_bucket(self.bucket_name)
106
107     def list(self, directory=0):
108         files = []
109         prefix = "log-%08d-" % (directory,)
110         for k in self.bucket.list(self.path + prefix):
111             files.append((k.key, k.size))
112         return files
113
114     @retry_wrap
115     def read(self, filename, offset=0, length=None):
116         if filename in self.cache:
117             fp = open(os.path.join(self.cachedir, filename), 'rb')
118             if offset > 0:
119                 fp.seek(offset)
120             if length is None:
121                 return fp.read()
122             else:
123                 return fp.read(length)
124         else:
125             k = Key(self.bucket)
126             k.key = self.path + filename
127             data = k.get_contents_as_string()
128             fp = open(os.path.join(self.cachedir, filename), 'wb')
129             fp.write(data)
130             fp.close()
131             self.cache[filename] = True
132             self.stats_get[0] += 1
133             self.stats_get[1] += len(data)
134             if offset > 0:
135                 data = data[offset:]
136             if length is not None:
137                 data = data[0:length]
138             return data
139
140     @retry_wrap
141     def write(self, filename, data):
142         k = Key(self.bucket)
143         k.key = self.path + filename
144         k.set_contents_from_string(data)
145         self.stats_put[0] += 1
146         self.stats_put[1] += len(data)
147         if filename in self.cache:
148             del self.cache[filename]
149
150     @retry_wrap
151     def delete(self, filename):
152         k = Key(self.bucket)
153         k.key = self.path + filename
154         k.delete()
155         if filename in self.cache:
156             del self.cache[filename]
157
158     def dump_stats(self):
159         print "S3 statistics:"
160         print "GET: %d ops / %d bytes" % tuple(self.stats_get)
161         print "PUT: %d ops / %d bytes" % tuple(self.stats_put)
162
163 class SimpleBackend(Backend):
164     """An interface to the simple BlueSky test network server."""
165
166     def __init__(self, server=('localhost', 12345), cachedir="."):
167         self.bucket_name = bucket
168         self.server_address = server
169         self.cachedir = cachedir
170         self.cache = {}
171
172     def _get_socket(self):
173         return socket.create_connection(self.server_address).makefile()
174
175     def list(self, directory=0):
176         files = []
177         prefix = "log-%08d-" % (directory,)
178         for k in self.bucket.list(self.path + prefix):
179             files.append((k.key, k.size))
180         return files
181
182     def read(self, filename, offset=0, length=None):
183         if filename in self.cache:
184             fp = open(os.path.join(self.cachedir, filename), 'rb')
185             if offset > 0:
186                 fp.seek(offset)
187             if length is None:
188                 return fp.read()
189             else:
190                 return fp.read(length)
191         else:
192             f = self._get_socket()
193             f.write("GET %s %d %d\n" % (filename, 0, 0))
194             f.flush()
195             datalen = int(f.readline())
196             if datalen < 0:
197                 raise RuntimeError
198             data = f.read(datalen)
199             fp = open(os.path.join(self.cachedir, filename), 'wb')
200             fp.write(data)
201             fp.close()
202             self.cache[filename] = True
203             if offset > 0:
204                 data = data[offset:]
205             if length is not None:
206                 data = data[0:length]
207             return data
208
209     def write(self, filename, data):
210         f = self._get_socket()
211         f.write("PUT %s %d %d\n" % (filename, len(data)))
212         f.write(data)
213         f.flush()
214         result = int(f.readline())
215         if filename in self.cache:
216             del self.cache[filename]
217
218     def delete(self, filename):
219         pass
220
221 class LogItem:
222     """In-memory representation of a single item stored in a log file."""
223
224     def __init__(self):
225         self.cryptkeys = '\0' * HEADER_CRYPTBYTES
226         self.encrypted = False
227
228     def __str__(self):
229         return "<Item%s ty=%s location=%s size=%d id=%s...>" % (self.encrypted and '$' or '', self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
230
231     @staticmethod
232     def random_id():
233         return open('/dev/urandom').read(16)
234
235     def serialize(self):
236         link_ids = []
237         link_locs = []
238         for (i, l) in self.links:
239             link_ids.append(i)
240             if i != '\0' * 16:
241                 link_locs.append(struct.pack('<IIII', *l))
242         link_ids = ''.join(link_ids)
243         link_locs = ''.join(link_locs)
244
245         if self.encrypted:
246             magic = HEADER_MAGIC2
247         else:
248             magic = HEADER_MAGIC1
249         header = struct.pack(HEADER_FORMAT,
250                              magic, self.cryptkeys,
251                              ord(self.type), self.id, self.inum,
252                              len(self.data), len(link_ids), len(link_locs))
253         return header + self.data + link_ids + link_locs
254
255 class LogSegment:
256     def __init__(self, backend, location):
257         self.backend = backend
258         self.location = location
259         self.data = []
260
261     def __len__(self):
262         return sum(len(s) for s in self.data)
263
264     def write(self, item):
265         data = item.serialize()
266         offset = len(self)
267         self.data.append(data)
268         item.location = self.location + (offset, len(data))
269
270     def close(self):
271         data = ''.join(self.data)
272         filename = self.backend.loc_to_name(self.location)
273         print "Would write %d bytes of data to %s" % (len(data), filename)
274         self.backend.write(filename, data)
275
276 class LogDirectory:
277     TARGET_SIZE = 4 << 20
278
279     def __init__(self, backend, dir):
280         self.backend = backend
281         self.dir_num = dir
282         self.seq_num = 0
283         for logname in backend.list(dir):
284             print "Old log file:", logname
285             loc = backend.name_to_loc(logname[0])
286             if loc is not None and loc[0] == dir:
287                 self.seq_num = max(self.seq_num, loc[1] + 1)
288         self.groups = {}
289         print "Starting sequence number is", self.seq_num
290
291     def open_segment(self):
292         seg = LogSegment(self.backend, (self.dir_num, self.seq_num))
293         self.seq_num += 1
294         return seg
295
296     def write(self, item, segment_group=0):
297         if segment_group not in self.groups:
298             self.groups[segment_group] = self.open_segment()
299         seg = self.groups[segment_group]
300         seg.write(item)
301         if len(seg) >= LogDirectory.TARGET_SIZE:
302             seg.close()
303             del self.groups[segment_group]
304
305     def close_all(self):
306         for k in list(self.groups.keys()):
307             self.groups[k].close()
308             del self.groups[k]
309
310 class UtilizationTracker:
311     """A simple object that tracks what fraction of each segment is used.
312
313     This data can be used to guide segment cleaning decisions."""
314
315     def __init__(self, backend):
316         self.segments = {}
317         for (segment, size) in backend.list(0) + backend.list(1):
318             self.segments[segment] = [size, 0]
319
320     def add_item(self, item):
321         if isinstance(item, LogItem):
322             item = item.location
323         if item is None: return
324         (dir, seq, offset, size) = item
325         filename = "log-%08d-%08d" % (dir, seq)
326         self.segments[filename][1] += size
327
328 def parse_item(data):
329     if len(data) < HEADER_SIZE: return
330     header = struct.unpack_from(HEADER_FORMAT, data, 0)
331     size = HEADER_SIZE + sum(header[5:8])
332
333     if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
334         print "Bad header magic!"
335         return
336
337     if len(data) != size:
338         print "Item size does not match: %d != %d" % (size, len(data))
339         return
340
341     item = LogItem()
342     if header[0] == HEADER_MAGIC2: item.encrypted = True
343     item.cryptkeys = header[1]
344     item.id = header[3]
345     item.inum = header[4]
346     item.location = None
347     item.type = chr(header[2])
348     item.size = size
349     item.data = data[HEADER_SIZE : HEADER_SIZE + header[5]]
350     links = []
351     link_ids = data[HEADER_SIZE + header[5]
352                     : HEADER_SIZE + header[5] + header[6]]
353     link_locs = data[HEADER_SIZE + header[5] + header[6]
354                      : HEADER_SIZE + sum(header[5:8])]
355     for i in range(len(link_ids) // 16):
356         id = link_ids[16*i : 16*i + 16]
357         if id == '\0' * 16:
358             loc = None
359         else:
360             loc = struct.unpack('<IIII', link_locs[0:16])
361             link_locs = link_locs[16:]
362         links.append((id, loc))
363     item.links = links
364     return item
365
366 def load_item(backend, location):
367     """Load the cloud item pointed at by the 4-tuple 'location'.
368
369     The elements of the tuple are (directory, sequence, offset, size)."""
370
371     filename = backend.loc_to_name((location[0], location[1]))
372     data = backend.read(filename, location[2], location[3])
373     item = parse_item(data)
374     item.location = location
375     return item
376
377 def parse_log(data, location=None):
378     """Parse contents of a log file, yielding a sequence of log items."""
379
380     if isinstance(location, str):
381         m = re.match(r"^log-(\d+)-(\d+)$", location)
382         if m:
383             location = (int(m.group(1)), int(m.group(2)))
384         else:
385             location = None
386
387     offset = 0
388     while len(data) - offset >= HEADER_SIZE:
389         header = struct.unpack_from(HEADER_FORMAT, data, offset)
390         size = HEADER_SIZE + sum(header[5:8])
391         if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
392             print "Bad header magic!"
393             break
394         if size + offset > len(data):
395             print "Short data record at end of log: %s < %s" % (len(data) - offset, size)
396             break
397         item = parse_item(data[offset : offset + size])
398         if location is not None:
399             item.location = location + (offset, size)
400         if item is not None: yield item
401         offset += size
402
403 def load_checkpoint_record(backend, directory=0):
404     for (log, size) in reversed(backend.list(directory)):
405         for item in reversed(list(parse_log(backend.read(log), log))):
406             print item
407             if item.type == ITEM_TYPE.CHECKPOINT:
408                 return item
409
410 class InodeMap:
411     def __init__(self):
412         pass
413
414     def build(self, backend, checkpoint_record):
415         """Reconstruct the inode map from the checkpoint record given.
416
417         This will also build up information about segment utilization."""
418
419         self.version_vector = {}
420         self.checkpoint_record = checkpoint_record
421
422         util = UtilizationTracker(backend)
423         util.add_item(checkpoint_record)
424         inodes = {}
425         self.obsolete_segments = set()
426
427         data = checkpoint_record.data
428         if not data.startswith(CHECKPOINT_MAGIC):
429             raise ValueError, "Invalid checkpoint record!"
430         data = data[len(CHECKPOINT_MAGIC):]
431         (vvlen,) = struct.unpack_from("<I", data, 0)
432         self.vvsize = 4 + 8*vvlen
433         for i in range(vvlen):
434             (v1, v2) = struct.unpack_from("<II", data, 4 + 8*i)
435             self.version_vector[v1] = v2
436         print self.version_vector
437         self.version_vector[checkpoint_record.location[0]] \
438             = checkpoint_record.location[1]
439         print self.version_vector
440
441         data = data[self.vvsize:]
442
443         print "Inode map:"
444         for i in range(len(data) // 16):
445             (start, end) = struct.unpack_from("<QQ", data, 16*i)
446             imap = load_item(backend, checkpoint_record.links[i][1])
447             util.add_item(imap)
448             print "[%d, %d]: %s" % (start, end, imap)
449             for j in range(len(imap.data) // 8):
450                 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
451                 inode = load_item(backend, imap.links[j][1])
452                 inodes[inum] = inode
453                 data_segments = set()
454                 util.add_item(inode)
455                 for i in inode.links:
456                     util.add_item(i[1])
457                     data_segments.add(i[1][0:2])
458                 print "  %d: %s (%d data segments)" % (inum, inode, len(data_segments))
459
460         print
461         print "Segment utilizations:"
462         total_data = [0, 0]
463         deletions = [0, 0]
464         for (s, u) in sorted(util.segments.items()):
465             for i in range(2): total_data[i] += u[i]
466             print "%s: %s %s" % (s, u, float(u[1]) / u[0])
467             if u[1] == 0:
468                 print "Would delete..."
469                 #backend.delete(s)
470                 deletions[0] += 1
471                 deletions[1] += u[0]
472
473         self.inodes = inodes
474         self.util = util
475         self.updated_inodes = set()
476
477         print "%d bytes total / %d bytes used" % tuple(total_data)
478         print "would delete %d segments (%d bytes)" % tuple(deletions)
479
480     def mark_updated(self, inum):
481         self.updated_inodes.add(inum)
482
483     def write(self, backend, log):
484         updated_inodes = sorted(self.updated_inodes, reverse=True)
485
486         new_checkpoint = LogItem()
487         new_checkpoint.id = LogItem.random_id()
488         new_checkpoint.inum = 0
489         new_checkpoint.type = ITEM_TYPE.CHECKPOINT
490         new_checkpoint.data = CHECKPOINT_MAGIC
491         new_checkpoint.links = []
492
493         new_checkpoint.data += struct.pack('<I', len(self.version_vector))
494         for d in sorted(self.version_vector):
495             new_checkpoint.data += struct.pack('<II', d, self.version_vector[d])
496
497         data = self.checkpoint_record.data[self.vvsize + len(CHECKPOINT_MAGIC):]
498         for i in range(len(data) // 16):
499             (start, end) = struct.unpack_from("<QQ", data, 16*i)
500
501             new_checkpoint.data += data[16*i : 16*i + 16]
502
503             # Case 1: No inodes in this range of the old inode map have
504             # changed.  Simply emit a new pointer to the same inode map block.
505             if len(updated_inodes) == 0 or updated_inodes[-1] > end:
506                 old_location = self.checkpoint_record.links[i][1][0:2]
507                 if old_location not in self.obsolete_segments:
508                     new_checkpoint.links.append(self.checkpoint_record.links[i])
509                     continue
510
511             # Case 2: Some inodes have been updated.  Create a new inode map
512             # block, write it out, and point the new checkpoint at it.
513             inodes = [k for k in self.inodes if k >= start and k <= end]
514             inodes.sort()
515
516             block = LogItem()
517             block.id = LogItem.random_id()
518             block.inum = 0
519             block.type = ITEM_TYPE.INODE_MAP
520             block.links = []
521             block.data = ""
522             for j in inodes:
523                 block.data += struct.pack("<Q", j)
524                 block.links.append((self.inodes[j].id, self.inodes[j].location))
525             log.write(block, 2)
526
527             new_checkpoint.links.append((block.id, block.location))
528
529             while len(updated_inodes) > 0 and updated_inodes[-1] <= end:
530                 updated_inodes.pop()
531
532         log.write(new_checkpoint, 2)
533         self.checkpoint_record = new_checkpoint
534
535 def rewrite_inode(backend, inode_map, inum, log, copy_data=True):
536     inode = inode_map.inodes[inum]
537     if copy_data:
538         blocks = []
539         for l in inode.links:
540             data = load_item(backend, l[1])
541             blocks.append(data)
542             log.write(data, 0)
543         inode.links = [(b.id, b.location) for b in blocks]
544     log.write(inode, 1)
545     inode_map.mark_updated(inum)
546
547 def run_cleaner(backend, inode_map, log, repack_inodes=False):
548     # Determine which segments are poorly utilized and should be cleaned.  We
549     # need better heuristics here.
550     for (s, u) in sorted(inode_map.util.segments.items()):
551         if (float(u[1]) / u[0] < 0.6) and u[1] > 0:
552             print "Should clean segment", s
553             loc = backend.name_to_loc(s)
554             if s: inode_map.obsolete_segments.add(loc)
555
556     # TODO: We probably also want heuristics that will find inodes with
557     # badly-fragmented data and rewrite that to achieve better locality.
558
559     # Given that list of segments to clean, scan through those segments to find
560     # data which is still live and mark relevant inodes as needing to be
561     # rewritten.
562     if repack_inodes:
563         dirty_inodes = set(inode_map.inodes)
564     else:
565         dirty_inodes = set()
566     dirty_inode_data = set()
567     for s in inode_map.obsolete_segments:
568         filename = backend.loc_to_name(s)
569         print "Scanning", filename, "for live data"
570         for item in parse_log(backend.read(filename), filename):
571             if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
572                 if item.inum != 0:
573                     inode = inode_map.inodes[item.inum]
574                     if s == inode.location[0:2]:
575                         dirty_inodes.add(item.inum)
576                     if item.inum not in dirty_inode_data:
577                         for b in inode.links:
578                             if s == b[1][0:2]:
579                                 dirty_inode_data.add(item.inum)
580                                 break
581
582     print "Inodes to rewrite:", dirty_inodes
583     print "Inodes with data to rewrite:", dirty_inode_data
584     for i in sorted(dirty_inodes.union(dirty_inode_data)):
585         rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
586
587 if __name__ == '__main__':
588     start_time = time.time()
589     backend = S3Backend("mvrable-bluesky-west", cachedir="/export/cache")
590     #backend = FileBackend(".")
591     chkpt = load_checkpoint_record(backend)
592     print backend.list()
593     imap = InodeMap()
594     imap.build(backend, chkpt)
595     print chkpt
596
597     log_dir = LogDirectory(backend, 1)
598     run_cleaner(backend, imap, log_dir)
599     print "Version vector:", imap.version_vector
600     imap.write(backend, log_dir)
601     log_dir.close_all()
602     end_time = time.time()
603     print "Cleaner running time:", end_time - start_time
604     backend.dump_stats()