Add a debug option to dump statistics counters every ten seconds
[bluesky.git] / bluesky / store-azure.c
1 /* Blue Sky: File Systems in the Cloud
2  *
3  * Copyright (C) 2009  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * TODO: Licensing
7  */
8
9 #include <stdint.h>
10 #include <stdlib.h>
11 #include <glib.h>
12 #include <string.h>
13 #include <gcrypt.h>
14 #include <curl/curl.h>
15
16 #include "bluesky-private.h"
17 #include "libs3.h"
18
19 #define AZURE_API_VERSION "2009-09-19"
20
21 /* Fixed headers that are used in calculating the request signature for Azure,
22  * in the order that they are included. */
23 static const char *signature_headers[] = {
24     "Content-Encoding", "Content-Language", "Content-Length", "Content-MD5",
25     "Content-Type", "Date", "If-Modified-Since", "If-Match", "If-None-Match",
26     "If-Unmodified-Since", "Range", NULL
27 };
28
29 /* Prototype Windows Azure backend for BlueSky.  This is intended to be
30  * minimally functional, but could use additional work for production use. */
31
32 #define MAX_IDLE_CONNECTIONS 8
33
34 typedef struct {
35     GThreadPool *thread_pool;
36     char *account, *container;
37     uint8_t *key;
38     size_t key_len;
39
40     /* A pool of available idle connections that could be used. */
41     GQueue *curl_pool;
42     GMutex *curl_pool_lock;
43 } AzureStore;
44
45 static CURL *get_connection(AzureStore *store)
46 {
47     CURL *curl = NULL;
48
49     g_mutex_lock(store->curl_pool_lock);
50     if (!g_queue_is_empty(store->curl_pool)) {
51         curl = (CURL *)(g_queue_pop_head(store->curl_pool));
52     }
53     g_mutex_unlock(store->curl_pool_lock);
54
55     if (curl == NULL)
56         curl = curl_easy_init();
57
58     return curl;
59 }
60
61 static void put_connection(AzureStore *store, CURL *curl)
62 {
63     g_mutex_lock(store->curl_pool_lock);
64     g_queue_push_head(store->curl_pool, curl);
65     while (g_queue_get_length(store->curl_pool) > MAX_IDLE_CONNECTIONS) {
66         curl = (CURL *)(g_queue_pop_tail(store->curl_pool));
67         curl_easy_cleanup(curl);
68     }
69     g_mutex_unlock(store->curl_pool_lock);
70 }
71
72 static void get_extra_headers(gchar *key, gchar *value, GList **headers)
73 {
74     key = g_ascii_strdown(key, strlen(key));
75     if (strncmp(key, "x-ms-", strlen("x-ms-")) == 0) {
76         *headers = g_list_prepend(*headers,
77                                   g_strdup_printf("%s:%s\n", key, value));
78     }
79     g_free(key);
80 }
81
82 static void get_curl_headers(gchar *key, gchar *value,
83                              struct curl_slist **curl_headers)
84 {
85     char *line = g_strdup_printf("%s: %s", key, value);
86     *curl_headers = curl_slist_append(*curl_headers, line);
87     g_free(line);
88 }
89
90 struct curlbuf {
91     /* For reading */
92     const char *readbuf;
93     size_t readsize;
94 };
95
96 static size_t curl_readfunc(void *ptr, size_t size, size_t nmemb,
97                             void *userdata)
98 {
99     struct curlbuf *buf = (struct curlbuf *)userdata;
100
101     if (buf == NULL)
102         return 0;
103
104     size_t copied = size * nmemb;
105     if (copied > buf->readsize)
106         copied = buf->readsize;
107
108     memcpy(ptr, buf->readbuf, copied);
109     buf->readbuf += copied;
110     buf->readsize -= copied;
111
112     return copied;
113 }
114
115 static size_t curl_writefunc(void *ptr, size_t size, size_t nmemb,
116                              void *userdata)
117 {
118     GString *buf = (GString *)userdata;
119     if (buf != NULL)
120         g_string_append_len(buf, ptr, size * nmemb);
121     return size * nmemb;
122 }
123
124 /* Compute the signature for a request to Azure and add it to the headers. */
125 static void azure_compute_signature(AzureStore *store,
126                                     GHashTable *headers,
127                                     const char *method, const char *path)
128 {
129     if (g_hash_table_lookup(headers, "Date") == NULL) {
130         time_t t;
131         struct tm now;
132         char timebuf[4096];
133         time(&t);
134         gmtime_r(&t, &now);
135         strftime(timebuf, sizeof(timebuf), "%a, %d %b %Y %H:%M:%S GMT", &now);
136         g_hash_table_insert(headers, g_strdup("Date"), g_strdup(timebuf));
137     }
138
139     g_hash_table_insert(headers, g_strdup("x-ms-version"),
140                         g_strdup(AZURE_API_VERSION));
141
142     GString *to_sign = g_string_new("");
143     g_string_append_printf(to_sign, "%s\n", method);
144     for (const char **h = signature_headers; *h != NULL; h++) {
145         const char *val = g_hash_table_lookup(headers, *h);
146         g_string_append_printf(to_sign, "%s\n", val ? val : "");
147     }
148
149     GList *extra_headers = NULL;
150     g_hash_table_foreach(headers, (GHFunc)get_extra_headers, &extra_headers);
151     extra_headers = g_list_sort(extra_headers, (GCompareFunc)g_strcmp0);
152     while (extra_headers != NULL) {
153         g_string_append(to_sign, extra_headers->data);
154         g_free(extra_headers->data);
155         extra_headers = g_list_delete_link(extra_headers, extra_headers);
156     }
157
158     /* FIXME: Doesn't handle query parameters (after '?') */
159     g_string_append_printf(to_sign, "/%s/%s/%s",
160                            store->account, store->container, path);
161
162     /* Compute an HMAC-SHA-256 of the encoded parameters */
163     gcry_error_t status;
164     gcry_md_hd_t handle;
165     status = gcry_md_open(&handle, GCRY_MD_SHA256, GCRY_MD_FLAG_HMAC);
166     g_assert(status == 0);
167     status = gcry_md_setkey(handle, store->key, store->key_len);
168     g_assert(status == 0);
169     gcry_md_write(handle, to_sign->str, to_sign->len);
170     unsigned char *digest = gcry_md_read(handle, GCRY_MD_SHA256);
171     gchar *signature = g_base64_encode(digest,
172                                        gcry_md_get_algo_dlen(GCRY_MD_SHA256));
173     g_hash_table_insert(headers, g_strdup("Authorization"),
174                         g_strdup_printf("SharedKey %s:%s",
175                                         store->account, signature));
176     g_free(signature);
177     gcry_md_close(handle);
178     g_string_free(to_sign, TRUE);
179 }
180
181 /* Submit an HTTP request using CURL.  Takes as input the Azure storage backend
182  * we are acting for, as well as the method (GET, PUT, etc.), HTTP path, other
183  * HTTP headers, and an optional body.  If body is not NULL, an empty body is
184  * sent.  This will compute an Azure authentication signature before sending
185  * the request. */
186 static BlueSkyRCStr *submit_request(AzureStore *store,
187                                     CURL *curl,
188                                     const char *method,
189                                     const char *path,
190                                     GHashTable *headers,
191                                     BlueSkyRCStr *body)
192 {
193     BlueSkyRCStr *result = NULL;
194
195     g_hash_table_insert(headers,
196                         g_strdup("Content-Length"),
197                         g_strdup_printf("%zd", body != NULL ? body->len : 0));
198
199     if (body != 0 && body->len > 0) {
200         GChecksum *csum = g_checksum_new(G_CHECKSUM_MD5);
201         g_checksum_update(csum, (uint8_t *)body->data, body->len);
202         uint8_t md5[16];
203         gsize md5_len = sizeof(md5);
204         g_checksum_get_digest(csum, md5, &md5_len);
205         g_hash_table_insert(headers,
206                             g_strdup("Content-MD5"),
207                             g_base64_encode(md5, md5_len));
208         g_checksum_free(csum);
209     }
210
211     azure_compute_signature(store, headers, method, path);
212
213     CURLcode status;
214
215 #define curl_easy_setopt_safe(opt, val)                                     \
216     if ((status = curl_easy_setopt(curl, (opt), (val))) != CURLE_OK) {      \
217         fprintf(stderr, "CURL error: %s!\n", curl_easy_strerror(status));   \
218         goto cleanup;                                                       \
219     }
220
221     curl_easy_setopt_safe(CURLOPT_NOSIGNAL, 1);
222     curl_easy_setopt_safe(CURLOPT_NOPROGRESS, 1);
223     curl_easy_setopt_safe(CURLOPT_NETRC, CURL_NETRC_IGNORED);
224     curl_easy_setopt_safe(CURLOPT_FOLLOWLOCATION, 1);
225     curl_easy_setopt_safe(CURLOPT_MAXREDIRS, 10);
226
227     curl_easy_setopt_safe(CURLOPT_HEADERFUNCTION, curl_writefunc);
228     curl_easy_setopt_safe(CURLOPT_HEADERDATA, NULL);
229
230     struct curlbuf readbuf;
231     if (body != NULL) {
232         readbuf.readbuf = body->data;
233         readbuf.readsize = body->len;
234     }
235     curl_easy_setopt_safe(CURLOPT_READFUNCTION, curl_readfunc);
236     curl_easy_setopt_safe(CURLOPT_READDATA, body ? &readbuf : NULL);
237
238     GString *result_body = g_string_new("");
239     curl_easy_setopt_safe(CURLOPT_WRITEFUNCTION, curl_writefunc);
240     curl_easy_setopt_safe(CURLOPT_WRITEDATA, result_body);
241
242     struct curl_slist *curl_headers = NULL;
243     g_hash_table_foreach(headers, (GHFunc)get_curl_headers, &curl_headers);
244     curl_easy_setopt_safe(CURLOPT_HTTPHEADER, curl_headers);
245
246     char *uri = g_strdup_printf("http://%s.blob.core.windows.net/%s/%s",
247                                 store->account, store->container, path);
248     printf("URI: %s\n", uri);
249     curl_easy_setopt_safe(CURLOPT_URL, uri);
250
251     if (strcmp(method, "GET") == 0) {
252         /* nothing special needed */
253     } else if (strcmp(method, "PUT") == 0) {
254         curl_easy_setopt_safe(CURLOPT_UPLOAD, 1);
255     } else if (strcmp(method, "DELETE") == 0) {
256         curl_easy_setopt_safe(CURLOPT_CUSTOMREQUEST, "DELETE");
257     }
258
259     status = curl_easy_perform(curl);
260     if (status != 0) {
261         fprintf(stderr, "CURL error: %s!\n", curl_easy_strerror(status));
262         goto cleanup;
263     }
264
265     long response_code = 0;
266     status = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response_code);
267     if (status != 0) {
268         fprintf(stderr, "CURL error: %s!\n", curl_easy_strerror(status));
269         goto cleanup;
270     }
271
272     if (response_code / 100 == 2) {
273         result = bluesky_string_new_from_gstring(result_body);
274         result_body = NULL;
275     } else {
276         fprintf(stderr, "HTTP response code: %ld!\n", response_code);
277         goto cleanup;
278     }
279
280 cleanup:
281     if (result != NULL && result_body != NULL)
282         g_string_free(result_body, TRUE);
283     curl_easy_reset(curl);
284     curl_slist_free_all(curl_headers);
285     g_free(uri);
286
287     return result;
288 }
289
290 static void azurestore_task(gpointer a, gpointer s)
291 {
292     BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a;
293     AzureStore *store = (AzureStore *)s;
294
295     async->status = ASYNC_RUNNING;
296     async->exec_time = bluesky_now_hires();
297
298     GHashTable *headers = g_hash_table_new_full(g_str_hash, g_str_equal,
299                                                 g_free, g_free);
300
301     BlueSkyRCStr *result = NULL;
302     CURL *curl = get_connection(store);
303
304     if (async->op == STORE_OP_GET) {
305         /* FIXME: We ought to check that the response returned the requested
306          * byte range. */
307         if (async->start != 0 && async->len != 0) {
308             g_hash_table_insert(headers,
309                                 g_strdup("Range"),
310                                 g_strdup_printf("bytes=%zd-%zd", async->start,
311                                                 async->start + async->len));
312             async->range_done = TRUE;
313         } else if (async->start != 0) {
314             g_hash_table_insert(headers,
315                                 g_strdup("Range"),
316                                 g_strdup_printf("bytes=%zd-", async->start));
317             async->range_done = TRUE;
318         }
319         result = submit_request(store, curl, "GET", async->key, headers, NULL);
320         if (result != NULL) {
321             async->data = result;
322             async->result = 0;
323         }
324     } else if (async->op == STORE_OP_PUT) {
325         g_hash_table_insert(headers,
326                             g_strdup("x-ms-blob-type"),
327                             g_strdup("BlockBlob"));
328         g_hash_table_insert(headers,
329                             g_strdup("Transfer-Encoding"),
330                             g_strdup(""));
331         result = submit_request(store, curl, "PUT", async->key,
332                                 headers, async->data);
333         if (result != NULL) {
334             async->result = 0;
335         }
336         bluesky_string_unref(result);
337     }
338
339     bluesky_store_async_mark_complete(async);
340     bluesky_store_async_unref(async);
341     g_hash_table_unref(headers);
342     put_connection(store, curl);
343 }
344
345 static gpointer azurestore_new(const gchar *path)
346 {
347     AzureStore *store = g_new(AzureStore, 1);
348     store->thread_pool = g_thread_pool_new(azurestore_task, store, -1, FALSE,
349                                            NULL);
350     if (path == NULL || strlen(path) == 0)
351         store->container = g_strdup("bluesky");
352     else
353         store->container = g_strdup(path);
354
355     store->account = g_strdup(getenv("AZURE_ACCOUNT_NAME"));
356
357     const char *key = getenv("AZURE_SECRET_KEY");
358     store->key = g_base64_decode(key, &store->key_len);
359
360     g_print("Initializing Azure with account %s, container %s\n",
361             store->account, store->container);
362
363     store->curl_pool = g_queue_new();
364     store->curl_pool_lock = g_mutex_new();
365
366     return store;
367 }
368
369 static void azurestore_destroy(gpointer store)
370 {
371     /* TODO: Clean up resources */
372 }
373
374 static void azurestore_submit(gpointer s, BlueSkyStoreAsync *async)
375 {
376     AzureStore *store = (AzureStore *)s;
377     g_return_if_fail(async->status == ASYNC_NEW);
378     g_return_if_fail(async->op != STORE_OP_NONE);
379
380     switch (async->op) {
381     case STORE_OP_GET:
382     case STORE_OP_PUT:
383         async->status = ASYNC_PENDING;
384         bluesky_store_async_ref(async);
385         g_thread_pool_push(store->thread_pool, async, NULL);
386         break;
387
388     default:
389         g_warning("Uknown operation type for AzureStore: %d\n", async->op);
390         bluesky_store_async_mark_complete(async);
391         break;
392     }
393 }
394
395 static void azurestore_cleanup(gpointer store, BlueSkyStoreAsync *async)
396 {
397 }
398
399 static char *azurestore_lookup_last(gpointer s, const char *prefix)
400 {
401     return NULL;
402 }
403
404 static BlueSkyStoreImplementation store_impl = {
405     .create = azurestore_new,
406     .destroy = azurestore_destroy,
407     .submit = azurestore_submit,
408     .cleanup = azurestore_cleanup,
409     .lookup_last = azurestore_lookup_last,
410 };
411
412 void bluesky_store_init_azure(void)
413 {
414     bluesky_store_register(&store_impl, "azure");
415 }