3 # A simple cleaner for the BlueSky cloud file system. At the moment this is an
4 # offline cleaner--the file system must not be in use at the time that the
5 # cleaning is performed. Later, it will be extended to be an online/concurrent
6 # cleaner, where cleaning is performed even while the file system is mounted.
8 # Copyright (C) 2010 The Regents of the University of California
9 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
11 import base64, os, re, struct, sys, time
13 from boto.s3.key import Key
15 # The BlueSky 'struct cloudlog_header' data type.
16 HEADER_FORMAT = '<4s48sb16sQIII'
17 HEADER_CRYPTBYTES = 48
18 HEADER_MAGIC1 = 'AgI-' # Unencrypted data
19 HEADER_MAGIC2 = 'AgI=' # Encrypted data
20 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
29 """An interface to BlueSky where the log segments are on local disk.
31 This is mainly intended for testing purposes, as the real cleaner would
32 operate where data is being stored in S3."""
34 def __init__(self, path):
38 """Return a listing of all log segments and their sizes."""
40 files = [f for f in os.listdir(self.path) if f.startswith('log-')]
43 return [(f, os.stat(os.path.join(self.path, f)).st_size)
46 def read(self, filename, offset=0, length=None):
47 fp = open(os.path.join(self.path, filename), 'rb')
53 return fp.read(length)
55 def write(self, filename, data):
56 fp = open(os.path.join(self.path, filename), 'wb')
60 def delete(self, filename):
61 os.unlink(os.path.join(self.path, filename))
63 def loc_to_name(self, location):
64 return "log-%08d-%08d" % (location)
66 def name_to_loc(self, name):
67 m = re.match(r"^log-(\d+)-(\d+)$", name)
68 if m: return (int(m.group(1)), int(m.group(2)))
70 def retry_wrap(method):
71 def wrapped(self, *args, **kwargs):
72 for retries in range(3):
74 return method(self, *args, **kwargs)
76 print >>sys.stderr, "S3 operation failed, retrying..."
79 return method(self, *args, **kwargs)
83 """An interface to BlueSky where the log segments are on in Amazon S3."""
85 def __init__(self, bucket, path='', cachedir="."):
86 self.bucket_name = bucket
88 self.cachedir = cachedir
93 self.conn = boto.connect_s3(is_secure=False)
94 self.bucket = self.conn.get_bucket(self.bucket_name)
98 for k in self.bucket.list(self.path + 'log-'):
99 files.append((k.key, k.size))
103 def read(self, filename, offset=0, length=None):
104 if filename in self.cache:
105 fp = open(os.path.join(self.cachedir, filename), 'rb')
111 return fp.read(length)
114 k.key = self.path + filename
115 data = k.get_contents_as_string()
116 fp = open(os.path.join(self.cachedir, filename), 'wb')
119 self.cache[filename] = True
122 if length is not None:
123 data = data[0:length]
127 def write(self, filename, data):
129 k.key = self.path + filename
130 k.set_contents_from_string(data)
131 if filename in self.cache:
132 del self.cache[filename]
135 def delete(self, filename):
137 k.key = self.path + filename
139 if filename in self.cache:
140 del self.cache[filename]
142 def loc_to_name(self, location):
143 return "log-%08d-%08d" % (location)
145 def name_to_loc(self, name):
146 m = re.match(r"^log-(\d+)-(\d+)$", name)
147 if m: return (int(m.group(1)), int(m.group(2)))
150 """In-memory representation of a single item stored in a log file."""
153 self.cryptkeys = '\0' * HEADER_CRYPTBYTES
154 self.encrypted = False
157 return "<Item%s ty=%s location=%s size=%d id=%s...>" % (self.encrypted and '$' or '', self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
161 return open('/dev/urandom').read(16)
166 for (i, l) in self.links:
169 link_locs.append(struct.pack('<IIII', *l))
170 link_ids = ''.join(link_ids)
171 link_locs = ''.join(link_locs)
174 magic = HEADER_MAGIC2
176 magic = HEADER_MAGIC1
177 header = struct.pack(HEADER_FORMAT,
178 magic, self.cryptkeys,
179 ord(self.type), self.id, self.inum,
180 len(self.data), len(link_ids), len(link_locs))
181 return header + self.data + link_ids + link_locs
184 def __init__(self, backend, location):
185 self.backend = backend
186 self.location = location
190 return sum(len(s) for s in self.data)
192 def write(self, item):
193 data = item.serialize()
195 self.data.append(data)
196 item.location = self.location + (offset, len(data))
199 data = ''.join(self.data)
200 filename = self.backend.loc_to_name(self.location)
201 print "Would write %d bytes of data to %s" % (len(data), filename)
202 self.backend.write(filename, data)
205 TARGET_SIZE = 4 << 20
207 def __init__(self, backend, dir):
208 self.backend = backend
211 for logname in backend.list():
212 loc = backend.name_to_loc(logname[0])
213 if loc is not None and loc[0] == dir:
214 self.seq_num = max(self.seq_num, loc[1] + 1)
216 print "Starting sequence number is", self.seq_num
218 def open_segment(self):
219 seg = LogSegment(self.backend, (self.dir_num, self.seq_num))
223 def write(self, item, segment_group=0):
224 if segment_group not in self.groups:
225 self.groups[segment_group] = self.open_segment()
226 seg = self.groups[segment_group]
228 if len(seg) >= LogDirectory.TARGET_SIZE:
230 del self.groups[segment_group]
233 for k in list(self.groups.keys()):
234 self.groups[k].close()
237 class UtilizationTracker:
238 """A simple object that tracks what fraction of each segment is used.
240 This data can be used to guide segment cleaning decisions."""
242 def __init__(self, backend):
244 for (segment, size) in backend.list():
245 self.segments[segment] = [size, 0]
247 def add_item(self, item):
248 if isinstance(item, LogItem):
250 if item is None: return
251 (dir, seq, offset, size) = item
252 filename = "log-%08d-%08d" % (dir, seq)
253 self.segments[filename][1] += size
255 def parse_item(data):
256 if len(data) < HEADER_SIZE: return
257 header = struct.unpack_from(HEADER_FORMAT, data, 0)
258 size = HEADER_SIZE + sum(header[5:8])
260 if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
261 print "Bad header magic!"
264 if len(data) != size:
265 print "Item size does not match: %d != %d" % (size, len(data))
269 if header[0] == HEADER_MAGIC2: item.encrypted = True
270 item.cryptkeys = header[1]
272 item.inum = header[4]
274 item.type = chr(header[2])
276 item.data = data[HEADER_SIZE : HEADER_SIZE + header[5]]
278 link_ids = data[HEADER_SIZE + header[5]
279 : HEADER_SIZE + header[5] + header[6]]
280 link_locs = data[HEADER_SIZE + header[5] + header[6]
281 : HEADER_SIZE + sum(header[5:8])]
282 for i in range(len(link_ids) // 16):
283 id = link_ids[16*i : 16*i + 16]
287 loc = struct.unpack('<IIII', link_locs[0:16])
288 link_locs = link_locs[16:]
289 links.append((id, loc))
293 def load_item(backend, location):
294 """Load the cloud item pointed at by the 4-tuple 'location'.
296 The elements of the tuple are (directory, sequence, offset, size)."""
298 filename = backend.loc_to_name((location[0], location[1]))
299 data = backend.read(filename, location[2], location[3])
300 item = parse_item(data)
301 item.location = location
304 def parse_log(data, location=None):
305 """Parse contents of a log file, yielding a sequence of log items."""
307 if isinstance(location, str):
308 m = re.match(r"^log-(\d+)-(\d+)$", location)
310 location = (int(m.group(1)), int(m.group(2)))
315 while len(data) - offset >= HEADER_SIZE:
316 header = struct.unpack_from(HEADER_FORMAT, data, offset)
317 size = HEADER_SIZE + sum(header[5:8])
318 if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
319 print "Bad header magic!"
321 if size + offset > len(data):
322 print "Short data record at end of log: %s < %s" % (len(data) - offset, size)
324 item = parse_item(data[offset : offset + size])
325 if location is not None:
326 item.location = location + (offset, size)
327 if item is not None: yield item
330 def load_checkpoint_record(backend):
331 for (log, size) in reversed(backend.list()):
332 for item in reversed(list(parse_log(backend.read(log), log))):
334 if item.type == ITEM_TYPE.CHECKPOINT:
341 def build(self, backend, checkpoint_record):
342 """Reconstruct the inode map from the checkpoint record given.
344 This will also build up information about segment utilization."""
346 self.checkpoint_record = checkpoint_record
348 util = UtilizationTracker(backend)
349 util.add_item(checkpoint_record)
351 self.obsolete_segments = set()
354 for i in range(len(checkpoint_record.data) // 16):
355 (start, end) = struct.unpack_from("<QQ", checkpoint_record.data, 16*i)
356 imap = load_item(backend, checkpoint_record.links[i][1])
358 print "[%d, %d]: %s" % (start, end, imap)
359 for j in range(len(imap.data) // 8):
360 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
361 inode = load_item(backend, imap.links[j][1])
363 data_segments = set()
365 for i in inode.links:
367 data_segments.add(i[1][0:2])
368 print " %d: %s (%d data segments)" % (inum, inode, len(data_segments))
371 print "Segment utilizations:"
372 for (s, u) in sorted(util.segments.items()):
373 print "%s: %s %s" % (s, u, float(u[1]) / u[0])
380 self.updated_inodes = set()
382 def mark_updated(self, inum):
383 self.updated_inodes.add(inum)
385 def write(self, backend, log):
386 updated_inodes = sorted(self.updated_inodes, reverse=True)
388 new_checkpoint = LogItem()
389 new_checkpoint.id = LogItem.random_id()
390 new_checkpoint.inum = 0
391 new_checkpoint.type = ITEM_TYPE.CHECKPOINT
392 new_checkpoint.data = ""
393 new_checkpoint.links = []
395 for i in range(len(self.checkpoint_record.data) // 16):
396 (start, end) = struct.unpack_from("<QQ", self.checkpoint_record.data, 16*i)
398 new_checkpoint.data += self.checkpoint_record.data[16*i : 16*i + 16]
400 # Case 1: No inodes in this range of the old inode map have
401 # changed. Simply emit a new pointer to the same inode map block.
402 if len(updated_inodes) == 0 or updated_inodes[-1] > end:
403 old_location = self.checkpoint_record.links[i][1][0:2]
404 if old_location not in self.obsolete_segments:
405 new_checkpoint.links.append(self.checkpoint_record.links[i])
408 # Case 2: Some inodes have been updated. Create a new inode map
409 # block, write it out, and point the new checkpoint at it.
410 inodes = [k for k in self.inodes if k >= start and k <= end]
414 block.id = LogItem.random_id()
416 block.type = ITEM_TYPE.INODE_MAP
420 block.data += struct.pack("<Q", j)
421 block.links.append((self.inodes[j].id, self.inodes[j].location))
424 new_checkpoint.links.append((block.id, block.location))
426 while len(updated_inodes) > 0 and updated_inodes[-1] <= end:
429 log.write(new_checkpoint, 2)
430 self.checkpoint_record = new_checkpoint
432 def rewrite_inode(backend, inode_map, inum, log, copy_data=True):
433 inode = inode_map.inodes[inum]
436 for l in inode.links:
437 data = load_item(backend, l[1])
440 inode.links = [(b.id, b.location) for b in blocks]
442 inode_map.mark_updated(inum)
444 def run_cleaner(backend, inode_map, log, repack_inodes=False):
445 # Determine which segments are poorly utilized and should be cleaned. We
446 # need better heuristics here.
447 for (s, u) in sorted(inode_map.util.segments.items()):
448 if (float(u[1]) / u[0] < 0.6 or u[1] < 32768) and u[1] > 0:
449 print "Should clean segment", s
450 loc = backend.name_to_loc(s)
451 if s: inode_map.obsolete_segments.add(loc)
453 # TODO: We probably also want heuristics that will find inodes with
454 # badly-fragmented data and rewrite that to achieve better locality.
456 # Given that list of segments to clean, scan through those segments to find
457 # data which is still live and mark relevant inodes as needing to be
460 dirty_inodes = set(inode_map.inodes)
463 dirty_inode_data = set()
464 for s in inode_map.obsolete_segments:
465 filename = backend.loc_to_name(s)
466 print "Scanning", filename, "for live data"
467 for item in parse_log(backend.read(filename), filename):
468 if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
470 inode = inode_map.inodes[item.inum]
471 if s == inode.location[0:2]:
472 dirty_inodes.add(item.inum)
473 if item.inum not in dirty_inode_data:
474 for b in inode.links:
476 dirty_inode_data.add(item.inum)
479 print "Inodes to rewrite:", dirty_inodes
480 print "Inodes with data to rewrite:", dirty_inode_data
481 for i in sorted(dirty_inodes.union(dirty_inode_data)):
482 rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
484 if __name__ == '__main__':
485 backend = S3Backend("mvrable-bluesky", cachedir=".")
486 chkpt = load_checkpoint_record(backend)
489 imap.build(backend, chkpt)
492 log_dir = LogDirectory(backend, 0)
493 run_cleaner(backend, imap, log_dir)
494 imap.write(backend, log_dir)