3 # A simple cleaner for the BlueSky cloud file system. At the moment this is an
4 # offline cleaner--the file system must not be in use at the time that the
5 # cleaning is performed. Later, it will be extended to be an online/concurrent
6 # cleaner, where cleaning is performed even while the file system is mounted.
8 # Copyright (C) 2010 The Regents of the University of California
9 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
11 import base64, os, re, struct, sys
14 # The BlueSky 'struct cloudlog_header' data type.
15 HEADER_FORMAT = '<4sb16sQIII'
17 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
26 """An interface to BlueSky where the log segments are on local disk.
28 This is mainly intended for testing purposes, as the real cleaner would
29 operate where data is being stored in S3."""
31 def __init__(self, path):
35 """Return a listing of all log segments and their sizes."""
37 files = [f for f in os.listdir(self.path) if f.startswith('log-')]
40 return [(f, os.stat(os.path.join(self.path, f)).st_size)
43 def read(self, filename):
44 fp = open(os.path.join(self.path, filename), 'rb')
47 def write(self, filename, data):
48 fp = open(os.path.join(self.path, filename), 'wb')
53 """In-memory representation of a single item stored in a log file."""
56 return "<Item ty=%s location=%s size=%d id=%s...>" % (self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
60 return open('/dev/urandom').read(16)
65 for (i, l) in self.links:
68 link_locs.append(struct.pack('<IIII', *l))
69 link_ids = ''.join(link_ids)
70 link_locs = ''.join(link_locs)
72 header = struct.pack(HEADER_FORMAT,
73 HEADER_MAGIC, ord(self.type), self.id, self.inum,
74 len(self.data), len(link_ids), len(link_locs))
75 return header + self.data + link_ids + link_locs
78 def __init__(self, backend, location):
79 self.backend = backend
80 self.location = location
84 return sum(len(s) for s in self.data)
86 def write(self, item):
87 data = item.serialize()
89 self.data.append(data)
90 item.location = self.location + (offset, len(data))
93 data = ''.join(self.data)
94 filename = "log-%08d-%08d" % (self.location)
95 print "Would write %d bytes of data to %s" % (len(data), filename)
96 self.backend.write(filename, data)
101 def __init__(self, backend, dir, seq):
102 self.backend = backend
107 def open_segment(self):
108 seg = LogSegment(self.backend, (self.dir_num, self.seq_num))
112 def write(self, item, segment_group=0):
113 if segment_group not in self.groups:
114 self.groups[segment_group] = self.open_segment()
115 seg = self.groups[segment_group]
117 if len(seg) >= LogDirectory.TARGET_SIZE:
119 del self.groups[segment_group]
122 for k in list(self.groups.keys()):
123 self.groups[k].close()
126 class UtilizationTracker:
127 """A simple object that tracks what fraction of each segment is used.
129 This data can be used to guide segment cleaning decisions."""
131 def __init__(self, backend):
133 for (segment, size) in backend.list():
134 self.segments[segment] = [size, 0]
136 def add_item(self, item):
137 if isinstance(item, LogItem):
139 if item is None: return
140 (dir, seq, offset, size) = item
141 filename = "log-%08d-%08d" % (dir, seq)
142 self.segments[filename][1] += size
144 def parse_item(data):
145 if len(data) < HEADER_SIZE: return
146 header = struct.unpack_from(HEADER_FORMAT, data, 0)
147 size = HEADER_SIZE + sum(header[4:7])
149 if header[0] != HEADER_MAGIC:
150 print "Bad header magic!"
153 if len(data) != size:
154 print "Item size does not match!"
159 item.inum = header[3]
161 item.type = chr(header[1])
163 item.data = data[HEADER_SIZE : HEADER_SIZE + header[4]]
165 link_ids = data[HEADER_SIZE + header[4]
166 : HEADER_SIZE + header[4] + header[5]]
167 link_locs = data[HEADER_SIZE + header[4] + header[5]
168 : HEADER_SIZE + sum(header[4:7])]
169 for i in range(len(link_ids) // 16):
170 id = link_ids[16*i : 16*i + 16]
174 loc = struct.unpack('<IIII', link_locs[0:16])
175 link_locs = link_locs[16:]
176 links.append((id, loc))
180 def load_item(backend, location):
181 """Load the cloud item pointed at by the 4-tuple 'location'.
183 The elements of the tuple are (directory, sequence, offset, size)."""
185 filename = "log-%08d-%08d" % (location[0], location[1])
186 data = backend.read(filename)[location[2] : location[2] + location[3]]
187 item = parse_item(data)
188 item.location = location
191 def parse_log(data, location=None):
192 """Parse contents of a log file, yielding a sequence of log items."""
194 if isinstance(location, str):
195 m = re.match(r"^log-(\d+)-(\d+)$", location)
197 location = (int(m.group(1)), int(m.group(2)))
202 while len(data) - offset >= HEADER_SIZE:
203 header = struct.unpack_from(HEADER_FORMAT, data, offset)
204 size = HEADER_SIZE + sum(header[4:7])
205 if header[0] != HEADER_MAGIC:
206 print "Bad header magic!"
208 if size + offset > len(data):
209 print "Short data record at end of log: %s < %s" % (len(data) - offset, size)
211 item = parse_item(data[offset : offset + size])
212 if location is not None:
213 item.location = location + (offset, size)
214 if item is not None: yield item
217 def load_checkpoint_record(backend):
218 for (log, size) in reversed(backend.list()):
219 for item in reversed(list(parse_log(backend.read(log), log))):
220 if item.type == ITEM_TYPE.CHECKPOINT:
227 def build(self, backend, checkpoint_record):
228 """Reconstruct the inode map from the checkpoint record given.
230 This will also build up information about segment utilization."""
232 self.checkpoint_record = checkpoint_record
234 util = UtilizationTracker(backend)
235 util.add_item(checkpoint_record)
239 for i in range(len(checkpoint_record.data) // 16):
240 (start, end) = struct.unpack_from("<QQ", checkpoint_record.data, 16*i)
241 imap = load_item(backend, checkpoint_record.links[i][1])
243 print "[%d, %d]: %s" % (start, end, imap)
244 for j in range(len(imap.data) // 8):
245 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
246 inode = load_item(backend, imap.links[j][1])
248 data_segments = set()
250 for i in inode.links:
252 data_segments.add(i[1][0:2])
253 print " %d: %s (%d data segments)" % (inum, inode, len(data_segments))
256 print "Segment utilizations:"
257 for (s, u) in sorted(util.segments.items()):
259 print "%s: %s %s" % (s, u, float(u[1]) / u[0])
263 self.updated_inodes = set()
265 def mark_updated(self, inum):
266 self.updated_inodes.add(inum)
268 def write(self, backend, log):
269 updated_inodes = sorted(self.updated_inodes, reverse=True)
271 new_checkpoint = LogItem()
272 new_checkpoint.id = LogItem.random_id()
273 new_checkpoint.inum = 0
274 new_checkpoint.type = ITEM_TYPE.CHECKPOINT
275 new_checkpoint.data = ""
276 new_checkpoint.links = []
278 for i in range(len(self.checkpoint_record.data) // 16):
279 (start, end) = struct.unpack_from("<QQ", self.checkpoint_record.data, 16*i)
281 new_checkpoint.data += self.checkpoint_record.data[16*i : 16*i + 16]
283 # Case 1: No inodes in this range of the old inode map have
284 # changed. Simply emit a new pointer to the same inode map block.
285 # TODO: Add the ability to rewrite the inode map block if we choose
286 # to do so for cleaning, even if no inodes have changed.
287 if len(updated_inodes) == 0 or updated_inodes[-1] > end:
288 new_checkpoint.links.append(self.checkpoint_record.links[i])
291 # Case 2: Some inodes have been updated. Create a new inode map
292 # block, write it out, and point the new checkpoint at it.
293 inodes = [k for k in self.inodes if k >= start and k <= end]
297 block.id = LogItem.random_id()
299 block.type = ITEM_TYPE.INODE_MAP
303 block.data += struct.pack("<Q", j)
304 block.links.append((self.inodes[j].id, self.inodes[j].location))
307 new_checkpoint.links.append((block.id, block.location))
309 while len(updated_inodes) > 0 and updated_inodes[-1] <= end:
312 log.write(new_checkpoint, 2)
313 self.checkpoint_record = new_checkpoint
315 def rewrite_inode(backend, inode_map, inum, log):
316 inode = inode_map.inodes[inum]
318 for l in inode.links:
319 data = load_item(backend, l[1])
322 inode.links = [(b.id, b.location) for b in blocks]
324 inode_map.mark_updated(inum)
326 if __name__ == '__main__':
327 backend = FileBackend(".")
328 chkpt = load_checkpoint_record(backend)
330 imap.build(backend, chkpt)
333 print repr(chkpt.serialize())
335 log_dir = LogDirectory(backend, 1, 0)
336 rewrite_inode(backend, imap, 147, log_dir)
337 imap.write(backend, log_dir)