Commit e69d45b9 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

adapted to new provider-id API

parent cb2df8fa
......@@ -39,7 +39,7 @@ int sdskv_client_finalize(sdskv_client_t client);
*
* @param[in] client SDSKV client responsible for the provider handle
* @param[in] addr Mercury address of the provider
* @param[in] mplex_id Multiplex id of the provider
* @param[in] provider_id id of the provider
* @param[in] handle provider handle
*
* @return SDSKV_SUCCESS or error code defined in sdskv-common.h
......@@ -47,7 +47,7 @@ int sdskv_client_finalize(sdskv_client_t client);
int sdskv_provider_handle_create(
sdskv_client_t client,
hg_addr_t addr,
uint8_t mplex_id,
uint16_t provider_id,
sdskv_provider_handle_t* handle);
/**
......
......@@ -15,7 +15,7 @@ extern "C" {
#endif
#define SDSKV_ABT_POOL_DEFAULT ABT_POOL_NULL
#define SDSKV_MPLEX_ID_DEFAULT 0
#define SDSKV_PROVIDER_ID_DEFAULT 0
#define SDSKV_PROVIDER_IGNORE NULL
#define SDSKV_COMPARE_DEFAULT NULL
......@@ -26,7 +26,7 @@ typedef int (*sdskv_compare_fn)(const void*, size_t, const void*, size_t);
* @brief Creates a new provider.
*
* @param[in] mid Margo instance
* @param[in] mplex_id multiplex id
* @param[in] provider_id provider id
* @param[in] pool Argobots pool
* @param[out] provider provider handle
*
......@@ -34,7 +34,7 @@ typedef int (*sdskv_compare_fn)(const void*, size_t, const void*, size_t);
*/
int sdskv_provider_register(
margo_instance_id mid,
uint8_t mplex_id,
uint16_t provider_id,
ABT_pool pool,
sdskv_provider_t* provider);
......
......@@ -22,7 +22,7 @@ struct sdskv_client {
struct sdskv_provider_handle {
sdskv_client_t client;
hg_addr_t addr;
uint8_t mplex_id;
uint8_t provider_id;
uint64_t refcount;
};
......@@ -100,7 +100,7 @@ int sdskv_client_finalize(sdskv_client_t client)
int sdskv_provider_handle_create(
sdskv_client_t client,
hg_addr_t addr,
uint8_t mplex_id,
uint16_t provider_id,
sdskv_provider_handle_t* handle)
{
if(client == SDSKV_CLIENT_NULL)
......@@ -117,9 +117,9 @@ int sdskv_provider_handle_create(
return SDSKV_ERR_MERCURY;
}
provider->client = client;
provider->mplex_id = mplex_id;
provider->refcount = 1;
provider->client = client;
provider->provider_id = provider_id;
provider->refcount = 1;
client->num_provider_handles += 1;
......@@ -166,16 +166,9 @@ int sdskv_open(
&handle);
if(hret != HG_SUCCESS) return SDSKV_ERR_MERCURY;
hret = margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return SDSKV_ERR_MERCURY;
}
in.name = (char*)db_name;
hret = margo_forward(handle, &in);
hret = margo_provider_forward(provider->provider_id, handle, &in);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return SDSKV_ERR_MERCURY;
......@@ -229,15 +222,7 @@ int sdskv_put(sdskv_provider_handle_t provider,
return SDSKV_ERR_MERCURY;
}
hret = margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_set_target_id() failed in sdskv_put()\n");
margo_destroy(handle);
return SDSKV_ERR_MERCURY;
}
hret = margo_forward(handle, &in);
hret = margo_provider_forward(provider->provider_id, handle, &in);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_forward() failed in sdskv_put()\n");
margo_destroy(handle);
......@@ -284,16 +269,7 @@ int sdskv_put(sdskv_provider_handle_t provider,
return SDSKV_ERR_MERCURY;
}
hret = margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_set_target_id() failed in sdskv_put()\n");
margo_bulk_free(in.handle);
margo_destroy(handle);
return SDSKV_ERR_MERCURY;
}
hret = margo_forward(handle, &in);
hret = margo_provider_forward(provider->provider_id, handle, &in);
if(hret != HG_SUCCESS) {
fprintf(stderr,"[SDSKV] margo_forward() failed in sdskv_put()\n");
margo_bulk_free(in.handle);
......@@ -350,14 +326,7 @@ int sdskv_get(sdskv_provider_handle_t provider,
&handle);
if(hret != HG_SUCCESS) return SDSKV_ERR_MERCURY;
hret = margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return SDSKV_ERR_MERCURY;
}
hret = margo_forward(handle, &in);
hret = margo_provider_forward(provider->provider_id, handle, &in);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return SDSKV_ERR_MERCURY;
......@@ -403,14 +372,7 @@ int sdskv_get(sdskv_provider_handle_t provider,
return SDSKV_ERR_MERCURY;
}
hret = margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS) {
margo_bulk_free(in.handle);
margo_destroy(handle);
return SDSKV_ERR_MERCURY;
}
hret = margo_forward(handle, &in);
hret = margo_provider_forward(provider->provider_id, handle, &in);
if(hret != HG_SUCCESS) {
margo_bulk_free(in.handle);
margo_destroy(handle);
......@@ -458,13 +420,7 @@ int sdskv_length(sdskv_provider_handle_t provider,
&handle);
if(hret != HG_SUCCESS) return SDSKV_ERR_MERCURY;
hret = margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return SDSKV_ERR_MERCURY;
}
hret = margo_forward(handle, &in);
hret = margo_provider_forward(provider->provider_id, handle, &in);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return SDSKV_ERR_MERCURY;
......@@ -507,13 +463,7 @@ int sdskv_erase(sdskv_provider_handle_t provider,
&handle);
if(hret != HG_SUCCESS) return SDSKV_ERR_MERCURY;
hret = margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return SDSKV_ERR_MERCURY;
}
hret = margo_forward(handle, &in);
hret = margo_provider_forward(provider->provider_id, handle, &in);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return SDSKV_ERR_MERCURY;
......@@ -617,15 +567,8 @@ int sdskv_list_keys_with_prefix(sdskv_provider_handle_t provider,
goto finish;
}
/* set target id */
hret = margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS) {
ret = SDSKV_ERR_MERCURY;
goto finish;
}
/* forward to provider */
hret = margo_forward(handle, &in);
hret = margo_provider_forward(provider->provider_id, handle, &in);
if(hret != HG_SUCCESS) {
ret = SDSKV_ERR_MERCURY;
goto finish;
......@@ -773,15 +716,8 @@ int sdskv_list_keyvals_with_prefix(sdskv_provider_handle_t provider,
goto finish;
}
/* set target id */
hret = margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS) {
ret = SDSKV_ERR_MERCURY;
goto finish;
}
/* forward to provider */
hret = margo_forward(handle, &in);
hret = margo_provider_forward(provider->provider_id, handle, &in);
if(hret != HG_SUCCESS) {
ret = SDSKV_ERR_MERCURY;
goto finish;
......
......@@ -33,7 +33,7 @@ static void sdskv_server_finalize_cb(void *data);
extern "C" int sdskv_provider_register(
margo_instance_id mid,
uint8_t mplex_id,
uint16_t provider_id,
ABT_pool abt_pool,
sdskv_provider_t* provider)
{
......@@ -43,9 +43,9 @@ extern "C" int sdskv_provider_register(
{
hg_id_t id;
hg_bool_t flag;
margo_registered_name_mplex(mid, "sdskv_put_rpc", mplex_id, &id, &flag);
margo_provider_registered_name(mid, "sdskv_put_rpc", provider_id, &id, &flag);
if(flag == HG_TRUE) {
fprintf(stderr, "sdskv_provider_register(): a provider with the same mplex id (%d) already exists\n", mplex_id);
fprintf(stderr, "sdskv_provider_register(): a provider with the same provider id (%d) already exists\n", provider_id);
return SDSKV_ERR_MERCURY;
}
}
......@@ -57,42 +57,42 @@ extern "C" int sdskv_provider_register(
/* register RPCs */
hg_id_t rpc_id;
rpc_id = MARGO_REGISTER_MPLEX(mid, "sdskv_put_rpc",
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_put_rpc",
put_in_t, put_out_t,
sdskv_put_ult, mplex_id, abt_pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_MPLEX(mid, "sdskv_bulk_put_rpc",
sdskv_put_ult, provider_id, abt_pool);
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_bulk_put_rpc",
bulk_put_in_t, bulk_put_out_t,
sdskv_bulk_put_ult, mplex_id, abt_pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_MPLEX(mid, "sdskv_get_rpc",
sdskv_bulk_put_ult, provider_id, abt_pool);
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_get_rpc",
get_in_t, get_out_t,
sdskv_get_ult, mplex_id, abt_pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_MPLEX(mid, "sdskv_length_rpc",
sdskv_get_ult, provider_id, abt_pool);
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_length_rpc",
length_in_t, length_out_t,
sdskv_length_ult, mplex_id, abt_pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_MPLEX(mid, "sdskv_bulk_get_rpc",
sdskv_length_ult, provider_id, abt_pool);
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_bulk_get_rpc",
bulk_get_in_t, bulk_get_out_t,
sdskv_bulk_get_ult, mplex_id, abt_pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_MPLEX(mid, "sdskv_open_rpc",
sdskv_bulk_get_ult, provider_id, abt_pool);
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_open_rpc",
open_in_t, open_out_t,
sdskv_open_ult, mplex_id, abt_pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_MPLEX(mid, "sdskv_list_keys_rpc",
sdskv_open_ult, provider_id, abt_pool);
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_list_keys_rpc",
list_keys_in_t, list_keys_out_t,
sdskv_list_keys_ult, mplex_id, abt_pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_MPLEX(mid, "sdskv_list_keyvals_rpc",
sdskv_list_keys_ult, provider_id, abt_pool);
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_list_keyvals_rpc",
list_keyvals_in_t, list_keyvals_out_t,
sdskv_list_keyvals_ult, mplex_id, abt_pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_MPLEX(mid, "sdskv_erase_rpc",
sdskv_list_keyvals_ult, provider_id, abt_pool);
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_erase_rpc",
erase_in_t, erase_out_t,
sdskv_erase_ult, mplex_id, abt_pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
sdskv_erase_ult, provider_id, abt_pool);
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
/* install the bake server finalize callback */
margo_push_finalize_callback(mid, &sdskv_server_finalize_cb, tmp_svr_ctx);
......@@ -184,7 +184,7 @@ static void sdskv_put_ult(hg_handle_t handle)
assert(mid);
const struct hg_info* info = margo_get_info(handle);
sdskv_provider_t svr_ctx =
(sdskv_provider_t)margo_registered_data_mplex(mid, info->id, info->target_id);
(sdskv_provider_t)margo_registered_data(mid, info->id);
if(!svr_ctx) {
fprintf(stderr, "Error: SDSKV could not find provider\n");
out.ret = SDSKV_ERR_UNKNOWN_PR;
......@@ -236,7 +236,7 @@ static void sdskv_length_ult(hg_handle_t handle)
assert(mid);
const struct hg_info* info = margo_get_info(handle);
sdskv_provider_t svr_ctx =
(sdskv_provider_t)margo_registered_data_mplex(mid, info->id, info->target_id);
(sdskv_provider_t)margo_registered_data(mid, info->id);
if(!svr_ctx) {
fprintf(stderr, "Error: SDSKV could not find provider\n");
out.ret = SDSKV_ERR_UNKNOWN_PR;
......@@ -292,7 +292,7 @@ static void sdskv_get_ult(hg_handle_t handle)
assert(mid);
const struct hg_info* info = margo_get_info(handle);
sdskv_provider_t svr_ctx =
(sdskv_provider_t)margo_registered_data_mplex(mid, info->id, info->target_id);
(sdskv_provider_t)margo_registered_data(mid, info->id);
if(!svr_ctx) {
fprintf(stderr, "Error: SDSKV could not find provider\n");
out.ret = SDSKV_ERR_UNKNOWN_PR;
......@@ -360,7 +360,7 @@ static void sdskv_open_ult(hg_handle_t handle)
assert(mid);
const struct hg_info* info = margo_get_info(handle);
sdskv_provider_t svr_ctx =
(sdskv_provider_t)margo_registered_data_mplex(mid, info->id, info->target_id);
(sdskv_provider_t)margo_registered_data(mid, info->id);
if(!svr_ctx) {
fprintf(stderr, "Error: SDSKV could not find provider\n");
out.ret = SDSKV_ERR_UNKNOWN_PR;
......@@ -408,7 +408,7 @@ static void sdskv_bulk_put_ult(hg_handle_t handle)
assert(mid);
const struct hg_info* info = margo_get_info(handle);
sdskv_provider_t svr_ctx =
(sdskv_provider_t)margo_registered_data_mplex(mid, info->id, info->target_id);
(sdskv_provider_t)margo_registered_data(mid, info->id);
if(!svr_ctx) {
fprintf(stderr, "Error: SDSKV could not find provider\n");
out.ret = SDSKV_ERR_UNKNOWN_PR;
......@@ -495,7 +495,7 @@ static void sdskv_bulk_get_ult(hg_handle_t handle)
assert(mid);
const struct hg_info* info = margo_get_info(handle);
sdskv_provider_t svr_ctx =
(sdskv_provider_t)margo_registered_data_mplex(mid, info->id, info->target_id);
(sdskv_provider_t)margo_registered_data(mid, info->id);
if(!svr_ctx) {
fprintf(stderr, "Error: SDSKV could not find provider\n");
out.ret = SDSKV_ERR_UNKNOWN_PR;
......@@ -586,7 +586,7 @@ static void sdskv_erase_ult(hg_handle_t handle)
assert(mid);
const struct hg_info* info = margo_get_info(handle);
sdskv_provider_t svr_ctx =
(sdskv_provider_t)margo_registered_data_mplex(mid, info->id, info->target_id);
(sdskv_provider_t)margo_registered_data(mid, info->id);
if(!svr_ctx) {
fprintf(stderr, "Error: SDSKV could not find provider\n");
out.ret = SDSKV_ERR_UNKNOWN_PR;
......@@ -645,7 +645,7 @@ static void sdskv_list_keys_ult(hg_handle_t handle)
assert(mid);
const struct hg_info* info = margo_get_info(handle);
sdskv_provider_t svr_ctx =
(sdskv_provider_t)margo_registered_data_mplex(mid, info->id, info->target_id);
(sdskv_provider_t)margo_registered_data(mid, info->id);
if(!svr_ctx) {
std::cerr << "Error: SDSKV list_keys could not find provider" << std::endl;
out.ret = SDSKV_ERR_UNKNOWN_PR;
......@@ -799,7 +799,7 @@ static void sdskv_list_keyvals_ult(hg_handle_t handle)
assert(mid);
const struct hg_info* info = margo_get_info(handle);
sdskv_provider_t svr_ctx =
(sdskv_provider_t)margo_registered_data_mplex(mid, info->id, info->target_id);
(sdskv_provider_t)margo_registered_data(mid, info->id);
if(!svr_ctx) {
std::cerr << "Error: SDSKV list_keyvals could not find provider" << std::endl;
out.ret = SDSKV_ERR_UNKNOWN_PR;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment