1 /* Blue Sky: File Systems in the Cloud
3 * Copyright (C) 2009 The Regents of the University of California
4 * Written by Michael Vrable <mvrable@cs.ucsd.edu>
14 #include "bluesky-private.h"
17 /* Interface to Amazon S3 storage. */
20 GThreadPool *thread_pool;
21 S3BucketContext bucket;
22 uint8_t encryption_key[CRYPTO_KEY_SIZE];
26 enum { S3_GET, S3_PUT } op;
41 static S3Status s3store_get_handler(int bufferSize, const char *buffer,
44 struct get_info *info = (struct get_info *)callbackData;
45 g_string_append_len(info->buf, buffer, bufferSize);
49 static int s3store_put_handler(int bufferSize, char *buffer,
52 struct put_info *info = (struct put_info *)callbackData;
53 gint bytes = MIN(bufferSize, (int)(info->val->len - info->offset));
54 memcpy(buffer, (char *)info->val->data + info->offset, bytes);
55 info->offset += bytes;
59 static S3Status s3store_properties_callback(const S3ResponseProperties *properties,
62 g_print("(Properties callback)\n");
66 static void s3store_response_callback(S3Status status,
67 const S3ErrorDetails *errorDetails,
70 struct get_info *info = (struct get_info *)callbackData;
72 g_print("S3 operation complete, status=%s, now=%ld\n",
73 S3_get_status_name(status), bluesky_now_hires());
79 if (errorDetails != NULL) {
80 g_print(" Error message: %s\n", errorDetails->message);
84 static void s3store_task(gpointer a, gpointer s)
86 BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a;
87 S3Store *store = (S3Store *)s;
89 g_print("Start task [key=%s]...\n", async->key);
90 async->status = ASYNC_RUNNING;
92 if (async->op == STORE_OP_GET) {
94 info.buf = g_string_new("");
97 struct S3GetObjectHandler handler;
98 handler.responseHandler.propertiesCallback = s3store_properties_callback;
99 handler.responseHandler.completeCallback = s3store_response_callback;
100 handler.getObjectDataCallback = s3store_get_handler;
102 g_print("Starting fetch of %s from S3...\n", async->key);
103 S3_get_object(&store->bucket, async->key, NULL, 0, 0, NULL,
107 BlueSkyRCStr *raw, *decrypted;
108 raw = bluesky_string_new_from_gstring(info.buf);
109 decrypted = bluesky_crypt_decrypt(raw, store->encryption_key);
110 bluesky_string_unref(raw);
111 async->data = decrypted;
114 g_string_free(info.buf, TRUE);
117 } else if (async->op == STORE_OP_PUT) {
118 BlueSkyRCStr *encrypted = bluesky_crypt_encrypt(async->data,
119 store->encryption_key);
121 struct put_info info;
122 info.val = encrypted;
125 struct S3PutObjectHandler handler;
126 handler.responseHandler.propertiesCallback
127 = s3store_properties_callback;
128 handler.responseHandler.completeCallback = s3store_response_callback;
129 handler.putObjectDataCallback = s3store_put_handler;
131 g_print("Starting store of %s to S3 at %ld...\n",
132 async->key, bluesky_now_hires());
133 S3_put_object(&store->bucket, async->key, encrypted->len, NULL, NULL,
136 bluesky_string_unref(encrypted);
141 g_print("Finish task...\n");
142 bluesky_store_async_mark_complete(async);
143 bluesky_store_async_unref(async);
146 static gpointer s3store_new()
148 S3Store *store = g_new(S3Store, 1);
149 store->thread_pool = g_thread_pool_new(s3store_task, store, -1, FALSE,
151 store->bucket.bucketName = "mvrable-bluesky";
152 store->bucket.protocol = S3ProtocolHTTP;
153 store->bucket.uriStyle = S3UriStylePath;
154 store->bucket.accessKeyId = getenv("AWS_ACCESS_KEY_ID");
155 store->bucket.secretAccessKey = getenv("AWS_SECRET_ACCESS_KEY");
157 const char *key = getenv("BLUESKY_KEY");
159 g_error("Encryption key not defined; please set BLUESKY_KEY environment variable");
163 bluesky_crypt_hash_key(key, store->encryption_key);
165 g_print("Initializing S3 with bucket %s, access key %s, encryption key %s\n",
166 store->bucket.bucketName, store->bucket.accessKeyId, key);
171 static void s3store_destroy(gpointer store)
176 static void s3store_submit(gpointer s, BlueSkyStoreAsync *async)
178 S3Store *store = (S3Store *)s;
179 g_return_if_fail(async->status == ASYNC_NEW);
180 g_return_if_fail(async->op != STORE_OP_NONE);
185 async->status = ASYNC_PENDING;
186 bluesky_store_async_ref(async);
187 g_thread_pool_push(store->thread_pool, async, NULL);
191 g_warning("Uknown operation type for S3Store: %d\n", async->op);
192 bluesky_store_async_mark_complete(async);
197 static void s3store_cleanup(gpointer store, BlueSkyStoreAsync *async)
199 GString *buf = (GString *)async->store_private;
202 g_string_free(buf, TRUE);
203 async->store_private = NULL;
207 static BlueSkyStoreImplementation store_impl = {
208 .create = s3store_new,
209 .destroy = s3store_destroy,
210 .submit = s3store_submit,
211 .cleanup = s3store_cleanup,
214 void bluesky_store_init_s3(void)
216 S3_initialize(NULL, S3_INIT_ALL);
217 bluesky_store_register(&store_impl, "s3");