/* Interface to Amazon S3 storage. */
-/* Simple in-memory data store for test purposes. */
typedef struct {
+ GThreadPool *thread_pool;
S3BucketContext bucket;
+ uint8_t encryption_key[CRYPTO_KEY_SIZE];
} S3Store;
+typedef struct {
+ enum { S3_GET, S3_PUT } op;
+ gchar *key;
+ BlueSkyRCStr *data;
+} S3Op;
+
+static void s3store_task(gpointer s, gpointer o);
+
static gpointer s3store_new()
{
S3Store *store = g_new(S3Store, 1);
+ store->thread_pool = g_thread_pool_new(s3store_task, store, -1, FALSE,
+ NULL);
store->bucket.bucketName = "mvrable-bluesky";
store->bucket.protocol = S3ProtocolHTTP;
store->bucket.uriStyle = S3UriStylePath;
store->bucket.accessKeyId = getenv("AWS_ACCESS_KEY_ID");
store->bucket.secretAccessKey = getenv("AWS_SECRET_ACCESS_KEY");
- g_print("Initializing S3 with bucket %s, access key %s\n",
- store->bucket.bucketName, store->bucket.accessKeyId);
+ const char *key = getenv("BLUESKY_KEY");
+ if (key == NULL) {
+ g_error("Encryption key not defined; please set BLUESKY_KEY environment variable");
+ exit(1);
+ }
+
+ bluesky_crypt_hash_key(key, store->encryption_key);
+
+ g_print("Initializing S3 with bucket %s, access key %s, encryption key %s\n",
+ store->bucket.bucketName, store->bucket.accessKeyId, key);
return store;
}
}
struct get_info {
- gchar *buf;
- gint offset;
+ GString *buf;
};
static S3Status s3store_get_handler(int bufferSize, const char *buffer,
void *callbackData)
{
struct get_info *info = (struct get_info *)callbackData;
- gint bytes = MIN(bufferSize, (int)(BLUESKY_BLOCK_SIZE - info->offset));
- memcpy(info->buf + info->offset, buffer, bytes);
- info->offset += bytes;
+ g_string_append_len(info->buf, buffer, bufferSize);
return S3StatusOK;
}
const S3ErrorDetails *errorDetails,
void *callbackData)
{
- g_print("S3 operation complete, status=%s\n",
- S3_get_status_name(status));
+ g_print("S3 operation complete, status=%s, now=%ld\n",
+ S3_get_status_name(status), bluesky_now_hires());
if (errorDetails != NULL) {
g_print(" Error message: %s\n", errorDetails->message);
}
S3Store *store = (S3Store *)s;
struct get_info info;
- info.buf = (char *)g_malloc0(BLUESKY_BLOCK_SIZE);
- info.offset = 0;
+ info.buf = g_string_new("");
struct S3GetObjectHandler handler;
handler.responseHandler.propertiesCallback = s3store_properties_callback;
S3_get_object(&store->bucket, key, NULL, 0, 0, NULL,
&handler, &info);
- return bluesky_string_new(info.buf, BLUESKY_BLOCK_SIZE);
+ BlueSkyRCStr *raw, *decrypted;
+ raw = bluesky_string_new_from_gstring(info.buf);
+ decrypted = bluesky_crypt_decrypt(raw, store->encryption_key);
+ bluesky_string_unref(raw);
+ return decrypted;
}
static void s3store_put(gpointer s, const gchar *key, BlueSkyRCStr *val)
{
S3Store *store = (S3Store *)s;
- struct put_info info;
- info.val = val;
- info.offset = 0;
+ S3Op *op = g_new(S3Op, 1);
+ op->op = S3_PUT;
+ op->key = g_strdup(key);
+ bluesky_string_ref(val);
+ op->data = val;
- struct S3PutObjectHandler handler;
- handler.responseHandler.propertiesCallback = s3store_properties_callback;
- handler.responseHandler.completeCallback = s3store_response_callback;
- handler.putObjectDataCallback = s3store_put_handler;
+ g_thread_pool_push(store->thread_pool, op, NULL);
+}
- g_print("Starting store of %s to S3...\n", key);
- S3_put_object(&store->bucket, key, val->len, NULL, NULL,
- &handler, &info);
+static void s3store_task(gpointer o, gpointer s)
+{
+ S3Store *store = (S3Store *)s;
+ S3Op *op = (S3Op *)o;
+
+ g_print("Start task [key=%s]...\n", op->key);
+
+ if (op->op == S3_PUT) {
+ BlueSkyRCStr *encrypted = bluesky_crypt_encrypt(op->data,
+ store->encryption_key);
+
+ struct put_info info;
+ info.val = encrypted;
+ info.offset = 0;
+
+ struct S3PutObjectHandler handler;
+ handler.responseHandler.propertiesCallback
+ = s3store_properties_callback;
+ handler.responseHandler.completeCallback = s3store_response_callback;
+ handler.putObjectDataCallback = s3store_put_handler;
+
+ g_print("Starting store of %s to S3 at %ld...\n",
+ op->key, bluesky_now_hires());
+ S3_put_object(&store->bucket, op->key, encrypted->len, NULL, NULL,
+ &handler, &info);
+
+ bluesky_string_unref(encrypted);
+ }
+
+ bluesky_string_unref(op->data);
+ g_free(op->key);
+ g_free(op);
+
+ g_print("Finish task...\n");
}
static BlueSkyStoreImplementation store_impl = {