Begin work on a segment cleaner prototype.
[bluesky.git] / cleaner / cleaner
1 #!/usr/bin/env python
2 #
3 # A simple cleaner for the BlueSky cloud file system.  At the moment this is an
4 # offline cleaner--the file system must not be in use at the time that the
5 # cleaning is performed.  Later, it will be extended to be an online/concurrent
6 # cleaner, where cleaning is performed even while the file system is mounted.
7 #
8 # Copyright (C) 2010  The Regents of the University of California
9 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
10
11 import base64, os, struct, sys
12 import boto
13
14 # The BlueSky 'struct cloudlog_header' data type.
15 HEADER_FORMAT = '<4sb16sQIII'
16 HEADER_MAGIC = 'AgI-'
17 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
18
19 class ITEM_TYPE:
20     DATA = '1'
21     INODE = '2'
22     INODE_MAP = '3'
23     CHECKPOINT = '4'
24
25 class FileBackend:
26     """An interface to BlueSky where the log segments are on local disk.
27
28     This is mainly intended for testing purposes, as the real cleaner would
29     operate where data is being stored in S3."""
30
31     def __init__(self, path):
32         self.path = path
33
34     def list(self):
35         """Return a listing of all log segments and their sizes."""
36
37         files = [f for f in os.listdir(self.path) if f.startswith('log-')]
38         files.sort()
39
40         return [(f, os.stat(os.path.join(self.path, f)).st_size)
41                 for f in files]
42
43     def read(self, filename):
44         fp = open(os.path.join(self.path, filename), 'rb')
45         return fp.read()
46
47 class LogItem:
48     """In-memory representation of a single item stored in a log file."""
49
50     def __str__(self):
51         return "<Item ty=%s location=%s size=%d id=%s...>" % (self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
52
53 class UtilizationTracker:
54     """A simple object that tracks what fraction of each segment is used.
55
56     This data can be used to guide segment cleaning decisions."""
57
58     def __init__(self, backend):
59         self.segments = {}
60         for (segment, size) in backend.list():
61             self.segments[segment] = [size, 0]
62
63     def add_item(self, item):
64         if isinstance(item, LogItem):
65             item = item.location
66         if item is None: return
67         (dir, seq, offset, size) = item
68         filename = "log-%08d-%08d" % (dir, seq)
69         self.segments[filename][1] += size
70
71 def parse_item(data):
72     if len(data) < HEADER_SIZE: return
73     header = struct.unpack_from(HEADER_FORMAT, data, 0)
74     size = HEADER_SIZE + sum(header[4:7])
75
76     if header[0] != HEADER_MAGIC:
77         print "Bad header magic!"
78         return
79
80     if len(data) != size:
81         print "Item size does not match!"
82         return
83
84     item = LogItem()
85     item.id = header[2]
86     item.location = None
87     item.type = chr(header[1])
88     item.size = size
89     item.data = data[HEADER_SIZE : HEADER_SIZE + header[4]]
90     links = []
91     link_ids = data[HEADER_SIZE + header[4]
92                     : HEADER_SIZE + header[4] + header[5]]
93     link_locs = data[HEADER_SIZE + header[4] + header[5]
94                      : HEADER_SIZE + sum(header[4:7])]
95     for i in range(len(link_ids) // 16):
96         id = link_ids[16*i : 16*i + 16]
97         if id == '\0' * 16:
98             loc = None
99         else:
100             loc = struct.unpack('<IIII', link_locs[0:16])
101             link_locs = link_locs[16:]
102         links.append((id, loc))
103     item.links = links
104     return item
105
106 def load_item(backend, location):
107     """Load the cloud item pointed at by the 4-tuple 'location'.
108
109     The elements of the tuple are (directory, sequence, offset, size)."""
110
111     filename = "log-%08d-%08d" % (location[0], location[1])
112     data = backend.read(filename)[location[2] : location[2] + location[3]]
113     item = parse_item(data)
114     item.location = location
115     return item
116
117 def parse_log(data, logname=None):
118     """Parse contents of a log file, yielding a sequence of log items."""
119
120     location = None
121     if logname is not None:
122         m = re.match(r"^log-(\d+)-(\d+)$", logname)
123         if m:
124             location = (int(m.group(1)), int(m.group(2)))
125
126     offset = 0
127     while len(data) - offset >= HEADER_SIZE:
128         header = struct.unpack_from(HEADER_FORMAT, data, offset)
129         size = HEADER_SIZE + sum(header[4:7])
130         if header[0] != HEADER_MAGIC:
131             print "Bad header magic!"
132             break
133         if size + offset > len(data):
134             print "Short data record at end of log: %s < %s" % (len(data) - offset, size)
135             break
136         item = parse_item(data[offset : offset + size])
137         if location is not None:
138             item.location = location + (offset, size)
139         if item is not None: yield item
140         offset += size
141
142 def load_checkpoint_record(backend):
143     for (log, size) in reversed(backend.list()):
144         for item in reversed(list(parse_log(backend.read(log)))):
145             if item.type == ITEM_TYPE.CHECKPOINT:
146                 return item
147
148 def build_inode_map(backend, checkpoint_record):
149     """Reconstruct the inode map, starting from the checkpoint record given.
150
151     This will also build up information about segment utilization."""
152
153     util = UtilizationTracker(backend)
154     util.add_item(checkpoint_record)
155
156     print "Inode map:"
157     for i in range(len(checkpoint_record.data) // 16):
158         (start, end) = struct.unpack_from("<QQ", checkpoint_record.data, 16*i)
159         imap = load_item(backend, checkpoint_record.links[i][1])
160         util.add_item(imap)
161         print "[%d, %d]: %s" % (start, end, imap)
162         for j in range(len(imap.data) // 8):
163             (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
164             inode = load_item(backend, imap.links[j][1])
165             data_segments = set()
166             util.add_item(inode)
167             for i in inode.links:
168                 util.add_item(i[1])
169                 data_segments.add(i[1][0:2])
170             print "  %d: %s (%d data segments)" % (inum, inode, len(data_segments))
171
172     print
173     print "Segment utilizations:"
174     for (s, u) in sorted(util.segments.items()):
175         print "%s: %s" % (s, u)
176
177 if __name__ == '__main__':
178     backend = FileBackend(".")
179     chkpt = load_checkpoint_record(backend)
180     build_inode_map(backend, chkpt)