...
 
Commits (4)
......@@ -82,6 +82,18 @@ typedef struct bake_server_context_t
remi_provider_t remi_provider;
} bake_server_context_t;
struct pipeline_ult_arg {
margo_instance_id mid; // margo instance
void * local_buf_ptr;
unsigned long local_buf_size;
margo_bulk_pool_t buf_pool; // pool of buffers
hg_addr_t remote_addr; // remote address
hg_bulk_t remote_bulk; // remote bulk handle for transfers
size_t remote_offset; // remote offset at which to take the data
int ret; // return value of the xfer_ult function
};
typedef struct xfer_args {
margo_instance_id mid; // margo instance
size_t size; // size of data to transfer
......@@ -95,12 +107,38 @@ typedef struct xfer_args {
int32_t ret; // return value of the xfer_ult function
} xfer_args;
struct memcpy_arg
{
void* to;
void *from;
unsigned long size;
};
struct persist_arg
{
PMEMobjpool *pool;
const void* addr;
size_t len;
};
struct alloc_arg
{
PMEMobjpool *pool;
PMEMoid oid;
size_t size;
};
static void bake_server_finalize_cb(void *data);
static int bake_target_post_migration_callback(remi_fileset_t fileset, void* provider);
static void pipeline_ult(void* _args);
static void xfer_ult(xfer_args* args);
ABT_xstream del_xstreams[4];
ABT_sched del_scheds[4];
ABT_pool del_pool;
int bake_makepool(
const char *pool_name,
size_t pool_size,
......@@ -138,6 +176,7 @@ int bake_provider_register(
{
bake_server_context_t *tmp_svr_ctx;
int ret;
int i;
/* check if a provider with the same provider id already exists */
{
hg_id_t id;
......@@ -269,6 +308,18 @@ int bake_provider_register(
return BAKE_ERR_REMI;
}
/* create a pool to delegate some operations to */
ret = ABT_pool_create_basic(ABT_POOL_FIFO_WAIT, ABT_POOL_ACCESS_MPMC, ABT_TRUE, &del_pool);
assert(ret == 0);
for(i=0; i<4; i++)
{
ret = ABT_sched_create_basic(ABT_SCHED_BASIC_WAIT, 1, &del_pool,
ABT_SCHED_CONFIG_NULL, &del_scheds[i]);
assert(ret == 0);
ret = ABT_xstream_create(del_scheds[i], &del_xstreams[i]);
assert(ret == 0);
}
/* install the bake server finalize callback */
margo_push_finalize_callback(mid, &bake_server_finalize_cb, tmp_svr_ctx);
......@@ -884,6 +935,23 @@ finish:
}
DEFINE_MARGO_RPC_HANDLER(bake_persist_ult)
static void alloc_tasklet(void *_arg)
{
struct alloc_arg *arg = _arg;
pmemobj_alloc(arg->pool, &arg->oid, arg->size, 0, NULL, NULL);
return;
}
static void persist_tasklet(void *_arg)
{
struct persist_arg *arg = _arg;
pmemobj_persist(arg->pool, arg->addr, arg->len);
return;
}
static void bake_create_write_persist_ult(hg_handle_t handle)
{
TIMERS_INITIALIZE("start","alloc","bulk_create","bulk_xfer","persist","respond");
......@@ -905,6 +973,7 @@ static void bake_create_write_persist_ult(hg_handle_t handle)
int ret;
pmemobj_region_id_t* prid;
ABT_rwlock lock = ABT_RWLOCK_NULL;
ABT_task tid;
memset(&out, 0, sizeof(out));
......@@ -952,6 +1021,7 @@ static void bake_create_write_persist_ult(hg_handle_t handle)
prid = (pmemobj_region_id_t*)out.rid.data;
#if 1
ret = pmemobj_alloc(entry->pmem_pool, &prid->oid,
content_size, 0, NULL, NULL);
if(ret != 0)
......@@ -960,6 +1030,18 @@ static void bake_create_write_persist_ult(hg_handle_t handle)
goto finish;
}
#else
struct alloc_arg a_arg;
a_arg.pool = entry->pmem_pool;
a_arg.size = content_size;
ret = ABT_task_create(del_pool, alloc_tasklet, &a_arg, &tid);
assert(ret == 0);
ret = ABT_task_join(tid);
assert(ret == 0);
prid->oid = a_arg.oid;
#endif
TIMERS_END_STEP(1);
/* find memory address for target object */
......@@ -992,8 +1074,11 @@ static void bake_create_write_persist_ult(hg_handle_t handle)
if(xfer_buf_size == 0
|| xfer_buf_count == 0
#if 0
|| xfer_buf_size > in.bulk_size) { // don't use an intermediate buffer
#else
) {
#endif
/* create bulk handle for local side of transfer */
hret = margo_bulk_create(mid, 1, (void**)(&memory), &in.bulk_size,
HG_BULK_WRITE_ONLY, &bulk_handle);
......@@ -1015,6 +1100,7 @@ static void bake_create_write_persist_ult(hg_handle_t handle)
} else {
#if 0
// (1) compute the maximum number of ULTs that can handle this transfer
// as well as the number of individual transfers needed given the buffer sizes
......@@ -1060,13 +1146,58 @@ static void bake_create_write_persist_ult(hg_handle_t handle)
// (3) join and free the ULTs
ABT_thread_join_many(num_threads, ults);
ABT_thread_free_many(num_threads, ults);
#else
/* experimental pipelining implementation */
int i=0;
int j=0;
ABT_thread tid_array[16]; /* TODO: dynamic, or reuse as completed */
struct pipeline_ult_arg arg_array[16];
unsigned long issued = 0;
while(issued < in.bulk_size)
{
assert(i < 16); /* TODO: dynamic, or reuse as completed */
arg_array[i].local_buf_ptr = memory + issued;
arg_array[i].local_buf_size = xfer_buf_size;
if(arg_array[i].local_buf_size > (in.bulk_size - issued))
arg_array[i].local_buf_size = in.bulk_size - issued;
arg_array[i].mid = mid;
arg_array[i].buf_pool = entry->xfer_bulk_pool;
arg_array[i].remote_addr = src_addr;
arg_array[i].remote_bulk = in.bulk_handle;
arg_array[i].remote_offset = issued;
arg_array[i].ret = 0;
/* TODO: use handler pool or a dedicated pool elsewhere? */
ABT_thread_create(handler_pool, pipeline_ult, &arg_array[i], ABT_THREAD_ATTR_NULL, &tid_array[i]);
issued += arg_array[i].local_buf_size;
i++;
}
while(j<i)
{
ABT_thread_join(tid_array[j]);
ABT_thread_free(&tid_array[j]);
j++;
}
#endif
}
TIMERS_END_STEP(3);
/* TODO: should this have an abt shim in case it blocks? */
pmemobj_persist(entry->pmem_pool, region, content_size);
struct persist_arg p_arg;
p_arg.pool = entry->pmem_pool;
p_arg.addr = region;
p_arg.len = content_size;
ret = ABT_task_create(del_pool, persist_tasklet, &p_arg, &tid);
assert(ret == 0);
ret = ABT_task_join(tid);
ret = ABT_task_free(&tid);
assert(ret == 0);
//pmemobj_persist(entry->pmem_pool, region, content_size);
out.ret = BAKE_SUCCESS;
......@@ -1901,6 +2032,62 @@ static int bake_target_post_migration_callback(remi_fileset_t fileset, void* uar
return 0;
}
static void memcpy_tasklet(void *_arg)
{
struct memcpy_arg *arg = _arg;
memcpy(arg->to, arg->from, arg->size);
return;
}
static void pipeline_ult(void* _arg)
{
struct pipeline_ult_arg *arg = _arg;
int ret;
void * local_buf_ptr;
size_t tmp_buf_size;
int tmp_count;
hg_bulk_t local_bulk = HG_BULK_NULL;
ABT_task tid;
ret = margo_bulk_pool_get(arg->buf_pool, &local_bulk);
assert(ret == 0);
ret = margo_bulk_access(local_bulk, 0,
arg->local_buf_size, HG_BULK_READWRITE, 1,
&local_buf_ptr, &tmp_buf_size, &tmp_count);
assert(ret == 0);
ret = margo_bulk_transfer(arg->mid, HG_BULK_PULL,
arg->remote_addr, arg->remote_bulk,
arg->remote_offset,
local_bulk, 0, arg->local_buf_size);
assert(ret == 0);
struct memcpy_arg m_arg;
m_arg.to = arg->local_buf_ptr;
m_arg.from = local_buf_ptr;
m_arg.size = arg->local_buf_size;
ret = ABT_task_create(del_pool, memcpy_tasklet, &m_arg, &tid);
assert(ret == 0);
ret = ABT_task_join(tid);
assert(ret == 0);
ret = ABT_task_free(&tid);
assert(ret == 0);
//memcpy(arg->local_buf_ptr, local_buf_ptr, arg->local_buf_size);
ret = margo_bulk_pool_release(arg->buf_pool, local_bulk);
assert(ret == 0);
return;
}
static void xfer_ult(xfer_args* args)
{
/*
......