Have cleaner wait on proxy when needed
[bluesky.git] / cleaner / cleaner
1 #!/usr/bin/env python
2 #
3 # A simple cleaner for the BlueSky cloud file system.  At the moment this is an
4 # offline cleaner--the file system must not be in use at the time that the
5 # cleaning is performed.  Later, it will be extended to be an online/concurrent
6 # cleaner, where cleaning is performed even while the file system is mounted.
7 #
8 # Copyright (C) 2010  The Regents of the University of California
9 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
10
11 import base64, os, re, struct, sys, time
12 import boto
13 from boto.s3.key import Key
14
15 # The BlueSky 'struct cloudlog_header' data type.
16 HEADER_FORMAT = '<4s48sb16sQIII'
17 HEADER_CRYPTBYTES = 48
18 HEADER_MAGIC1 = 'AgI-'          # Unencrypted data
19 HEADER_MAGIC2 = 'AgI='          # Encrypted data
20 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
21
22 CHECKPOINT_MAGIC = struct.pack('<Q', 0x7ad7dafb42a498b4)
23
24 # Log file to write benchmark data to
25 benchlog = None
26 def benchlog_write(msg, *args):
27     m = msg % args
28     print "LOG:", m
29     if benchlog is not None:
30         benchlog.write(msg % args)
31         benchlog.write("\n")
32
33 class ITEM_TYPE:
34     DATA = '1'
35     INODE = '2'
36     INODE_MAP = '3'
37     CHECKPOINT = '4'
38
39 class Backend:
40     """Base class for BlueSky storage backends."""
41
42     def loc_to_name(self, location):
43         return "log-%08d-%08d" % (location)
44
45     def name_to_loc(self, name):
46         m = re.match(r"^log-(\d+)-(\d+)$", name)
47         if m: return (int(m.group(1)), int(m.group(2)))
48
49     def dump_stats(self):
50         pass
51
52 class FileBackend(Backend):
53     """An interface to BlueSky where the log segments are on local disk.
54
55     This is mainly intended for testing purposes, as the real cleaner would
56     operate where data is being stored in S3."""
57
58     def __init__(self, path):
59         self.path = path
60
61     def list(self, directory=0):
62         """Return a listing of all log segments and their sizes."""
63
64         prefix = "log-%08d-" % (directory,)
65         files = [f for f in os.listdir(self.path) if f.startswith(prefix)]
66         files.sort()
67
68         return [(f, os.stat(os.path.join(self.path, f)).st_size)
69                 for f in files]
70
71     def read(self, filename, offset=0, length=None):
72         fp = open(os.path.join(self.path, filename), 'rb')
73         if offset > 0:
74             fp.seek(offset)
75         if length is None:
76             return fp.read()
77         else:
78             return fp.read(length)
79
80     def write(self, filename, data):
81         fp = open(os.path.join(self.path, filename), 'wb')
82         fp.write(data)
83         fp.close()
84
85     def delete(self, filename):
86         os.unlink(os.path.join(self.path, filename))
87
88 def retry_wrap(method):
89     def wrapped(self, *args, **kwargs):
90         for retries in range(3):
91             try:
92                 return method(self, *args, **kwargs)
93             except:
94                 print >>sys.stderr, "S3 operation failed, retrying..."
95                 print >>sys.stderr, "  %s %s %s" % (method, args, kwargs)
96                 self.connect()
97                 time.sleep(1.0)
98         return method(self, *args, **kwargs)
99     return wrapped
100
101 class S3Backend(Backend):
102     """An interface to BlueSky where the log segments are on in Amazon S3."""
103
104     def __init__(self, bucket, path='', cachedir="."):
105         self.bucket_name = bucket
106         self.path = path
107         self.cachedir = cachedir
108         self.cache = {}
109         for f in os.listdir(cachedir):
110             self.cache[f] = True
111         #print "Initial cache contents:", list(self.cache.keys())
112         self.connect()
113         self.stats_get = [0, 0]
114         self.stats_put = [0, 0]
115
116     def connect(self):
117         self.conn = boto.connect_s3(is_secure=False)
118         self.bucket = self.conn.get_bucket(self.bucket_name)
119
120     def list(self, directory=0):
121         files = []
122         prefix = "log-%08d-" % (directory,)
123         for k in self.bucket.list(self.path + prefix):
124             files.append((k.key, k.size))
125         return files
126
127     @retry_wrap
128     def read(self, filename, offset=0, length=None):
129         if filename in self.cache:
130             fp = open(os.path.join(self.cachedir, filename), 'rb')
131             if offset > 0:
132                 fp.seek(offset)
133             if length is None:
134                 return fp.read()
135             else:
136                 return fp.read(length)
137         else:
138             k = Key(self.bucket)
139             k.key = self.path + filename
140             data = k.get_contents_as_string()
141             fp = open(os.path.join(self.cachedir, filename), 'wb')
142             fp.write(data)
143             fp.close()
144             self.cache[filename] = True
145             self.stats_get[0] += 1
146             self.stats_get[1] += len(data)
147             if offset > 0:
148                 data = data[offset:]
149             if length is not None:
150                 data = data[0:length]
151             return data
152
153     @retry_wrap
154     def write(self, filename, data):
155         k = Key(self.bucket)
156         k.key = self.path + filename
157         k.set_contents_from_string(data)
158         self.stats_put[0] += 1
159         self.stats_put[1] += len(data)
160         if filename in self.cache:
161             del self.cache[filename]
162
163     @retry_wrap
164     def delete(self, filename):
165         k = Key(self.bucket)
166         k.key = self.path + filename
167         k.delete()
168         if filename in self.cache:
169             del self.cache[filename]
170
171     def dump_stats(self):
172         print "S3 statistics:"
173         print "GET: %d ops / %d bytes" % tuple(self.stats_get)
174         print "PUT: %d ops / %d bytes" % tuple(self.stats_put)
175         benchlog_write("s3_get: %d", self.stats_get[1])
176         benchlog_write("s3_put: %d", self.stats_put[1])
177
178 class SimpleBackend(Backend):
179     """An interface to the simple BlueSky test network server."""
180
181     def __init__(self, server=('localhost', 12345), cachedir="."):
182         self.bucket_name = bucket
183         self.server_address = server
184         self.cachedir = cachedir
185         self.cache = {}
186
187     def _get_socket(self):
188         return socket.create_connection(self.server_address).makefile()
189
190     def list(self, directory=0):
191         files = []
192         prefix = "log-%08d-" % (directory,)
193         for k in self.bucket.list(self.path + prefix):
194             files.append((k.key, k.size))
195         return files
196
197     def read(self, filename, offset=0, length=None):
198         if filename in self.cache:
199             fp = open(os.path.join(self.cachedir, filename), 'rb')
200             if offset > 0:
201                 fp.seek(offset)
202             if length is None:
203                 return fp.read()
204             else:
205                 return fp.read(length)
206         else:
207             f = self._get_socket()
208             f.write("GET %s %d %d\n" % (filename, 0, 0))
209             f.flush()
210             datalen = int(f.readline())
211             if datalen < 0:
212                 raise RuntimeError
213             data = f.read(datalen)
214             fp = open(os.path.join(self.cachedir, filename), 'wb')
215             fp.write(data)
216             fp.close()
217             self.cache[filename] = True
218             if offset > 0:
219                 data = data[offset:]
220             if length is not None:
221                 data = data[0:length]
222             return data
223
224     def write(self, filename, data):
225         f = self._get_socket()
226         f.write("PUT %s %d %d\n" % (filename, len(data)))
227         f.write(data)
228         f.flush()
229         result = int(f.readline())
230         if filename in self.cache:
231             del self.cache[filename]
232
233     def delete(self, filename):
234         pass
235
236 class LogItem:
237     """In-memory representation of a single item stored in a log file."""
238
239     def __init__(self):
240         self.cryptkeys = '\0' * HEADER_CRYPTBYTES
241         self.encrypted = False
242
243     def __str__(self):
244         return "<Item%s ty=%s location=%s size=%d id=%s...>" % (self.encrypted and '$' or '', self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
245
246     @staticmethod
247     def random_id():
248         return open('/dev/urandom').read(16)
249
250     def serialize(self):
251         link_ids = []
252         link_locs = []
253         for (i, l) in self.links:
254             link_ids.append(i)
255             if i != '\0' * 16:
256                 link_locs.append(struct.pack('<IIII', *l))
257         link_ids = ''.join(link_ids)
258         link_locs = ''.join(link_locs)
259
260         if self.encrypted:
261             magic = HEADER_MAGIC2
262         else:
263             magic = HEADER_MAGIC1
264         header = struct.pack(HEADER_FORMAT,
265                              magic, self.cryptkeys,
266                              ord(self.type), self.id, self.inum,
267                              len(self.data), len(link_ids), len(link_locs))
268         return header + self.data + link_ids + link_locs
269
270 class LogSegment:
271     def __init__(self, backend, location):
272         self.backend = backend
273         self.location = location
274         self.data = []
275
276     def __len__(self):
277         return sum(len(s) for s in self.data)
278
279     def write(self, item):
280         data = item.serialize()
281         offset = len(self)
282         self.data.append(data)
283         item.location = self.location + (offset, len(data))
284
285     def close(self):
286         data = ''.join(self.data)
287         filename = self.backend.loc_to_name(self.location)
288         print "Would write %d bytes of data to %s" % (len(data), filename)
289         self.backend.write(filename, data)
290
291 class LogDirectory:
292     TARGET_SIZE = 4 << 20
293
294     def __init__(self, backend, dir):
295         self.backend = backend
296         self.dir_num = dir
297         self.seq_num = 0
298         for logname in backend.list(dir):
299             #print "Old log file:", logname
300             loc = backend.name_to_loc(logname[0])
301             if loc is not None and loc[0] == dir:
302                 self.seq_num = max(self.seq_num, loc[1] + 1)
303         self.groups = {}
304         print "Starting sequence number is", self.seq_num
305
306     def open_segment(self):
307         seg = LogSegment(self.backend, (self.dir_num, self.seq_num))
308         self.seq_num += 1
309         return seg
310
311     def write(self, item, segment_group=0):
312         if segment_group not in self.groups:
313             self.groups[segment_group] = self.open_segment()
314         seg = self.groups[segment_group]
315         seg.write(item)
316         if len(seg) >= LogDirectory.TARGET_SIZE:
317             seg.close()
318             del self.groups[segment_group]
319
320     def close_all(self):
321         for k in list(self.groups.keys()):
322             self.groups[k].close()
323             del self.groups[k]
324
325 class UtilizationTracker:
326     """A simple object that tracks what fraction of each segment is used.
327
328     This data can be used to guide segment cleaning decisions."""
329
330     def __init__(self, backend):
331         self.segments = {}
332         for (segment, size) in backend.list(0) + backend.list(1):
333             self.segments[segment] = [size, 0]
334
335     def add_item(self, item):
336         if isinstance(item, LogItem):
337             item = item.location
338         if item is None: return
339         (dir, seq, offset, size) = item
340         filename = "log-%08d-%08d" % (dir, seq)
341         self.segments[filename][1] += size
342
343 def parse_item(data):
344     if len(data) < HEADER_SIZE: return
345     header = struct.unpack_from(HEADER_FORMAT, data, 0)
346     size = HEADER_SIZE + sum(header[5:8])
347
348     if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
349         print "Bad header magic!"
350         return
351
352     if len(data) != size:
353         print "Item size does not match: %d != %d" % (size, len(data))
354         return
355
356     item = LogItem()
357     if header[0] == HEADER_MAGIC2: item.encrypted = True
358     item.cryptkeys = header[1]
359     item.id = header[3]
360     item.inum = header[4]
361     item.location = None
362     item.type = chr(header[2])
363     item.size = size
364     item.data = data[HEADER_SIZE : HEADER_SIZE + header[5]]
365     links = []
366     link_ids = data[HEADER_SIZE + header[5]
367                     : HEADER_SIZE + header[5] + header[6]]
368     link_locs = data[HEADER_SIZE + header[5] + header[6]
369                      : HEADER_SIZE + sum(header[5:8])]
370     for i in range(len(link_ids) // 16):
371         id = link_ids[16*i : 16*i + 16]
372         if id == '\0' * 16:
373             loc = None
374         else:
375             loc = struct.unpack('<IIII', link_locs[0:16])
376             link_locs = link_locs[16:]
377         links.append((id, loc))
378     item.links = links
379     return item
380
381 def load_item(backend, location):
382     """Load the cloud item pointed at by the 4-tuple 'location'.
383
384     The elements of the tuple are (directory, sequence, offset, size)."""
385
386     filename = backend.loc_to_name((location[0], location[1]))
387     data = backend.read(filename, location[2], location[3])
388     item = parse_item(data)
389     item.location = location
390     return item
391
392 def parse_log(data, location=None):
393     """Parse contents of a log file, yielding a sequence of log items."""
394
395     if isinstance(location, str):
396         m = re.match(r"^log-(\d+)-(\d+)$", location)
397         if m:
398             location = (int(m.group(1)), int(m.group(2)))
399         else:
400             location = None
401
402     offset = 0
403     while len(data) - offset >= HEADER_SIZE:
404         header = struct.unpack_from(HEADER_FORMAT, data, offset)
405         size = HEADER_SIZE + sum(header[5:8])
406         if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
407             print "Bad header magic!"
408             break
409         if size + offset > len(data):
410             print "Short data record at end of log: %s < %s" % (len(data) - offset, size)
411             break
412         item = parse_item(data[offset : offset + size])
413         if location is not None:
414             item.location = location + (offset, size)
415         if item is not None: yield item
416         offset += size
417
418 def load_checkpoint_record(backend, directory=0):
419     for (log, size) in reversed(backend.list(directory)):
420         for item in reversed(list(parse_log(backend.read(log), log))):
421             print item
422             if item.type == ITEM_TYPE.CHECKPOINT:
423                 return item
424
425 class InodeMap:
426     def __init__(self):
427         pass
428
429     def build(self, backend, checkpoint_record):
430         """Reconstruct the inode map from the checkpoint record given.
431
432         This will also build up information about segment utilization."""
433
434         self.version_vector = {}
435         self.checkpoint_record = checkpoint_record
436
437         util = UtilizationTracker(backend)
438         util.add_item(checkpoint_record)
439         inodes = {}
440         self.obsolete_segments = set()
441
442         data = checkpoint_record.data
443         if not data.startswith(CHECKPOINT_MAGIC):
444             raise ValueError, "Invalid checkpoint record!"
445         data = data[len(CHECKPOINT_MAGIC):]
446         (vvlen,) = struct.unpack_from("<I", data, 0)
447         self.vvsize = 4 + 8*vvlen
448         for i in range(vvlen):
449             (v1, v2) = struct.unpack_from("<II", data, 4 + 8*i)
450             self.version_vector[v1] = v2
451         print self.version_vector
452         self.version_vector[checkpoint_record.location[0]] \
453             = checkpoint_record.location[1]
454         print self.version_vector
455
456         data = data[self.vvsize:]
457
458         #print "Inode map:"
459         for i in range(len(data) // 16):
460             (start, end) = struct.unpack_from("<QQ", data, 16*i)
461             imap = load_item(backend, checkpoint_record.links[i][1])
462             util.add_item(imap)
463             #print "[%d, %d]: %s" % (start, end, imap)
464             for j in range(len(imap.data) // 8):
465                 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
466                 inode = load_item(backend, imap.links[j][1])
467                 inodes[inum] = inode
468                 data_segments = set()
469                 util.add_item(inode)
470                 for i in inode.links:
471                     util.add_item(i[1])
472                     if i[1] is not None:
473                         data_segments.add(i[1][0:2])
474                 #print "  %d: %s (%d data segments)" % (inum, inode, len(data_segments))
475
476         #print
477         print "Segment utilizations:"
478         total_data = [0, 0]
479         deletions = [0, 0]
480         for (s, u) in sorted(util.segments.items()):
481             for i in range(2): total_data[i] += u[i]
482             print "%s: %s %s" % (s, u, float(u[1]) / u[0])
483             if u[1] == 0:
484                 print "Would delete..."
485                 (d, n) = backend.name_to_loc(s)
486                 try:
487                     if n < self.version_vector[d]:
488                         backend.delete(s)
489                         deletions[0] += 1
490                         deletions[1] += u[0]
491                     else:
492                         print "Not deleting log file newer than checkpoint!"
493                 except:
494                     print "Error determining age of log segment, keeping"
495
496         self.inodes = inodes
497         self.util = util
498         self.updated_inodes = set()
499
500         print "%d bytes total / %d bytes used" % tuple(total_data)
501         print "would delete %d segments (%d bytes)" % tuple(deletions)
502         benchlog_write("bytes_used: %d", total_data[1])
503         benchlog_write("bytes_wasted: %d", total_data[0] - total_data[1])
504         benchlog_write("bytes_freed: %d", deletions[1])
505
506     def mark_updated(self, inum):
507         self.updated_inodes.add(inum)
508
509     def write(self, backend, log):
510         updated_inodes = sorted(self.updated_inodes, reverse=True)
511
512         new_checkpoint = LogItem()
513         new_checkpoint.id = LogItem.random_id()
514         new_checkpoint.inum = 0
515         new_checkpoint.type = ITEM_TYPE.CHECKPOINT
516         new_checkpoint.data = CHECKPOINT_MAGIC
517         new_checkpoint.links = []
518
519         new_checkpoint.data += struct.pack('<I', len(self.version_vector))
520         for d in sorted(self.version_vector):
521             new_checkpoint.data += struct.pack('<II', d, self.version_vector[d])
522
523         data = self.checkpoint_record.data[self.vvsize + len(CHECKPOINT_MAGIC):]
524         for i in range(len(data) // 16):
525             (start, end) = struct.unpack_from("<QQ", data, 16*i)
526
527             new_checkpoint.data += data[16*i : 16*i + 16]
528
529             # Case 1: No inodes in this range of the old inode map have
530             # changed.  Simply emit a new pointer to the same inode map block.
531             if len(updated_inodes) == 0 or updated_inodes[-1] > end:
532                 old_location = self.checkpoint_record.links[i][1][0:2]
533                 if old_location not in self.obsolete_segments:
534                     new_checkpoint.links.append(self.checkpoint_record.links[i])
535                     continue
536
537             # Case 2: Some inodes have been updated.  Create a new inode map
538             # block, write it out, and point the new checkpoint at it.
539             inodes = [k for k in self.inodes if k >= start and k <= end]
540             inodes.sort()
541
542             block = LogItem()
543             block.id = LogItem.random_id()
544             block.inum = 0
545             block.type = ITEM_TYPE.INODE_MAP
546             block.links = []
547             block.data = ""
548             for j in inodes:
549                 block.data += struct.pack("<Q", j)
550                 block.links.append((self.inodes[j].id, self.inodes[j].location))
551             log.write(block, 2)
552
553             new_checkpoint.links.append((block.id, block.location))
554
555             while len(updated_inodes) > 0 and updated_inodes[-1] <= end:
556                 updated_inodes.pop()
557
558         log.write(new_checkpoint, 2)
559         self.checkpoint_record = new_checkpoint
560
561 def rewrite_inode(backend, inode_map, inum, log, copy_data=True):
562     inode = inode_map.inodes[inum]
563     if copy_data:
564         newlinks = []
565         for l in inode.links:
566             if l[1] is not None:
567                 data = load_item(backend, l[1])
568                 log.write(data, 0)
569                 newlinks.append((data.id, data.location))
570             else:
571                 newlinks.append(l)
572         inode.links = newlinks
573     log.write(inode, 1)
574     inode_map.mark_updated(inum)
575
576 def run_cleaner(backend, inode_map, log, repack_inodes=False):
577     # Determine which segments are poorly utilized and should be cleaned.  We
578     # need better heuristics here.
579     for (s, u) in sorted(inode_map.util.segments.items()):
580         if (float(u[1]) / u[0] < 0.6) and u[1] > 0:
581             print "Should clean segment", s
582             loc = backend.name_to_loc(s)
583             if s: inode_map.obsolete_segments.add(loc)
584
585     # TODO: We probably also want heuristics that will find inodes with
586     # badly-fragmented data and rewrite that to achieve better locality.
587
588     # Given that list of segments to clean, scan through those segments to find
589     # data which is still live and mark relevant inodes as needing to be
590     # rewritten.
591     if repack_inodes:
592         dirty_inodes = set(inode_map.inodes)
593     else:
594         dirty_inodes = set()
595     dirty_inode_data = set()
596     for s in inode_map.obsolete_segments:
597         filename = backend.loc_to_name(s)
598         #print "Scanning", filename, "for live data"
599         for item in parse_log(backend.read(filename), filename):
600             if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
601                 if item.inum != 0:
602                     inode = inode_map.inodes[item.inum]
603                     if s == inode.location[0:2]:
604                         dirty_inodes.add(item.inum)
605                     if item.inum not in dirty_inode_data:
606                         for b in inode.links:
607                             if b[1] is not None and s == b[1][0:2]:
608                                 dirty_inode_data.add(item.inum)
609                                 break
610
611     #print "Inodes to rewrite:", dirty_inodes
612     #print "Inodes with data to rewrite:", dirty_inode_data
613     for i in sorted(dirty_inodes.union(dirty_inode_data)):
614         rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
615
616 if __name__ == '__main__':
617     benchlog = open('cleaner.log', 'a')
618     benchlog_write("*** START CLEANER RUN ***")
619     start_time = time.time()
620     backend = S3Backend("mvrable-bluesky-west", cachedir="/tmp/bluesky-cache")
621     #backend = FileBackend(".")
622     chkpt = load_checkpoint_record(backend)
623     #print backend.list()
624     log_dir = LogDirectory(backend, 1)
625     imap = InodeMap()
626     imap.build(backend, chkpt)
627     print chkpt
628
629     print "Version vector:", imap.version_vector
630     print "Last cleaner log file:", log_dir.seq_num - 1
631     if imap.version_vector.get(1, -1) != log_dir.seq_num - 1:
632         print "Proxy hasn't updated to latest cleaner segment yet!"
633         benchlog_write("waiting for proxy...")
634         sys.exit(0)
635
636     run_cleaner(backend, imap, log_dir)
637     print "Version vector:", imap.version_vector
638     imap.write(backend, log_dir)
639     log_dir.close_all()
640     end_time = time.time()
641     backend.dump_stats()
642     benchlog_write("running_time: %s", end_time - start_time)
643     benchlog_write("")