Move encryption of cloud log segments into background threads.
[bluesky.git] / parsetrace / parse.py
1 #!/usr/bin/python
2
3 import impacket, pcapy, re, sys
4 import impacket.ImpactDecoder, impacket.ImpactPacket
5
6 start_time = None
7
8 flows = {}
9
10 STATE_START = 0
11 STATE_REQ_SENT = 1
12 STATE_REQ_ACKED = 2
13 STATE_RESP_START = 3
14
15 logfile = open('times.data', 'w')
16
17 def seq_after(x, y):
18     """Compares whether x >= y in sequence number space."""
19
20     delta = (x - y) % (1 << 32)
21     return delta < (1 << 31)
22
23 class Connection:
24     counter = 0
25
26     def __init__(self, endpoints):
27         self.endpoints = endpoints
28         self.packets = []
29         self.state = STATE_START
30         self.id = Connection.counter
31         self.times = []
32         self.transfer_count = 0
33         Connection.counter += 1
34         self.last_id = 0
35         self.winscale = {1: 0, -1: 0}
36
37     def finish_transfer(self):
38         if len(self.times) > 0:
39             rtt = self.times[0][0]
40             try:
41                 start = iter(t[0] for t in self.times if t[1] > 0).next()
42             except:
43                 start = 0.0
44             end = self.times[-1][0]
45             data = self.times[-1][1]
46             print "Connection %d Transfer #%d" % (self.id, self.transfer_count)
47             print "Network RTT:", rtt
48             print "Additional response delay:", start - rtt
49             print "Transfer time:", end - start
50             if end - start > 0:
51                 print "Bandwidth:", data / (end - start)
52             print
53             logfile.write("%d\t%d\t%d\t%f\t%f\t%f\t# %s\n"
54                           % (self.id, self.transfer_count, data,
55                               rtt, start - rtt, end - start, self.endpoints))
56             self.transfer_count += 1
57         self.times = []
58         self.state = STATE_START
59
60     def process(self, timestamp, packet):
61         ip = pkt.child()
62         tcp = ip.child()
63         self.packets.append(packet)
64
65         datalen = ip.get_ip_len() - ip.get_header_size() - tcp.get_header_size()
66         data = tcp.get_data_as_string()[0:datalen]
67
68         if tcp.get_th_sport() == 80:
69             # Incoming packet
70             direction = -1
71         elif tcp.get_th_dport() == 80:
72             # Outgoing packet
73             direction = 1
74         else:
75             direction = 0
76
77         for o in tcp.get_options():
78             if o.get_kind() == o.TCPOPT_WINDOW:
79                 self.winscale[direction] = o.get_shift_cnt()
80                 print "window scale for dir %d is %d" % (direction,
81                                                          o.get_shift_cnt())
82
83         if direction < 0:
84             gap = (ip.get_ip_id() - self.last_id) & 0xffff
85             if 1 < gap < 256:
86                 print "Gap of", gap, "packets on connection", self.endpoints
87             self.last_id = ip.get_ip_id()
88
89         seq = (tcp.get_th_seq(), tcp.get_th_seq() + datalen)
90         ack = tcp.get_th_ack()
91
92         # Previous request finished
93         if self.state == STATE_RESP_START and direction > 0 \
94                 and data.startswith('GET /'):
95             self.finish_transfer()
96
97         # New request seen on an idle connection...
98         if self.state == STATE_START and direction > 0 \
99                 and data.startswith('GET /'):
100             self.startseq = seq[1]
101             self.starttime = timestamp
102             self.state = STATE_REQ_SENT
103
104         # Request is acknowledged, but response not yet seen
105         if self.state == STATE_REQ_SENT and direction < 0 \
106                 and seq_after(ack, self.startseq):
107             self.state = STATE_REQ_ACKED
108             self.respseq = seq[0]
109             self.times.append(((timestamp - self.starttime) / 1e6, 0))
110
111         # Response header to request has been seen
112         if self.state == STATE_REQ_ACKED and direction < 0 \
113                 and data.startswith("HTTP/1."):
114             self.state = STATE_RESP_START
115
116         # Data packet in response
117         if self.state == STATE_RESP_START and direction < 0 and datalen > 0:
118             self.times.append(((timestamp - self.starttime) / 1e6,
119                                seq[1] - self.respseq))
120
121         if self.id == 21:
122             winsize = tcp.get_th_win()
123             if not tcp.get_SYN():
124                 winsize <<= self.winscale[direction]
125             print "got packet, data=%d win=%d" % (datalen, winsize)
126
127 def handler(header, data):
128     global start_time
129     global pkt
130     (sec, us) = header.getts()
131     ts = sec * 1000000 + us
132     if start_time is None:
133         start_time = ts
134     ts -= start_time
135     pkt = decoder.decode(data)
136
137     ip = pkt.child()
138     tcp = ip.child()
139     src = (ip.get_ip_src(), tcp.get_th_sport())
140     dst = (ip.get_ip_dst(), tcp.get_th_dport())
141     flow = tuple(sorted([src, dst]))
142     if flow not in flows:
143         #print "New flow", flow
144         flows[flow] = Connection(flow)
145
146     flows[flow].process(ts, pkt)
147
148 def process(filename):
149     global decoder
150     p = pcapy.open_offline(filename)
151     p.setfilter(r"ip proto \tcp")
152     assert p.datalink() == pcapy.DLT_EN10MB
153     decoder = impacket.ImpactDecoder.EthDecoder()
154     p.loop(0, handler)
155
156     for c in flows.values():
157         c.finish_transfer()
158
159 if __name__ == '__main__':
160     for f in sys.argv[1:]:
161         process(f)