Allow cleaner to delete unused log segments.
[bluesky.git] / cleaner / cleaner
1 #!/usr/bin/env python
2 #
3 # A simple cleaner for the BlueSky cloud file system.  At the moment this is an
4 # offline cleaner--the file system must not be in use at the time that the
5 # cleaning is performed.  Later, it will be extended to be an online/concurrent
6 # cleaner, where cleaning is performed even while the file system is mounted.
7 #
8 # Copyright (C) 2010  The Regents of the University of California
9 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
10
11 import base64, os, re, struct, sys
12 import boto
13
14 # The BlueSky 'struct cloudlog_header' data type.
15 HEADER_FORMAT = '<4sb16sQIII'
16 HEADER_MAGIC = 'AgI-'
17 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
18
19 class ITEM_TYPE:
20     DATA = '1'
21     INODE = '2'
22     INODE_MAP = '3'
23     CHECKPOINT = '4'
24
25 class FileBackend:
26     """An interface to BlueSky where the log segments are on local disk.
27
28     This is mainly intended for testing purposes, as the real cleaner would
29     operate where data is being stored in S3."""
30
31     def __init__(self, path):
32         self.path = path
33
34     def list(self):
35         """Return a listing of all log segments and their sizes."""
36
37         files = [f for f in os.listdir(self.path) if f.startswith('log-')]
38         files.sort()
39
40         return [(f, os.stat(os.path.join(self.path, f)).st_size)
41                 for f in files]
42
43     def read(self, filename):
44         fp = open(os.path.join(self.path, filename), 'rb')
45         return fp.read()
46
47     def write(self, filename, data):
48         fp = open(os.path.join(self.path, filename), 'wb')
49         fp.write(data)
50         fp.close()
51
52     def delete(self, filename):
53         os.unlink(os.path.join(self.path, filename))
54
55     def loc_to_name(self, location):
56         return "log-%08d-%08d" % (location)
57
58     def name_to_loc(self, name):
59         m = re.match(r"^log-(\d+)-(\d+)$", name)
60         if m: return (int(m.group(1)), int(m.group(2)))
61
62 class LogItem:
63     """In-memory representation of a single item stored in a log file."""
64
65     def __str__(self):
66         return "<Item ty=%s location=%s size=%d id=%s...>" % (self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
67
68     @staticmethod
69     def random_id():
70         return open('/dev/urandom').read(16)
71
72     def serialize(self):
73         link_ids = []
74         link_locs = []
75         for (i, l) in self.links:
76             link_ids.append(i)
77             if i != '\0' * 16:
78                 link_locs.append(struct.pack('<IIII', *l))
79         link_ids = ''.join(link_ids)
80         link_locs = ''.join(link_locs)
81
82         header = struct.pack(HEADER_FORMAT,
83                              HEADER_MAGIC, ord(self.type), self.id, self.inum,
84                              len(self.data), len(link_ids), len(link_locs))
85         return header + self.data + link_ids + link_locs
86
87 class LogSegment:
88     def __init__(self, backend, location):
89         self.backend = backend
90         self.location = location
91         self.data = []
92
93     def __len__(self):
94         return sum(len(s) for s in self.data)
95
96     def write(self, item):
97         data = item.serialize()
98         offset = len(self)
99         self.data.append(data)
100         item.location = self.location + (offset, len(data))
101
102     def close(self):
103         data = ''.join(self.data)
104         filename = self.backend.loc_to_name(self.location)
105         print "Would write %d bytes of data to %s" % (len(data), filename)
106         self.backend.write(filename, data)
107
108 class LogDirectory:
109     TARGET_SIZE = 4 << 20
110
111     def __init__(self, backend, dir):
112         self.backend = backend
113         self.dir_num = dir
114         self.seq_num = 0
115         for logname in backend.list():
116             loc = backend.name_to_loc(logname[0])
117             if loc is not None and loc[0] == dir:
118                 self.seq_num = max(self.seq_num, loc[1] + 1)
119         self.groups = {}
120         print "Starting sequence number is", self.seq_num
121
122     def open_segment(self):
123         seg = LogSegment(self.backend, (self.dir_num, self.seq_num))
124         self.seq_num += 1
125         return seg
126
127     def write(self, item, segment_group=0):
128         if segment_group not in self.groups:
129             self.groups[segment_group] = self.open_segment()
130         seg = self.groups[segment_group]
131         seg.write(item)
132         if len(seg) >= LogDirectory.TARGET_SIZE:
133             seg.close()
134             del self.groups[segment_group]
135
136     def close_all(self):
137         for k in list(self.groups.keys()):
138             self.groups[k].close()
139             del self.groups[k]
140
141 class UtilizationTracker:
142     """A simple object that tracks what fraction of each segment is used.
143
144     This data can be used to guide segment cleaning decisions."""
145
146     def __init__(self, backend):
147         self.segments = {}
148         for (segment, size) in backend.list():
149             self.segments[segment] = [size, 0]
150
151     def add_item(self, item):
152         if isinstance(item, LogItem):
153             item = item.location
154         if item is None: return
155         (dir, seq, offset, size) = item
156         filename = "log-%08d-%08d" % (dir, seq)
157         self.segments[filename][1] += size
158
159 def parse_item(data):
160     if len(data) < HEADER_SIZE: return
161     header = struct.unpack_from(HEADER_FORMAT, data, 0)
162     size = HEADER_SIZE + sum(header[4:7])
163
164     if header[0] != HEADER_MAGIC:
165         print "Bad header magic!"
166         return
167
168     if len(data) != size:
169         print "Item size does not match!"
170         return
171
172     item = LogItem()
173     item.id = header[2]
174     item.inum = header[3]
175     item.location = None
176     item.type = chr(header[1])
177     item.size = size
178     item.data = data[HEADER_SIZE : HEADER_SIZE + header[4]]
179     links = []
180     link_ids = data[HEADER_SIZE + header[4]
181                     : HEADER_SIZE + header[4] + header[5]]
182     link_locs = data[HEADER_SIZE + header[4] + header[5]
183                      : HEADER_SIZE + sum(header[4:7])]
184     for i in range(len(link_ids) // 16):
185         id = link_ids[16*i : 16*i + 16]
186         if id == '\0' * 16:
187             loc = None
188         else:
189             loc = struct.unpack('<IIII', link_locs[0:16])
190             link_locs = link_locs[16:]
191         links.append((id, loc))
192     item.links = links
193     return item
194
195 def load_item(backend, location):
196     """Load the cloud item pointed at by the 4-tuple 'location'.
197
198     The elements of the tuple are (directory, sequence, offset, size)."""
199
200     filename = backend.loc_to_name((location[0], location[1]))
201     data = backend.read(filename)[location[2] : location[2] + location[3]]
202     item = parse_item(data)
203     item.location = location
204     return item
205
206 def parse_log(data, location=None):
207     """Parse contents of a log file, yielding a sequence of log items."""
208
209     if isinstance(location, str):
210         m = re.match(r"^log-(\d+)-(\d+)$", location)
211         if m:
212             location = (int(m.group(1)), int(m.group(2)))
213         else:
214             location = None
215
216     offset = 0
217     while len(data) - offset >= HEADER_SIZE:
218         header = struct.unpack_from(HEADER_FORMAT, data, offset)
219         size = HEADER_SIZE + sum(header[4:7])
220         if header[0] != HEADER_MAGIC:
221             print "Bad header magic!"
222             break
223         if size + offset > len(data):
224             print "Short data record at end of log: %s < %s" % (len(data) - offset, size)
225             break
226         item = parse_item(data[offset : offset + size])
227         if location is not None:
228             item.location = location + (offset, size)
229         if item is not None: yield item
230         offset += size
231
232 def load_checkpoint_record(backend):
233     for (log, size) in reversed(backend.list()):
234         for item in reversed(list(parse_log(backend.read(log), log))):
235             if item.type == ITEM_TYPE.CHECKPOINT:
236                 return item
237
238 class InodeMap:
239     def __init__(self):
240         pass
241
242     def build(self, backend, checkpoint_record):
243         """Reconstruct the inode map from the checkpoint record given.
244
245         This will also build up information about segment utilization."""
246
247         self.checkpoint_record = checkpoint_record
248
249         util = UtilizationTracker(backend)
250         util.add_item(checkpoint_record)
251         inodes = {}
252         self.obsolete_segments = set()
253
254         print "Inode map:"
255         for i in range(len(checkpoint_record.data) // 16):
256             (start, end) = struct.unpack_from("<QQ", checkpoint_record.data, 16*i)
257             imap = load_item(backend, checkpoint_record.links[i][1])
258             util.add_item(imap)
259             print "[%d, %d]: %s" % (start, end, imap)
260             for j in range(len(imap.data) // 8):
261                 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
262                 inode = load_item(backend, imap.links[j][1])
263                 inodes[inum] = inode
264                 data_segments = set()
265                 util.add_item(inode)
266                 for i in inode.links:
267                     util.add_item(i[1])
268                     data_segments.add(i[1][0:2])
269                 print "  %d: %s (%d data segments)" % (inum, inode, len(data_segments))
270
271         print
272         print "Segment utilizations:"
273         for (s, u) in sorted(util.segments.items()):
274             print "%s: %s %s" % (s, u, float(u[1]) / u[0])
275             if u[1] == 0:
276                 print "Deleting..."
277                 backend.delete(s)
278
279         self.inodes = inodes
280         self.util = util
281         self.updated_inodes = set()
282
283     def mark_updated(self, inum):
284         self.updated_inodes.add(inum)
285
286     def write(self, backend, log):
287         updated_inodes = sorted(self.updated_inodes, reverse=True)
288
289         new_checkpoint = LogItem()
290         new_checkpoint.id = LogItem.random_id()
291         new_checkpoint.inum = 0
292         new_checkpoint.type = ITEM_TYPE.CHECKPOINT
293         new_checkpoint.data = ""
294         new_checkpoint.links = []
295
296         for i in range(len(self.checkpoint_record.data) // 16):
297             (start, end) = struct.unpack_from("<QQ", self.checkpoint_record.data, 16*i)
298
299             new_checkpoint.data += self.checkpoint_record.data[16*i : 16*i + 16]
300
301             # Case 1: No inodes in this range of the old inode map have
302             # changed.  Simply emit a new pointer to the same inode map block.
303             if len(updated_inodes) == 0 or updated_inodes[-1] > end:
304                 old_location = self.checkpoint_record.links[i][1][0:2]
305                 if old_location not in self.obsolete_segments:
306                     new_checkpoint.links.append(self.checkpoint_record.links[i])
307                     continue
308
309             # Case 2: Some inodes have been updated.  Create a new inode map
310             # block, write it out, and point the new checkpoint at it.
311             inodes = [k for k in self.inodes if k >= start and k <= end]
312             inodes.sort()
313
314             block = LogItem()
315             block.id = LogItem.random_id()
316             block.inum = 0
317             block.type = ITEM_TYPE.INODE_MAP
318             block.links = []
319             block.data = ""
320             for j in inodes:
321                 block.data += struct.pack("<Q", j)
322                 block.links.append((self.inodes[j].id, self.inodes[j].location))
323             log.write(block, 2)
324
325             new_checkpoint.links.append((block.id, block.location))
326
327             while len(updated_inodes) > 0 and updated_inodes[-1] <= end:
328                 updated_inodes.pop()
329
330         log.write(new_checkpoint, 2)
331         self.checkpoint_record = new_checkpoint
332
333 def rewrite_inode(backend, inode_map, inum, log, copy_data=True):
334     inode = inode_map.inodes[inum]
335     if copy_data:
336         blocks = []
337         for l in inode.links:
338             data = load_item(backend, l[1])
339             blocks.append(data)
340             log.write(data, 0)
341         inode.links = [(b.id, b.location) for b in blocks]
342     log.write(inode, 1)
343     inode_map.mark_updated(inum)
344
345 def run_cleaner(backend, inode_map, log):
346     # Determine which segments are poorly utilized and should be cleaned.  We
347     # need better heuristics here.
348     for (s, u) in sorted(inode_map.util.segments.items()):
349         if float(u[1]) / u[0] < 0.6 and u[1] > 0:
350             print "Should clean segment", s
351             loc = backend.name_to_loc(s)
352             if s: inode_map.obsolete_segments.add(loc)
353
354     # TODO: We probably also want heuristics that will find inodes with
355     # badly-fragmented data and rewrite that to achieve better locality.
356
357     # Given that list of segments to clean, scan through those segments to find
358     # data which is still live and mark relevant inodes as needing to be
359     # rewritten.
360     dirty_inodes = set()
361     dirty_inode_data = set()
362     for s in inode_map.obsolete_segments:
363         filename = backend.loc_to_name(s)
364         print "Scanning", filename, "for live data"
365         for item in parse_log(backend.read(filename), filename):
366             if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
367                 if item.inum != 0:
368                     inode = inode_map.inodes[item.inum]
369                     if s == inode.location[0:2]:
370                         dirty_inodes.add(item.inum)
371                     if item.inum not in dirty_inode_data:
372                         for b in inode.links:
373                             if s == b[1][0:2]:
374                                 dirty_inode_data.add(item.inum)
375                                 break
376
377     print "Inodes to rewrite:", dirty_inodes
378     print "Inodes with data to rewrite:", dirty_inode_data
379     for i in sorted(dirty_inodes.union(dirty_inode_data)):
380         rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
381
382 if __name__ == '__main__':
383     backend = FileBackend(".")
384     chkpt = load_checkpoint_record(backend)
385     imap = InodeMap()
386     imap.build(backend, chkpt)
387     print chkpt
388
389     log_dir = LogDirectory(backend, 0)
390     run_cleaner(backend, imap, log_dir)
391     imap.write(backend, log_dir)
392     log_dir.close_all()