Commit d0fa8a14 authored by Philip Carns's avatar Philip Carns

expermental: just offload memcpy

parent 4f5f516f
......@@ -107,6 +107,13 @@ typedef struct xfer_args {
int32_t ret; // return value of the xfer_ult function
} xfer_args;
struct memcpy_arg
{
void* to;
void *from;
unsigned long size;
};
static void bake_server_finalize_cb(void *data);
static int bake_target_post_migration_callback(remi_fileset_t fileset, void* provider);
......@@ -114,6 +121,10 @@ static int bake_target_post_migration_callback(remi_fileset_t fileset, void* pro
static void pipeline_ult(void* _args);
static void xfer_ult(xfer_args* args);
ABT_xstream del_xstreams[4];
ABT_sched del_scheds[4];
ABT_pool del_pool;
int bake_makepool(
const char *pool_name,
size_t pool_size,
......@@ -151,6 +162,7 @@ int bake_provider_register(
{
bake_server_context_t *tmp_svr_ctx;
int ret;
int i;
/* check if a provider with the same provider id already exists */
{
hg_id_t id;
......@@ -282,6 +294,18 @@ int bake_provider_register(
return BAKE_ERR_REMI;
}
/* create a pool to delegate some operations to */
ret = ABT_pool_create_basic(ABT_POOL_FIFO_WAIT, ABT_POOL_ACCESS_MPMC, ABT_TRUE, &del_pool);
assert(ret == 0);
for(i=0; i<4; i++)
{
ret = ABT_sched_create_basic(ABT_SCHED_BASIC_WAIT, 1, &del_pool,
ABT_SCHED_CONFIG_NULL, &del_scheds[i]);
assert(ret == 0);
ret = ABT_xstream_create(del_scheds[i], &del_xstreams[i]);
assert(ret == 0);
}
/* install the bake server finalize callback */
margo_push_finalize_callback(mid, &bake_server_finalize_cb, tmp_svr_ctx);
......@@ -1953,6 +1977,15 @@ static int bake_target_post_migration_callback(remi_fileset_t fileset, void* uar
return 0;
}
static void memcpy_tasklet(void *_arg)
{
struct memcpy_arg *arg = _arg;
memcpy(arg->to, arg->from, arg->size);
return;
}
static void pipeline_ult(void* _arg)
{
struct pipeline_ult_arg *arg = _arg;
......@@ -1961,6 +1994,7 @@ static void pipeline_ult(void* _arg)
size_t tmp_buf_size;
int tmp_count;
hg_bulk_t local_bulk = HG_BULK_NULL;
ABT_task tid;
ret = margo_bulk_pool_get(arg->buf_pool, &local_bulk);
assert(ret == 0);
......@@ -1976,14 +2010,17 @@ static void pipeline_ult(void* _arg)
local_bulk, 0, arg->local_buf_size);
assert(ret == 0);
/* NOTE: this is the one line of code with more performance impact than
* any other under concurrent load. Need to refactor in a way that
* allocates intermediate buffer on same ES where this memcpy is
* executed to get better locality? The destination is typically going
* to a kernel space so we can't control that one easily, but we should
* be able to get the intermediate buffer right here.
*/
memcpy(arg->local_buf_ptr, local_buf_ptr, arg->local_buf_size);
struct memcpy_arg m_arg;
m_arg.to = arg->local_buf_ptr;
m_arg.from = local_buf_ptr;
m_arg.size = arg->local_buf_size;
ret = ABT_task_create(del_pool, memcpy_tasklet, &m_arg, &tid);
assert(ret == 0);
ret = ABT_task_join(tid);
//memcpy(arg->local_buf_ptr, local_buf_ptr, arg->local_buf_size);
ret = margo_bulk_pool_release(arg->buf_pool, local_bulk);
assert(ret == 0);
......@@ -1991,6 +2028,7 @@ static void pipeline_ult(void* _arg)
return;
}
static void xfer_ult(xfer_args* args)
{
/*
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment