Cleaner updates
[bluesky.git] / cleaner / cleaner
1 #!/usr/bin/env python
2 #
3 # A simple cleaner for the BlueSky cloud file system.  At the moment this is an
4 # offline cleaner--the file system must not be in use at the time that the
5 # cleaning is performed.  Later, it will be extended to be an online/concurrent
6 # cleaner, where cleaning is performed even while the file system is mounted.
7 #
8 # Copyright (C) 2010  The Regents of the University of California
9 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
10
11 import base64, os, re, struct, sys, time
12 import boto
13 from boto.s3.key import Key
14
15 # The BlueSky 'struct cloudlog_header' data type.
16 HEADER_FORMAT = '<4s48sb16sQIII'
17 HEADER_CRYPTBYTES = 48
18 HEADER_MAGIC1 = 'AgI-'          # Unencrypted data
19 HEADER_MAGIC2 = 'AgI='          # Encrypted data
20 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
21
22 CHECKPOINT_MAGIC = struct.pack('<Q', 0x7ad7dafb42a498b4)
23
24 class ITEM_TYPE:
25     DATA = '1'
26     INODE = '2'
27     INODE_MAP = '3'
28     CHECKPOINT = '4'
29
30 class Backend:
31     """Base class for BlueSky storage backends."""
32
33     def loc_to_name(self, location):
34         return "log-%08d-%08d" % (location)
35
36     def name_to_loc(self, name):
37         m = re.match(r"^log-(\d+)-(\d+)$", name)
38         if m: return (int(m.group(1)), int(m.group(2)))
39
40     def dump_stats(self):
41         pass
42
43 class FileBackend(Backend):
44     """An interface to BlueSky where the log segments are on local disk.
45
46     This is mainly intended for testing purposes, as the real cleaner would
47     operate where data is being stored in S3."""
48
49     def __init__(self, path):
50         self.path = path
51
52     def list(self, directory=0):
53         """Return a listing of all log segments and their sizes."""
54
55         prefix = "log-%08d-" % (directory,)
56         files = [f for f in os.listdir(self.path) if f.startswith(prefix)]
57         files.sort()
58
59         return [(f, os.stat(os.path.join(self.path, f)).st_size)
60                 for f in files]
61
62     def read(self, filename, offset=0, length=None):
63         fp = open(os.path.join(self.path, filename), 'rb')
64         if offset > 0:
65             fp.seek(offset)
66         if length is None:
67             return fp.read()
68         else:
69             return fp.read(length)
70
71     def write(self, filename, data):
72         fp = open(os.path.join(self.path, filename), 'wb')
73         fp.write(data)
74         fp.close()
75
76     def delete(self, filename):
77         os.unlink(os.path.join(self.path, filename))
78
79 def retry_wrap(method):
80     def wrapped(self, *args, **kwargs):
81         for retries in range(3):
82             try:
83                 return method(self, *args, **kwargs)
84             except:
85                 print >>sys.stderr, "S3 operation failed, retrying..."
86                 print >>sys.stderr, "  %s %s %s" % (method, args, kwargs)
87                 self.connect()
88                 time.sleep(1.0)
89         return method(self, *args, **kwargs)
90     return wrapped
91
92 class S3Backend(Backend):
93     """An interface to BlueSky where the log segments are on in Amazon S3."""
94
95     def __init__(self, bucket, path='', cachedir="."):
96         self.bucket_name = bucket
97         self.path = path
98         self.cachedir = cachedir
99         self.cache = {}
100         for f in os.listdir(cachedir):
101             self.cache[f] = True
102         #print "Initial cache contents:", list(self.cache.keys())
103         self.connect()
104         self.stats_get = [0, 0]
105         self.stats_put = [0, 0]
106
107     def connect(self):
108         self.conn = boto.connect_s3(is_secure=False)
109         self.bucket = self.conn.get_bucket(self.bucket_name)
110
111     def list(self, directory=0):
112         files = []
113         prefix = "log-%08d-" % (directory,)
114         for k in self.bucket.list(self.path + prefix):
115             files.append((k.key, k.size))
116         return files
117
118     @retry_wrap
119     def read(self, filename, offset=0, length=None):
120         if filename in self.cache:
121             fp = open(os.path.join(self.cachedir, filename), 'rb')
122             if offset > 0:
123                 fp.seek(offset)
124             if length is None:
125                 return fp.read()
126             else:
127                 return fp.read(length)
128         else:
129             k = Key(self.bucket)
130             k.key = self.path + filename
131             data = k.get_contents_as_string()
132             fp = open(os.path.join(self.cachedir, filename), 'wb')
133             fp.write(data)
134             fp.close()
135             self.cache[filename] = True
136             self.stats_get[0] += 1
137             self.stats_get[1] += len(data)
138             if offset > 0:
139                 data = data[offset:]
140             if length is not None:
141                 data = data[0:length]
142             return data
143
144     @retry_wrap
145     def write(self, filename, data):
146         k = Key(self.bucket)
147         k.key = self.path + filename
148         k.set_contents_from_string(data)
149         self.stats_put[0] += 1
150         self.stats_put[1] += len(data)
151         if filename in self.cache:
152             del self.cache[filename]
153
154     @retry_wrap
155     def delete(self, filename):
156         k = Key(self.bucket)
157         k.key = self.path + filename
158         k.delete()
159         if filename in self.cache:
160             del self.cache[filename]
161
162     def dump_stats(self):
163         print "S3 statistics:"
164         print "GET: %d ops / %d bytes" % tuple(self.stats_get)
165         print "PUT: %d ops / %d bytes" % tuple(self.stats_put)
166
167 class SimpleBackend(Backend):
168     """An interface to the simple BlueSky test network server."""
169
170     def __init__(self, server=('localhost', 12345), cachedir="."):
171         self.bucket_name = bucket
172         self.server_address = server
173         self.cachedir = cachedir
174         self.cache = {}
175
176     def _get_socket(self):
177         return socket.create_connection(self.server_address).makefile()
178
179     def list(self, directory=0):
180         files = []
181         prefix = "log-%08d-" % (directory,)
182         for k in self.bucket.list(self.path + prefix):
183             files.append((k.key, k.size))
184         return files
185
186     def read(self, filename, offset=0, length=None):
187         if filename in self.cache:
188             fp = open(os.path.join(self.cachedir, filename), 'rb')
189             if offset > 0:
190                 fp.seek(offset)
191             if length is None:
192                 return fp.read()
193             else:
194                 return fp.read(length)
195         else:
196             f = self._get_socket()
197             f.write("GET %s %d %d\n" % (filename, 0, 0))
198             f.flush()
199             datalen = int(f.readline())
200             if datalen < 0:
201                 raise RuntimeError
202             data = f.read(datalen)
203             fp = open(os.path.join(self.cachedir, filename), 'wb')
204             fp.write(data)
205             fp.close()
206             self.cache[filename] = True
207             if offset > 0:
208                 data = data[offset:]
209             if length is not None:
210                 data = data[0:length]
211             return data
212
213     def write(self, filename, data):
214         f = self._get_socket()
215         f.write("PUT %s %d %d\n" % (filename, len(data)))
216         f.write(data)
217         f.flush()
218         result = int(f.readline())
219         if filename in self.cache:
220             del self.cache[filename]
221
222     def delete(self, filename):
223         pass
224
225 class LogItem:
226     """In-memory representation of a single item stored in a log file."""
227
228     def __init__(self):
229         self.cryptkeys = '\0' * HEADER_CRYPTBYTES
230         self.encrypted = False
231
232     def __str__(self):
233         return "<Item%s ty=%s location=%s size=%d id=%s...>" % (self.encrypted and '$' or '', self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
234
235     @staticmethod
236     def random_id():
237         return open('/dev/urandom').read(16)
238
239     def serialize(self):
240         link_ids = []
241         link_locs = []
242         for (i, l) in self.links:
243             link_ids.append(i)
244             if i != '\0' * 16:
245                 link_locs.append(struct.pack('<IIII', *l))
246         link_ids = ''.join(link_ids)
247         link_locs = ''.join(link_locs)
248
249         if self.encrypted:
250             magic = HEADER_MAGIC2
251         else:
252             magic = HEADER_MAGIC1
253         header = struct.pack(HEADER_FORMAT,
254                              magic, self.cryptkeys,
255                              ord(self.type), self.id, self.inum,
256                              len(self.data), len(link_ids), len(link_locs))
257         return header + self.data + link_ids + link_locs
258
259 class LogSegment:
260     def __init__(self, backend, location):
261         self.backend = backend
262         self.location = location
263         self.data = []
264
265     def __len__(self):
266         return sum(len(s) for s in self.data)
267
268     def write(self, item):
269         data = item.serialize()
270         offset = len(self)
271         self.data.append(data)
272         item.location = self.location + (offset, len(data))
273
274     def close(self):
275         data = ''.join(self.data)
276         filename = self.backend.loc_to_name(self.location)
277         print "Would write %d bytes of data to %s" % (len(data), filename)
278         self.backend.write(filename, data)
279
280 class LogDirectory:
281     TARGET_SIZE = 4 << 20
282
283     def __init__(self, backend, dir):
284         self.backend = backend
285         self.dir_num = dir
286         self.seq_num = 0
287         for logname in backend.list(dir):
288             #print "Old log file:", logname
289             loc = backend.name_to_loc(logname[0])
290             if loc is not None and loc[0] == dir:
291                 self.seq_num = max(self.seq_num, loc[1] + 1)
292         self.groups = {}
293         print "Starting sequence number is", self.seq_num
294
295     def open_segment(self):
296         seg = LogSegment(self.backend, (self.dir_num, self.seq_num))
297         self.seq_num += 1
298         return seg
299
300     def write(self, item, segment_group=0):
301         if segment_group not in self.groups:
302             self.groups[segment_group] = self.open_segment()
303         seg = self.groups[segment_group]
304         seg.write(item)
305         if len(seg) >= LogDirectory.TARGET_SIZE:
306             seg.close()
307             del self.groups[segment_group]
308
309     def close_all(self):
310         for k in list(self.groups.keys()):
311             self.groups[k].close()
312             del self.groups[k]
313
314 class UtilizationTracker:
315     """A simple object that tracks what fraction of each segment is used.
316
317     This data can be used to guide segment cleaning decisions."""
318
319     def __init__(self, backend):
320         self.segments = {}
321         for (segment, size) in backend.list(0) + backend.list(1):
322             self.segments[segment] = [size, 0]
323
324     def add_item(self, item):
325         if isinstance(item, LogItem):
326             item = item.location
327         if item is None: return
328         (dir, seq, offset, size) = item
329         filename = "log-%08d-%08d" % (dir, seq)
330         self.segments[filename][1] += size
331
332 def parse_item(data):
333     if len(data) < HEADER_SIZE: return
334     header = struct.unpack_from(HEADER_FORMAT, data, 0)
335     size = HEADER_SIZE + sum(header[5:8])
336
337     if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
338         print "Bad header magic!"
339         return
340
341     if len(data) != size:
342         print "Item size does not match: %d != %d" % (size, len(data))
343         return
344
345     item = LogItem()
346     if header[0] == HEADER_MAGIC2: item.encrypted = True
347     item.cryptkeys = header[1]
348     item.id = header[3]
349     item.inum = header[4]
350     item.location = None
351     item.type = chr(header[2])
352     item.size = size
353     item.data = data[HEADER_SIZE : HEADER_SIZE + header[5]]
354     links = []
355     link_ids = data[HEADER_SIZE + header[5]
356                     : HEADER_SIZE + header[5] + header[6]]
357     link_locs = data[HEADER_SIZE + header[5] + header[6]
358                      : HEADER_SIZE + sum(header[5:8])]
359     for i in range(len(link_ids) // 16):
360         id = link_ids[16*i : 16*i + 16]
361         if id == '\0' * 16:
362             loc = None
363         else:
364             loc = struct.unpack('<IIII', link_locs[0:16])
365             link_locs = link_locs[16:]
366         links.append((id, loc))
367     item.links = links
368     return item
369
370 def load_item(backend, location):
371     """Load the cloud item pointed at by the 4-tuple 'location'.
372
373     The elements of the tuple are (directory, sequence, offset, size)."""
374
375     filename = backend.loc_to_name((location[0], location[1]))
376     data = backend.read(filename, location[2], location[3])
377     item = parse_item(data)
378     item.location = location
379     return item
380
381 def parse_log(data, location=None):
382     """Parse contents of a log file, yielding a sequence of log items."""
383
384     if isinstance(location, str):
385         m = re.match(r"^log-(\d+)-(\d+)$", location)
386         if m:
387             location = (int(m.group(1)), int(m.group(2)))
388         else:
389             location = None
390
391     offset = 0
392     while len(data) - offset >= HEADER_SIZE:
393         header = struct.unpack_from(HEADER_FORMAT, data, offset)
394         size = HEADER_SIZE + sum(header[5:8])
395         if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
396             print "Bad header magic!"
397             break
398         if size + offset > len(data):
399             print "Short data record at end of log: %s < %s" % (len(data) - offset, size)
400             break
401         item = parse_item(data[offset : offset + size])
402         if location is not None:
403             item.location = location + (offset, size)
404         if item is not None: yield item
405         offset += size
406
407 def load_checkpoint_record(backend, directory=0):
408     for (log, size) in reversed(backend.list(directory)):
409         for item in reversed(list(parse_log(backend.read(log), log))):
410             print item
411             if item.type == ITEM_TYPE.CHECKPOINT:
412                 return item
413
414 class InodeMap:
415     def __init__(self):
416         pass
417
418     def build(self, backend, checkpoint_record):
419         """Reconstruct the inode map from the checkpoint record given.
420
421         This will also build up information about segment utilization."""
422
423         self.version_vector = {}
424         self.checkpoint_record = checkpoint_record
425
426         util = UtilizationTracker(backend)
427         util.add_item(checkpoint_record)
428         inodes = {}
429         self.obsolete_segments = set()
430
431         data = checkpoint_record.data
432         if not data.startswith(CHECKPOINT_MAGIC):
433             raise ValueError, "Invalid checkpoint record!"
434         data = data[len(CHECKPOINT_MAGIC):]
435         (vvlen,) = struct.unpack_from("<I", data, 0)
436         self.vvsize = 4 + 8*vvlen
437         for i in range(vvlen):
438             (v1, v2) = struct.unpack_from("<II", data, 4 + 8*i)
439             self.version_vector[v1] = v2
440         print self.version_vector
441         self.version_vector[checkpoint_record.location[0]] \
442             = checkpoint_record.location[1]
443         print self.version_vector
444
445         data = data[self.vvsize:]
446
447         #print "Inode map:"
448         for i in range(len(data) // 16):
449             (start, end) = struct.unpack_from("<QQ", data, 16*i)
450             imap = load_item(backend, checkpoint_record.links[i][1])
451             util.add_item(imap)
452             #print "[%d, %d]: %s" % (start, end, imap)
453             for j in range(len(imap.data) // 8):
454                 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
455                 inode = load_item(backend, imap.links[j][1])
456                 inodes[inum] = inode
457                 data_segments = set()
458                 util.add_item(inode)
459                 for i in inode.links:
460                     util.add_item(i[1])
461                     if i[1] is not None:
462                         data_segments.add(i[1][0:2])
463                 #print "  %d: %s (%d data segments)" % (inum, inode, len(data_segments))
464
465         #print
466         print "Segment utilizations:"
467         total_data = [0, 0]
468         deletions = [0, 0]
469         for (s, u) in sorted(util.segments.items()):
470             for i in range(2): total_data[i] += u[i]
471             print "%s: %s %s" % (s, u, float(u[1]) / u[0])
472             if u[1] == 0:
473                 print "Would delete..."
474                 (d, n) = backend.name_to_loc(s)
475                 try:
476                     if n < self.version_vector[d]:
477                         backend.delete(s)
478                         deletions[0] += 1
479                         deletions[1] += u[0]
480                     else:
481                         print "Not deleting log file newer than checkpoint!"
482                 except:
483                     print "Error determining age of log segment, keeping"
484
485         self.inodes = inodes
486         self.util = util
487         self.updated_inodes = set()
488
489         print "%d bytes total / %d bytes used" % tuple(total_data)
490         print "would delete %d segments (%d bytes)" % tuple(deletions)
491
492     def mark_updated(self, inum):
493         self.updated_inodes.add(inum)
494
495     def write(self, backend, log):
496         updated_inodes = sorted(self.updated_inodes, reverse=True)
497
498         new_checkpoint = LogItem()
499         new_checkpoint.id = LogItem.random_id()
500         new_checkpoint.inum = 0
501         new_checkpoint.type = ITEM_TYPE.CHECKPOINT
502         new_checkpoint.data = CHECKPOINT_MAGIC
503         new_checkpoint.links = []
504
505         new_checkpoint.data += struct.pack('<I', len(self.version_vector))
506         for d in sorted(self.version_vector):
507             new_checkpoint.data += struct.pack('<II', d, self.version_vector[d])
508
509         data = self.checkpoint_record.data[self.vvsize + len(CHECKPOINT_MAGIC):]
510         for i in range(len(data) // 16):
511             (start, end) = struct.unpack_from("<QQ", data, 16*i)
512
513             new_checkpoint.data += data[16*i : 16*i + 16]
514
515             # Case 1: No inodes in this range of the old inode map have
516             # changed.  Simply emit a new pointer to the same inode map block.
517             if len(updated_inodes) == 0 or updated_inodes[-1] > end:
518                 old_location = self.checkpoint_record.links[i][1][0:2]
519                 if old_location not in self.obsolete_segments:
520                     new_checkpoint.links.append(self.checkpoint_record.links[i])
521                     continue
522
523             # Case 2: Some inodes have been updated.  Create a new inode map
524             # block, write it out, and point the new checkpoint at it.
525             inodes = [k for k in self.inodes if k >= start and k <= end]
526             inodes.sort()
527
528             block = LogItem()
529             block.id = LogItem.random_id()
530             block.inum = 0
531             block.type = ITEM_TYPE.INODE_MAP
532             block.links = []
533             block.data = ""
534             for j in inodes:
535                 block.data += struct.pack("<Q", j)
536                 block.links.append((self.inodes[j].id, self.inodes[j].location))
537             log.write(block, 2)
538
539             new_checkpoint.links.append((block.id, block.location))
540
541             while len(updated_inodes) > 0 and updated_inodes[-1] <= end:
542                 updated_inodes.pop()
543
544         log.write(new_checkpoint, 2)
545         self.checkpoint_record = new_checkpoint
546
547 def rewrite_inode(backend, inode_map, inum, log, copy_data=True):
548     inode = inode_map.inodes[inum]
549     if copy_data:
550         newlinks = []
551         for l in inode.links:
552             if l[1] is not None:
553                 data = load_item(backend, l[1])
554                 log.write(data, 0)
555                 newlinks.append((data.id, data.location))
556             else:
557                 newlinks.append(l)
558         inode.links = newlinks
559     log.write(inode, 1)
560     inode_map.mark_updated(inum)
561
562 def run_cleaner(backend, inode_map, log, repack_inodes=False):
563     # Determine which segments are poorly utilized and should be cleaned.  We
564     # need better heuristics here.
565     for (s, u) in sorted(inode_map.util.segments.items()):
566         if (float(u[1]) / u[0] < 0.6) and u[1] > 0:
567             print "Should clean segment", s
568             loc = backend.name_to_loc(s)
569             if s: inode_map.obsolete_segments.add(loc)
570
571     # TODO: We probably also want heuristics that will find inodes with
572     # badly-fragmented data and rewrite that to achieve better locality.
573
574     # Given that list of segments to clean, scan through those segments to find
575     # data which is still live and mark relevant inodes as needing to be
576     # rewritten.
577     if repack_inodes:
578         dirty_inodes = set(inode_map.inodes)
579     else:
580         dirty_inodes = set()
581     dirty_inode_data = set()
582     for s in inode_map.obsolete_segments:
583         filename = backend.loc_to_name(s)
584         #print "Scanning", filename, "for live data"
585         for item in parse_log(backend.read(filename), filename):
586             if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
587                 if item.inum != 0:
588                     inode = inode_map.inodes[item.inum]
589                     if s == inode.location[0:2]:
590                         dirty_inodes.add(item.inum)
591                     if item.inum not in dirty_inode_data:
592                         for b in inode.links:
593                             if b[1] is not None and s == b[1][0:2]:
594                                 dirty_inode_data.add(item.inum)
595                                 break
596
597     #print "Inodes to rewrite:", dirty_inodes
598     #print "Inodes with data to rewrite:", dirty_inode_data
599     for i in sorted(dirty_inodes.union(dirty_inode_data)):
600         rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
601
602 if __name__ == '__main__':
603     start_time = time.time()
604     backend = S3Backend("mvrable-bluesky-west", cachedir="/tmp/bluesky-cache")
605     #backend = FileBackend(".")
606     chkpt = load_checkpoint_record(backend)
607     #print backend.list()
608     log_dir = LogDirectory(backend, 1)
609     imap = InodeMap()
610     imap.build(backend, chkpt)
611     print chkpt
612
613     run_cleaner(backend, imap, log_dir)
614     print "Version vector:", imap.version_vector
615     imap.write(backend, log_dir)
616     log_dir.close_all()
617     end_time = time.time()
618     print "Cleaner running time:", end_time - start_time
619     backend.dump_stats()