Add proper per-file copyright notices/licenses and top-level license.
[bluesky.git] / cleaner / cleaner
1 #!/usr/bin/env python
2 #
3 # A simple cleaner for the BlueSky cloud file system.  At the moment this is an
4 # offline cleaner--the file system must not be in use at the time that the
5 # cleaning is performed.  Later, it will be extended to be an online/concurrent
6 # cleaner, where cleaning is performed even while the file system is mounted.
7 #
8 # Copyright (C) 2010  The Regents of the University of California
9 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
10 #
11 # Redistribution and use in source and binary forms, with or without
12 # modification, are permitted provided that the following conditions
13 # are met:
14 # 1. Redistributions of source code must retain the above copyright
15 #    notice, this list of conditions and the following disclaimer.
16 # 2. Redistributions in binary form must reproduce the above copyright
17 #    notice, this list of conditions and the following disclaimer in the
18 #    documentation and/or other materials provided with the distribution.
19 # 3. Neither the name of the University nor the names of its contributors
20 #    may be used to endorse or promote products derived from this software
21 #    without specific prior written permission.
22 #
23 # THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
24 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26 # ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
27 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
28 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
29 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
30 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
31 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
32 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33 # SUCH DAMAGE.
34
35 import base64, os, re, struct, sys, time
36 import boto
37 from boto.s3.key import Key
38
39 # The BlueSky 'struct cloudlog_header' data type.
40 HEADER_FORMAT = '<4s48sb16sQIII'
41 HEADER_CRYPTBYTES = 48
42 HEADER_MAGIC1 = 'AgI-'          # Unencrypted data
43 HEADER_MAGIC2 = 'AgI='          # Encrypted data
44 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
45
46 CHECKPOINT_MAGIC = struct.pack('<Q', 0x7ad7dafb42a498b4)
47
48 # Log file to write benchmark data to
49 benchlog = None
50 def benchlog_write(msg, *args):
51     m = msg % args
52     print "LOG:", m
53     if benchlog is not None:
54         benchlog.write(msg % args)
55         benchlog.write("\n")
56
57 class ITEM_TYPE:
58     DATA = '1'
59     INODE = '2'
60     INODE_MAP = '3'
61     CHECKPOINT = '4'
62
63 class Backend:
64     """Base class for BlueSky storage backends."""
65
66     def loc_to_name(self, location):
67         return "log-%08d-%08d" % (location)
68
69     def name_to_loc(self, name):
70         m = re.match(r"^log-(\d+)-(\d+)$", name)
71         if m: return (int(m.group(1)), int(m.group(2)))
72
73     def dump_stats(self):
74         pass
75
76 class FileBackend(Backend):
77     """An interface to BlueSky where the log segments are on local disk.
78
79     This is mainly intended for testing purposes, as the real cleaner would
80     operate where data is being stored in S3."""
81
82     def __init__(self, path):
83         self.path = path
84
85     def list(self, directory=0):
86         """Return a listing of all log segments and their sizes."""
87
88         prefix = "log-%08d-" % (directory,)
89         files = [f for f in os.listdir(self.path) if f.startswith(prefix)]
90         files.sort()
91
92         return [(f, os.stat(os.path.join(self.path, f)).st_size)
93                 for f in files]
94
95     def read(self, filename, offset=0, length=None):
96         fp = open(os.path.join(self.path, filename), 'rb')
97         if offset > 0:
98             fp.seek(offset)
99         if length is None:
100             return fp.read()
101         else:
102             return fp.read(length)
103
104     def write(self, filename, data):
105         fp = open(os.path.join(self.path, filename), 'wb')
106         fp.write(data)
107         fp.close()
108
109     def delete(self, filename):
110         os.unlink(os.path.join(self.path, filename))
111
112 def retry_wrap(method):
113     def wrapped(self, *args, **kwargs):
114         for retries in range(3):
115             try:
116                 return method(self, *args, **kwargs)
117             except:
118                 print >>sys.stderr, "S3 operation failed, retrying..."
119                 print >>sys.stderr, "  %s %s %s" % (method, args, kwargs)
120                 self.connect()
121                 time.sleep(1.0)
122         return method(self, *args, **kwargs)
123     return wrapped
124
125 class S3Backend(Backend):
126     """An interface to BlueSky where the log segments are on in Amazon S3."""
127
128     def __init__(self, bucket, path='', cachedir="."):
129         self.bucket_name = bucket
130         self.path = path
131         self.cachedir = cachedir
132         self.cache = {}
133         for f in os.listdir(cachedir):
134             self.cache[f] = True
135         #print "Initial cache contents:", list(self.cache.keys())
136         self.connect()
137         self.stats_get = [0, 0]
138         self.stats_put = [0, 0]
139
140     def connect(self):
141         self.conn = boto.connect_s3(is_secure=False)
142         self.bucket = self.conn.get_bucket(self.bucket_name)
143
144     def list(self, directory=0):
145         files = []
146         prefix = "log-%08d-" % (directory,)
147         for k in self.bucket.list(self.path + prefix):
148             files.append((k.key, k.size))
149         return files
150
151     @retry_wrap
152     def read(self, filename, offset=0, length=None):
153         if filename in self.cache:
154             fp = open(os.path.join(self.cachedir, filename), 'rb')
155             if offset > 0:
156                 fp.seek(offset)
157             if length is None:
158                 return fp.read()
159             else:
160                 return fp.read(length)
161         else:
162             k = Key(self.bucket)
163             k.key = self.path + filename
164             data = k.get_contents_as_string()
165             fp = open(os.path.join(self.cachedir, filename), 'wb')
166             fp.write(data)
167             fp.close()
168             self.cache[filename] = True
169             self.stats_get[0] += 1
170             self.stats_get[1] += len(data)
171             if offset > 0:
172                 data = data[offset:]
173             if length is not None:
174                 data = data[0:length]
175             return data
176
177     @retry_wrap
178     def write(self, filename, data):
179         k = Key(self.bucket)
180         k.key = self.path + filename
181         k.set_contents_from_string(data)
182         self.stats_put[0] += 1
183         self.stats_put[1] += len(data)
184         if filename in self.cache:
185             del self.cache[filename]
186
187     @retry_wrap
188     def delete(self, filename):
189         k = Key(self.bucket)
190         k.key = self.path + filename
191         k.delete()
192         if filename in self.cache:
193             del self.cache[filename]
194
195     def dump_stats(self):
196         print "S3 statistics:"
197         print "GET: %d ops / %d bytes" % tuple(self.stats_get)
198         print "PUT: %d ops / %d bytes" % tuple(self.stats_put)
199         benchlog_write("s3_get: %d", self.stats_get[1])
200         benchlog_write("s3_put: %d", self.stats_put[1])
201
202 class SimpleBackend(Backend):
203     """An interface to the simple BlueSky test network server."""
204
205     def __init__(self, server=('localhost', 12345), cachedir="."):
206         self.bucket_name = bucket
207         self.server_address = server
208         self.cachedir = cachedir
209         self.cache = {}
210
211     def _get_socket(self):
212         return socket.create_connection(self.server_address).makefile()
213
214     def list(self, directory=0):
215         files = []
216         prefix = "log-%08d-" % (directory,)
217         for k in self.bucket.list(self.path + prefix):
218             files.append((k.key, k.size))
219         return files
220
221     def read(self, filename, offset=0, length=None):
222         if filename in self.cache:
223             fp = open(os.path.join(self.cachedir, filename), 'rb')
224             if offset > 0:
225                 fp.seek(offset)
226             if length is None:
227                 return fp.read()
228             else:
229                 return fp.read(length)
230         else:
231             f = self._get_socket()
232             f.write("GET %s %d %d\n" % (filename, 0, 0))
233             f.flush()
234             datalen = int(f.readline())
235             if datalen < 0:
236                 raise RuntimeError
237             data = f.read(datalen)
238             fp = open(os.path.join(self.cachedir, filename), 'wb')
239             fp.write(data)
240             fp.close()
241             self.cache[filename] = True
242             if offset > 0:
243                 data = data[offset:]
244             if length is not None:
245                 data = data[0:length]
246             return data
247
248     def write(self, filename, data):
249         f = self._get_socket()
250         f.write("PUT %s %d %d\n" % (filename, len(data)))
251         f.write(data)
252         f.flush()
253         result = int(f.readline())
254         if filename in self.cache:
255             del self.cache[filename]
256
257     def delete(self, filename):
258         pass
259
260 class LogItem:
261     """In-memory representation of a single item stored in a log file."""
262
263     def __init__(self):
264         self.cryptkeys = '\0' * HEADER_CRYPTBYTES
265         self.encrypted = False
266
267     def __str__(self):
268         return "<Item%s ty=%s location=%s size=%d id=%s...>" % (self.encrypted and '$' or '', self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
269
270     @staticmethod
271     def random_id():
272         return open('/dev/urandom').read(16)
273
274     def serialize(self):
275         link_ids = []
276         link_locs = []
277         for (i, l) in self.links:
278             link_ids.append(i)
279             if i != '\0' * 16:
280                 link_locs.append(struct.pack('<IIII', *l))
281         link_ids = ''.join(link_ids)
282         link_locs = ''.join(link_locs)
283
284         if self.encrypted:
285             magic = HEADER_MAGIC2
286         else:
287             magic = HEADER_MAGIC1
288         header = struct.pack(HEADER_FORMAT,
289                              magic, self.cryptkeys,
290                              ord(self.type), self.id, self.inum,
291                              len(self.data), len(link_ids), len(link_locs))
292         return header + self.data + link_ids + link_locs
293
294 class LogSegment:
295     def __init__(self, backend, location):
296         self.backend = backend
297         self.location = location
298         self.data = []
299
300     def __len__(self):
301         return sum(len(s) for s in self.data)
302
303     def write(self, item):
304         data = item.serialize()
305         offset = len(self)
306         self.data.append(data)
307         item.location = self.location + (offset, len(data))
308
309     def close(self):
310         data = ''.join(self.data)
311         filename = self.backend.loc_to_name(self.location)
312         print "Would write %d bytes of data to %s" % (len(data), filename)
313         self.backend.write(filename, data)
314
315 class LogDirectory:
316     TARGET_SIZE = 4 << 20
317
318     def __init__(self, backend, dir):
319         self.backend = backend
320         self.dir_num = dir
321         self.seq_num = 0
322         for logname in backend.list(dir):
323             #print "Old log file:", logname
324             loc = backend.name_to_loc(logname[0])
325             if loc is not None and loc[0] == dir:
326                 self.seq_num = max(self.seq_num, loc[1] + 1)
327         self.groups = {}
328         print "Starting sequence number is", self.seq_num
329
330     def open_segment(self):
331         seg = LogSegment(self.backend, (self.dir_num, self.seq_num))
332         self.seq_num += 1
333         return seg
334
335     def write(self, item, segment_group=0):
336         if segment_group not in self.groups:
337             self.groups[segment_group] = self.open_segment()
338         seg = self.groups[segment_group]
339         seg.write(item)
340         if len(seg) >= LogDirectory.TARGET_SIZE:
341             seg.close()
342             del self.groups[segment_group]
343
344     def close_all(self):
345         for k in list(self.groups.keys()):
346             self.groups[k].close()
347             del self.groups[k]
348
349 class UtilizationTracker:
350     """A simple object that tracks what fraction of each segment is used.
351
352     This data can be used to guide segment cleaning decisions."""
353
354     def __init__(self, backend):
355         self.segments = {}
356         for (segment, size) in backend.list(0) + backend.list(1):
357             self.segments[segment] = [size, 0]
358
359     def add_item(self, item):
360         if isinstance(item, LogItem):
361             item = item.location
362         if item is None: return
363         (dir, seq, offset, size) = item
364         filename = "log-%08d-%08d" % (dir, seq)
365         self.segments[filename][1] += size
366
367 def parse_item(data):
368     if len(data) < HEADER_SIZE: return
369     header = struct.unpack_from(HEADER_FORMAT, data, 0)
370     size = HEADER_SIZE + sum(header[5:8])
371
372     if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
373         print "Bad header magic!"
374         return
375
376     if len(data) != size:
377         print "Item size does not match: %d != %d" % (size, len(data))
378         return
379
380     item = LogItem()
381     if header[0] == HEADER_MAGIC2: item.encrypted = True
382     item.cryptkeys = header[1]
383     item.id = header[3]
384     item.inum = header[4]
385     item.location = None
386     item.type = chr(header[2])
387     item.size = size
388     item.data = data[HEADER_SIZE : HEADER_SIZE + header[5]]
389     links = []
390     link_ids = data[HEADER_SIZE + header[5]
391                     : HEADER_SIZE + header[5] + header[6]]
392     link_locs = data[HEADER_SIZE + header[5] + header[6]
393                      : HEADER_SIZE + sum(header[5:8])]
394     for i in range(len(link_ids) // 16):
395         id = link_ids[16*i : 16*i + 16]
396         if id == '\0' * 16:
397             loc = None
398         else:
399             loc = struct.unpack('<IIII', link_locs[0:16])
400             link_locs = link_locs[16:]
401         links.append((id, loc))
402     item.links = links
403     return item
404
405 def load_item(backend, location):
406     """Load the cloud item pointed at by the 4-tuple 'location'.
407
408     The elements of the tuple are (directory, sequence, offset, size)."""
409
410     filename = backend.loc_to_name((location[0], location[1]))
411     data = backend.read(filename, location[2], location[3])
412     item = parse_item(data)
413     item.location = location
414     return item
415
416 def parse_log(data, location=None):
417     """Parse contents of a log file, yielding a sequence of log items."""
418
419     if isinstance(location, str):
420         m = re.match(r"^log-(\d+)-(\d+)$", location)
421         if m:
422             location = (int(m.group(1)), int(m.group(2)))
423         else:
424             location = None
425
426     offset = 0
427     while len(data) - offset >= HEADER_SIZE:
428         header = struct.unpack_from(HEADER_FORMAT, data, offset)
429         size = HEADER_SIZE + sum(header[5:8])
430         if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
431             print "Bad header magic!"
432             break
433         if size + offset > len(data):
434             print "Short data record at end of log: %s < %s" % (len(data) - offset, size)
435             break
436         item = parse_item(data[offset : offset + size])
437         if location is not None:
438             item.location = location + (offset, size)
439         if item is not None: yield item
440         offset += size
441
442 def load_checkpoint_record(backend, directory=0):
443     for (log, size) in reversed(backend.list(directory)):
444         for item in reversed(list(parse_log(backend.read(log), log))):
445             print item
446             if item.type == ITEM_TYPE.CHECKPOINT:
447                 return item
448
449 class InodeMap:
450     def __init__(self):
451         pass
452
453     def build(self, backend, checkpoint_record):
454         """Reconstruct the inode map from the checkpoint record given.
455
456         This will also build up information about segment utilization."""
457
458         self.version_vector = {}
459         self.checkpoint_record = checkpoint_record
460
461         util = UtilizationTracker(backend)
462         util.add_item(checkpoint_record)
463         inodes = {}
464         self.obsolete_segments = set()
465
466         data = checkpoint_record.data
467         if not data.startswith(CHECKPOINT_MAGIC):
468             raise ValueError, "Invalid checkpoint record!"
469         data = data[len(CHECKPOINT_MAGIC):]
470         (vvlen,) = struct.unpack_from("<I", data, 0)
471         self.vvsize = 4 + 8*vvlen
472         for i in range(vvlen):
473             (v1, v2) = struct.unpack_from("<II", data, 4 + 8*i)
474             self.version_vector[v1] = v2
475         print self.version_vector
476         self.version_vector[checkpoint_record.location[0]] \
477             = checkpoint_record.location[1]
478         print self.version_vector
479
480         data = data[self.vvsize:]
481
482         #print "Inode map:"
483         for i in range(len(data) // 16):
484             (start, end) = struct.unpack_from("<QQ", data, 16*i)
485             imap = load_item(backend, checkpoint_record.links[i][1])
486             util.add_item(imap)
487             #print "[%d, %d]: %s" % (start, end, imap)
488             for j in range(len(imap.data) // 8):
489                 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
490                 inode = load_item(backend, imap.links[j][1])
491                 inodes[inum] = inode
492                 data_segments = set()
493                 util.add_item(inode)
494                 for i in inode.links:
495                     util.add_item(i[1])
496                     if i[1] is not None:
497                         data_segments.add(i[1][0:2])
498                 #print "  %d: %s (%d data segments)" % (inum, inode, len(data_segments))
499
500         #print
501         print "Segment utilizations:"
502         total_data = [0, 0]
503         deletions = [0, 0]
504         for (s, u) in sorted(util.segments.items()):
505             for i in range(2): total_data[i] += u[i]
506             print "%s: %s %s" % (s, u, float(u[1]) / u[0])
507             if u[1] == 0:
508                 print "Would delete..."
509                 (d, n) = backend.name_to_loc(s)
510                 try:
511                     if n < self.version_vector[d]:
512                         backend.delete(s)
513                         deletions[0] += 1
514                         deletions[1] += u[0]
515                     else:
516                         print "Not deleting log file newer than checkpoint!"
517                 except:
518                     print "Error determining age of log segment, keeping"
519
520         self.inodes = inodes
521         self.util = util
522         self.updated_inodes = set()
523
524         print "%d bytes total / %d bytes used" % tuple(total_data)
525         print "would delete %d segments (%d bytes)" % tuple(deletions)
526         benchlog_write("bytes_used: %d", total_data[1])
527         benchlog_write("bytes_wasted: %d", total_data[0] - total_data[1])
528         benchlog_write("bytes_freed: %d", deletions[1])
529
530     def mark_updated(self, inum):
531         self.updated_inodes.add(inum)
532
533     def write(self, backend, log):
534         updated_inodes = sorted(self.updated_inodes, reverse=True)
535
536         new_checkpoint = LogItem()
537         new_checkpoint.id = LogItem.random_id()
538         new_checkpoint.inum = 0
539         new_checkpoint.type = ITEM_TYPE.CHECKPOINT
540         new_checkpoint.data = CHECKPOINT_MAGIC
541         new_checkpoint.links = []
542
543         new_checkpoint.data += struct.pack('<I', len(self.version_vector))
544         for d in sorted(self.version_vector):
545             new_checkpoint.data += struct.pack('<II', d, self.version_vector[d])
546
547         data = self.checkpoint_record.data[self.vvsize + len(CHECKPOINT_MAGIC):]
548         for i in range(len(data) // 16):
549             (start, end) = struct.unpack_from("<QQ", data, 16*i)
550
551             new_checkpoint.data += data[16*i : 16*i + 16]
552
553             # Case 1: No inodes in this range of the old inode map have
554             # changed.  Simply emit a new pointer to the same inode map block.
555             if len(updated_inodes) == 0 or updated_inodes[-1] > end:
556                 old_location = self.checkpoint_record.links[i][1][0:2]
557                 if old_location not in self.obsolete_segments:
558                     new_checkpoint.links.append(self.checkpoint_record.links[i])
559                     continue
560
561             # Case 2: Some inodes have been updated.  Create a new inode map
562             # block, write it out, and point the new checkpoint at it.
563             inodes = [k for k in self.inodes if k >= start and k <= end]
564             inodes.sort()
565
566             block = LogItem()
567             block.id = LogItem.random_id()
568             block.inum = 0
569             block.type = ITEM_TYPE.INODE_MAP
570             block.links = []
571             block.data = ""
572             for j in inodes:
573                 block.data += struct.pack("<Q", j)
574                 block.links.append((self.inodes[j].id, self.inodes[j].location))
575             log.write(block, 2)
576
577             new_checkpoint.links.append((block.id, block.location))
578
579             while len(updated_inodes) > 0 and updated_inodes[-1] <= end:
580                 updated_inodes.pop()
581
582         log.write(new_checkpoint, 2)
583         self.checkpoint_record = new_checkpoint
584
585 def rewrite_inode(backend, inode_map, inum, log, copy_data=True):
586     inode = inode_map.inodes[inum]
587     if copy_data:
588         newlinks = []
589         for l in inode.links:
590             if l[1] is not None:
591                 data = load_item(backend, l[1])
592                 log.write(data, 0)
593                 newlinks.append((data.id, data.location))
594             else:
595                 newlinks.append(l)
596         inode.links = newlinks
597     log.write(inode, 1)
598     inode_map.mark_updated(inum)
599
600 def run_cleaner(backend, inode_map, log, repack_inodes=False):
601     # Determine which segments are poorly utilized and should be cleaned.  We
602     # need better heuristics here.
603     for (s, u) in sorted(inode_map.util.segments.items()):
604         if (float(u[1]) / u[0] < 0.6) and u[1] > 0:
605             print "Should clean segment", s
606             loc = backend.name_to_loc(s)
607             if s: inode_map.obsolete_segments.add(loc)
608
609     # TODO: We probably also want heuristics that will find inodes with
610     # badly-fragmented data and rewrite that to achieve better locality.
611
612     # Given that list of segments to clean, scan through those segments to find
613     # data which is still live and mark relevant inodes as needing to be
614     # rewritten.
615     if repack_inodes:
616         dirty_inodes = set(inode_map.inodes)
617     else:
618         dirty_inodes = set()
619     dirty_inode_data = set()
620     for s in inode_map.obsolete_segments:
621         filename = backend.loc_to_name(s)
622         #print "Scanning", filename, "for live data"
623         for item in parse_log(backend.read(filename), filename):
624             if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
625                 if item.inum != 0:
626                     inode = inode_map.inodes[item.inum]
627                     if s == inode.location[0:2]:
628                         dirty_inodes.add(item.inum)
629                     if item.inum not in dirty_inode_data:
630                         for b in inode.links:
631                             if b[1] is not None and s == b[1][0:2]:
632                                 dirty_inode_data.add(item.inum)
633                                 break
634
635     #print "Inodes to rewrite:", dirty_inodes
636     #print "Inodes with data to rewrite:", dirty_inode_data
637     for i in sorted(dirty_inodes.union(dirty_inode_data)):
638         rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
639
640 if __name__ == '__main__':
641     benchlog = open('cleaner.log', 'a')
642     benchlog_write("*** START CLEANER RUN ***")
643     start_time = time.time()
644     backend = S3Backend("mvrable-bluesky-west", cachedir="/tmp/bluesky-cache")
645     #backend = FileBackend(".")
646     chkpt = load_checkpoint_record(backend)
647     #print backend.list()
648     log_dir = LogDirectory(backend, 1)
649     imap = InodeMap()
650     imap.build(backend, chkpt)
651     print chkpt
652
653     print "Version vector:", imap.version_vector
654     print "Last cleaner log file:", log_dir.seq_num - 1
655     if imap.version_vector.get(1, -1) != log_dir.seq_num - 1:
656         print "Proxy hasn't updated to latest cleaner segment yet!"
657         benchlog_write("waiting for proxy...")
658         sys.exit(0)
659
660     run_cleaner(backend, imap, log_dir)
661     print "Version vector:", imap.version_vector
662     imap.write(backend, log_dir)
663     log_dir.close_all()
664     end_time = time.time()
665     backend.dump_stats()
666     benchlog_write("running_time: %s", end_time - start_time)
667     benchlog_write("")