diff --git a/perf-regression/bake-p2p-bw.c b/perf-regression/bake-p2p-bw.c index 82f502d49d94796af61499c13154b6d0a5dce7e4..a8af20e83232ffd654771a198e051d1ae210f82b 100644 --- a/perf-regression/bake-p2p-bw.c +++ b/perf-regression/bake-p2p-bw.c @@ -45,6 +45,8 @@ struct bench_worker_arg { bake_provider_handle_t bph; bake_target_id_t bti; + ABT_mutex *cur_off_mutex; + unsigned long *cur_off; }; /* defealt to 512 MiB total xfer unless specified otherwise */ @@ -358,12 +360,16 @@ static int run_benchmark(struct options *opts, bake_provider_handle_t bph, int i; ABT_thread *tid_array; struct bench_worker_arg *arg_array; + ABT_mutex cur_off_mutex; + unsigned long cur_off = 0; tid_array = malloc(g_opts.concurrency * sizeof(*tid_array)); assert(tid_array); arg_array = malloc(g_opts.concurrency * sizeof(*arg_array)); assert(arg_array); + ABT_mutex_create(&cur_off_mutex); + ret = ABT_xstream_self(&xstream); assert(ret == 0); @@ -374,6 +380,8 @@ static int run_benchmark(struct options *opts, bake_provider_handle_t bph, { arg_array[i].bph = bph; arg_array[i].bti = bti; + arg_array[i].cur_off_mutex = &cur_off_mutex; + arg_array[i].cur_off = &cur_off; ret = ABT_thread_create(pool, bench_worker, &arg_array[i], ABT_THREAD_ATTR_NULL, &tid_array[i]); assert(ret == 0); @@ -386,6 +394,7 @@ static int run_benchmark(struct options *opts, bake_provider_handle_t bph, } free(tid_array); + ABT_mutex_free(&cur_off_mutex); return(0); } @@ -395,14 +404,22 @@ static void bench_worker(void *_arg) struct bench_worker_arg* arg = _arg; bake_region_id_t rid; int ret; + char* this_buffer; - /* TODO: iterate through g_buffer to end and stop, transfering xfer_size - * at a time, coordinating with other workers on offset. Stop benchmark - * when the end is reached and do not reuse memory buffers. - */ - ret = bake_create_write_persist(arg->bph, arg->bti, g_buffer, - g_opts.xfer_size, &rid); - assert(ret == 0); + ABT_mutex_spinlock(*arg->cur_off_mutex); + while(*arg->cur_off < g_opts.total_mem_size) + { + this_buffer = (char*)((unsigned long)g_buffer + *arg->cur_off); + (*arg->cur_off) += g_opts.xfer_size; + ABT_mutex_unlock(*arg->cur_off_mutex); + + ret = bake_create_write_persist(arg->bph, arg->bti, this_buffer, + g_opts.xfer_size, &rid); + assert(ret == 0); + ABT_mutex_spinlock(*arg->cur_off_mutex); + } + + ABT_mutex_unlock(*arg->cur_off_mutex); return; }