Improve cleaner: make sure new logs are written after existing ones.
[bluesky.git] / cleaner / cleaner
1 #!/usr/bin/env python
2 #
3 # A simple cleaner for the BlueSky cloud file system.  At the moment this is an
4 # offline cleaner--the file system must not be in use at the time that the
5 # cleaning is performed.  Later, it will be extended to be an online/concurrent
6 # cleaner, where cleaning is performed even while the file system is mounted.
7 #
8 # Copyright (C) 2010  The Regents of the University of California
9 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
10
11 import base64, os, re, struct, sys
12 import boto
13
14 # The BlueSky 'struct cloudlog_header' data type.
15 HEADER_FORMAT = '<4sb16sQIII'
16 HEADER_MAGIC = 'AgI-'
17 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
18
19 class ITEM_TYPE:
20     DATA = '1'
21     INODE = '2'
22     INODE_MAP = '3'
23     CHECKPOINT = '4'
24
25 class FileBackend:
26     """An interface to BlueSky where the log segments are on local disk.
27
28     This is mainly intended for testing purposes, as the real cleaner would
29     operate where data is being stored in S3."""
30
31     def __init__(self, path):
32         self.path = path
33
34     def list(self):
35         """Return a listing of all log segments and their sizes."""
36
37         files = [f for f in os.listdir(self.path) if f.startswith('log-')]
38         files.sort()
39
40         return [(f, os.stat(os.path.join(self.path, f)).st_size)
41                 for f in files]
42
43     def read(self, filename):
44         fp = open(os.path.join(self.path, filename), 'rb')
45         return fp.read()
46
47     def write(self, filename, data):
48         fp = open(os.path.join(self.path, filename), 'wb')
49         fp.write(data)
50         fp.close()
51
52     def loc_to_name(self, location):
53         return "log-%08d-%08d" % (location)
54
55     def name_to_loc(self, name):
56         m = re.match(r"^log-(\d+)-(\d+)$", name)
57         if m: return (int(m.group(1)), int(m.group(2)))
58
59 class LogItem:
60     """In-memory representation of a single item stored in a log file."""
61
62     def __str__(self):
63         return "<Item ty=%s location=%s size=%d id=%s...>" % (self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
64
65     @staticmethod
66     def random_id():
67         return open('/dev/urandom').read(16)
68
69     def serialize(self):
70         link_ids = []
71         link_locs = []
72         for (i, l) in self.links:
73             link_ids.append(i)
74             if i != '\0' * 16:
75                 link_locs.append(struct.pack('<IIII', *l))
76         link_ids = ''.join(link_ids)
77         link_locs = ''.join(link_locs)
78
79         header = struct.pack(HEADER_FORMAT,
80                              HEADER_MAGIC, ord(self.type), self.id, self.inum,
81                              len(self.data), len(link_ids), len(link_locs))
82         return header + self.data + link_ids + link_locs
83
84 class LogSegment:
85     def __init__(self, backend, location):
86         self.backend = backend
87         self.location = location
88         self.data = []
89
90     def __len__(self):
91         return sum(len(s) for s in self.data)
92
93     def write(self, item):
94         data = item.serialize()
95         offset = len(self)
96         self.data.append(data)
97         item.location = self.location + (offset, len(data))
98
99     def close(self):
100         data = ''.join(self.data)
101         filename = self.backend.loc_to_name(self.location)
102         print "Would write %d bytes of data to %s" % (len(data), filename)
103         self.backend.write(filename, data)
104
105 class LogDirectory:
106     TARGET_SIZE = 4 << 20
107
108     def __init__(self, backend, dir):
109         self.backend = backend
110         self.dir_num = dir
111         self.seq_num = 0
112         for logname in backend.list():
113             loc = backend.name_to_loc(logname[0])
114             if loc is not None and loc[0] == dir:
115                 self.seq_num = max(self.seq_num, loc[1] + 1)
116         self.groups = {}
117         print "Starting sequence number is", self.seq_num
118
119     def open_segment(self):
120         seg = LogSegment(self.backend, (self.dir_num, self.seq_num))
121         self.seq_num += 1
122         return seg
123
124     def write(self, item, segment_group=0):
125         if segment_group not in self.groups:
126             self.groups[segment_group] = self.open_segment()
127         seg = self.groups[segment_group]
128         seg.write(item)
129         if len(seg) >= LogDirectory.TARGET_SIZE:
130             seg.close()
131             del self.groups[segment_group]
132
133     def close_all(self):
134         for k in list(self.groups.keys()):
135             self.groups[k].close()
136             del self.groups[k]
137
138 class UtilizationTracker:
139     """A simple object that tracks what fraction of each segment is used.
140
141     This data can be used to guide segment cleaning decisions."""
142
143     def __init__(self, backend):
144         self.segments = {}
145         for (segment, size) in backend.list():
146             self.segments[segment] = [size, 0]
147
148     def add_item(self, item):
149         if isinstance(item, LogItem):
150             item = item.location
151         if item is None: return
152         (dir, seq, offset, size) = item
153         filename = "log-%08d-%08d" % (dir, seq)
154         self.segments[filename][1] += size
155
156 def parse_item(data):
157     if len(data) < HEADER_SIZE: return
158     header = struct.unpack_from(HEADER_FORMAT, data, 0)
159     size = HEADER_SIZE + sum(header[4:7])
160
161     if header[0] != HEADER_MAGIC:
162         print "Bad header magic!"
163         return
164
165     if len(data) != size:
166         print "Item size does not match!"
167         return
168
169     item = LogItem()
170     item.id = header[2]
171     item.inum = header[3]
172     item.location = None
173     item.type = chr(header[1])
174     item.size = size
175     item.data = data[HEADER_SIZE : HEADER_SIZE + header[4]]
176     links = []
177     link_ids = data[HEADER_SIZE + header[4]
178                     : HEADER_SIZE + header[4] + header[5]]
179     link_locs = data[HEADER_SIZE + header[4] + header[5]
180                      : HEADER_SIZE + sum(header[4:7])]
181     for i in range(len(link_ids) // 16):
182         id = link_ids[16*i : 16*i + 16]
183         if id == '\0' * 16:
184             loc = None
185         else:
186             loc = struct.unpack('<IIII', link_locs[0:16])
187             link_locs = link_locs[16:]
188         links.append((id, loc))
189     item.links = links
190     return item
191
192 def load_item(backend, location):
193     """Load the cloud item pointed at by the 4-tuple 'location'.
194
195     The elements of the tuple are (directory, sequence, offset, size)."""
196
197     filename = backend.loc_to_name((location[0], location[1]))
198     data = backend.read(filename)[location[2] : location[2] + location[3]]
199     item = parse_item(data)
200     item.location = location
201     return item
202
203 def parse_log(data, location=None):
204     """Parse contents of a log file, yielding a sequence of log items."""
205
206     if isinstance(location, str):
207         m = re.match(r"^log-(\d+)-(\d+)$", location)
208         if m:
209             location = (int(m.group(1)), int(m.group(2)))
210         else:
211             location = None
212
213     offset = 0
214     while len(data) - offset >= HEADER_SIZE:
215         header = struct.unpack_from(HEADER_FORMAT, data, offset)
216         size = HEADER_SIZE + sum(header[4:7])
217         if header[0] != HEADER_MAGIC:
218             print "Bad header magic!"
219             break
220         if size + offset > len(data):
221             print "Short data record at end of log: %s < %s" % (len(data) - offset, size)
222             break
223         item = parse_item(data[offset : offset + size])
224         if location is not None:
225             item.location = location + (offset, size)
226         if item is not None: yield item
227         offset += size
228
229 def load_checkpoint_record(backend):
230     for (log, size) in reversed(backend.list()):
231         for item in reversed(list(parse_log(backend.read(log), log))):
232             if item.type == ITEM_TYPE.CHECKPOINT:
233                 return item
234
235 class InodeMap:
236     def __init__(self):
237         pass
238
239     def build(self, backend, checkpoint_record):
240         """Reconstruct the inode map from the checkpoint record given.
241
242         This will also build up information about segment utilization."""
243
244         self.checkpoint_record = checkpoint_record
245
246         util = UtilizationTracker(backend)
247         util.add_item(checkpoint_record)
248         inodes = {}
249         self.obsolete_segments = set()
250
251         print "Inode map:"
252         for i in range(len(checkpoint_record.data) // 16):
253             (start, end) = struct.unpack_from("<QQ", checkpoint_record.data, 16*i)
254             imap = load_item(backend, checkpoint_record.links[i][1])
255             util.add_item(imap)
256             print "[%d, %d]: %s" % (start, end, imap)
257             for j in range(len(imap.data) // 8):
258                 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
259                 inode = load_item(backend, imap.links[j][1])
260                 inodes[inum] = inode
261                 data_segments = set()
262                 util.add_item(inode)
263                 for i in inode.links:
264                     util.add_item(i[1])
265                     data_segments.add(i[1][0:2])
266                 print "  %d: %s (%d data segments)" % (inum, inode, len(data_segments))
267
268         print
269         print "Segment utilizations:"
270         for (s, u) in sorted(util.segments.items()):
271             #if u[1] > 0:
272             print "%s: %s %s" % (s, u, float(u[1]) / u[0])
273
274         self.inodes = inodes
275         self.util = util
276         self.updated_inodes = set()
277
278     def mark_updated(self, inum):
279         self.updated_inodes.add(inum)
280
281     def write(self, backend, log):
282         updated_inodes = sorted(self.updated_inodes, reverse=True)
283
284         new_checkpoint = LogItem()
285         new_checkpoint.id = LogItem.random_id()
286         new_checkpoint.inum = 0
287         new_checkpoint.type = ITEM_TYPE.CHECKPOINT
288         new_checkpoint.data = ""
289         new_checkpoint.links = []
290
291         for i in range(len(self.checkpoint_record.data) // 16):
292             (start, end) = struct.unpack_from("<QQ", self.checkpoint_record.data, 16*i)
293
294             new_checkpoint.data += self.checkpoint_record.data[16*i : 16*i + 16]
295
296             # Case 1: No inodes in this range of the old inode map have
297             # changed.  Simply emit a new pointer to the same inode map block.
298             if len(updated_inodes) == 0 or updated_inodes[-1] > end:
299                 old_location = self.checkpoint_record.links[i][1][0:2]
300                 if old_location not in self.obsolete_segments:
301                     new_checkpoint.links.append(self.checkpoint_record.links[i])
302                     continue
303
304             # Case 2: Some inodes have been updated.  Create a new inode map
305             # block, write it out, and point the new checkpoint at it.
306             inodes = [k for k in self.inodes if k >= start and k <= end]
307             inodes.sort()
308
309             block = LogItem()
310             block.id = LogItem.random_id()
311             block.inum = 0
312             block.type = ITEM_TYPE.INODE_MAP
313             block.links = []
314             block.data = ""
315             for j in inodes:
316                 block.data += struct.pack("<Q", j)
317                 block.links.append((self.inodes[j].id, self.inodes[j].location))
318             log.write(block, 2)
319
320             new_checkpoint.links.append((block.id, block.location))
321
322             while len(updated_inodes) > 0 and updated_inodes[-1] <= end:
323                 updated_inodes.pop()
324
325         log.write(new_checkpoint, 2)
326         self.checkpoint_record = new_checkpoint
327
328 def rewrite_inode(backend, inode_map, inum, log):
329     inode = inode_map.inodes[inum]
330     blocks = []
331     for l in inode.links:
332         data = load_item(backend, l[1])
333         blocks.append(data)
334         log.write(data, 0)
335     inode.links = [(b.id, b.location) for b in blocks]
336     log.write(inode, 1)
337     inode_map.mark_updated(inum)
338
339 def run_cleaner(backend, inode_map, log):
340     # Determine which segments are poorly utilized and should be cleaned.  We
341     # need better heuristics here.
342     for (s, u) in sorted(inode_map.util.segments.items()):
343         if float(u[1]) / u[0] < 0.99 and u[1] > 0:
344             print "Should clean segment", s
345             loc = backend.name_to_loc(s)
346             if s: inode_map.obsolete_segments.add(loc)
347
348     # Given that list of segments to clean, scan through those segments to find
349     # data which is still live and mark relevant inodes as needing to be
350     # rewritten.
351     dirty_inodes = set()
352     for s in inode_map.obsolete_segments:
353         filename = backend.loc_to_name(s)
354         print "Scanning", filename, "for live data"
355         for item in parse_log(backend.read(filename), filename):
356             if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
357                 if item.inum != 0:
358                     dirty_inodes.add(item.inum)
359
360     print "Inodes to rewrite:", dirty_inodes
361     for i in sorted(dirty_inodes):
362         rewrite_inode(backend, inode_map, i, log)
363
364 if __name__ == '__main__':
365     backend = FileBackend(".")
366     chkpt = load_checkpoint_record(backend)
367     imap = InodeMap()
368     imap.build(backend, chkpt)
369     print chkpt
370
371     log_dir = LogDirectory(backend, 0)
372     run_cleaner(backend, imap, log_dir)
373     imap.write(backend, log_dir)
374     log_dir.close_all()