From 3b95f5ea43b13f2487598dff69a81bcbdc09bf3a Mon Sep 17 00:00:00 2001 From: Michael Vrable Date: Mon, 5 Apr 2010 17:08:53 -0700 Subject: [PATCH] Pull in updated tool for measuring S3 read performance. --- CMakeLists.txt | 1 + s3bench/CMakeLists.txt | 10 ++ s3bench/readbench.c | 277 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 288 insertions(+) create mode 100644 s3bench/CMakeLists.txt create mode 100644 s3bench/readbench.c diff --git a/CMakeLists.txt b/CMakeLists.txt index 50bf118..e774ebd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -12,3 +12,4 @@ add_subdirectory(kvstore) add_subdirectory(bluesky) add_subdirectory(nfs3) add_subdirectory(microbench) +add_subdirectory(s3bench) diff --git a/s3bench/CMakeLists.txt b/s3bench/CMakeLists.txt new file mode 100644 index 0000000..29aabe1 --- /dev/null +++ b/s3bench/CMakeLists.txt @@ -0,0 +1,10 @@ +include_directories("${LIBS3_BUILD_DIR}/include") +link_directories("${LIBS3_BUILD_DIR}/lib") + +add_executable(s3readbench readbench.c) + +set(CMAKE_C_FLAGS "-Wall -std=gnu99 ${CMAKE_C_FLAGS}") +set(INSTALL_RPATH_USE_LINK_PATH 1) + +target_link_libraries(s3readbench pthread s3) + diff --git a/s3bench/readbench.c b/s3bench/readbench.c new file mode 100644 index 0000000..277e1f3 --- /dev/null +++ b/s3bench/readbench.c @@ -0,0 +1,277 @@ +/* Simple benchmark for Amazon S3: measures download speeds for + * differently-sized objects and with a variable number of parallel + * connections. */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "libs3.h" + +FILE *statsfile; + +S3BucketContext bucket; + +struct thread_state { + pthread_t thread; + int thread_num; + long long timestamp; + + // Time when first bytes of the response were received + long long first_byte_timestamp; + + // Statistics for computing mean and standard deviation + int n; + size_t bytes_sent; + double sum_x, sum_x2; + + double sum_f; +}; + +struct callback_state { + struct thread_state *ts; + size_t bytes_remaining; +}; + +#define MAX_THREADS 128 +struct thread_state threads[MAX_THREADS]; + +int experiment_threads, experiment_size, experiment_objects; + +pthread_mutex_t barrier_mutex; +pthread_cond_t barrier_cond; +int barrier_val; + +enum phase { LAUNCH, MEASURE, TERMINATE }; +volatile enum phase test_phase; + +void barrier_signal() +{ + pthread_mutex_lock(&barrier_mutex); + barrier_val--; + printf("Barrier: %d left\n", barrier_val); + if (barrier_val == 0) + pthread_cond_signal(&barrier_cond); + pthread_mutex_unlock(&barrier_mutex); +} + +long long get_ns() +{ + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + + return ts.tv_sec * 1000000000LL + ts.tv_nsec; +} + +static S3Status data_callback(int bufferSize, const char *buffer, + void *callbackData) +{ + struct callback_state *state = (struct callback_state *)callbackData; + state->bytes_remaining -= bufferSize; + if (state->ts->first_byte_timestamp == 0) + state->ts->first_byte_timestamp = get_ns(); + return S3StatusOK; +} + +static S3Status properties_callback(const S3ResponseProperties *properties, + void *callbackData) +{ + return S3StatusOK; +} + +static void complete_callback(S3Status status, + const S3ErrorDetails *errorDetails, + void *callbackData) +{ +} + +static void do_get(const char *key, size_t bytes, struct thread_state *ts) +{ + struct callback_state state; + struct S3GetObjectHandler handler; + + state.bytes_remaining = bytes; + state.ts = ts; + handler.responseHandler.propertiesCallback = properties_callback; + handler.responseHandler.completeCallback = complete_callback; + handler.getObjectDataCallback = data_callback; + + S3_get_object(&bucket, key, NULL, 0, 0, NULL, &handler, &state); +} + +void *benchmark_thread(void *arg) +{ + struct thread_state *ts = (struct thread_state *)arg; + char namebuf[64]; + int i = 0; + int stage = 0; + int measuring = 0; + + ts->n = 0; + ts->sum_x = ts->sum_x2 = ts->sum_f = 0.0; + ts->bytes_sent = 0; + + ts->timestamp = get_ns(); + while (test_phase != TERMINATE) { + int object = random() % experiment_objects; + sprintf(namebuf, "file-%d-%d", experiment_size, object); + ts->first_byte_timestamp = 0; + do_get(namebuf, experiment_size, ts); + long long timestamp = get_ns(); + long long elapsed = timestamp - ts->timestamp; + + printf("Elapsed[%d-%d]: %lld ns\n", ts->thread_num, i, elapsed); + printf(" first data after: %lld ns\n", + ts->first_byte_timestamp - ts->timestamp); + if (measuring && test_phase == MEASURE) { + double e = elapsed / 1e9; + double f = (ts->first_byte_timestamp - ts->timestamp) / 1e9; + ts->n++; + ts->sum_x += e; + ts->sum_x2 += e * e; + ts->sum_f += f; + ts->bytes_sent += experiment_size; + } + + i++; + if (stage == 0 && i > 2) { + barrier_signal(); + stage = 1; + } else if (stage == 1 && ts->n >= 2) { + barrier_signal(); + stage = 2; + } + + ts->timestamp = timestamp; + if (test_phase == MEASURE) + measuring = 1; + } + + return NULL; +} + +void launch_thread(int n) +{ + threads[n].thread_num = n; + if (pthread_create(&threads[n].thread, NULL, benchmark_thread, &threads[n]) != 0) { + fprintf(stderr, "Error launching thread!\n"); + exit(1); + } +} + +void wait_thread(int n) +{ + void *result; + pthread_join(threads[n].thread, &result); +} + +void launch_test(int thread_count) +{ + int i; + long long start_time = get_ns(); + + test_phase = LAUNCH; + barrier_val = thread_count; + assert(thread_count <= MAX_THREADS); + + printf("Launching...\n"); + + for (i = 0; i < thread_count; i++) + launch_thread(i); + + /* Wait until all threads are ready. */ + pthread_mutex_lock(&barrier_mutex); + while (barrier_val > 0) { + pthread_cond_wait(&barrier_cond, &barrier_mutex); + } + pthread_mutex_unlock(&barrier_mutex); + + printf("Measuring...\n"); + barrier_val = thread_count; + test_phase = MEASURE; + + /* Ensure all threads have measured some activity, then a bit more. */ + pthread_mutex_lock(&barrier_mutex); + while (barrier_val > 0) { + pthread_cond_wait(&barrier_cond, &barrier_mutex); + } + pthread_mutex_unlock(&barrier_mutex); + printf("Data in from all threads...\n"); + sleep(5); + + printf("Terminating...\n"); + test_phase = TERMINATE; + + for (i = 0; i < thread_count; i++) + wait_thread(i); + + int n = 0; + double sum_x = 0.0, sum_x2 = 0.0, sum_f = 0.0; + double bandwidth = 0.0; + for (i = 0; i < thread_count; i++) { + n += threads[i].n; + sum_x += threads[i].sum_x; + sum_x2 += threads[i].sum_x2; + sum_f += threads[i].sum_f; + bandwidth += threads[i].bytes_sent / threads[i].sum_x; + } + + double elapsed = (get_ns() - start_time) / 1e9; + printf("*** %d threads, %d byte objects\n", + experiment_threads, experiment_size); + printf("Elapsed: %f s\n", elapsed); + printf("Data points: %d\n", n); + double mx = sum_x / n; + double sx = sqrt((sum_x2 - 2*sum_x*mx + n*mx*mx) / (n - 1)); + printf("Time: %f ± %f s\n", mx, sx); + printf("Latency to first byte: %f\n", sum_f / n); + printf("Bandwidth: %f B/s\n", bandwidth); + + fprintf(statsfile, "%d\t%d\t%f\t%d\t%f\t%f\t%f\n", + experiment_threads, experiment_size, elapsed, n, + mx, sx, bandwidth); + + printf("Finished.\n"); +} + +int main(int argc, char *argv[]) +{ + statsfile = fopen("readbench.data", "a"); + if (statsfile == NULL) { + perror("open stats file"); + return 1; + } + + S3_initialize(NULL, S3_INIT_ALL); + + bucket.bucketName = "mvrable-benchmark"; + bucket.protocol = S3ProtocolHTTP; + bucket.uriStyle = S3UriStylePath; + bucket.accessKeyId = getenv("AWS_ACCESS_KEY_ID"); + bucket.secretAccessKey = getenv("AWS_SECRET_ACCESS_KEY"); + + pthread_mutex_init(&barrier_mutex, NULL); + pthread_cond_init(&barrier_cond, NULL); + + if (argc < 4) { + fprintf(stderr, "Usage: %s \n", argv[0]); + return 1; + } + + experiment_threads = atoi(argv[1]); + experiment_size = atoi(argv[2]); + experiment_objects = atoi(argv[3]); + assert(experiment_objects > 0); + launch_test(experiment_threads); + + printf("Done.\n"); + fclose(statsfile); + + return 0; +} -- 2.20.1