Commit 931ce85d authored by Matthieu Dorier's avatar Matthieu Dorier

Merge branch 'dev-provider-id' into 'master'

Adapted to new provider API of Margo

See merge request !2
parents b7805db7 1dade6a2
......@@ -51,7 +51,7 @@ int bake_client_finalize(bake_client_t client);
*
* @param client client managing the provider handle
* @param addr address of the provider
* @param mplex_id multiplex id of the provider
* @param provider_id id of the provider
* @param handle resulting handle
*
* @return 0 on success, -1 on failure
......@@ -59,7 +59,7 @@ int bake_client_finalize(bake_client_t client);
int bake_provider_handle_create(
bake_client_t client,
hg_addr_t addr,
uint8_t mplex_id,
uint16_t provider_id,
bake_provider_handle_t* handle);
/**
......
......@@ -16,7 +16,7 @@ extern "C" {
#endif
#define BAKE_ABT_POOL_DEFAULT ABT_POOL_NULL
#define BAKE_MPLEX_ID_DEFAULT 0
#define BAKE_PROVIDER_ID_DEFAULT 0
#define BAKE_PROVIDER_IGNORE NULL
typedef struct bake_server_context_t* bake_provider_t;
......@@ -41,7 +41,7 @@ int bake_makepool(
* Initializes a BAKE provider.
*
* @param[in] mid Margo instance identifier
* @param[in] mplex_id Multiplex id
* @param[in] provider_id provider id
* @param[in] pool Pool on which to run the RPC handlers
* @param[in] target_name path to PMEM backend file
* @param[out] provider resulting provider
......@@ -49,7 +49,7 @@ int bake_makepool(
*/
int bake_provider_register(
margo_instance_id mid,
uint8_t mplex_id,
uint16_t provider_id,
ABT_pool pool,
bake_provider_t* provider);
......
......@@ -39,7 +39,7 @@ struct bake_client
struct bake_provider_handle {
struct bake_client* client;
hg_addr_t addr;
uint8_t mplex_id;
uint16_t provider_id;
uint64_t refcount;
uint64_t eager_limit;
};
......@@ -152,14 +152,7 @@ int bake_probe(
if(hret != HG_SUCCESS) return -1;
hret = margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return -1;
}
hret = margo_forward(handle, &in);
hret = margo_provider_forward(provider->provider_id, handle, &in);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return -1;
......@@ -194,7 +187,7 @@ int bake_probe(
int bake_provider_handle_create(
bake_client_t client,
hg_addr_t addr,
uint8_t mplex_id,
uint16_t provider_id,
bake_provider_handle_t* handle)
{
if(client == BAKE_CLIENT_NULL) return -1;
......@@ -211,7 +204,7 @@ int bake_provider_handle_create(
}
provider->client = client;
provider->mplex_id = mplex_id;
provider->provider_id = provider_id;
provider->refcount = 1;
provider->eager_limit = BAKE_DEFAULT_EAGER_LIMIT;
......@@ -279,12 +272,11 @@ static int bake_eager_write(
hret = margo_create(provider->client->mid, provider->addr,
provider->client->bake_eager_write_id, &handle);
margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS)
return(-1);
hret = margo_forward(handle, &in);
hret = margo_provider_forward(provider->provider_id, handle, &in);
if(hret != HG_SUCCESS)
{
margo_destroy(handle);
......@@ -335,7 +327,6 @@ int bake_write(
hret = margo_create(provider->client->mid, provider->addr,
provider->client->bake_write_id, &handle);
margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS)
{
......@@ -343,7 +334,7 @@ int bake_write(
return(-1);
}
hret = margo_forward(handle, &in);
hret = margo_provider_forward(provider->provider_id, handle, &in);
if(hret != HG_SUCCESS)
{
margo_bulk_free(in.bulk_handle);
......@@ -391,12 +382,11 @@ int bake_proxy_write(
hret = margo_create(provider->client->mid, provider->addr,
provider->client->bake_write_id, &handle);
margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS)
return(-1);
hret = margo_forward(handle, &in);
hret = margo_provider_forward(provider->provider_id, handle, &in);
if(hret != HG_SUCCESS)
{
margo_destroy(handle);
......@@ -439,9 +429,7 @@ int bake_create(
return(-1);
}
margo_set_target_id(handle, provider->mplex_id);
hret = margo_forward(handle, &in);
hret = margo_provider_forward(provider->provider_id, handle, &in);
if(hret != HG_SUCCESS)
{
margo_destroy(handle);
......@@ -479,12 +467,11 @@ int bake_persist(
hret = margo_create(provider->client->mid, provider->addr,
provider->client->bake_persist_id, &handle);
margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS)
return(-1);
hret = margo_forward(handle, &in);
hret = margo_provider_forward(provider->provider_id, handle, &in);
if(hret != HG_SUCCESS)
{
margo_destroy(handle);
......@@ -543,9 +530,7 @@ int bake_create_write_persist(
return(-1);
}
margo_set_target_id(handle, provider->mplex_id);
hret = margo_forward(handle, &in);
hret = margo_provider_forward(provider->provider_id, handle, &in);
if(hret != HG_SUCCESS)
{
margo_bulk_free(in.bulk_handle);
......@@ -598,12 +583,11 @@ int bake_create_write_persist_proxy(
hret = margo_create(provider->client->mid, provider->addr,
provider->client->bake_create_write_persist_id, &handle);
margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS)
return(-1);
hret = margo_forward(handle, &in);
hret = margo_provider_forward(provider->provider_id, handle, &in);
if(hret != HG_SUCCESS)
{
margo_destroy(handle);
......@@ -641,12 +625,11 @@ int bake_get_size(
hret = margo_create(provider->client->mid, provider->addr,
provider->client->bake_get_size_id, &handle);
margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS)
return(-1);
hret = margo_forward(handle, &in);
hret = margo_provider_forward(provider->provider_id, handle, &in);
if(hret != HG_SUCCESS)
{
margo_destroy(handle);
......@@ -675,12 +658,11 @@ int bake_noop(bake_provider_handle_t provider)
hret = margo_create(provider->client->mid, provider->addr,
provider->client->bake_noop_id, &handle);
margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS)
return(-1);
hret = margo_forward(handle, NULL);
hret = margo_provider_forward(provider->provider_id, handle, NULL);
if(hret != HG_SUCCESS)
{
margo_destroy(handle);
......@@ -710,12 +692,11 @@ static int bake_eager_read(
hret = margo_create(provider->client->mid, provider->addr,
provider->client->bake_eager_read_id, &handle);
margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS)
return(-1);
hret = margo_forward(handle, &in);
hret = margo_provider_forward(provider->provider_id, handle, &in);
if(hret != HG_SUCCESS)
{
margo_destroy(handle);
......@@ -767,7 +748,6 @@ int bake_read(
hret = margo_create(provider->client->mid, provider->addr,
provider->client->bake_read_id, &handle);
margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS)
{
......@@ -775,7 +755,7 @@ int bake_read(
return(-1);
}
hret = margo_forward(handle, &in);
hret = margo_provider_forward(provider->provider_id, handle, &in);
if(hret != HG_SUCCESS)
{
margo_bulk_free(in.bulk_handle);
......@@ -823,12 +803,11 @@ int bake_proxy_read(
hret = margo_create(provider->client->mid, provider->addr,
provider->client->bake_read_id, &handle);
margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS)
return(-1);
hret = margo_forward(handle, &in);
hret = margo_provider_forward(provider->provider_id, handle, &in);
if(hret != HG_SUCCESS)
{
margo_destroy(handle);
......
......@@ -77,7 +77,7 @@ int bake_makepool(
int bake_provider_register(
margo_instance_id mid,
uint8_t mplex_id,
uint16_t provider_id,
ABT_pool abt_pool,
bake_provider_t* provider)
{
......@@ -87,9 +87,9 @@ int bake_provider_register(
{
hg_id_t id;
hg_bool_t flag;
margo_registered_name_mplex(mid, "bake_probe_rpc", mplex_id, &id, &flag);
margo_provider_registered_name(mid, "bake_probe_rpc", provider_id, &id, &flag);
if(flag == HG_TRUE) {
fprintf(stderr, "bake_provider_register(): a provider with the same mplex id (%d) already exists\n", mplex_id);
fprintf(stderr, "bake_provider_register(): a provider with the same id (%d) already exists\n", provider_id);
return -1;
}
}
......@@ -101,45 +101,45 @@ int bake_provider_register(
/* register RPCs */
hg_id_t rpc_id;
rpc_id = MARGO_REGISTER_MPLEX(mid, "bake_create_rpc",
rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_create_rpc",
bake_create_in_t, bake_create_out_t,
bake_create_ult, mplex_id, abt_pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_MPLEX(mid, "bake_write_rpc",
bake_create_ult, provider_id, abt_pool);
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_write_rpc",
bake_write_in_t, bake_write_out_t,
bake_write_ult, mplex_id, abt_pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_MPLEX(mid, "bake_eager_write_rpc",
bake_write_ult, provider_id, abt_pool);
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_eager_write_rpc",
bake_eager_write_in_t, bake_eager_write_out_t,
bake_eager_write_ult, mplex_id, abt_pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_MPLEX(mid, "bake_eager_read_rpc",
bake_eager_write_ult, provider_id, abt_pool);
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_eager_read_rpc",
bake_eager_read_in_t, bake_eager_read_out_t,
bake_eager_read_ult, mplex_id, abt_pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_MPLEX(mid, "bake_persist_rpc",
bake_eager_read_ult, provider_id, abt_pool);
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_persist_rpc",
bake_persist_in_t, bake_persist_out_t,
bake_persist_ult, mplex_id, abt_pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_MPLEX(mid, "bake_create_write_persist_rpc",
bake_persist_ult, provider_id, abt_pool);
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_create_write_persist_rpc",
bake_create_write_persist_in_t, bake_create_write_persist_out_t,
bake_create_write_persist_ult, mplex_id, abt_pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_MPLEX(mid, "bake_get_size_rpc",
bake_create_write_persist_ult, provider_id, abt_pool);
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_get_size_rpc",
bake_get_size_in_t, bake_get_size_out_t,
bake_get_size_ult, mplex_id, abt_pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_MPLEX(mid, "bake_read_rpc",
bake_get_size_ult, provider_id, abt_pool);
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_read_rpc",
bake_read_in_t, bake_read_out_t,
bake_read_ult, mplex_id, abt_pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_MPLEX(mid, "bake_probe_rpc",
bake_read_ult, provider_id, abt_pool);
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_probe_rpc",
bake_probe_in_t, bake_probe_out_t, bake_probe_ult,
mplex_id, abt_pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_MPLEX(mid, "bake_noop_rpc",
void, void, bake_noop_ult, mplex_id, abt_pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
provider_id, abt_pool);
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "bake_noop_rpc",
void, void, bake_noop_ult, provider_id, abt_pool);
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
/* install the bake server finalize callback */
margo_push_finalize_callback(mid, &bake_server_finalize_cb, tmp_svr_ctx);
......@@ -263,7 +263,7 @@ static void bake_create_ult(hg_handle_t handle)
assert(mid);
const struct hg_info* info = margo_get_info(handle);
bake_provider_t svr_ctx =
margo_registered_data_mplex(mid, info->id, info->target_id);
margo_registered_data(mid, info->id);
if(!svr_ctx) {
fprintf(stderr, "Error: BAKE create could not find provider\n");
goto respond_with_error;
......@@ -323,8 +323,7 @@ static void bake_write_ult(hg_handle_t handle)
mid = margo_hg_handle_get_instance(handle);
assert(mid);
hgi = margo_get_info(handle);
bake_provider_t svr_ctx =
margo_registered_data_mplex(mid, hgi->id, hgi->target_id);
bake_provider_t svr_ctx = margo_registered_data(mid, hgi->id);
if(!svr_ctx) {
out.ret = -1;
margo_respond(handle, &out);
......@@ -427,8 +426,7 @@ static void bake_eager_write_ult(hg_handle_t handle)
margo_instance_id mid = margo_hg_handle_get_instance(handle);
assert(mid);
const struct hg_info* info = margo_get_info(handle);
bake_provider_t svr_ctx =
margo_registered_data_mplex(mid, info->id, info->target_id);
bake_provider_t svr_ctx = margo_registered_data(mid, info->id);
if(!svr_ctx) {
out.ret = -1;
margo_respond(handle, &out);
......@@ -483,8 +481,7 @@ static void bake_persist_ult(hg_handle_t handle)
margo_instance_id mid = margo_hg_handle_get_instance(handle);
assert(mid);
const struct hg_info* info = margo_get_info(handle);
bake_provider_t svr_ctx =
margo_registered_data_mplex(mid, info->id, info->target_id);
bake_provider_t svr_ctx = margo_registered_data(mid, info->id);
if(!svr_ctx) {
out.ret = -1;
margo_respond(handle, &out);
......@@ -545,8 +542,7 @@ static void bake_create_write_persist_ult(hg_handle_t handle)
mid = margo_hg_handle_get_instance(handle);
assert(mid);
hgi = margo_get_info(handle);
bake_provider_t svr_ctx =
margo_registered_data_mplex(mid, hgi->id, hgi->target_id);
bake_provider_t svr_ctx = margo_registered_data(mid, hgi->id);
if(!svr_ctx) {
fprintf(stderr, "Error: BAKE create_write_persist could not find provider\n");
out.ret = -1;
......@@ -676,8 +672,7 @@ static void bake_get_size_ult(hg_handle_t handle)
margo_instance_id mid = margo_hg_handle_get_instance(handle);
assert(mid);
const struct hg_info* hgi = margo_get_info(handle);
bake_provider_t svr_ctx =
margo_registered_data_mplex(mid, hgi->id, hgi->target_id);
bake_provider_t svr_ctx = margo_registered_data(mid, hgi->id);
if(!svr_ctx) {
out.ret = -1;
margo_respond(handle, &out);
......@@ -713,8 +708,7 @@ static void bake_noop_ult(hg_handle_t handle)
margo_instance_id mid = margo_hg_handle_get_instance(handle);
assert(mid);
const struct hg_info* hgi = margo_get_info(handle);
bake_provider_t svr_ctx =
margo_registered_data_mplex(mid, hgi->id, hgi->target_id);
bake_provider_t svr_ctx = margo_registered_data(mid, hgi->id);
margo_respond(handle, NULL);
margo_destroy(handle);
......@@ -742,7 +736,7 @@ static void bake_read_ult(hg_handle_t handle)
assert(mid);
hgi = margo_get_info(handle);
bake_provider_t svr_ctx =
margo_registered_data_mplex(mid, hgi->id, hgi->target_id);
margo_registered_data(mid, hgi->id);
if(!svr_ctx) {
out.ret = -1;
margo_respond(handle, &out);
......@@ -847,7 +841,7 @@ static void bake_eager_read_ult(hg_handle_t handle)
assert(mid);
const struct hg_info* hgi = margo_get_info(handle);
bake_provider_t svr_ctx =
margo_registered_data_mplex(mid, hgi->id, hgi->target_id);
margo_registered_data(mid, hgi->id);
if(!svr_ctx) {
out.ret = -1;
margo_respond(handle, &out);
......@@ -899,7 +893,7 @@ static void bake_probe_ult(hg_handle_t handle)
assert(mid);
const struct hg_info* hgi = margo_get_info(handle);
bake_provider_t svr_ctx =
margo_registered_data_mplex(mid, hgi->id, hgi->target_id);
margo_registered_data(mid, hgi->id);
if(!svr_ctx) {
out.ret = -1;
margo_respond(handle, &out);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment