Commit e9bd50d0 authored by Philip Carns's avatar Philip Carns
Browse files

fill in bulk xfers

parent c85a5cdc
......@@ -57,6 +57,10 @@ struct bw_worker_arg
{
double start_tm;
margo_instance_id mid;
ABT_mutex *cur_off_mutex;
size_t *cur_off;
hg_bulk_t *client_bulk_handle;
const hg_addr_t *target_addr;
};
static void bw_worker(void *_arg);
......@@ -423,19 +427,27 @@ static void usage(void)
static void bw_ult(hg_handle_t handle)
{
int i;
bw_rpc_in_t in;
ABT_thread *tid_array;
struct bw_worker_arg *arg_array;
int ret;
double start_time;
margo_instance_id mid;
const struct hg_info *hgi;
size_t cur_off = 0;
ABT_mutex cur_off_mutex;
ABT_mutex_create(&cur_off_mutex);
/* get handle info and margo instance */
hgi = margo_get_info(handle);
assert(hgi);
mid = margo_hg_info_get_instance(hgi);
assert(mid != MARGO_INSTANCE_NULL);
ret = margo_get_input(handle, &in);
assert(ret == HG_SUCCESS);
tid_array = malloc(g_opts.concurrency * sizeof(*tid_array));
assert(tid_array);
arg_array = malloc(g_opts.concurrency * sizeof(*arg_array));
......@@ -447,6 +459,11 @@ static void bw_ult(hg_handle_t handle)
{
arg_array[i].start_tm = start_time;
arg_array[i].mid = mid;
arg_array[i].cur_off = &cur_off;
arg_array[i].cur_off_mutex = &cur_off_mutex;
arg_array[i].client_bulk_handle = &in.bulk_handle;
arg_array[i].target_addr = &hgi->addr;
ret = ABT_thread_create(g_transfer_pool, bw_worker, &arg_array[i], ABT_THREAD_ATTR_NULL, &tid_array[i]);
assert(ret == 0);
}
......@@ -458,6 +475,7 @@ static void bw_ult(hg_handle_t handle)
}
margo_respond(handle, NULL);
margo_free_input(handle, &in);
margo_destroy(handle);
free(tid_array);
......@@ -465,6 +483,8 @@ static void bw_ult(hg_handle_t handle)
ABT_eventual_set(g_bw_done_eventual, NULL, 0);
ABT_mutex_free(&cur_off_mutex);
return;
}
DEFINE_MARGO_RPC_HANDLER(bw_ult)
......@@ -487,7 +507,7 @@ static int run_benchmark(hg_id_t id, ssg_member_id_t target,
ret = margo_bulk_create(mid, 1, &buffer, &g_buffer_size, HG_BULK_READWRITE, &in.bulk_handle);
assert(ret == 0);
ret = margo_forward(handle, NULL);
ret = margo_forward(handle, &in);
assert(ret == 0);
margo_bulk_free(in.bulk_handle);
......@@ -568,15 +588,30 @@ static void bw_worker(void *_arg)
{
struct bw_worker_arg *arg = _arg;
double now;
size_t my_off;
int ret;
printf("# DBG: worker started.\n");
now = ABT_get_wtime();
/* TODO: need two runs, one in each direction? */
while((now - arg->start_tm) < g_opts.duration_seconds)
{
/* TODO: run a bulk transfer */
margo_thread_sleep(arg->mid, 1000.0);
/* find the offset for this transfer and then increment for next
* one
*/
ABT_mutex_spinlock(*arg->cur_off_mutex);
my_off = *arg->cur_off;
(*arg->cur_off) += g_opts.xfer_size;
if(*arg->cur_off > g_buffer_size)
*arg->cur_off = 0;
ABT_mutex_unlock(*arg->cur_off_mutex);
ret = margo_bulk_transfer(arg->mid, HG_BULK_PULL,
*arg->target_addr, *arg->client_bulk_handle, my_off, g_bulk_handle, my_off, g_opts.xfer_size);
assert(ret == 0);
now = ABT_get_wtime();
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment