Commit bf6c4c54 authored by Matthieu Dorier's avatar Matthieu Dorier

Merge branch 'dev-eager-create-write-persist' into 'master'

eager path for create-write-persist

See merge request !6
parents 82afdaf9 003691ef
......@@ -29,6 +29,7 @@ struct bake_client
hg_id_t bake_write_id;
hg_id_t bake_persist_id;
hg_id_t bake_create_write_persist_id;
hg_id_t bake_eager_create_write_persist_id;
hg_id_t bake_get_size_id;
hg_id_t bake_get_data_id;
hg_id_t bake_read_id;
......@@ -59,20 +60,21 @@ static int bake_client_register(bake_client_t client, margo_instance_id mid)
if(flag == HG_TRUE) { /* RPCs already registered */
margo_registered_name(mid, "bake_probe_rpc", &client->bake_probe_id, &flag);
margo_registered_name(mid, "bake_create_rpc", &client->bake_create_id, &flag);
margo_registered_name(mid, "bake_write_rpc", &client->bake_write_id, &flag);
margo_registered_name(mid, "bake_eager_write_rpc", &client->bake_eager_write_id, &flag);
margo_registered_name(mid, "bake_eager_read_rpc", &client->bake_eager_read_id, &flag);
margo_registered_name(mid, "bake_persist_rpc", &client->bake_persist_id, &flag);
margo_registered_name(mid, "bake_create_write_persist_rpc", &client->bake_create_write_persist_id, &flag);
margo_registered_name(mid, "bake_get_size_rpc", &client->bake_get_size_id, &flag);
margo_registered_name(mid, "bake_get_data_rpc", &client->bake_get_data_id, &flag);
margo_registered_name(mid, "bake_read_rpc", &client->bake_read_id, &flag);
margo_registered_name(mid, "bake_noop_rpc", &client->bake_noop_id, &flag);
margo_registered_name(mid, "bake_remove_rpc", &client->bake_remove_id, &flag);
margo_registered_name(mid, "bake_migrate_region_rpc", &client->bake_migrate_region_id, &flag);
margo_registered_name(mid, "bake_migrate_target_rpc", &client->bake_migrate_target_id, &flag);
margo_registered_name(mid, "bake_probe_rpc", &client->bake_probe_id, &flag);
margo_registered_name(mid, "bake_create_rpc", &client->bake_create_id, &flag);
margo_registered_name(mid, "bake_write_rpc", &client->bake_write_id, &flag);
margo_registered_name(mid, "bake_eager_write_rpc", &client->bake_eager_write_id, &flag);
margo_registered_name(mid, "bake_eager_read_rpc", &client->bake_eager_read_id, &flag);
margo_registered_name(mid, "bake_persist_rpc", &client->bake_persist_id, &flag);
margo_registered_name(mid, "bake_create_write_persist_rpc", &client->bake_create_write_persist_id, &flag);
margo_registered_name(mid, "bake_eager_create_write_persist_rpc", &client->bake_eager_create_write_persist_id, &flag);
margo_registered_name(mid, "bake_get_size_rpc", &client->bake_get_size_id, &flag);
margo_registered_name(mid, "bake_get_data_rpc", &client->bake_get_data_id, &flag);
margo_registered_name(mid, "bake_read_rpc", &client->bake_read_id, &flag);
margo_registered_name(mid, "bake_noop_rpc", &client->bake_noop_id, &flag);
margo_registered_name(mid, "bake_remove_rpc", &client->bake_remove_id, &flag);
margo_registered_name(mid, "bake_migrate_region_rpc", &client->bake_migrate_region_id, &flag);
margo_registered_name(mid, "bake_migrate_target_rpc", &client->bake_migrate_target_id, &flag);
} else { /* RPCs not already registered */
......@@ -97,6 +99,9 @@ static int bake_client_register(bake_client_t client, margo_instance_id mid)
client->bake_create_write_persist_id =
MARGO_REGISTER(mid, "bake_create_write_persist_rpc",
bake_create_write_persist_in_t, bake_create_write_persist_out_t, NULL);
client->bake_eager_create_write_persist_id =
MARGO_REGISTER(mid, "bake_eager_create_write_persist_rpc",
bake_eager_create_write_persist_in_t, bake_eager_create_write_persist_out_t, NULL);
client->bake_get_size_id =
MARGO_REGISTER(mid, "bake_get_size_rpc",
bake_get_size_in_t, bake_get_size_out_t, NULL);
......@@ -516,6 +521,53 @@ int bake_persist(
return(ret);
}
static int bake_eager_create_write_persist(
bake_provider_handle_t provider,
bake_target_id_t bti,
void const *buf,
uint64_t buf_size,
bake_region_id_t *rid)
{
hg_return_t hret;
hg_handle_t handle;
bake_eager_create_write_persist_in_t in;
bake_eager_create_write_persist_out_t out;
int ret;
in.bti = bti;
in.buffer = (char*)buf;
in.size = buf_size;
hret = margo_create(provider->client->mid, provider->addr,
provider->client->bake_eager_create_write_persist_id, &handle);
if(hret != HG_SUCCESS)
{
return BAKE_ERR_MERCURY;
}
hret = margo_provider_forward(provider->provider_id, handle, &in);
if(hret != HG_SUCCESS)
{
margo_destroy(handle);
return BAKE_ERR_MERCURY;
}
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS)
{
margo_destroy(handle);
return BAKE_ERR_MERCURY;
}
ret = out.ret;
if(ret == 0)
*rid = out.rid;
margo_free_output(handle, &out);
margo_destroy(handle);
return(ret);
}
int bake_create_write_persist(
bake_provider_handle_t provider,
bake_target_id_t bti,
......@@ -529,7 +581,8 @@ int bake_create_write_persist(
bake_create_write_persist_out_t out;
int ret;
/* XXX eager path? */
if(buf_size <= provider->eager_limit)
return(bake_eager_create_write_persist(provider, bti, buf, buf_size, rid));
in.bti = bti;
in.bulk_offset = 0;
......
......@@ -41,7 +41,7 @@ typedef struct
{
bake_region_id_t rid;
uint64_t region_offset;
uint32_t size;
uint64_t size;
char * buffer;
} bake_eager_write_in_t;
static inline hg_return_t hg_proc_bake_eager_write_in_t(hg_proc_t proc, void *v_out_p);
......@@ -69,6 +69,18 @@ MERCURY_GEN_PROC(bake_create_write_persist_out_t,
((int32_t)(ret))\
((bake_region_id_t)(rid)))
/* BAKE eager create/write/persist */
typedef struct
{
bake_target_id_t bti;
uint64_t size;
char * buffer;
} bake_eager_create_write_persist_in_t;
static inline hg_return_t hg_proc_bake_eager_create_write_persist_in_t(hg_proc_t proc, void *v_out_p);
MERCURY_GEN_PROC(bake_eager_create_write_persist_out_t,
((int32_t)(ret))\
((bake_region_id_t)(rid)))
/* BAKE get size */
MERCURY_GEN_PROC(bake_get_size_in_t,
((bake_region_id_t)(rid)))
......@@ -177,7 +189,7 @@ static inline hg_return_t hg_proc_bake_eager_write_in_t(hg_proc_t proc, void *v_
hg_proc_bake_region_id_t(proc, &in->rid);
hg_proc_uint64_t(proc, &in->region_offset);
hg_proc_uint32_t(proc, &in->size);
hg_proc_uint64_t(proc, &in->size);
if(in->size)
{
buf = hg_proc_save_ptr(proc, in->size);
......@@ -191,6 +203,26 @@ static inline hg_return_t hg_proc_bake_eager_write_in_t(hg_proc_t proc, void *v_
return(HG_SUCCESS);
}
static inline hg_return_t hg_proc_bake_eager_create_write_persist_in_t(hg_proc_t proc, void *v_out_p)
{
/* TODO: error checking */
bake_eager_create_write_persist_in_t *in = v_out_p;
void *buf = NULL;
hg_proc_bake_target_id_t(proc, &in->bti);
hg_proc_uint64_t(proc, &in->size);
if(in->size)
{
buf = hg_proc_save_ptr(proc, in->size);
if(hg_proc_get_op(proc) == HG_ENCODE)
memcpy(buf, in->buffer, in->size);
if(hg_proc_get_op(proc) == HG_DECODE)
in->buffer = buf;
hg_proc_restore_ptr(proc, buf, in->size);
}
return(HG_SUCCESS);
}
static inline hg_return_t hg_proc_bake_eager_read_out_t(hg_proc_t proc, void *v_out_p)
{
......
......@@ -20,6 +20,7 @@ DECLARE_MARGO_RPC_HANDLER(bake_write_ult)
DECLARE_MARGO_RPC_HANDLER(bake_eager_write_ult)
DECLARE_MARGO_RPC_HANDLER(bake_persist_ult)
DECLARE_MARGO_RPC_HANDLER(bake_create_write_persist_ult)
DECLARE_MARGO_RPC_HANDLER(bake_eager_create_write_persist_ult)
DECLARE_MARGO_RPC_HANDLER(bake_get_size_ult)
DECLARE_MARGO_RPC_HANDLER(bake_get_data_ult)
DECLARE_MARGO_RPC_HANDLER(bake_read_ult)
......@@ -171,6 +172,10 @@ int bake_provider_register(
bake_create_write_persist_in_t, bake_create_write_persist_out_t,
bake_create_write_persist_ult, provider_id, abt_pool);
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_eager_create_write_persist_rpc",
bake_eager_create_write_persist_in_t, bake_eager_create_write_persist_out_t,
bake_eager_create_write_persist_ult, provider_id, abt_pool);
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_get_size_rpc",
bake_get_size_in_t, bake_get_size_out_t,
bake_get_size_ult, provider_id, abt_pool);
......@@ -739,8 +744,12 @@ static void bake_create_write_persist_ult(hg_handle_t handle)
out.ret = BAKE_ERR_UNKNOWN_TARGET;
goto finish;
}
#ifdef USE_SIZECHECK_HEADERS
size_t content_size = in.bulk_size + sizeof(uint64_t);
#else
size_t content_size = in.bulk_size;
#endif
prid = (pmemobj_region_id_t*)out.rid.data;
ret = pmemobj_alloc(entry->pmem_pool, &prid->oid,
......@@ -814,7 +823,97 @@ finish:
}
DEFINE_MARGO_RPC_HANDLER(bake_create_write_persist_ult)
/* service a remote RPC that retrieves the size of a BAKE region */
static void bake_eager_create_write_persist_ult(hg_handle_t handle)
{
bake_eager_create_write_persist_out_t out;
bake_eager_create_write_persist_in_t in;
in.buffer = NULL;
in.size = 0;
char* buffer = NULL;
const struct hg_info *hgi = NULL;
margo_instance_id mid;
hg_return_t hret;
int ret;
pmemobj_region_id_t* prid;
ABT_rwlock lock = ABT_RWLOCK_NULL;
memset(&out, 0, sizeof(out));
mid = margo_hg_handle_get_instance(handle);
assert(mid);
hgi = margo_get_info(handle);
bake_provider_t svr_ctx = margo_registered_data(mid, hgi->id);
if(!svr_ctx) {
out.ret = BAKE_ERR_UNKNOWN_PROVIDER;
goto finish;
}
/* TODO: this check needs to be somewhere else */
assert(sizeof(pmemobj_region_id_t) <= BAKE_REGION_ID_DATA_SIZE);
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS)
{
out.ret = BAKE_ERR_MERCURY;
goto finish;
}
/* lock provider */
lock = svr_ctx->lock;
ABT_rwlock_rdlock(lock);
/* find the pmem pool */
bake_pmem_entry_t* entry = find_pmem_entry(svr_ctx, in.bti);
if(entry == NULL) {
out.ret = BAKE_ERR_UNKNOWN_TARGET;
goto finish;
}
#ifdef USE_SIZECHECK_HEADERS
size_t content_size = in.size + sizeof(uint64_t);
#else
size_t content_size = in.size;
#endif
prid = (pmemobj_region_id_t*)out.rid.data;
ret = pmemobj_alloc(entry->pmem_pool, &prid->oid,
content_size, 0, NULL, NULL);
if(ret != 0)
{
out.ret = BAKE_ERR_PMEM;
goto finish;
}
/* find memory address for target object */
region_content_t* region = pmemobj_direct(prid->oid);
if(!region)
{
out.ret = BAKE_ERR_PMEM;
goto finish;
}
#ifdef USE_SIZECHECK_HEADERS
region->size = in.size;
#endif
buffer = region->data;
memcpy(buffer, in.buffer, in.size);
/* TODO: should this have an abt shim in case it blocks? */
pmemobj_persist(entry->pmem_pool, region, content_size);
out.ret = BAKE_SUCCESS;
finish:
if(lock != ABT_RWLOCK_NULL)
ABT_rwlock_unlock(lock);
margo_respond(handle, &out);
margo_free_input(handle, &in);
margo_destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(bake_eager_create_write_persist_ult)
/* service a remote RPC that retrieves the size of a BAKE region */
static void bake_get_size_ult(hg_handle_t handle)
{
bake_get_size_out_t out;
......
......@@ -254,7 +254,8 @@ static void proxy_write_ult(hg_handle_t handle)
assert(ret == 0);
/* persist the BAKE region */
ret = bake_persist(g_proxy_svr_ctx->svr_bph, g_proxy_svr_ctx->the_rid);
ret = bake_persist(g_proxy_svr_ctx->svr_bph, g_proxy_svr_ctx->the_rid,
0, in.bulk_size);
assert(ret == 0);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment