diff --git a/src/bake-client.c b/src/bake-client.c index db4fa706970a0b46e78d5288bd28f1904e1aa397..17de8f4e91012a9658690e22497f7aebc57eda65 100644 --- a/src/bake-client.c +++ b/src/bake-client.c @@ -29,6 +29,7 @@ struct bake_client hg_id_t bake_write_id; hg_id_t bake_persist_id; hg_id_t bake_create_write_persist_id; + hg_id_t bake_eager_create_write_persist_id; hg_id_t bake_get_size_id; hg_id_t bake_get_data_id; hg_id_t bake_read_id; @@ -59,20 +60,21 @@ static int bake_client_register(bake_client_t client, margo_instance_id mid) if(flag == HG_TRUE) { /* RPCs already registered */ - margo_registered_name(mid, "bake_probe_rpc", &client->bake_probe_id, &flag); - margo_registered_name(mid, "bake_create_rpc", &client->bake_create_id, &flag); - margo_registered_name(mid, "bake_write_rpc", &client->bake_write_id, &flag); - margo_registered_name(mid, "bake_eager_write_rpc", &client->bake_eager_write_id, &flag); - margo_registered_name(mid, "bake_eager_read_rpc", &client->bake_eager_read_id, &flag); - margo_registered_name(mid, "bake_persist_rpc", &client->bake_persist_id, &flag); - margo_registered_name(mid, "bake_create_write_persist_rpc", &client->bake_create_write_persist_id, &flag); - margo_registered_name(mid, "bake_get_size_rpc", &client->bake_get_size_id, &flag); - margo_registered_name(mid, "bake_get_data_rpc", &client->bake_get_data_id, &flag); - margo_registered_name(mid, "bake_read_rpc", &client->bake_read_id, &flag); - margo_registered_name(mid, "bake_noop_rpc", &client->bake_noop_id, &flag); - margo_registered_name(mid, "bake_remove_rpc", &client->bake_remove_id, &flag); - margo_registered_name(mid, "bake_migrate_region_rpc", &client->bake_migrate_region_id, &flag); - margo_registered_name(mid, "bake_migrate_target_rpc", &client->bake_migrate_target_id, &flag); + margo_registered_name(mid, "bake_probe_rpc", &client->bake_probe_id, &flag); + margo_registered_name(mid, "bake_create_rpc", &client->bake_create_id, &flag); + margo_registered_name(mid, "bake_write_rpc", &client->bake_write_id, &flag); + margo_registered_name(mid, "bake_eager_write_rpc", &client->bake_eager_write_id, &flag); + margo_registered_name(mid, "bake_eager_read_rpc", &client->bake_eager_read_id, &flag); + margo_registered_name(mid, "bake_persist_rpc", &client->bake_persist_id, &flag); + margo_registered_name(mid, "bake_create_write_persist_rpc", &client->bake_create_write_persist_id, &flag); + margo_registered_name(mid, "bake_eager_create_write_persist_rpc", &client->bake_eager_create_write_persist_id, &flag); + margo_registered_name(mid, "bake_get_size_rpc", &client->bake_get_size_id, &flag); + margo_registered_name(mid, "bake_get_data_rpc", &client->bake_get_data_id, &flag); + margo_registered_name(mid, "bake_read_rpc", &client->bake_read_id, &flag); + margo_registered_name(mid, "bake_noop_rpc", &client->bake_noop_id, &flag); + margo_registered_name(mid, "bake_remove_rpc", &client->bake_remove_id, &flag); + margo_registered_name(mid, "bake_migrate_region_rpc", &client->bake_migrate_region_id, &flag); + margo_registered_name(mid, "bake_migrate_target_rpc", &client->bake_migrate_target_id, &flag); } else { /* RPCs not already registered */ @@ -97,6 +99,9 @@ static int bake_client_register(bake_client_t client, margo_instance_id mid) client->bake_create_write_persist_id = MARGO_REGISTER(mid, "bake_create_write_persist_rpc", bake_create_write_persist_in_t, bake_create_write_persist_out_t, NULL); + client->bake_eager_create_write_persist_id = + MARGO_REGISTER(mid, "bake_eager_create_write_persist_rpc", + bake_eager_create_write_persist_in_t, bake_eager_create_write_persist_out_t, NULL); client->bake_get_size_id = MARGO_REGISTER(mid, "bake_get_size_rpc", bake_get_size_in_t, bake_get_size_out_t, NULL); @@ -516,6 +521,53 @@ int bake_persist( return(ret); } +static int bake_eager_create_write_persist( + bake_provider_handle_t provider, + bake_target_id_t bti, + void const *buf, + uint64_t buf_size, + bake_region_id_t *rid) +{ + hg_return_t hret; + hg_handle_t handle; + bake_eager_create_write_persist_in_t in; + bake_eager_create_write_persist_out_t out; + int ret; + + in.bti = bti; + in.buffer = (char*)buf; + in.size = buf_size; + + hret = margo_create(provider->client->mid, provider->addr, + provider->client->bake_eager_create_write_persist_id, &handle); + if(hret != HG_SUCCESS) + { + return BAKE_ERR_MERCURY; + } + + hret = margo_provider_forward(provider->provider_id, handle, &in); + if(hret != HG_SUCCESS) + { + margo_destroy(handle); + return BAKE_ERR_MERCURY; + } + + hret = margo_get_output(handle, &out); + if(hret != HG_SUCCESS) + { + margo_destroy(handle); + return BAKE_ERR_MERCURY; + } + + ret = out.ret; + if(ret == 0) + *rid = out.rid; + + margo_free_output(handle, &out); + margo_destroy(handle); + return(ret); +} + int bake_create_write_persist( bake_provider_handle_t provider, bake_target_id_t bti, @@ -529,7 +581,8 @@ int bake_create_write_persist( bake_create_write_persist_out_t out; int ret; - /* XXX eager path? */ + if(buf_size <= provider->eager_limit) + return(bake_eager_create_write_persist(provider, bti, buf, buf_size, rid)); in.bti = bti; in.bulk_offset = 0; diff --git a/src/bake-rpc.h b/src/bake-rpc.h index e16db69d2d5da22b6a632c3de545cb8cd1e08001..80c2c13bcc83dc16d67af1a92f47dcb0ffc615ec 100644 --- a/src/bake-rpc.h +++ b/src/bake-rpc.h @@ -41,7 +41,7 @@ typedef struct { bake_region_id_t rid; uint64_t region_offset; - uint32_t size; + uint64_t size; char * buffer; } bake_eager_write_in_t; static inline hg_return_t hg_proc_bake_eager_write_in_t(hg_proc_t proc, void *v_out_p); @@ -69,6 +69,18 @@ MERCURY_GEN_PROC(bake_create_write_persist_out_t, ((int32_t)(ret))\ ((bake_region_id_t)(rid))) +/* BAKE eager create/write/persist */ +typedef struct +{ + bake_target_id_t bti; + uint64_t size; + char * buffer; +} bake_eager_create_write_persist_in_t; +static inline hg_return_t hg_proc_bake_eager_create_write_persist_in_t(hg_proc_t proc, void *v_out_p); +MERCURY_GEN_PROC(bake_eager_create_write_persist_out_t, + ((int32_t)(ret))\ + ((bake_region_id_t)(rid))) + /* BAKE get size */ MERCURY_GEN_PROC(bake_get_size_in_t, ((bake_region_id_t)(rid))) @@ -177,7 +189,7 @@ static inline hg_return_t hg_proc_bake_eager_write_in_t(hg_proc_t proc, void *v_ hg_proc_bake_region_id_t(proc, &in->rid); hg_proc_uint64_t(proc, &in->region_offset); - hg_proc_uint32_t(proc, &in->size); + hg_proc_uint64_t(proc, &in->size); if(in->size) { buf = hg_proc_save_ptr(proc, in->size); @@ -191,6 +203,26 @@ static inline hg_return_t hg_proc_bake_eager_write_in_t(hg_proc_t proc, void *v_ return(HG_SUCCESS); } +static inline hg_return_t hg_proc_bake_eager_create_write_persist_in_t(hg_proc_t proc, void *v_out_p) +{ + /* TODO: error checking */ + bake_eager_create_write_persist_in_t *in = v_out_p; + void *buf = NULL; + + hg_proc_bake_target_id_t(proc, &in->bti); + hg_proc_uint64_t(proc, &in->size); + if(in->size) + { + buf = hg_proc_save_ptr(proc, in->size); + if(hg_proc_get_op(proc) == HG_ENCODE) + memcpy(buf, in->buffer, in->size); + if(hg_proc_get_op(proc) == HG_DECODE) + in->buffer = buf; + hg_proc_restore_ptr(proc, buf, in->size); + } + + return(HG_SUCCESS); +} static inline hg_return_t hg_proc_bake_eager_read_out_t(hg_proc_t proc, void *v_out_p) { diff --git a/src/bake-server.c b/src/bake-server.c index d19f26dfc8b8155b0c041c723762fa14d943a5e0..3ab4808a533d1a6b9c23a12d1d6672c62705abec 100644 --- a/src/bake-server.c +++ b/src/bake-server.c @@ -20,6 +20,7 @@ DECLARE_MARGO_RPC_HANDLER(bake_write_ult) DECLARE_MARGO_RPC_HANDLER(bake_eager_write_ult) DECLARE_MARGO_RPC_HANDLER(bake_persist_ult) DECLARE_MARGO_RPC_HANDLER(bake_create_write_persist_ult) +DECLARE_MARGO_RPC_HANDLER(bake_eager_create_write_persist_ult) DECLARE_MARGO_RPC_HANDLER(bake_get_size_ult) DECLARE_MARGO_RPC_HANDLER(bake_get_data_ult) DECLARE_MARGO_RPC_HANDLER(bake_read_ult) @@ -171,6 +172,10 @@ int bake_provider_register( bake_create_write_persist_in_t, bake_create_write_persist_out_t, bake_create_write_persist_ult, provider_id, abt_pool); margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL); + rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_eager_create_write_persist_rpc", + bake_eager_create_write_persist_in_t, bake_eager_create_write_persist_out_t, + bake_eager_create_write_persist_ult, provider_id, abt_pool); + margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL); rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_get_size_rpc", bake_get_size_in_t, bake_get_size_out_t, bake_get_size_ult, provider_id, abt_pool); @@ -739,8 +744,12 @@ static void bake_create_write_persist_ult(hg_handle_t handle) out.ret = BAKE_ERR_UNKNOWN_TARGET; goto finish; } - +#ifdef USE_SIZECHECK_HEADERS size_t content_size = in.bulk_size + sizeof(uint64_t); +#else + size_t content_size = in.bulk_size; +#endif + prid = (pmemobj_region_id_t*)out.rid.data; ret = pmemobj_alloc(entry->pmem_pool, &prid->oid, @@ -814,7 +823,97 @@ finish: } DEFINE_MARGO_RPC_HANDLER(bake_create_write_persist_ult) - /* service a remote RPC that retrieves the size of a BAKE region */ +static void bake_eager_create_write_persist_ult(hg_handle_t handle) +{ + bake_eager_create_write_persist_out_t out; + bake_eager_create_write_persist_in_t in; + in.buffer = NULL; + in.size = 0; + char* buffer = NULL; + const struct hg_info *hgi = NULL; + margo_instance_id mid; + hg_return_t hret; + int ret; + + pmemobj_region_id_t* prid; + ABT_rwlock lock = ABT_RWLOCK_NULL; + + memset(&out, 0, sizeof(out)); + + mid = margo_hg_handle_get_instance(handle); + assert(mid); + hgi = margo_get_info(handle); + bake_provider_t svr_ctx = margo_registered_data(mid, hgi->id); + if(!svr_ctx) { + out.ret = BAKE_ERR_UNKNOWN_PROVIDER; + goto finish; + } + + /* TODO: this check needs to be somewhere else */ + assert(sizeof(pmemobj_region_id_t) <= BAKE_REGION_ID_DATA_SIZE); + + hret = margo_get_input(handle, &in); + if(hret != HG_SUCCESS) + { + out.ret = BAKE_ERR_MERCURY; + goto finish; + } + + /* lock provider */ + lock = svr_ctx->lock; + ABT_rwlock_rdlock(lock); + /* find the pmem pool */ + bake_pmem_entry_t* entry = find_pmem_entry(svr_ctx, in.bti); + if(entry == NULL) { + out.ret = BAKE_ERR_UNKNOWN_TARGET; + goto finish; + } + +#ifdef USE_SIZECHECK_HEADERS + size_t content_size = in.size + sizeof(uint64_t); +#else + size_t content_size = in.size; +#endif + prid = (pmemobj_region_id_t*)out.rid.data; + + ret = pmemobj_alloc(entry->pmem_pool, &prid->oid, + content_size, 0, NULL, NULL); + if(ret != 0) + { + out.ret = BAKE_ERR_PMEM; + goto finish; + } + + /* find memory address for target object */ + region_content_t* region = pmemobj_direct(prid->oid); + if(!region) + { + out.ret = BAKE_ERR_PMEM; + goto finish; + } +#ifdef USE_SIZECHECK_HEADERS + region->size = in.size; +#endif + buffer = region->data; + + memcpy(buffer, in.buffer, in.size); + + /* TODO: should this have an abt shim in case it blocks? */ + pmemobj_persist(entry->pmem_pool, region, content_size); + + out.ret = BAKE_SUCCESS; + +finish: + if(lock != ABT_RWLOCK_NULL) + ABT_rwlock_unlock(lock); + margo_respond(handle, &out); + margo_free_input(handle, &in); + margo_destroy(handle); + return; +} +DEFINE_MARGO_RPC_HANDLER(bake_eager_create_write_persist_ult) + +/* service a remote RPC that retrieves the size of a BAKE region */ static void bake_get_size_ult(hg_handle_t handle) { bake_get_size_out_t out; diff --git a/tests/proxy/proxy-server-daemon.c b/tests/proxy/proxy-server-daemon.c index da97ca29ed0162ab81ee97ccc72382d6ab78a41a..d5e1cb32a068b4daa60e4b61340fe51db3663bb6 100644 --- a/tests/proxy/proxy-server-daemon.c +++ b/tests/proxy/proxy-server-daemon.c @@ -254,7 +254,8 @@ static void proxy_write_ult(hg_handle_t handle) assert(ret == 0); /* persist the BAKE region */ - ret = bake_persist(g_proxy_svr_ctx->svr_bph, g_proxy_svr_ctx->the_rid); + ret = bake_persist(g_proxy_svr_ctx->svr_bph, g_proxy_svr_ctx->the_rid, + 0, in.bulk_size); assert(ret == 0); }