Make cleaner cache persistent across runs
[bluesky.git] / cleaner / cleaner
1 #!/usr/bin/env python
2 #
3 # A simple cleaner for the BlueSky cloud file system.  At the moment this is an
4 # offline cleaner--the file system must not be in use at the time that the
5 # cleaning is performed.  Later, it will be extended to be an online/concurrent
6 # cleaner, where cleaning is performed even while the file system is mounted.
7 #
8 # Copyright (C) 2010  The Regents of the University of California
9 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
10
11 import base64, os, re, struct, sys, time
12 import boto
13 from boto.s3.key import Key
14
15 # The BlueSky 'struct cloudlog_header' data type.
16 HEADER_FORMAT = '<4s48sb16sQIII'
17 HEADER_CRYPTBYTES = 48
18 HEADER_MAGIC1 = 'AgI-'          # Unencrypted data
19 HEADER_MAGIC2 = 'AgI='          # Encrypted data
20 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
21
22 CHECKPOINT_MAGIC = struct.pack('<Q', 0x7ad7dafb42a498b4)
23
24 class ITEM_TYPE:
25     DATA = '1'
26     INODE = '2'
27     INODE_MAP = '3'
28     CHECKPOINT = '4'
29
30 class Backend:
31     """Base class for BlueSky storage backends."""
32
33     def loc_to_name(self, location):
34         return "log-%08d-%08d" % (location)
35
36     def name_to_loc(self, name):
37         m = re.match(r"^log-(\d+)-(\d+)$", name)
38         if m: return (int(m.group(1)), int(m.group(2)))
39
40     def dump_stats(self):
41         pass
42
43 class FileBackend(Backend):
44     """An interface to BlueSky where the log segments are on local disk.
45
46     This is mainly intended for testing purposes, as the real cleaner would
47     operate where data is being stored in S3."""
48
49     def __init__(self, path):
50         self.path = path
51
52     def list(self, directory=0):
53         """Return a listing of all log segments and their sizes."""
54
55         prefix = "log-%08d-" % (directory,)
56         files = [f for f in os.listdir(self.path) if f.startswith(prefix)]
57         files.sort()
58
59         return [(f, os.stat(os.path.join(self.path, f)).st_size)
60                 for f in files]
61
62     def read(self, filename, offset=0, length=None):
63         fp = open(os.path.join(self.path, filename), 'rb')
64         if offset > 0:
65             fp.seek(offset)
66         if length is None:
67             return fp.read()
68         else:
69             return fp.read(length)
70
71     def write(self, filename, data):
72         fp = open(os.path.join(self.path, filename), 'wb')
73         fp.write(data)
74         fp.close()
75
76     def delete(self, filename):
77         os.unlink(os.path.join(self.path, filename))
78
79 def retry_wrap(method):
80     def wrapped(self, *args, **kwargs):
81         for retries in range(3):
82             try:
83                 return method(self, *args, **kwargs)
84             except:
85                 print >>sys.stderr, "S3 operation failed, retrying..."
86                 self.connect()
87                 time.sleep(1.0)
88         return method(self, *args, **kwargs)
89     return wrapped
90
91 class S3Backend(Backend):
92     """An interface to BlueSky where the log segments are on in Amazon S3."""
93
94     def __init__(self, bucket, path='', cachedir="."):
95         self.bucket_name = bucket
96         self.path = path
97         self.cachedir = cachedir
98         self.cache = {}
99         for f in os.listdir(cachedir):
100             self.cache[f] = True
101         print "Initial cache contents:", list(self.cache.keys())
102         self.connect()
103         self.stats_get = [0, 0]
104         self.stats_put = [0, 0]
105
106     def connect(self):
107         self.conn = boto.connect_s3(is_secure=False)
108         self.bucket = self.conn.get_bucket(self.bucket_name)
109
110     def list(self, directory=0):
111         files = []
112         prefix = "log-%08d-" % (directory,)
113         for k in self.bucket.list(self.path + prefix):
114             files.append((k.key, k.size))
115         return files
116
117     @retry_wrap
118     def read(self, filename, offset=0, length=None):
119         if filename in self.cache:
120             fp = open(os.path.join(self.cachedir, filename), 'rb')
121             if offset > 0:
122                 fp.seek(offset)
123             if length is None:
124                 return fp.read()
125             else:
126                 return fp.read(length)
127         else:
128             k = Key(self.bucket)
129             k.key = self.path + filename
130             data = k.get_contents_as_string()
131             fp = open(os.path.join(self.cachedir, filename), 'wb')
132             fp.write(data)
133             fp.close()
134             self.cache[filename] = True
135             self.stats_get[0] += 1
136             self.stats_get[1] += len(data)
137             if offset > 0:
138                 data = data[offset:]
139             if length is not None:
140                 data = data[0:length]
141             return data
142
143     @retry_wrap
144     def write(self, filename, data):
145         k = Key(self.bucket)
146         k.key = self.path + filename
147         k.set_contents_from_string(data)
148         self.stats_put[0] += 1
149         self.stats_put[1] += len(data)
150         if filename in self.cache:
151             del self.cache[filename]
152
153     @retry_wrap
154     def delete(self, filename):
155         k = Key(self.bucket)
156         k.key = self.path + filename
157         k.delete()
158         if filename in self.cache:
159             del self.cache[filename]
160
161     def dump_stats(self):
162         print "S3 statistics:"
163         print "GET: %d ops / %d bytes" % tuple(self.stats_get)
164         print "PUT: %d ops / %d bytes" % tuple(self.stats_put)
165
166 class SimpleBackend(Backend):
167     """An interface to the simple BlueSky test network server."""
168
169     def __init__(self, server=('localhost', 12345), cachedir="."):
170         self.bucket_name = bucket
171         self.server_address = server
172         self.cachedir = cachedir
173         self.cache = {}
174
175     def _get_socket(self):
176         return socket.create_connection(self.server_address).makefile()
177
178     def list(self, directory=0):
179         files = []
180         prefix = "log-%08d-" % (directory,)
181         for k in self.bucket.list(self.path + prefix):
182             files.append((k.key, k.size))
183         return files
184
185     def read(self, filename, offset=0, length=None):
186         if filename in self.cache:
187             fp = open(os.path.join(self.cachedir, filename), 'rb')
188             if offset > 0:
189                 fp.seek(offset)
190             if length is None:
191                 return fp.read()
192             else:
193                 return fp.read(length)
194         else:
195             f = self._get_socket()
196             f.write("GET %s %d %d\n" % (filename, 0, 0))
197             f.flush()
198             datalen = int(f.readline())
199             if datalen < 0:
200                 raise RuntimeError
201             data = f.read(datalen)
202             fp = open(os.path.join(self.cachedir, filename), 'wb')
203             fp.write(data)
204             fp.close()
205             self.cache[filename] = True
206             if offset > 0:
207                 data = data[offset:]
208             if length is not None:
209                 data = data[0:length]
210             return data
211
212     def write(self, filename, data):
213         f = self._get_socket()
214         f.write("PUT %s %d %d\n" % (filename, len(data)))
215         f.write(data)
216         f.flush()
217         result = int(f.readline())
218         if filename in self.cache:
219             del self.cache[filename]
220
221     def delete(self, filename):
222         pass
223
224 class LogItem:
225     """In-memory representation of a single item stored in a log file."""
226
227     def __init__(self):
228         self.cryptkeys = '\0' * HEADER_CRYPTBYTES
229         self.encrypted = False
230
231     def __str__(self):
232         return "<Item%s ty=%s location=%s size=%d id=%s...>" % (self.encrypted and '$' or '', self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
233
234     @staticmethod
235     def random_id():
236         return open('/dev/urandom').read(16)
237
238     def serialize(self):
239         link_ids = []
240         link_locs = []
241         for (i, l) in self.links:
242             link_ids.append(i)
243             if i != '\0' * 16:
244                 link_locs.append(struct.pack('<IIII', *l))
245         link_ids = ''.join(link_ids)
246         link_locs = ''.join(link_locs)
247
248         if self.encrypted:
249             magic = HEADER_MAGIC2
250         else:
251             magic = HEADER_MAGIC1
252         header = struct.pack(HEADER_FORMAT,
253                              magic, self.cryptkeys,
254                              ord(self.type), self.id, self.inum,
255                              len(self.data), len(link_ids), len(link_locs))
256         return header + self.data + link_ids + link_locs
257
258 class LogSegment:
259     def __init__(self, backend, location):
260         self.backend = backend
261         self.location = location
262         self.data = []
263
264     def __len__(self):
265         return sum(len(s) for s in self.data)
266
267     def write(self, item):
268         data = item.serialize()
269         offset = len(self)
270         self.data.append(data)
271         item.location = self.location + (offset, len(data))
272
273     def close(self):
274         data = ''.join(self.data)
275         filename = self.backend.loc_to_name(self.location)
276         print "Would write %d bytes of data to %s" % (len(data), filename)
277         self.backend.write(filename, data)
278
279 class LogDirectory:
280     TARGET_SIZE = 4 << 20
281
282     def __init__(self, backend, dir):
283         self.backend = backend
284         self.dir_num = dir
285         self.seq_num = 0
286         for logname in backend.list(dir):
287             print "Old log file:", logname
288             loc = backend.name_to_loc(logname[0])
289             if loc is not None and loc[0] == dir:
290                 self.seq_num = max(self.seq_num, loc[1] + 1)
291         self.groups = {}
292         print "Starting sequence number is", self.seq_num
293
294     def open_segment(self):
295         seg = LogSegment(self.backend, (self.dir_num, self.seq_num))
296         self.seq_num += 1
297         return seg
298
299     def write(self, item, segment_group=0):
300         if segment_group not in self.groups:
301             self.groups[segment_group] = self.open_segment()
302         seg = self.groups[segment_group]
303         seg.write(item)
304         if len(seg) >= LogDirectory.TARGET_SIZE:
305             seg.close()
306             del self.groups[segment_group]
307
308     def close_all(self):
309         for k in list(self.groups.keys()):
310             self.groups[k].close()
311             del self.groups[k]
312
313 class UtilizationTracker:
314     """A simple object that tracks what fraction of each segment is used.
315
316     This data can be used to guide segment cleaning decisions."""
317
318     def __init__(self, backend):
319         self.segments = {}
320         for (segment, size) in backend.list(0) + backend.list(1):
321             self.segments[segment] = [size, 0]
322
323     def add_item(self, item):
324         if isinstance(item, LogItem):
325             item = item.location
326         if item is None: return
327         (dir, seq, offset, size) = item
328         filename = "log-%08d-%08d" % (dir, seq)
329         self.segments[filename][1] += size
330
331 def parse_item(data):
332     if len(data) < HEADER_SIZE: return
333     header = struct.unpack_from(HEADER_FORMAT, data, 0)
334     size = HEADER_SIZE + sum(header[5:8])
335
336     if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
337         print "Bad header magic!"
338         return
339
340     if len(data) != size:
341         print "Item size does not match: %d != %d" % (size, len(data))
342         return
343
344     item = LogItem()
345     if header[0] == HEADER_MAGIC2: item.encrypted = True
346     item.cryptkeys = header[1]
347     item.id = header[3]
348     item.inum = header[4]
349     item.location = None
350     item.type = chr(header[2])
351     item.size = size
352     item.data = data[HEADER_SIZE : HEADER_SIZE + header[5]]
353     links = []
354     link_ids = data[HEADER_SIZE + header[5]
355                     : HEADER_SIZE + header[5] + header[6]]
356     link_locs = data[HEADER_SIZE + header[5] + header[6]
357                      : HEADER_SIZE + sum(header[5:8])]
358     for i in range(len(link_ids) // 16):
359         id = link_ids[16*i : 16*i + 16]
360         if id == '\0' * 16:
361             loc = None
362         else:
363             loc = struct.unpack('<IIII', link_locs[0:16])
364             link_locs = link_locs[16:]
365         links.append((id, loc))
366     item.links = links
367     return item
368
369 def load_item(backend, location):
370     """Load the cloud item pointed at by the 4-tuple 'location'.
371
372     The elements of the tuple are (directory, sequence, offset, size)."""
373
374     filename = backend.loc_to_name((location[0], location[1]))
375     data = backend.read(filename, location[2], location[3])
376     item = parse_item(data)
377     item.location = location
378     return item
379
380 def parse_log(data, location=None):
381     """Parse contents of a log file, yielding a sequence of log items."""
382
383     if isinstance(location, str):
384         m = re.match(r"^log-(\d+)-(\d+)$", location)
385         if m:
386             location = (int(m.group(1)), int(m.group(2)))
387         else:
388             location = None
389
390     offset = 0
391     while len(data) - offset >= HEADER_SIZE:
392         header = struct.unpack_from(HEADER_FORMAT, data, offset)
393         size = HEADER_SIZE + sum(header[5:8])
394         if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
395             print "Bad header magic!"
396             break
397         if size + offset > len(data):
398             print "Short data record at end of log: %s < %s" % (len(data) - offset, size)
399             break
400         item = parse_item(data[offset : offset + size])
401         if location is not None:
402             item.location = location + (offset, size)
403         if item is not None: yield item
404         offset += size
405
406 def load_checkpoint_record(backend, directory=0):
407     for (log, size) in reversed(backend.list(directory)):
408         for item in reversed(list(parse_log(backend.read(log), log))):
409             print item
410             if item.type == ITEM_TYPE.CHECKPOINT:
411                 return item
412
413 class InodeMap:
414     def __init__(self):
415         pass
416
417     def build(self, backend, checkpoint_record):
418         """Reconstruct the inode map from the checkpoint record given.
419
420         This will also build up information about segment utilization."""
421
422         self.version_vector = {}
423         self.checkpoint_record = checkpoint_record
424
425         util = UtilizationTracker(backend)
426         util.add_item(checkpoint_record)
427         inodes = {}
428         self.obsolete_segments = set()
429
430         data = checkpoint_record.data
431         if not data.startswith(CHECKPOINT_MAGIC):
432             raise ValueError, "Invalid checkpoint record!"
433         data = data[len(CHECKPOINT_MAGIC):]
434         (vvlen,) = struct.unpack_from("<I", data, 0)
435         self.vvsize = 4 + 8*vvlen
436         for i in range(vvlen):
437             (v1, v2) = struct.unpack_from("<II", data, 4 + 8*i)
438             self.version_vector[v1] = v2
439         print self.version_vector
440         self.version_vector[checkpoint_record.location[0]] \
441             = checkpoint_record.location[1]
442         print self.version_vector
443
444         data = data[self.vvsize:]
445
446         print "Inode map:"
447         for i in range(len(data) // 16):
448             (start, end) = struct.unpack_from("<QQ", data, 16*i)
449             imap = load_item(backend, checkpoint_record.links[i][1])
450             util.add_item(imap)
451             print "[%d, %d]: %s" % (start, end, imap)
452             for j in range(len(imap.data) // 8):
453                 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
454                 inode = load_item(backend, imap.links[j][1])
455                 inodes[inum] = inode
456                 data_segments = set()
457                 util.add_item(inode)
458                 for i in inode.links:
459                     util.add_item(i[1])
460                     data_segments.add(i[1][0:2])
461                 print "  %d: %s (%d data segments)" % (inum, inode, len(data_segments))
462
463         print
464         print "Segment utilizations:"
465         total_data = [0, 0]
466         deletions = [0, 0]
467         for (s, u) in sorted(util.segments.items()):
468             for i in range(2): total_data[i] += u[i]
469             print "%s: %s %s" % (s, u, float(u[1]) / u[0])
470             if u[1] == 0:
471                 print "Would delete..."
472                 backend.delete(s)
473                 deletions[0] += 1
474                 deletions[1] += u[0]
475
476         self.inodes = inodes
477         self.util = util
478         self.updated_inodes = set()
479
480         print "%d bytes total / %d bytes used" % tuple(total_data)
481         print "would delete %d segments (%d bytes)" % tuple(deletions)
482
483     def mark_updated(self, inum):
484         self.updated_inodes.add(inum)
485
486     def write(self, backend, log):
487         updated_inodes = sorted(self.updated_inodes, reverse=True)
488
489         new_checkpoint = LogItem()
490         new_checkpoint.id = LogItem.random_id()
491         new_checkpoint.inum = 0
492         new_checkpoint.type = ITEM_TYPE.CHECKPOINT
493         new_checkpoint.data = CHECKPOINT_MAGIC
494         new_checkpoint.links = []
495
496         new_checkpoint.data += struct.pack('<I', len(self.version_vector))
497         for d in sorted(self.version_vector):
498             new_checkpoint.data += struct.pack('<II', d, self.version_vector[d])
499
500         data = self.checkpoint_record.data[self.vvsize + len(CHECKPOINT_MAGIC):]
501         for i in range(len(data) // 16):
502             (start, end) = struct.unpack_from("<QQ", data, 16*i)
503
504             new_checkpoint.data += data[16*i : 16*i + 16]
505
506             # Case 1: No inodes in this range of the old inode map have
507             # changed.  Simply emit a new pointer to the same inode map block.
508             if len(updated_inodes) == 0 or updated_inodes[-1] > end:
509                 old_location = self.checkpoint_record.links[i][1][0:2]
510                 if old_location not in self.obsolete_segments:
511                     new_checkpoint.links.append(self.checkpoint_record.links[i])
512                     continue
513
514             # Case 2: Some inodes have been updated.  Create a new inode map
515             # block, write it out, and point the new checkpoint at it.
516             inodes = [k for k in self.inodes if k >= start and k <= end]
517             inodes.sort()
518
519             block = LogItem()
520             block.id = LogItem.random_id()
521             block.inum = 0
522             block.type = ITEM_TYPE.INODE_MAP
523             block.links = []
524             block.data = ""
525             for j in inodes:
526                 block.data += struct.pack("<Q", j)
527                 block.links.append((self.inodes[j].id, self.inodes[j].location))
528             log.write(block, 2)
529
530             new_checkpoint.links.append((block.id, block.location))
531
532             while len(updated_inodes) > 0 and updated_inodes[-1] <= end:
533                 updated_inodes.pop()
534
535         log.write(new_checkpoint, 2)
536         self.checkpoint_record = new_checkpoint
537
538 def rewrite_inode(backend, inode_map, inum, log, copy_data=True):
539     inode = inode_map.inodes[inum]
540     if copy_data:
541         blocks = []
542         for l in inode.links:
543             data = load_item(backend, l[1])
544             blocks.append(data)
545             log.write(data, 0)
546         inode.links = [(b.id, b.location) for b in blocks]
547     log.write(inode, 1)
548     inode_map.mark_updated(inum)
549
550 def run_cleaner(backend, inode_map, log, repack_inodes=False):
551     # Determine which segments are poorly utilized and should be cleaned.  We
552     # need better heuristics here.
553     for (s, u) in sorted(inode_map.util.segments.items()):
554         if (float(u[1]) / u[0] < 0.6) and u[1] > 0:
555             print "Should clean segment", s
556             loc = backend.name_to_loc(s)
557             if s: inode_map.obsolete_segments.add(loc)
558
559     # TODO: We probably also want heuristics that will find inodes with
560     # badly-fragmented data and rewrite that to achieve better locality.
561
562     # Given that list of segments to clean, scan through those segments to find
563     # data which is still live and mark relevant inodes as needing to be
564     # rewritten.
565     if repack_inodes:
566         dirty_inodes = set(inode_map.inodes)
567     else:
568         dirty_inodes = set()
569     dirty_inode_data = set()
570     for s in inode_map.obsolete_segments:
571         filename = backend.loc_to_name(s)
572         print "Scanning", filename, "for live data"
573         for item in parse_log(backend.read(filename), filename):
574             if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
575                 if item.inum != 0:
576                     inode = inode_map.inodes[item.inum]
577                     if s == inode.location[0:2]:
578                         dirty_inodes.add(item.inum)
579                     if item.inum not in dirty_inode_data:
580                         for b in inode.links:
581                             if s == b[1][0:2]:
582                                 dirty_inode_data.add(item.inum)
583                                 break
584
585     print "Inodes to rewrite:", dirty_inodes
586     print "Inodes with data to rewrite:", dirty_inode_data
587     for i in sorted(dirty_inodes.union(dirty_inode_data)):
588         rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
589
590 if __name__ == '__main__':
591     start_time = time.time()
592     backend = S3Backend("mvrable-bluesky-west", cachedir="/tmp/bluesky-cache")
593     #backend = FileBackend(".")
594     chkpt = load_checkpoint_record(backend)
595     print backend.list()
596     imap = InodeMap()
597     imap.build(backend, chkpt)
598     print chkpt
599
600     log_dir = LogDirectory(backend, 1)
601     run_cleaner(backend, imap, log_dir)
602     print "Version vector:", imap.version_vector
603     imap.write(backend, log_dir)
604     log_dir.close_all()
605     end_time = time.time()
606     print "Cleaner running time:", end_time - start_time
607     backend.dump_stats()