Add proper per-file copyright notices/licenses and top-level license.
[bluesky.git] / bluesky / store-azure.c
1 /* Blue Sky: File Systems in the Cloud
2  *
3  * Copyright (C) 2009  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  * 3. Neither the name of the University nor the names of its contributors
15  *    may be used to endorse or promote products derived from this software
16  *    without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
19  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
22  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28  * SUCH DAMAGE.
29  */
30
31 #include <stdint.h>
32 #include <stdlib.h>
33 #include <glib.h>
34 #include <string.h>
35 #include <gcrypt.h>
36 #include <curl/curl.h>
37
38 #include "bluesky-private.h"
39 #include "libs3.h"
40
41 #define AZURE_API_VERSION "2009-09-19"
42
43 /* Fixed headers that are used in calculating the request signature for Azure,
44  * in the order that they are included. */
45 static const char *signature_headers[] = {
46     "Content-Encoding", "Content-Language", "Content-Length", "Content-MD5",
47     "Content-Type", "Date", "If-Modified-Since", "If-Match", "If-None-Match",
48     "If-Unmodified-Since", "Range", NULL
49 };
50
51 /* Prototype Windows Azure backend for BlueSky.  This is intended to be
52  * minimally functional, but could use additional work for production use. */
53
54 #define MAX_IDLE_CONNECTIONS 8
55
56 typedef struct {
57     GThreadPool *thread_pool;
58     char *account, *container;
59     uint8_t *key;
60     size_t key_len;
61
62     /* A pool of available idle connections that could be used. */
63     GQueue *curl_pool;
64     GMutex *curl_pool_lock;
65 } AzureStore;
66
67 static CURL *get_connection(AzureStore *store)
68 {
69     CURL *curl = NULL;
70
71     g_mutex_lock(store->curl_pool_lock);
72     if (!g_queue_is_empty(store->curl_pool)) {
73         curl = (CURL *)(g_queue_pop_head(store->curl_pool));
74     }
75     g_mutex_unlock(store->curl_pool_lock);
76
77     if (curl == NULL)
78         curl = curl_easy_init();
79
80     return curl;
81 }
82
83 static void put_connection(AzureStore *store, CURL *curl)
84 {
85     g_mutex_lock(store->curl_pool_lock);
86     g_queue_push_head(store->curl_pool, curl);
87     while (g_queue_get_length(store->curl_pool) > MAX_IDLE_CONNECTIONS) {
88         curl = (CURL *)(g_queue_pop_tail(store->curl_pool));
89         curl_easy_cleanup(curl);
90     }
91     g_mutex_unlock(store->curl_pool_lock);
92 }
93
94 static void get_extra_headers(gchar *key, gchar *value, GList **headers)
95 {
96     key = g_ascii_strdown(key, strlen(key));
97     if (strncmp(key, "x-ms-", strlen("x-ms-")) == 0) {
98         *headers = g_list_prepend(*headers,
99                                   g_strdup_printf("%s:%s\n", key, value));
100     }
101     g_free(key);
102 }
103
104 static void get_curl_headers(gchar *key, gchar *value,
105                              struct curl_slist **curl_headers)
106 {
107     char *line = g_strdup_printf("%s: %s", key, value);
108     *curl_headers = curl_slist_append(*curl_headers, line);
109     g_free(line);
110 }
111
112 struct curlbuf {
113     /* For reading */
114     const char *readbuf;
115     size_t readsize;
116 };
117
118 static size_t curl_readfunc(void *ptr, size_t size, size_t nmemb,
119                             void *userdata)
120 {
121     struct curlbuf *buf = (struct curlbuf *)userdata;
122
123     if (buf == NULL)
124         return 0;
125
126     size_t copied = size * nmemb;
127     if (copied > buf->readsize)
128         copied = buf->readsize;
129
130     memcpy(ptr, buf->readbuf, copied);
131     buf->readbuf += copied;
132     buf->readsize -= copied;
133
134     return copied;
135 }
136
137 static size_t curl_writefunc(void *ptr, size_t size, size_t nmemb,
138                              void *userdata)
139 {
140     GString *buf = (GString *)userdata;
141     if (buf != NULL)
142         g_string_append_len(buf, ptr, size * nmemb);
143     return size * nmemb;
144 }
145
146 /* Compute the signature for a request to Azure and add it to the headers. */
147 static void azure_compute_signature(AzureStore *store,
148                                     GHashTable *headers,
149                                     const char *method, const char *path)
150 {
151     if (g_hash_table_lookup(headers, "Date") == NULL) {
152         time_t t;
153         struct tm now;
154         char timebuf[4096];
155         time(&t);
156         gmtime_r(&t, &now);
157         strftime(timebuf, sizeof(timebuf), "%a, %d %b %Y %H:%M:%S GMT", &now);
158         g_hash_table_insert(headers, g_strdup("Date"), g_strdup(timebuf));
159     }
160
161     g_hash_table_insert(headers, g_strdup("x-ms-version"),
162                         g_strdup(AZURE_API_VERSION));
163
164     GString *to_sign = g_string_new("");
165     g_string_append_printf(to_sign, "%s\n", method);
166     for (const char **h = signature_headers; *h != NULL; h++) {
167         const char *val = g_hash_table_lookup(headers, *h);
168         g_string_append_printf(to_sign, "%s\n", val ? val : "");
169     }
170
171     GList *extra_headers = NULL;
172     g_hash_table_foreach(headers, (GHFunc)get_extra_headers, &extra_headers);
173     extra_headers = g_list_sort(extra_headers, (GCompareFunc)g_strcmp0);
174     while (extra_headers != NULL) {
175         g_string_append(to_sign, extra_headers->data);
176         g_free(extra_headers->data);
177         extra_headers = g_list_delete_link(extra_headers, extra_headers);
178     }
179
180     /* FIXME: Doesn't handle query parameters (after '?') */
181     g_string_append_printf(to_sign, "/%s/%s/%s",
182                            store->account, store->container, path);
183
184     /* Compute an HMAC-SHA-256 of the encoded parameters */
185     gcry_error_t status;
186     gcry_md_hd_t handle;
187     status = gcry_md_open(&handle, GCRY_MD_SHA256, GCRY_MD_FLAG_HMAC);
188     g_assert(status == 0);
189     status = gcry_md_setkey(handle, store->key, store->key_len);
190     g_assert(status == 0);
191     gcry_md_write(handle, to_sign->str, to_sign->len);
192     unsigned char *digest = gcry_md_read(handle, GCRY_MD_SHA256);
193     gchar *signature = g_base64_encode(digest,
194                                        gcry_md_get_algo_dlen(GCRY_MD_SHA256));
195     g_hash_table_insert(headers, g_strdup("Authorization"),
196                         g_strdup_printf("SharedKey %s:%s",
197                                         store->account, signature));
198     g_free(signature);
199     gcry_md_close(handle);
200     g_string_free(to_sign, TRUE);
201 }
202
203 /* Submit an HTTP request using CURL.  Takes as input the Azure storage backend
204  * we are acting for, as well as the method (GET, PUT, etc.), HTTP path, other
205  * HTTP headers, and an optional body.  If body is not NULL, an empty body is
206  * sent.  This will compute an Azure authentication signature before sending
207  * the request. */
208 static BlueSkyRCStr *submit_request(AzureStore *store,
209                                     CURL *curl,
210                                     const char *method,
211                                     const char *path,
212                                     GHashTable *headers,
213                                     BlueSkyRCStr *body)
214 {
215     BlueSkyRCStr *result = NULL;
216
217     g_hash_table_insert(headers,
218                         g_strdup("Content-Length"),
219                         g_strdup_printf("%zd", body != NULL ? body->len : 0));
220
221     if (body != 0 && body->len > 0) {
222         GChecksum *csum = g_checksum_new(G_CHECKSUM_MD5);
223         g_checksum_update(csum, (uint8_t *)body->data, body->len);
224         uint8_t md5[16];
225         gsize md5_len = sizeof(md5);
226         g_checksum_get_digest(csum, md5, &md5_len);
227         g_hash_table_insert(headers,
228                             g_strdup("Content-MD5"),
229                             g_base64_encode(md5, md5_len));
230         g_checksum_free(csum);
231     }
232
233     azure_compute_signature(store, headers, method, path);
234
235     CURLcode status;
236
237 #define curl_easy_setopt_safe(opt, val)                                     \
238     if ((status = curl_easy_setopt(curl, (opt), (val))) != CURLE_OK) {      \
239         fprintf(stderr, "CURL error: %s!\n", curl_easy_strerror(status));   \
240         goto cleanup;                                                       \
241     }
242
243     curl_easy_setopt_safe(CURLOPT_NOSIGNAL, 1);
244     curl_easy_setopt_safe(CURLOPT_NOPROGRESS, 1);
245     curl_easy_setopt_safe(CURLOPT_NETRC, CURL_NETRC_IGNORED);
246     curl_easy_setopt_safe(CURLOPT_FOLLOWLOCATION, 1);
247     curl_easy_setopt_safe(CURLOPT_MAXREDIRS, 10);
248
249     curl_easy_setopt_safe(CURLOPT_HEADERFUNCTION, curl_writefunc);
250     curl_easy_setopt_safe(CURLOPT_HEADERDATA, NULL);
251
252     struct curlbuf readbuf;
253     if (body != NULL) {
254         readbuf.readbuf = body->data;
255         readbuf.readsize = body->len;
256     }
257     curl_easy_setopt_safe(CURLOPT_READFUNCTION, curl_readfunc);
258     curl_easy_setopt_safe(CURLOPT_READDATA, body ? &readbuf : NULL);
259
260     GString *result_body = g_string_new("");
261     curl_easy_setopt_safe(CURLOPT_WRITEFUNCTION, curl_writefunc);
262     curl_easy_setopt_safe(CURLOPT_WRITEDATA, result_body);
263
264     struct curl_slist *curl_headers = NULL;
265     g_hash_table_foreach(headers, (GHFunc)get_curl_headers, &curl_headers);
266     curl_easy_setopt_safe(CURLOPT_HTTPHEADER, curl_headers);
267
268     char *uri = g_strdup_printf("http://%s.blob.core.windows.net/%s/%s",
269                                 store->account, store->container, path);
270     printf("URI: %s\n", uri);
271     curl_easy_setopt_safe(CURLOPT_URL, uri);
272
273     if (strcmp(method, "GET") == 0) {
274         /* nothing special needed */
275     } else if (strcmp(method, "PUT") == 0) {
276         curl_easy_setopt_safe(CURLOPT_UPLOAD, 1);
277     } else if (strcmp(method, "DELETE") == 0) {
278         curl_easy_setopt_safe(CURLOPT_CUSTOMREQUEST, "DELETE");
279     }
280
281     status = curl_easy_perform(curl);
282     if (status != 0) {
283         fprintf(stderr, "CURL error: %s!\n", curl_easy_strerror(status));
284         goto cleanup;
285     }
286
287     long response_code = 0;
288     status = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response_code);
289     if (status != 0) {
290         fprintf(stderr, "CURL error: %s!\n", curl_easy_strerror(status));
291         goto cleanup;
292     }
293
294     if (response_code / 100 == 2) {
295         result = bluesky_string_new_from_gstring(result_body);
296         result_body = NULL;
297     } else {
298         fprintf(stderr, "HTTP response code: %ld!\n", response_code);
299         goto cleanup;
300     }
301
302 cleanup:
303     if (result != NULL && result_body != NULL)
304         g_string_free(result_body, TRUE);
305     curl_easy_reset(curl);
306     curl_slist_free_all(curl_headers);
307     g_free(uri);
308
309     return result;
310 }
311
312 static void azurestore_task(gpointer a, gpointer s)
313 {
314     BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a;
315     AzureStore *store = (AzureStore *)s;
316
317     async->status = ASYNC_RUNNING;
318     async->exec_time = bluesky_now_hires();
319
320     GHashTable *headers = g_hash_table_new_full(g_str_hash, g_str_equal,
321                                                 g_free, g_free);
322
323     BlueSkyRCStr *result = NULL;
324     CURL *curl = get_connection(store);
325
326     if (async->op == STORE_OP_GET) {
327         /* FIXME: We ought to check that the response returned the requested
328          * byte range. */
329         if (async->start != 0 && async->len != 0) {
330             g_hash_table_insert(headers,
331                                 g_strdup("Range"),
332                                 g_strdup_printf("bytes=%zd-%zd", async->start,
333                                                 async->start + async->len));
334             async->range_done = TRUE;
335         } else if (async->start != 0) {
336             g_hash_table_insert(headers,
337                                 g_strdup("Range"),
338                                 g_strdup_printf("bytes=%zd-", async->start));
339             async->range_done = TRUE;
340         }
341         result = submit_request(store, curl, "GET", async->key, headers, NULL);
342         if (result != NULL) {
343             async->data = result;
344             async->result = 0;
345         }
346     } else if (async->op == STORE_OP_PUT) {
347         g_hash_table_insert(headers,
348                             g_strdup("x-ms-blob-type"),
349                             g_strdup("BlockBlob"));
350         g_hash_table_insert(headers,
351                             g_strdup("Transfer-Encoding"),
352                             g_strdup(""));
353         result = submit_request(store, curl, "PUT", async->key,
354                                 headers, async->data);
355         if (result != NULL) {
356             async->result = 0;
357         }
358         bluesky_string_unref(result);
359     }
360
361     bluesky_store_async_mark_complete(async);
362     bluesky_store_async_unref(async);
363     g_hash_table_unref(headers);
364     put_connection(store, curl);
365 }
366
367 static gpointer azurestore_new(const gchar *path)
368 {
369     AzureStore *store = g_new(AzureStore, 1);
370     store->thread_pool = g_thread_pool_new(azurestore_task, store, -1, FALSE,
371                                            NULL);
372     if (path == NULL || strlen(path) == 0)
373         store->container = g_strdup("bluesky");
374     else
375         store->container = g_strdup(path);
376
377     store->account = g_strdup(getenv("AZURE_ACCOUNT_NAME"));
378
379     const char *key = getenv("AZURE_SECRET_KEY");
380     store->key = g_base64_decode(key, &store->key_len);
381
382     g_print("Initializing Azure with account %s, container %s\n",
383             store->account, store->container);
384
385     store->curl_pool = g_queue_new();
386     store->curl_pool_lock = g_mutex_new();
387
388     return store;
389 }
390
391 static void azurestore_destroy(gpointer store)
392 {
393     /* TODO: Clean up resources */
394 }
395
396 static void azurestore_submit(gpointer s, BlueSkyStoreAsync *async)
397 {
398     AzureStore *store = (AzureStore *)s;
399     g_return_if_fail(async->status == ASYNC_NEW);
400     g_return_if_fail(async->op != STORE_OP_NONE);
401
402     switch (async->op) {
403     case STORE_OP_GET:
404     case STORE_OP_PUT:
405         async->status = ASYNC_PENDING;
406         bluesky_store_async_ref(async);
407         g_thread_pool_push(store->thread_pool, async, NULL);
408         break;
409
410     default:
411         g_warning("Uknown operation type for AzureStore: %d\n", async->op);
412         bluesky_store_async_mark_complete(async);
413         break;
414     }
415 }
416
417 static void azurestore_cleanup(gpointer store, BlueSkyStoreAsync *async)
418 {
419 }
420
421 static char *azurestore_lookup_last(gpointer s, const char *prefix)
422 {
423     return NULL;
424 }
425
426 static BlueSkyStoreImplementation store_impl = {
427     .create = azurestore_new,
428     .destroy = azurestore_destroy,
429     .submit = azurestore_submit,
430     .cleanup = azurestore_cleanup,
431     .lookup_last = azurestore_lookup_last,
432 };
433
434 void bluesky_store_init_azure(void)
435 {
436     bluesky_store_register(&store_impl, "azure");
437 }