Extend cleaner with a simple policy for choosing segments to clean.
[bluesky.git] / cleaner / cleaner
1 #!/usr/bin/env python
2 #
3 # A simple cleaner for the BlueSky cloud file system.  At the moment this is an
4 # offline cleaner--the file system must not be in use at the time that the
5 # cleaning is performed.  Later, it will be extended to be an online/concurrent
6 # cleaner, where cleaning is performed even while the file system is mounted.
7 #
8 # Copyright (C) 2010  The Regents of the University of California
9 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
10
11 import base64, os, re, struct, sys
12 import boto
13
14 # The BlueSky 'struct cloudlog_header' data type.
15 HEADER_FORMAT = '<4sb16sQIII'
16 HEADER_MAGIC = 'AgI-'
17 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
18
19 class ITEM_TYPE:
20     DATA = '1'
21     INODE = '2'
22     INODE_MAP = '3'
23     CHECKPOINT = '4'
24
25 class FileBackend:
26     """An interface to BlueSky where the log segments are on local disk.
27
28     This is mainly intended for testing purposes, as the real cleaner would
29     operate where data is being stored in S3."""
30
31     def __init__(self, path):
32         self.path = path
33
34     def list(self):
35         """Return a listing of all log segments and their sizes."""
36
37         files = [f for f in os.listdir(self.path) if f.startswith('log-')]
38         files.sort()
39
40         return [(f, os.stat(os.path.join(self.path, f)).st_size)
41                 for f in files]
42
43     def read(self, filename):
44         fp = open(os.path.join(self.path, filename), 'rb')
45         return fp.read()
46
47     def write(self, filename, data):
48         fp = open(os.path.join(self.path, filename), 'wb')
49         fp.write(data)
50         fp.close()
51
52 class LogItem:
53     """In-memory representation of a single item stored in a log file."""
54
55     def __str__(self):
56         return "<Item ty=%s location=%s size=%d id=%s...>" % (self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
57
58     @staticmethod
59     def random_id():
60         return open('/dev/urandom').read(16)
61
62     def serialize(self):
63         link_ids = []
64         link_locs = []
65         for (i, l) in self.links:
66             link_ids.append(i)
67             if i != '\0' * 16:
68                 link_locs.append(struct.pack('<IIII', *l))
69         link_ids = ''.join(link_ids)
70         link_locs = ''.join(link_locs)
71
72         header = struct.pack(HEADER_FORMAT,
73                              HEADER_MAGIC, ord(self.type), self.id, self.inum,
74                              len(self.data), len(link_ids), len(link_locs))
75         return header + self.data + link_ids + link_locs
76
77 class LogSegment:
78     def __init__(self, backend, location):
79         self.backend = backend
80         self.location = location
81         self.data = []
82
83     def __len__(self):
84         return sum(len(s) for s in self.data)
85
86     def write(self, item):
87         data = item.serialize()
88         offset = len(self)
89         self.data.append(data)
90         item.location = self.location + (offset, len(data))
91
92     def close(self):
93         data = ''.join(self.data)
94         filename = "log-%08d-%08d" % (self.location)
95         print "Would write %d bytes of data to %s" % (len(data), filename)
96         self.backend.write(filename, data)
97
98 class LogDirectory:
99     TARGET_SIZE = 4 << 20
100
101     def __init__(self, backend, dir, seq):
102         self.backend = backend
103         self.dir_num = dir
104         self.seq_num = seq
105         self.groups = {}
106
107     def open_segment(self):
108         seg = LogSegment(self.backend, (self.dir_num, self.seq_num))
109         self.seq_num += 1
110         return seg
111
112     def write(self, item, segment_group=0):
113         if segment_group not in self.groups:
114             self.groups[segment_group] = self.open_segment()
115         seg = self.groups[segment_group]
116         seg.write(item)
117         if len(seg) >= LogDirectory.TARGET_SIZE:
118             seg.close()
119             del self.groups[segment_group]
120
121     def close_all(self):
122         for k in list(self.groups.keys()):
123             self.groups[k].close()
124             del self.groups[k]
125
126 class UtilizationTracker:
127     """A simple object that tracks what fraction of each segment is used.
128
129     This data can be used to guide segment cleaning decisions."""
130
131     def __init__(self, backend):
132         self.segments = {}
133         for (segment, size) in backend.list():
134             self.segments[segment] = [size, 0]
135
136     def add_item(self, item):
137         if isinstance(item, LogItem):
138             item = item.location
139         if item is None: return
140         (dir, seq, offset, size) = item
141         filename = "log-%08d-%08d" % (dir, seq)
142         self.segments[filename][1] += size
143
144 def parse_item(data):
145     if len(data) < HEADER_SIZE: return
146     header = struct.unpack_from(HEADER_FORMAT, data, 0)
147     size = HEADER_SIZE + sum(header[4:7])
148
149     if header[0] != HEADER_MAGIC:
150         print "Bad header magic!"
151         return
152
153     if len(data) != size:
154         print "Item size does not match!"
155         return
156
157     item = LogItem()
158     item.id = header[2]
159     item.inum = header[3]
160     item.location = None
161     item.type = chr(header[1])
162     item.size = size
163     item.data = data[HEADER_SIZE : HEADER_SIZE + header[4]]
164     links = []
165     link_ids = data[HEADER_SIZE + header[4]
166                     : HEADER_SIZE + header[4] + header[5]]
167     link_locs = data[HEADER_SIZE + header[4] + header[5]
168                      : HEADER_SIZE + sum(header[4:7])]
169     for i in range(len(link_ids) // 16):
170         id = link_ids[16*i : 16*i + 16]
171         if id == '\0' * 16:
172             loc = None
173         else:
174             loc = struct.unpack('<IIII', link_locs[0:16])
175             link_locs = link_locs[16:]
176         links.append((id, loc))
177     item.links = links
178     return item
179
180 def load_item(backend, location):
181     """Load the cloud item pointed at by the 4-tuple 'location'.
182
183     The elements of the tuple are (directory, sequence, offset, size)."""
184
185     filename = "log-%08d-%08d" % (location[0], location[1])
186     data = backend.read(filename)[location[2] : location[2] + location[3]]
187     item = parse_item(data)
188     item.location = location
189     return item
190
191 def parse_log(data, location=None):
192     """Parse contents of a log file, yielding a sequence of log items."""
193
194     if isinstance(location, str):
195         m = re.match(r"^log-(\d+)-(\d+)$", location)
196         if m:
197             location = (int(m.group(1)), int(m.group(2)))
198         else:
199             location = None
200
201     offset = 0
202     while len(data) - offset >= HEADER_SIZE:
203         header = struct.unpack_from(HEADER_FORMAT, data, offset)
204         size = HEADER_SIZE + sum(header[4:7])
205         if header[0] != HEADER_MAGIC:
206             print "Bad header magic!"
207             break
208         if size + offset > len(data):
209             print "Short data record at end of log: %s < %s" % (len(data) - offset, size)
210             break
211         item = parse_item(data[offset : offset + size])
212         if location is not None:
213             item.location = location + (offset, size)
214         if item is not None: yield item
215         offset += size
216
217 def load_checkpoint_record(backend):
218     for (log, size) in reversed(backend.list()):
219         for item in reversed(list(parse_log(backend.read(log), log))):
220             if item.type == ITEM_TYPE.CHECKPOINT:
221                 return item
222
223 class InodeMap:
224     def __init__(self):
225         pass
226
227     def build(self, backend, checkpoint_record):
228         """Reconstruct the inode map from the checkpoint record given.
229
230         This will also build up information about segment utilization."""
231
232         self.checkpoint_record = checkpoint_record
233
234         util = UtilizationTracker(backend)
235         util.add_item(checkpoint_record)
236         inodes = {}
237         self.obsolete_segments = set()
238
239         print "Inode map:"
240         for i in range(len(checkpoint_record.data) // 16):
241             (start, end) = struct.unpack_from("<QQ", checkpoint_record.data, 16*i)
242             imap = load_item(backend, checkpoint_record.links[i][1])
243             util.add_item(imap)
244             print "[%d, %d]: %s" % (start, end, imap)
245             for j in range(len(imap.data) // 8):
246                 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
247                 inode = load_item(backend, imap.links[j][1])
248                 inodes[inum] = inode
249                 data_segments = set()
250                 util.add_item(inode)
251                 for i in inode.links:
252                     util.add_item(i[1])
253                     data_segments.add(i[1][0:2])
254                 print "  %d: %s (%d data segments)" % (inum, inode, len(data_segments))
255
256         print
257         print "Segment utilizations:"
258         for (s, u) in sorted(util.segments.items()):
259             #if u[1] > 0:
260             print "%s: %s %s" % (s, u, float(u[1]) / u[0])
261
262         self.inodes = inodes
263         self.util = util
264         self.updated_inodes = set()
265
266     def mark_updated(self, inum):
267         self.updated_inodes.add(inum)
268
269     def write(self, backend, log):
270         updated_inodes = sorted(self.updated_inodes, reverse=True)
271
272         new_checkpoint = LogItem()
273         new_checkpoint.id = LogItem.random_id()
274         new_checkpoint.inum = 0
275         new_checkpoint.type = ITEM_TYPE.CHECKPOINT
276         new_checkpoint.data = ""
277         new_checkpoint.links = []
278
279         for i in range(len(self.checkpoint_record.data) // 16):
280             (start, end) = struct.unpack_from("<QQ", self.checkpoint_record.data, 16*i)
281
282             new_checkpoint.data += self.checkpoint_record.data[16*i : 16*i + 16]
283
284             # Case 1: No inodes in this range of the old inode map have
285             # changed.  Simply emit a new pointer to the same inode map block.
286             if len(updated_inodes) == 0 or updated_inodes[-1] > end:
287                 old_location = self.checkpoint_record.links[i][1][0:2]
288                 if old_location not in self.obsolete_segments:
289                     new_checkpoint.links.append(self.checkpoint_record.links[i])
290                     continue
291
292             # Case 2: Some inodes have been updated.  Create a new inode map
293             # block, write it out, and point the new checkpoint at it.
294             inodes = [k for k in self.inodes if k >= start and k <= end]
295             inodes.sort()
296
297             block = LogItem()
298             block.id = LogItem.random_id()
299             block.inum = 0
300             block.type = ITEM_TYPE.INODE_MAP
301             block.links = []
302             block.data = ""
303             for j in inodes:
304                 block.data += struct.pack("<Q", j)
305                 block.links.append((self.inodes[j].id, self.inodes[j].location))
306             log.write(block, 2)
307
308             new_checkpoint.links.append((block.id, block.location))
309
310             while len(updated_inodes) > 0 and updated_inodes[-1] <= end:
311                 updated_inodes.pop()
312
313         log.write(new_checkpoint, 2)
314         self.checkpoint_record = new_checkpoint
315
316 def rewrite_inode(backend, inode_map, inum, log):
317     inode = inode_map.inodes[inum]
318     blocks = []
319     for l in inode.links:
320         data = load_item(backend, l[1])
321         blocks.append(data)
322         log.write(data, 0)
323     inode.links = [(b.id, b.location) for b in blocks]
324     log.write(inode, 1)
325     inode_map.mark_updated(inum)
326
327 def run_cleaner(backend, inode_map, log):
328     # Determine which segments are poorly utilized and should be cleaned.  We
329     # need better heuristics here.
330     for (s, u) in sorted(inode_map.util.segments.items()):
331         if float(u[1]) / u[0] < 0.99 and u[1] > 0:
332             print "Should clean segment", s
333             m = re.match(r"^log-(\d+)-(\d+)$", s)
334             if m: inode_map.obsolete_segments.add((int(m.group(1)), int(m.group(2))))
335
336     # Given that list of segments to clean, scan through those segments to find
337     # data which is still live and mark relevant inodes as needing to be
338     # rewritten.
339     dirty_inodes = set()
340     for s in inode_map.obsolete_segments:
341         filename = "log-%08d-%08d" % s
342         print "Scanning", filename, "for live data"
343         for item in parse_log(backend.read(filename), filename):
344             if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
345                 if item.inum != 0:
346                     dirty_inodes.add(item.inum)
347
348     print "Inodes to rewrite:", dirty_inodes
349     for i in sorted(dirty_inodes):
350         rewrite_inode(backend, inode_map, i, log)
351
352 if __name__ == '__main__':
353     backend = FileBackend(".")
354     chkpt = load_checkpoint_record(backend)
355     imap = InodeMap()
356     imap.build(backend, chkpt)
357     print chkpt
358
359     log_dir = LogDirectory(backend, 1, 0)
360     run_cleaner(backend, imap, log_dir)
361     imap.write(backend, log_dir)
362     log_dir.close_all()