Add pcap dump parser for extracting S3 performance measurements.
authorMichael Vrable <mvrable@cs.ucsd.edu>
Wed, 21 Apr 2010 23:13:58 +0000 (16:13 -0700)
committerMichael Vrable <mvrable@cs.ucsd.edu>
Wed, 21 Apr 2010 23:13:58 +0000 (16:13 -0700)
parsetrace/README [new file with mode: 0644]
parsetrace/parse.py [new file with mode: 0755]

diff --git a/parsetrace/README b/parsetrace/README
new file mode 100644 (file)
index 0000000..358997d
--- /dev/null
@@ -0,0 +1,2 @@
+Python scripts for parsing pcap dumps of traffic to/from Amazon S3 and
+extracting performance statistics.
diff --git a/parsetrace/parse.py b/parsetrace/parse.py
new file mode 100755 (executable)
index 0000000..4419a35
--- /dev/null
@@ -0,0 +1,140 @@
+#!/usr/bin/python
+
+import impacket, pcapy, re, sys
+import impacket.ImpactDecoder
+
+start_time = None
+
+flows = {}
+
+STATE_START = 0
+STATE_REQ_SENT = 1
+STATE_REQ_ACKED = 2
+STATE_RESP_START = 3
+
+logfile = open('times.data', 'w')
+
+def seq_after(x, y):
+    """Compares whether x >= y in sequence number space."""
+
+    delta = (x - y) % (1 << 32)
+    return delta < (1 << 31)
+
+class Connection:
+    counter = 0
+
+    def __init__(self, endpoints):
+        self.endpoints = endpoints
+        self.packets = []
+        self.state = STATE_START
+        self.id = Connection.counter
+        self.times = []
+        self.transfer_count = 0
+        Connection.counter += 1
+
+    def finish_transfer(self):
+        if len(self.times) > 0:
+            rtt = self.times[0][0]
+            try:
+                start = iter(t[0] for t in self.times if t[1] > 0).next()
+            except:
+                start = 0.0
+            end = self.times[-1][0]
+            data = self.times[-1][1]
+            print "Connection %d Transfer #%d" % (self.id, self.transfer_count)
+            print "Network RTT:", rtt
+            print "Additional response delay:", start - rtt
+            print "Transfer time:", end - start
+            print "Bandwidth:", data / (end - start)
+            print
+            logfile.write("%d\t%d\t%d\t%f\t%f\t%f\n"
+                          % (self.id, self.transfer_count, data,
+                              rtt, start - rtt, end - start))
+            self.transfer_count += 1
+        self.times = []
+        self.state = STATE_START
+
+    def process(self, timestamp, packet):
+        ip = pkt.child()
+        tcp = ip.child()
+        self.packets.append(packet)
+
+        datalen = ip.get_ip_len() - ip.get_header_size() - tcp.get_header_size()
+        data = tcp.get_data_as_string()[0:datalen]
+
+        if tcp.get_th_sport() == 80:
+            # Incoming packet
+            direction = -1
+        elif tcp.get_th_dport() == 80:
+            # Outgoing packet
+            direction = 1
+        else:
+            direction = 0
+
+        seq = (tcp.get_th_seq(), tcp.get_th_seq() + datalen)
+        ack = tcp.get_th_ack()
+
+        # Previous request finished
+        if self.state == STATE_RESP_START and direction > 0 \
+                and data.startswith('GET /'):
+            self.finish_transfer()
+
+        # New request seen on an idle connection...
+        if self.state == STATE_START and direction > 0 \
+                and data.startswith('GET /'):
+            self.startseq = seq[1]
+            self.starttime = timestamp
+            self.state = STATE_REQ_SENT
+
+        # Request is acknowledged, but response not yet seen
+        if self.state == STATE_REQ_SENT and direction < 0 \
+                and seq_after(ack, self.startseq):
+            self.state = STATE_REQ_ACKED
+            self.respseq = seq[0]
+            self.times.append(((timestamp - self.starttime) / 1e6, 0))
+
+        # Response header to request has been seen
+        if self.state == STATE_REQ_ACKED and direction < 0 \
+                and data.startswith("HTTP/1."):
+            self.state = STATE_RESP_START
+
+        # Data packet in response
+        if self.state == STATE_RESP_START and direction < 0 and datalen > 0:
+            self.times.append(((timestamp - self.starttime) / 1e6,
+                               seq[1] - self.respseq))
+
+def handler(header, data):
+    global start_time
+    global pkt
+    (sec, us) = header.getts()
+    ts = sec * 1000000 + us
+    if start_time is None:
+        start_time = ts
+    ts -= start_time
+    pkt = decoder.decode(data)
+
+    ip = pkt.child()
+    tcp = ip.child()
+    src = (ip.get_ip_src(), tcp.get_th_sport())
+    dst = (ip.get_ip_dst(), tcp.get_th_dport())
+    flow = tuple(sorted([src, dst]))
+    if flow not in flows:
+        #print "New flow", flow
+        flows[flow] = Connection(flow)
+
+    flows[flow].process(ts, pkt)
+
+def process(filename):
+    global decoder
+    p = pcapy.open_offline(filename)
+    p.setfilter(r"ip proto \tcp")
+    assert p.datalink() == pcapy.DLT_EN10MB
+    decoder = impacket.ImpactDecoder.EthDecoder()
+    p.loop(0, handler)
+
+    for c in flows.values():
+        c.finish_transfer()
+
+if __name__ == '__main__':
+    for f in sys.argv[1:]:
+        process(f)