1 /* Blue Sky: File Systems in the Cloud
3 * Copyright (C) 2009 The Regents of the University of California
4 * Written by Michael Vrable <mvrable@cs.ucsd.edu>
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * 3. Neither the name of the University nor the names of its contributors
15 * may be used to endorse or promote products derived from this software
16 * without specific prior written permission.
18 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
22 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
36 #include "bluesky-private.h"
39 /* Interface to Amazon S3 storage. */
42 GThreadPool *thread_pool;
43 S3BucketContext bucket;
44 uint8_t encryption_key[CRYPTO_KEY_SIZE];
48 enum { S3_GET, S3_PUT } op;
70 static S3Status s3store_get_handler(int bufferSize, const char *buffer,
73 struct get_info *info = (struct get_info *)callbackData;
74 g_string_append_len(info->buf, buffer, bufferSize);
78 static int s3store_put_handler(int bufferSize, char *buffer,
81 struct put_info *info = (struct put_info *)callbackData;
82 gint bytes = MIN(bufferSize, (int)(info->val->len - info->offset));
83 memcpy(buffer, (char *)info->val->data + info->offset, bytes);
84 info->offset += bytes;
88 static S3Status s3store_properties_callback(const S3ResponseProperties *properties,
94 static void s3store_response_callback(S3Status status,
95 const S3ErrorDetails *errorDetails,
98 struct get_info *info = (struct get_info *)callbackData;
104 if (errorDetails != NULL && errorDetails->message != NULL) {
105 g_print(" Error message: %s\n", errorDetails->message);
109 static void s3store_task(gpointer a, gpointer s)
111 BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a;
112 S3Store *store = (S3Store *)s;
114 async->status = ASYNC_RUNNING;
115 async->exec_time = bluesky_now_hires();
117 if (async->op == STORE_OP_GET) {
118 struct get_info info;
119 info.buf = g_string_new("");
122 struct S3GetObjectHandler handler;
123 handler.responseHandler.propertiesCallback = s3store_properties_callback;
124 handler.responseHandler.completeCallback = s3store_response_callback;
125 handler.getObjectDataCallback = s3store_get_handler;
127 S3_get_object(&store->bucket, async->key, NULL,
128 async->start, async->len, NULL, &handler, &info);
129 async->range_done = TRUE;
132 async->data = bluesky_string_new_from_gstring(info.buf);
135 g_string_free(info.buf, TRUE);
138 } else if (async->op == STORE_OP_PUT) {
139 struct put_info info;
141 info.val = async->data;
144 struct S3PutObjectHandler handler;
145 handler.responseHandler.propertiesCallback
146 = s3store_properties_callback;
147 handler.responseHandler.completeCallback = s3store_response_callback;
148 handler.putObjectDataCallback = s3store_put_handler;
150 S3_put_object(&store->bucket, async->key, async->data->len, NULL, NULL,
156 g_warning("Error completing S3 put operation; client must retry!");
160 bluesky_store_async_mark_complete(async);
161 bluesky_store_async_unref(async);
164 static S3Status s3store_list_handler(int isTruncated,
165 const char *nextMarker,
167 const S3ListBucketContent *contents,
168 int commonPrefixesCount,
169 const char **commonPrefixes,
172 struct list_info *info = (struct list_info *)callbackData;
173 if (contentsCount > 0) {
174 g_free(info->last_entry);
175 info->last_entry = g_strdup(contents[contentsCount - 1].key);
177 info->truncated = isTruncated;
181 static char *s3store_lookup_last(gpointer s, const char *prefix)
183 S3Store *store = (S3Store *)s;
184 struct list_info info = {0, NULL, FALSE};
186 struct S3ListBucketHandler handler;
187 handler.responseHandler.propertiesCallback
188 = s3store_properties_callback;
189 handler.responseHandler.completeCallback = s3store_response_callback;
190 handler.listBucketCallback = s3store_list_handler;
195 S3_list_bucket(&store->bucket, prefix, marker, NULL, 1024, NULL,
198 marker = g_strdup(info.last_entry);
199 g_print("Last key: %s\n", info.last_entry);
200 } while (info.truncated);
204 return info.last_entry;
207 static gpointer s3store_new(const gchar *path)
209 S3Store *store = g_new(S3Store, 1);
210 store->thread_pool = g_thread_pool_new(s3store_task, store, -1, FALSE,
212 if (path == NULL || strlen(path) == 0)
213 store->bucket.bucketName = "mvrable-bluesky";
215 store->bucket.bucketName = g_strdup(path);
216 store->bucket.protocol = S3ProtocolHTTP;
217 store->bucket.uriStyle = S3UriStyleVirtualHost;
218 store->bucket.accessKeyId = getenv("AWS_ACCESS_KEY_ID");
219 store->bucket.secretAccessKey = getenv("AWS_SECRET_ACCESS_KEY");
221 const char *key = getenv("BLUESKY_KEY");
223 g_error("Encryption key not defined; please set BLUESKY_KEY environment variable");
227 bluesky_crypt_hash_key(key, store->encryption_key);
229 g_print("Initializing S3 with bucket %s, access key %s, encryption key %s\n",
230 store->bucket.bucketName, store->bucket.accessKeyId, key);
235 static void s3store_destroy(gpointer store)
240 static void s3store_submit(gpointer s, BlueSkyStoreAsync *async)
242 S3Store *store = (S3Store *)s;
243 g_return_if_fail(async->status == ASYNC_NEW);
244 g_return_if_fail(async->op != STORE_OP_NONE);
249 async->status = ASYNC_PENDING;
250 bluesky_store_async_ref(async);
251 g_thread_pool_push(store->thread_pool, async, NULL);
255 g_warning("Uknown operation type for S3Store: %d\n", async->op);
256 bluesky_store_async_mark_complete(async);
261 static void s3store_cleanup(gpointer store, BlueSkyStoreAsync *async)
263 GString *buf = (GString *)async->store_private;
266 g_string_free(buf, TRUE);
267 async->store_private = NULL;
271 static BlueSkyStoreImplementation store_impl = {
272 .create = s3store_new,
273 .destroy = s3store_destroy,
274 .submit = s3store_submit,
275 .cleanup = s3store_cleanup,
276 .lookup_last = s3store_lookup_last,
279 void bluesky_store_init_s3(void)
281 S3_initialize(NULL, S3_INIT_ALL, NULL);
282 bluesky_store_register(&store_impl, "s3");