1 /* Blue Sky: File Systems in the Cloud
3 * Copyright (C) 2009 The Regents of the University of California
4 * Written by Michael Vrable <mvrable@cs.ucsd.edu>
14 #include "bluesky-private.h"
17 /* Interface to Amazon S3 storage. */
20 GThreadPool *thread_pool;
21 S3BucketContext bucket;
22 uint8_t encryption_key[CRYPTO_KEY_SIZE];
26 enum { S3_GET, S3_PUT } op;
48 static S3Status s3store_get_handler(int bufferSize, const char *buffer,
51 struct get_info *info = (struct get_info *)callbackData;
52 g_string_append_len(info->buf, buffer, bufferSize);
56 static int s3store_put_handler(int bufferSize, char *buffer,
59 struct put_info *info = (struct put_info *)callbackData;
60 gint bytes = MIN(bufferSize, (int)(info->val->len - info->offset));
61 memcpy(buffer, (char *)info->val->data + info->offset, bytes);
62 info->offset += bytes;
66 static S3Status s3store_properties_callback(const S3ResponseProperties *properties,
72 static void s3store_response_callback(S3Status status,
73 const S3ErrorDetails *errorDetails,
76 struct get_info *info = (struct get_info *)callbackData;
82 if (errorDetails != NULL && errorDetails->message != NULL) {
83 g_print(" Error message: %s\n", errorDetails->message);
87 static void s3store_task(gpointer a, gpointer s)
89 BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a;
90 S3Store *store = (S3Store *)s;
92 async->status = ASYNC_RUNNING;
93 async->exec_time = bluesky_now_hires();
95 if (async->op == STORE_OP_GET) {
97 info.buf = g_string_new("");
100 struct S3GetObjectHandler handler;
101 handler.responseHandler.propertiesCallback = s3store_properties_callback;
102 handler.responseHandler.completeCallback = s3store_response_callback;
103 handler.getObjectDataCallback = s3store_get_handler;
105 S3_get_object(&store->bucket, async->key, NULL, 0, 0, NULL,
109 async->data = bluesky_string_new_from_gstring(info.buf);
112 g_string_free(info.buf, TRUE);
115 } else if (async->op == STORE_OP_PUT) {
116 struct put_info info;
117 info.val = async->data;
120 struct S3PutObjectHandler handler;
121 handler.responseHandler.propertiesCallback
122 = s3store_properties_callback;
123 handler.responseHandler.completeCallback = s3store_response_callback;
124 handler.putObjectDataCallback = s3store_put_handler;
126 S3_put_object(&store->bucket, async->key, async->data->len, NULL, NULL,
132 bluesky_store_async_mark_complete(async);
133 bluesky_store_async_unref(async);
136 static S3Status s3store_list_handler(int isTruncated,
137 const char *nextMarker,
139 const S3ListBucketContent *contents,
140 int commonPrefixesCount,
141 const char **commonPrefixes,
144 struct list_info *info = (struct list_info *)callbackData;
145 if (contentsCount > 0) {
146 g_free(info->last_entry);
147 info->last_entry = g_strdup(contents[contentsCount - 1].key);
149 info->truncated = isTruncated;
153 static char *s3store_lookup_last(gpointer s, const char *prefix)
155 S3Store *store = (S3Store *)s;
156 struct list_info info = {0, NULL, FALSE};
158 struct S3ListBucketHandler handler;
159 handler.responseHandler.propertiesCallback
160 = s3store_properties_callback;
161 handler.responseHandler.completeCallback = s3store_response_callback;
162 handler.listBucketCallback = s3store_list_handler;
167 S3_list_bucket(&store->bucket, prefix, marker, NULL, 1024, NULL,
170 marker = g_strdup(info.last_entry);
171 g_print("Last key: %s\n", info.last_entry);
172 } while (info.truncated);
176 return info.last_entry;
179 static gpointer s3store_new(const gchar *path)
181 S3Store *store = g_new(S3Store, 1);
182 store->thread_pool = g_thread_pool_new(s3store_task, store, -1, FALSE,
184 if (path == NULL || strlen(path) == 0)
185 store->bucket.bucketName = "mvrable-bluesky";
187 store->bucket.bucketName = g_strdup(path);
188 store->bucket.protocol = S3ProtocolHTTP;
189 store->bucket.uriStyle = S3UriStylePath;
190 store->bucket.accessKeyId = getenv("AWS_ACCESS_KEY_ID");
191 store->bucket.secretAccessKey = getenv("AWS_SECRET_ACCESS_KEY");
193 const char *key = getenv("BLUESKY_KEY");
195 g_error("Encryption key not defined; please set BLUESKY_KEY environment variable");
199 bluesky_crypt_hash_key(key, store->encryption_key);
201 g_print("Initializing S3 with bucket %s, access key %s, encryption key %s\n",
202 store->bucket.bucketName, store->bucket.accessKeyId, key);
207 static void s3store_destroy(gpointer store)
212 static void s3store_submit(gpointer s, BlueSkyStoreAsync *async)
214 S3Store *store = (S3Store *)s;
215 g_return_if_fail(async->status == ASYNC_NEW);
216 g_return_if_fail(async->op != STORE_OP_NONE);
221 async->status = ASYNC_PENDING;
222 bluesky_store_async_ref(async);
223 g_thread_pool_push(store->thread_pool, async, NULL);
227 g_warning("Uknown operation type for S3Store: %d\n", async->op);
228 bluesky_store_async_mark_complete(async);
233 static void s3store_cleanup(gpointer store, BlueSkyStoreAsync *async)
235 GString *buf = (GString *)async->store_private;
238 g_string_free(buf, TRUE);
239 async->store_private = NULL;
243 static BlueSkyStoreImplementation store_impl = {
244 .create = s3store_new,
245 .destroy = s3store_destroy,
246 .submit = s3store_submit,
247 .cleanup = s3store_cleanup,
248 .lookup_last = s3store_lookup_last,
251 void bluesky_store_init_s3(void)
253 S3_initialize(NULL, S3_INIT_ALL);
254 bluesky_store_register(&store_impl, "s3");