Commit ea51f251 authored by Philip Carns's avatar Philip Carns

experimenting with bulk pool per ES

parent 4f5f516f
...@@ -82,15 +82,19 @@ typedef struct bake_server_context_t ...@@ -82,15 +82,19 @@ typedef struct bake_server_context_t
remi_provider_t remi_provider; remi_provider_t remi_provider;
} bake_server_context_t; } bake_server_context_t;
struct xfer_slot;
struct pipeline_ult_arg { struct pipeline_ult_arg {
margo_instance_id mid; // margo instance
void * local_buf_ptr; void * local_buf_ptr;
unsigned long local_buf_size; unsigned long local_buf_size;
margo_bulk_pool_t buf_pool; // pool of buffers
hg_addr_t remote_addr; // remote address hg_addr_t remote_addr; // remote address
hg_bulk_t remote_bulk; // remote bulk handle for transfers hg_bulk_t remote_bulk; // remote bulk handle for transfers
size_t remote_offset; // remote offset at which to take the data size_t remote_offset; // remote offset at which to take the data
int ret; // return value of the xfer_ult function int ret; // return value of the xfer_ult function
struct xfer_slot* xfer_slot;
int last;
region_content_t* region;
size_t content_size;
PMEMobjpool* pmem_pool;
}; };
...@@ -430,6 +434,7 @@ int bake_provider_list_storage_targets( ...@@ -430,6 +434,7 @@ int bake_provider_list_storage_targets(
return BAKE_SUCCESS; return BAKE_SUCCESS;
} }
#if 0
int bake_provider_set_target_xfer_buffer( int bake_provider_set_target_xfer_buffer(
bake_provider_t provider, bake_provider_t provider,
bake_target_id_t target_id, bake_target_id_t target_id,
...@@ -464,6 +469,69 @@ finish: ...@@ -464,6 +469,69 @@ finish:
ABT_rwlock_unlock(provider->lock); ABT_rwlock_unlock(provider->lock);
return ret; return ret;
} }
#else
struct xfer_slot
{
margo_instance_id mid;
ABT_pool pool;
ABT_sched sched;
ABT_xstream xstream;
margo_bulk_pool_t bulk_pool;
int bulk_count;
unsigned long bulk_size;
};
#define XFER_SLOTS 4
struct xfer_slot xfer_slot_array[XFER_SLOTS];
int xfer_slot_array_idx = -1;
static void bulk_pool_maker_ult(void* _arg)
{
struct xfer_slot *slot = _arg;
int ret;
ret = margo_bulk_pool_create(slot->mid, slot->bulk_count, slot->bulk_size, HG_BULK_READWRITE, &slot->bulk_pool);
assert(ret == 0);
return;
}
int bake_provider_set_target_xfer_buffer(
bake_provider_t provider,
bake_target_id_t target_id,
size_t count,
size_t size)
{
int i;
int ret;
ABT_thread tid;
for(i=0; i<XFER_SLOTS; i++)
{
ret = ABT_pool_create_basic(ABT_POOL_FIFO_WAIT, ABT_POOL_ACCESS_MPMC, ABT_TRUE, &xfer_slot_array[i].pool);
assert(ret == 0);
ret = ABT_sched_create_basic(ABT_SCHED_BASIC_WAIT, 1, &xfer_slot_array[i].pool,
ABT_SCHED_CONFIG_NULL, &xfer_slot_array[i].sched);
assert(ret == 0);
ret = ABT_xstream_create(xfer_slot_array[i].sched, &xfer_slot_array[i].xstream);
assert(ret == 0);
xfer_slot_array[i].bulk_size = size;
xfer_slot_array[i].bulk_count = count;
xfer_slot_array[i].mid = provider->mid;
/* run one thread on the new pool and sequentially wait for it to
* complete; the only thing this thread will do is allocate a margo
* bulk pool local to that particular ES.
*/
ABT_thread_create(xfer_slot_array[i].pool, bulk_pool_maker_ult, &xfer_slot_array[i], ABT_THREAD_ATTR_NULL, &tid);
ABT_thread_join(tid);
}
xfer_slot_array_idx = 0;
return(0);
}
#endif
int bake_provider_set_target_xfer_concurrency( int bake_provider_set_target_xfer_concurrency(
bake_provider_t provider, bake_provider_t provider,
...@@ -1003,12 +1071,12 @@ static void bake_create_write_persist_ult(hg_handle_t handle) ...@@ -1003,12 +1071,12 @@ static void bake_create_write_persist_ult(hg_handle_t handle)
src_addr = hgi->addr; src_addr = hgi->addr;
} }
#if 0
if(xfer_buf_size == 0 if(xfer_buf_size == 0
|| xfer_buf_count == 0 || xfer_buf_count == 0
#if 0
|| xfer_buf_size > in.bulk_size) { // don't use an intermediate buffer || xfer_buf_size > in.bulk_size) { // don't use an intermediate buffer
#else #else
) { if(xfer_slot_array_idx < 0) { /* don't use intermediate buffer */
#endif #endif
/* create bulk handle for local side of transfer */ /* create bulk handle for local side of transfer */
hret = margo_bulk_create(mid, 1, (void**)(&memory), &in.bulk_size, hret = margo_bulk_create(mid, 1, (void**)(&memory), &in.bulk_size,
...@@ -1089,21 +1157,33 @@ static void bake_create_write_persist_ult(hg_handle_t handle) ...@@ -1089,21 +1157,33 @@ static void bake_create_write_persist_ult(hg_handle_t handle)
{ {
assert(i < 16); /* TODO: dynamic, or reuse as completed */ assert(i < 16); /* TODO: dynamic, or reuse as completed */
arg_array[i].local_buf_ptr = memory + issued; arg_array[i].local_buf_ptr = memory + issued;
arg_array[i].local_buf_size = xfer_buf_size; arg_array[i].local_buf_size = xfer_slot_array[0].bulk_size;
if(arg_array[i].local_buf_size > (in.bulk_size - issued)) if(arg_array[i].local_buf_size > (in.bulk_size - issued))
arg_array[i].local_buf_size = in.bulk_size - issued; arg_array[i].local_buf_size = in.bulk_size - issued;
arg_array[i].mid = mid;
arg_array[i].buf_pool = entry->xfer_bulk_pool;
arg_array[i].remote_addr = src_addr; arg_array[i].remote_addr = src_addr;
arg_array[i].remote_bulk = in.bulk_handle; arg_array[i].remote_bulk = in.bulk_handle;
arg_array[i].remote_offset = issued; arg_array[i].remote_offset = issued;
//fprintf(stderr, "FOO: using xfer slot %d\n", xfer_slot_array_idx);
arg_array[i].xfer_slot = &xfer_slot_array[xfer_slot_array_idx];
xfer_slot_array_idx++; /* TODO: protect with an abt mutex */
if(xfer_slot_array_idx >= XFER_SLOTS)
xfer_slot_array_idx = 0;
arg_array[i].ret = 0; arg_array[i].ret = 0;
/* TODO: use handler pool or a dedicated pool elsewhere? */ /* TODO: use handler pool or a dedicated pool elsewhere? */
ABT_thread_create(handler_pool, pipeline_ult, &arg_array[i], ABT_THREAD_ATTR_NULL, &tid_array[i]); ABT_thread_create(arg_array[i].xfer_slot->pool, pipeline_ult, &arg_array[i], ABT_THREAD_ATTR_NULL, &tid_array[i]);
issued += arg_array[i].local_buf_size; issued += arg_array[i].local_buf_size;
i++; i++;
if(issued < in.bulk_size)
arg_array[i].last = 0;
else
{
arg_array[i].last = 1;
arg_array[i].pmem_pool = entry->pmem_pool;
arg_array[i].region = region;
arg_array[i].content_size = content_size;
}
} }
while(j<i) while(j<i)
...@@ -1117,9 +1197,6 @@ static void bake_create_write_persist_ult(hg_handle_t handle) ...@@ -1117,9 +1197,6 @@ static void bake_create_write_persist_ult(hg_handle_t handle)
TIMERS_END_STEP(3); TIMERS_END_STEP(3);
/* TODO: should this have an abt shim in case it blocks? */
pmemobj_persist(entry->pmem_pool, region, content_size);
out.ret = BAKE_SUCCESS; out.ret = BAKE_SUCCESS;
TIMERS_END_STEP(4); TIMERS_END_STEP(4);
...@@ -1962,7 +2039,7 @@ static void pipeline_ult(void* _arg) ...@@ -1962,7 +2039,7 @@ static void pipeline_ult(void* _arg)
int tmp_count; int tmp_count;
hg_bulk_t local_bulk = HG_BULK_NULL; hg_bulk_t local_bulk = HG_BULK_NULL;
ret = margo_bulk_pool_get(arg->buf_pool, &local_bulk); ret = margo_bulk_pool_get(arg->xfer_slot->bulk_pool, &local_bulk);
assert(ret == 0); assert(ret == 0);
ret = margo_bulk_access(local_bulk, 0, ret = margo_bulk_access(local_bulk, 0,
...@@ -1970,7 +2047,7 @@ static void pipeline_ult(void* _arg) ...@@ -1970,7 +2047,7 @@ static void pipeline_ult(void* _arg)
&local_buf_ptr, &tmp_buf_size, &tmp_count); &local_buf_ptr, &tmp_buf_size, &tmp_count);
assert(ret == 0); assert(ret == 0);
ret = margo_bulk_transfer(arg->mid, HG_BULK_PULL, ret = margo_bulk_transfer(arg->xfer_slot->mid, HG_BULK_PULL,
arg->remote_addr, arg->remote_bulk, arg->remote_addr, arg->remote_bulk,
arg->remote_offset, arg->remote_offset,
local_bulk, 0, arg->local_buf_size); local_bulk, 0, arg->local_buf_size);
...@@ -1985,7 +2062,11 @@ static void pipeline_ult(void* _arg) ...@@ -1985,7 +2062,11 @@ static void pipeline_ult(void* _arg)
*/ */
memcpy(arg->local_buf_ptr, local_buf_ptr, arg->local_buf_size); memcpy(arg->local_buf_ptr, local_buf_ptr, arg->local_buf_size);
ret = margo_bulk_pool_release(arg->buf_pool, local_bulk); ret = margo_bulk_pool_release(arg->xfer_slot->bulk_pool, local_bulk);
if(arg->last)
pmemobj_persist(arg->pmem_pool, arg->region, arg->content_size);
assert(ret == 0); assert(ret == 0);
return; return;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment