Updates to the Python cleaner prototype.
[bluesky.git] / cleaner / cleaner
1 #!/usr/bin/env python
2 #
3 # A simple cleaner for the BlueSky cloud file system.  At the moment this is an
4 # offline cleaner--the file system must not be in use at the time that the
5 # cleaning is performed.  Later, it will be extended to be an online/concurrent
6 # cleaner, where cleaning is performed even while the file system is mounted.
7 #
8 # Copyright (C) 2010  The Regents of the University of California
9 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
10
11 import base64, os, re, struct, sys
12 import boto
13
14 # The BlueSky 'struct cloudlog_header' data type.
15 HEADER_FORMAT = '<4sb16sQIII'
16 HEADER_MAGIC = 'AgI-'
17 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
18
19 class ITEM_TYPE:
20     DATA = '1'
21     INODE = '2'
22     INODE_MAP = '3'
23     CHECKPOINT = '4'
24
25 class FileBackend:
26     """An interface to BlueSky where the log segments are on local disk.
27
28     This is mainly intended for testing purposes, as the real cleaner would
29     operate where data is being stored in S3."""
30
31     def __init__(self, path):
32         self.path = path
33
34     def list(self):
35         """Return a listing of all log segments and their sizes."""
36
37         files = [f for f in os.listdir(self.path) if f.startswith('log-')]
38         files.sort()
39
40         return [(f, os.stat(os.path.join(self.path, f)).st_size)
41                 for f in files]
42
43     def read(self, filename):
44         fp = open(os.path.join(self.path, filename), 'rb')
45         return fp.read()
46
47     def write(self, filename, data):
48         fp = open(os.path.join(self.path, filename), 'wb')
49         fp.write(data)
50         fp.close()
51
52 class LogItem:
53     """In-memory representation of a single item stored in a log file."""
54
55     def __str__(self):
56         return "<Item ty=%s location=%s size=%d id=%s...>" % (self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
57
58     @staticmethod
59     def random_id():
60         return open('/dev/urandom').read(16)
61
62     def serialize(self):
63         link_ids = []
64         link_locs = []
65         for (i, l) in self.links:
66             link_ids.append(i)
67             if i != '\0' * 16:
68                 link_locs.append(struct.pack('<IIII', *l))
69         link_ids = ''.join(link_ids)
70         link_locs = ''.join(link_locs)
71
72         header = struct.pack(HEADER_FORMAT,
73                              HEADER_MAGIC, ord(self.type), self.id, self.inum,
74                              len(self.data), len(link_ids), len(link_locs))
75         return header + self.data + link_ids + link_locs
76
77 class LogSegment:
78     def __init__(self, backend, location):
79         self.backend = backend
80         self.location = location
81         self.data = []
82
83     def __len__(self):
84         return sum(len(s) for s in self.data)
85
86     def write(self, item):
87         data = item.serialize()
88         offset = len(self)
89         self.data.append(data)
90         item.location = self.location + (offset, len(data))
91
92     def close(self):
93         data = ''.join(self.data)
94         filename = "log-%08d-%08d" % (self.location)
95         print "Would write %d bytes of data to %s" % (len(data), filename)
96         self.backend.write(filename, data)
97
98 class LogDirectory:
99     TARGET_SIZE = 4 << 20
100
101     def __init__(self, backend, dir, seq):
102         self.backend = backend
103         self.dir_num = dir
104         self.seq_num = seq
105         self.groups = {}
106
107     def open_segment(self):
108         seg = LogSegment(self.backend, (self.dir_num, self.seq_num))
109         self.seq_num += 1
110         return seg
111
112     def write(self, item, segment_group=0):
113         if segment_group not in self.groups:
114             self.groups[segment_group] = self.open_segment()
115         seg = self.groups[segment_group]
116         seg.write(item)
117         if len(seg) >= LogDirectory.TARGET_SIZE:
118             seg.close()
119             del self.groups[segment_group]
120
121     def close_all(self):
122         for k in list(self.groups.keys()):
123             self.groups[k].close()
124             del self.groups[k]
125
126 class UtilizationTracker:
127     """A simple object that tracks what fraction of each segment is used.
128
129     This data can be used to guide segment cleaning decisions."""
130
131     def __init__(self, backend):
132         self.segments = {}
133         for (segment, size) in backend.list():
134             self.segments[segment] = [size, 0]
135
136     def add_item(self, item):
137         if isinstance(item, LogItem):
138             item = item.location
139         if item is None: return
140         (dir, seq, offset, size) = item
141         filename = "log-%08d-%08d" % (dir, seq)
142         self.segments[filename][1] += size
143
144 def parse_item(data):
145     if len(data) < HEADER_SIZE: return
146     header = struct.unpack_from(HEADER_FORMAT, data, 0)
147     size = HEADER_SIZE + sum(header[4:7])
148
149     if header[0] != HEADER_MAGIC:
150         print "Bad header magic!"
151         return
152
153     if len(data) != size:
154         print "Item size does not match!"
155         return
156
157     item = LogItem()
158     item.id = header[2]
159     item.inum = header[3]
160     item.location = None
161     item.type = chr(header[1])
162     item.size = size
163     item.data = data[HEADER_SIZE : HEADER_SIZE + header[4]]
164     links = []
165     link_ids = data[HEADER_SIZE + header[4]
166                     : HEADER_SIZE + header[4] + header[5]]
167     link_locs = data[HEADER_SIZE + header[4] + header[5]
168                      : HEADER_SIZE + sum(header[4:7])]
169     for i in range(len(link_ids) // 16):
170         id = link_ids[16*i : 16*i + 16]
171         if id == '\0' * 16:
172             loc = None
173         else:
174             loc = struct.unpack('<IIII', link_locs[0:16])
175             link_locs = link_locs[16:]
176         links.append((id, loc))
177     item.links = links
178     return item
179
180 def load_item(backend, location):
181     """Load the cloud item pointed at by the 4-tuple 'location'.
182
183     The elements of the tuple are (directory, sequence, offset, size)."""
184
185     filename = "log-%08d-%08d" % (location[0], location[1])
186     data = backend.read(filename)[location[2] : location[2] + location[3]]
187     item = parse_item(data)
188     item.location = location
189     return item
190
191 def parse_log(data, location=None):
192     """Parse contents of a log file, yielding a sequence of log items."""
193
194     if isinstance(location, str):
195         m = re.match(r"^log-(\d+)-(\d+)$", location)
196         if m:
197             location = (int(m.group(1)), int(m.group(2)))
198         else:
199             location = None
200
201     offset = 0
202     while len(data) - offset >= HEADER_SIZE:
203         header = struct.unpack_from(HEADER_FORMAT, data, offset)
204         size = HEADER_SIZE + sum(header[4:7])
205         if header[0] != HEADER_MAGIC:
206             print "Bad header magic!"
207             break
208         if size + offset > len(data):
209             print "Short data record at end of log: %s < %s" % (len(data) - offset, size)
210             break
211         item = parse_item(data[offset : offset + size])
212         if location is not None:
213             item.location = location + (offset, size)
214         if item is not None: yield item
215         offset += size
216
217 def load_checkpoint_record(backend):
218     for (log, size) in reversed(backend.list()):
219         for item in reversed(list(parse_log(backend.read(log), log))):
220             if item.type == ITEM_TYPE.CHECKPOINT:
221                 return item
222
223 class InodeMap:
224     def __init__(self):
225         pass
226
227     def build(self, backend, checkpoint_record):
228         """Reconstruct the inode map from the checkpoint record given.
229
230         This will also build up information about segment utilization."""
231
232         self.checkpoint_record = checkpoint_record
233
234         util = UtilizationTracker(backend)
235         util.add_item(checkpoint_record)
236         inodes = {}
237
238         print "Inode map:"
239         for i in range(len(checkpoint_record.data) // 16):
240             (start, end) = struct.unpack_from("<QQ", checkpoint_record.data, 16*i)
241             imap = load_item(backend, checkpoint_record.links[i][1])
242             util.add_item(imap)
243             print "[%d, %d]: %s" % (start, end, imap)
244             for j in range(len(imap.data) // 8):
245                 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
246                 inode = load_item(backend, imap.links[j][1])
247                 inodes[inum] = inode
248                 data_segments = set()
249                 util.add_item(inode)
250                 for i in inode.links:
251                     util.add_item(i[1])
252                     data_segments.add(i[1][0:2])
253                 print "  %d: %s (%d data segments)" % (inum, inode, len(data_segments))
254
255         print
256         print "Segment utilizations:"
257         for (s, u) in sorted(util.segments.items()):
258             #if u[1] > 0:
259             print "%s: %s %s" % (s, u, float(u[1]) / u[0])
260
261         self.inodes = inodes
262         self.util = util
263         self.updated_inodes = set()
264
265     def mark_updated(self, inum):
266         self.updated_inodes.add(inum)
267
268     def write(self, backend, log):
269         updated_inodes = sorted(self.updated_inodes, reverse=True)
270
271         new_checkpoint = LogItem()
272         new_checkpoint.id = LogItem.random_id()
273         new_checkpoint.inum = 0
274         new_checkpoint.type = ITEM_TYPE.CHECKPOINT
275         new_checkpoint.data = ""
276         new_checkpoint.links = []
277
278         for i in range(len(self.checkpoint_record.data) // 16):
279             (start, end) = struct.unpack_from("<QQ", self.checkpoint_record.data, 16*i)
280
281             new_checkpoint.data += self.checkpoint_record.data[16*i : 16*i + 16]
282
283             # Case 1: No inodes in this range of the old inode map have
284             # changed.  Simply emit a new pointer to the same inode map block.
285             # TODO: Add the ability to rewrite the inode map block if we choose
286             # to do so for cleaning, even if no inodes have changed.
287             if len(updated_inodes) == 0 or updated_inodes[-1] > end:
288                 new_checkpoint.links.append(self.checkpoint_record.links[i])
289                 continue
290
291             # Case 2: Some inodes have been updated.  Create a new inode map
292             # block, write it out, and point the new checkpoint at it.
293             inodes = [k for k in self.inodes if k >= start and k <= end]
294             inodes.sort()
295
296             block = LogItem()
297             block.id = LogItem.random_id()
298             block.inum = 0
299             block.type = ITEM_TYPE.INODE_MAP
300             block.links = []
301             block.data = ""
302             for j in inodes:
303                 block.data += struct.pack("<Q", j)
304                 block.links.append((self.inodes[j].id, self.inodes[j].location))
305             log.write(block, 2)
306
307             new_checkpoint.links.append((block.id, block.location))
308
309             while len(updated_inodes) > 0 and updated_inodes[-1] <= end:
310                 updated_inodes.pop()
311
312         log.write(new_checkpoint, 2)
313         self.checkpoint_record = new_checkpoint
314
315 def rewrite_inode(backend, inode_map, inum, log):
316     inode = inode_map.inodes[inum]
317     blocks = []
318     for l in inode.links:
319         data = load_item(backend, l[1])
320         blocks.append(data)
321         log.write(data, 0)
322     inode.links = [(b.id, b.location) for b in blocks]
323     log.write(inode, 1)
324     inode_map.mark_updated(inum)
325
326 if __name__ == '__main__':
327     backend = FileBackend(".")
328     chkpt = load_checkpoint_record(backend)
329     imap = InodeMap()
330     imap.build(backend, chkpt)
331     print chkpt
332
333     print repr(chkpt.serialize())
334
335     log_dir = LogDirectory(backend, 1, 0)
336     rewrite_inode(backend, imap, 147, log_dir)
337     imap.write(backend, log_dir)
338     log_dir.close_all()