3 # A simple cleaner for the BlueSky cloud file system. At the moment this is an
4 # offline cleaner--the file system must not be in use at the time that the
5 # cleaning is performed. Later, it will be extended to be an online/concurrent
6 # cleaner, where cleaning is performed even while the file system is mounted.
8 # Copyright (C) 2010 The Regents of the University of California
9 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
11 import base64, os, struct, sys
14 # The BlueSky 'struct cloudlog_header' data type.
15 HEADER_FORMAT = '<4sb16sQIII'
17 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
26 """An interface to BlueSky where the log segments are on local disk.
28 This is mainly intended for testing purposes, as the real cleaner would
29 operate where data is being stored in S3."""
31 def __init__(self, path):
35 """Return a listing of all log segments and their sizes."""
37 files = [f for f in os.listdir(self.path) if f.startswith('log-')]
40 return [(f, os.stat(os.path.join(self.path, f)).st_size)
43 def read(self, filename):
44 fp = open(os.path.join(self.path, filename), 'rb')
48 """In-memory representation of a single item stored in a log file."""
51 return "<Item ty=%s location=%s size=%d id=%s...>" % (self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
53 class UtilizationTracker:
54 """A simple object that tracks what fraction of each segment is used.
56 This data can be used to guide segment cleaning decisions."""
58 def __init__(self, backend):
60 for (segment, size) in backend.list():
61 self.segments[segment] = [size, 0]
63 def add_item(self, item):
64 if isinstance(item, LogItem):
66 if item is None: return
67 (dir, seq, offset, size) = item
68 filename = "log-%08d-%08d" % (dir, seq)
69 self.segments[filename][1] += size
72 if len(data) < HEADER_SIZE: return
73 header = struct.unpack_from(HEADER_FORMAT, data, 0)
74 size = HEADER_SIZE + sum(header[4:7])
76 if header[0] != HEADER_MAGIC:
77 print "Bad header magic!"
81 print "Item size does not match!"
87 item.type = chr(header[1])
89 item.data = data[HEADER_SIZE : HEADER_SIZE + header[4]]
91 link_ids = data[HEADER_SIZE + header[4]
92 : HEADER_SIZE + header[4] + header[5]]
93 link_locs = data[HEADER_SIZE + header[4] + header[5]
94 : HEADER_SIZE + sum(header[4:7])]
95 for i in range(len(link_ids) // 16):
96 id = link_ids[16*i : 16*i + 16]
100 loc = struct.unpack('<IIII', link_locs[0:16])
101 link_locs = link_locs[16:]
102 links.append((id, loc))
106 def load_item(backend, location):
107 """Load the cloud item pointed at by the 4-tuple 'location'.
109 The elements of the tuple are (directory, sequence, offset, size)."""
111 filename = "log-%08d-%08d" % (location[0], location[1])
112 data = backend.read(filename)[location[2] : location[2] + location[3]]
113 item = parse_item(data)
114 item.location = location
117 def parse_log(data, logname=None):
118 """Parse contents of a log file, yielding a sequence of log items."""
121 if logname is not None:
122 m = re.match(r"^log-(\d+)-(\d+)$", logname)
124 location = (int(m.group(1)), int(m.group(2)))
127 while len(data) - offset >= HEADER_SIZE:
128 header = struct.unpack_from(HEADER_FORMAT, data, offset)
129 size = HEADER_SIZE + sum(header[4:7])
130 if header[0] != HEADER_MAGIC:
131 print "Bad header magic!"
133 if size + offset > len(data):
134 print "Short data record at end of log: %s < %s" % (len(data) - offset, size)
136 item = parse_item(data[offset : offset + size])
137 if location is not None:
138 item.location = location + (offset, size)
139 if item is not None: yield item
142 def load_checkpoint_record(backend):
143 for (log, size) in reversed(backend.list()):
144 for item in reversed(list(parse_log(backend.read(log)))):
145 if item.type == ITEM_TYPE.CHECKPOINT:
148 def build_inode_map(backend, checkpoint_record):
149 """Reconstruct the inode map, starting from the checkpoint record given.
151 This will also build up information about segment utilization."""
153 util = UtilizationTracker(backend)
154 util.add_item(checkpoint_record)
157 for i in range(len(checkpoint_record.data) // 16):
158 (start, end) = struct.unpack_from("<QQ", checkpoint_record.data, 16*i)
159 imap = load_item(backend, checkpoint_record.links[i][1])
161 print "[%d, %d]: %s" % (start, end, imap)
162 for j in range(len(imap.data) // 8):
163 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
164 inode = load_item(backend, imap.links[j][1])
165 data_segments = set()
167 for i in inode.links:
169 data_segments.add(i[1][0:2])
170 print " %d: %s (%d data segments)" % (inum, inode, len(data_segments))
173 print "Segment utilizations:"
174 for (s, u) in sorted(util.segments.items()):
175 print "%s: %s" % (s, u)
177 if __name__ == '__main__':
178 backend = FileBackend(".")
179 chkpt = load_checkpoint_record(backend)
180 build_inode_map(backend, chkpt)