7a07694cd772ec338028ea98ea993a6927eaa162
[bluesky.git] / cleaner / azure.py
1 """A simple Python library for accessing the Windows Azure blob service."""
2
3 import base64, hashlib, hmac, httplib, os, time, urllib
4 import xml.dom.minidom
5
6 # The version of the Azure API we implement; sent in the x-ms-version header.
7 API_VERSION = '2009-09-19'
8
9 def uri_decode(s):
10     return urllib.unquote_plus(s)
11
12 def uri_encode(s):
13     return urllib.quote_plus(s)
14
15 def xmlGetText(nodelist):
16     text = []
17     def walk(nodes):
18         for node in nodes:
19             if node.nodeType == node.TEXT_NODE:
20                 text.append(node.data)
21             else:
22                 walk(node.childNodes)
23     walk(nodelist)
24     return ''.join(text)
25
26 def xmlParse(s):
27     return xml.dom.minidom.parseString(s)
28
29 def buildQueryString(uri, params={}):
30     for (k, v) in params.items():
31         if v is None: continue
32         kv = '%s=%s' % (uri_encode(k), uri_encode(v))
33         if '?' not in uri:
34             uri += '?' + kv
35         else:
36             uri += '&' + kv
37     return uri
38
39 def add_auth_headers(headers, method, path, account, key):
40     header_order = ['Content-Encoding', 'Content-Language', 'Content-Length',
41                     'Content-MD5', 'Content-Type', 'Date', 'If-Modified-Since',
42                     'If-Match', 'If-None-Match', 'If-Unmodified-Since',
43                     'Range']
44
45     if not headers.has_key('Date'):
46         headers['Date'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT",
47                                         time.gmtime())
48     if not headers.has_key('x-ms-version'):
49         headers['x-ms-version'] = API_VERSION
50
51     StringToSign = method + "\n"
52     for h in header_order:
53         if h in headers:
54             StringToSign += headers[h] + "\n"
55         else:
56             StringToSign += "\n"
57
58     # Add Canonicalized Headers
59     canonized = []
60     for (k, v) in headers.items():
61         k = k.lower()
62         if k.startswith('x-ms-'):
63             canonized.append((k, v))
64     canonized.sort()
65     for (k, v) in canonized:
66         StringToSign += "%s:%s\n" % (k, v)
67
68     resource = "/" + account
69     if '?' not in path:
70         resource += path
71     else:
72         (path, params) = path.split('?', 1)
73         params = [p.split('=') for p in params.split("&")]
74         params = dict((k.lower(), uri_decode(v)) for (k, v) in params)
75         resource += path
76         for k in sorted(params):
77             resource += "\n%s:%s" % (k, params[k])
78     StringToSign += resource
79
80     h = hmac.new(key, digestmod=hashlib.sha256)
81     h.update(StringToSign)
82
83     signature = base64.b64encode(h.digest())
84     headers['Authorization'] = "SharedKey %s:%s" % (account, signature)
85
86 class AzureError(RuntimeError):
87     def __init__(self, response):
88         self.response = response
89         self.details = response.read()
90
91 class AzureConnection:
92     def __init__(self, account=None, key=None):
93         if account is None:
94             account = os.environ['AZURE_ACCOUNT_NAME']
95         self.account = account
96         self.host = account + ".blob.core.windows.net"
97         #self.conn = httplib.HTTPConnection(self.host)
98
99         if key is None:
100             key = os.environ['AZURE_SECRET_KEY']
101         self.key = base64.b64decode(key)
102
103     def _make_request(self, path, method='GET', body="", headers={}):
104         headers = headers.copy()
105         headers['Content-Length'] = str(len(body))
106         if len(body) > 0:
107             headers['Content-MD5'] \
108                 = base64.b64encode(hashlib.md5(body).digest())
109         add_auth_headers(headers, method, path, self.account, self.key)
110
111         conn = httplib.HTTPConnection(self.host)
112         conn.request(method, path, body, headers)
113         response = conn.getresponse()
114         if response.status // 100 != 2:
115             raise AzureError(response)
116         return response
117         #print "Response:", response.status
118         #print "Headers:", response.getheaders()
119         #body = response.read()
120
121     def list(self, container, prefix=''):
122         marker = None
123         while True:
124             path = '/' + container + '?restype=container&comp=list'
125             path = buildQueryString(path, {'prefix': prefix, 'marker': marker})
126             r = self._make_request(path)
127             xml = xmlParse(r.read())
128
129             blobs = xml.getElementsByTagName('Blob')
130             for b in blobs:
131                 yield xmlGetText(b.getElementsByTagName('Name'))
132
133             marker = xmlGetText(xml.getElementsByTagName('NextMarker'))
134             if marker == "":
135                 return
136
137     def get(self, container, key):
138         path = "/%s/%s" % (container, key)
139         r = self._make_request(path)
140         return r.read()
141
142     def put(self, container, key, value):
143         path = "/%s/%s" % (container, key)
144         r = self._make_request(path, method='PUT', body=value,
145                                headers={'x-ms-blob-type': 'BlockBlob'})
146
147     def delete(self, container, key):
148         path = "/%s/%s" % (container, key)
149         r = self._make_request(path, method='DELETE')
150
151 def parallel_delete(container, keys):
152     import Queue
153     from threading import Lock, Thread
154
155     keys = list(iter(keys))
156
157     q = Queue.Queue(16384)
158     l = Lock()
159
160     def deletion_task():
161         conn = AzureConnection()
162         while True:
163             k = q.get()
164             l.acquire()
165             print k
166             l.release()
167             conn.delete(container, k)
168             q.task_done()
169
170     for i in range(128):
171         t = Thread(target=deletion_task)
172         t.setDaemon(True)
173         t.start()
174
175     for k in keys:
176         q.put(k)
177     q.join()
178
179 if __name__ == '__main__':
180     container = 'bluesky'
181     conn = AzureConnection()
182
183     conn.put(container, "testkey", "A" * 40)
184     print "Fetch result:", conn.get(container, "testkey")
185
186     parallel_delete(container, conn.list(container))