Add proper per-file copyright notices/licenses and top-level license.
[bluesky.git] / cleaner / azure.py
1 # Blue Sky: File Systems in the Cloud
2 #
3 # Copyright (C) 2011  The Regents of the University of California
4 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
5 #
6 # Redistribution and use in source and binary forms, with or without
7 # modification, are permitted provided that the following conditions
8 # are met:
9 # 1. Redistributions of source code must retain the above copyright
10 #    notice, this list of conditions and the following disclaimer.
11 # 2. Redistributions in binary form must reproduce the above copyright
12 #    notice, this list of conditions and the following disclaimer in the
13 #    documentation and/or other materials provided with the distribution.
14 # 3. Neither the name of the University nor the names of its contributors
15 #    may be used to endorse or promote products derived from this software
16 #    without specific prior written permission.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
19 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 # ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
22 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28 # SUCH DAMAGE.
29
30 """A simple Python library for accessing the Windows Azure blob service."""
31
32 import base64, hashlib, hmac, httplib, os, time, urllib
33 import xml.dom.minidom
34
35 # The version of the Azure API we implement; sent in the x-ms-version header.
36 API_VERSION = '2009-09-19'
37
38 def uri_decode(s):
39     return urllib.unquote_plus(s)
40
41 def uri_encode(s):
42     return urllib.quote_plus(s)
43
44 def xmlGetText(nodelist):
45     text = []
46     def walk(nodes):
47         for node in nodes:
48             if node.nodeType == node.TEXT_NODE:
49                 text.append(node.data)
50             else:
51                 walk(node.childNodes)
52     walk(nodelist)
53     return ''.join(text)
54
55 def xmlParse(s):
56     return xml.dom.minidom.parseString(s)
57
58 def buildQueryString(uri, params={}):
59     for (k, v) in params.items():
60         if v is None: continue
61         kv = '%s=%s' % (uri_encode(k), uri_encode(v))
62         if '?' not in uri:
63             uri += '?' + kv
64         else:
65             uri += '&' + kv
66     return uri
67
68 def add_auth_headers(headers, method, path, account, key):
69     header_order = ['Content-Encoding', 'Content-Language', 'Content-Length',
70                     'Content-MD5', 'Content-Type', 'Date', 'If-Modified-Since',
71                     'If-Match', 'If-None-Match', 'If-Unmodified-Since',
72                     'Range']
73
74     if not headers.has_key('Date'):
75         headers['Date'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT",
76                                         time.gmtime())
77     if not headers.has_key('x-ms-version'):
78         headers['x-ms-version'] = API_VERSION
79
80     StringToSign = method + "\n"
81     for h in header_order:
82         if h in headers:
83             StringToSign += headers[h] + "\n"
84         else:
85             StringToSign += "\n"
86
87     # Add Canonicalized Headers
88     canonized = []
89     for (k, v) in headers.items():
90         k = k.lower()
91         if k.startswith('x-ms-'):
92             canonized.append((k, v))
93     canonized.sort()
94     for (k, v) in canonized:
95         StringToSign += "%s:%s\n" % (k, v)
96
97     resource = "/" + account
98     if '?' not in path:
99         resource += path
100     else:
101         (path, params) = path.split('?', 1)
102         params = [p.split('=') for p in params.split("&")]
103         params = dict((k.lower(), uri_decode(v)) for (k, v) in params)
104         resource += path
105         for k in sorted(params):
106             resource += "\n%s:%s" % (k, params[k])
107     StringToSign += resource
108
109     h = hmac.new(key, digestmod=hashlib.sha256)
110     h.update(StringToSign)
111
112     signature = base64.b64encode(h.digest())
113     headers['Authorization'] = "SharedKey %s:%s" % (account, signature)
114
115 class AzureError(RuntimeError):
116     def __init__(self, response):
117         self.response = response
118         self.details = response.read()
119
120 class AzureConnection:
121     def __init__(self, account=None, key=None):
122         if account is None:
123             account = os.environ['AZURE_ACCOUNT_NAME']
124         self.account = account
125         self.host = account + ".blob.core.windows.net"
126         #self.conn = httplib.HTTPConnection(self.host)
127
128         if key is None:
129             key = os.environ['AZURE_SECRET_KEY']
130         self.key = base64.b64decode(key)
131
132     def _make_request(self, path, method='GET', body="", headers={}):
133         headers = headers.copy()
134         headers['Content-Length'] = str(len(body))
135         if len(body) > 0:
136             headers['Content-MD5'] \
137                 = base64.b64encode(hashlib.md5(body).digest())
138         add_auth_headers(headers, method, path, self.account, self.key)
139
140         conn = httplib.HTTPConnection(self.host)
141         conn.request(method, path, body, headers)
142         response = conn.getresponse()
143         if response.status // 100 != 2:
144             raise AzureError(response)
145         return response
146         #print "Response:", response.status
147         #print "Headers:", response.getheaders()
148         #body = response.read()
149
150     def list(self, container, prefix=''):
151         marker = None
152         while True:
153             path = '/' + container + '?restype=container&comp=list'
154             path = buildQueryString(path, {'prefix': prefix, 'marker': marker})
155             r = self._make_request(path)
156             xml = xmlParse(r.read())
157
158             blobs = xml.getElementsByTagName('Blob')
159             for b in blobs:
160                 yield xmlGetText(b.getElementsByTagName('Name'))
161
162             marker = xmlGetText(xml.getElementsByTagName('NextMarker'))
163             if marker == "":
164                 return
165
166     def get(self, container, key):
167         path = "/%s/%s" % (container, key)
168         r = self._make_request(path)
169         return r.read()
170
171     def put(self, container, key, value):
172         path = "/%s/%s" % (container, key)
173         r = self._make_request(path, method='PUT', body=value,
174                                headers={'x-ms-blob-type': 'BlockBlob'})
175
176     def delete(self, container, key):
177         path = "/%s/%s" % (container, key)
178         r = self._make_request(path, method='DELETE')
179
180 def parallel_delete(container, keys):
181     import Queue
182     from threading import Lock, Thread
183
184     keys = list(iter(keys))
185
186     q = Queue.Queue(16384)
187     l = Lock()
188
189     def deletion_task():
190         conn = AzureConnection()
191         while True:
192             k = q.get()
193             l.acquire()
194             print k
195             l.release()
196             conn.delete(container, k)
197             q.task_done()
198
199     for i in range(128):
200         t = Thread(target=deletion_task)
201         t.setDaemon(True)
202         t.start()
203
204     for k in keys:
205         q.put(k)
206     q.join()
207
208 if __name__ == '__main__':
209     container = 'bluesky'
210     conn = AzureConnection()
211
212     conn.put(container, "testkey", "A" * 40)
213     print "Fetch result:", conn.get(container, "testkey")
214
215     parallel_delete(container, conn.list(container))