Commit 9116f766 authored by Matthieu Dorier's avatar Matthieu Dorier

supporting provider id API

parent dc59a4c2
...@@ -58,7 +58,7 @@ extern "C" { ...@@ -58,7 +58,7 @@ extern "C" {
* *
* @param client client managing the provider handle * @param client client managing the provider handle
* @param addr address of the provider * @param addr address of the provider
* @param mplex_id multiplex id of the provider * @param provider_id multiplex id of the provider
* @param handle resulting handle * @param handle resulting handle
* *
* @return 0 on success, -1 on failure * @return 0 on success, -1 on failure
...@@ -66,7 +66,7 @@ extern "C" { ...@@ -66,7 +66,7 @@ extern "C" {
int mobject_provider_handle_create( int mobject_provider_handle_create(
mobject_client_t client, mobject_client_t client,
hg_addr_t addr, hg_addr_t addr,
uint8_t mplex_id, uint16_t provider_id,
mobject_provider_handle_t* handle); mobject_provider_handle_t* handle);
/** /**
......
...@@ -27,7 +27,7 @@ typedef struct mobject_server_context* mobject_provider_t; ...@@ -27,7 +27,7 @@ typedef struct mobject_server_context* mobject_provider_t;
* Start a mobject server instance * Start a mobject server instance
* *
* @param[in] mid margo instance id * @param[in] mid margo instance id
* @param[in] mplex_id multiplex id of the provider * @param[in] provider_id id of the provider
* @param[in] pool Argobots pool for the provider * @param[in] pool Argobots pool for the provider
* @param[in] bake_ph Bake provider handle to use to write/read data * @param[in] bake_ph Bake provider handle to use to write/read data
* @param[in] sdskv_ph SDSKV provider handle to use to access metadata * @param[in] sdskv_ph SDSKV provider handle to use to access metadata
...@@ -39,7 +39,7 @@ typedef struct mobject_server_context* mobject_provider_t; ...@@ -39,7 +39,7 @@ typedef struct mobject_server_context* mobject_provider_t;
*/ */
int mobject_provider_register( int mobject_provider_register(
margo_instance_id mid, margo_instance_id mid,
uint8_t mplex_id, uint16_t provider_id,
ABT_pool pool, ABT_pool pool,
bake_provider_handle_t bake_ph, bake_provider_handle_t bake_ph,
sdskv_provider_handle_t sdskv_ph, sdskv_provider_handle_t sdskv_ph,
......
...@@ -46,10 +46,8 @@ int mobject_aio_write_op_operate( ...@@ -46,10 +46,8 @@ int mobject_aio_write_op_operate(
return -1; return -1;
} }
margo_set_target_id(h, mph->mplex_id);
margo_request mreq; margo_request mreq;
ret = margo_iforward(h, &in, &mreq); ret = margo_provider_iforward(mph->provider_id, h, &in, &mreq);
if(ret != HG_SUCCESS) { if(ret != HG_SUCCESS) {
fprintf(stderr, "[MOBJECT] margo_iforward() failed in mobject_aio_write_op_operate()\n"); fprintf(stderr, "[MOBJECT] margo_iforward() failed in mobject_aio_write_op_operate()\n");
margo_destroy(h); margo_destroy(h);
...@@ -97,10 +95,8 @@ int mobject_aio_read_op_operate( ...@@ -97,10 +95,8 @@ int mobject_aio_read_op_operate(
return -1; return -1;
} }
margo_set_target_id(h, mph->mplex_id);
margo_request mreq; margo_request mreq;
ret = margo_iforward(h, &in, &mreq); ret = margo_provider_iforward(mph->provider_id, h, &in, &mreq);
if(ret != HG_SUCCESS) { if(ret != HG_SUCCESS) {
fprintf(stderr, "[MOBJECT] margo_iforward() failed in mobject_aio_write_op_operate()\n"); fprintf(stderr, "[MOBJECT] margo_iforward() failed in mobject_aio_write_op_operate()\n");
margo_destroy(h); margo_destroy(h);
......
...@@ -30,7 +30,7 @@ struct mobject_client { ...@@ -30,7 +30,7 @@ struct mobject_client {
struct mobject_provider_handle { struct mobject_provider_handle {
mobject_client_t client; mobject_client_t client;
hg_addr_t addr; hg_addr_t addr;
uint8_t mplex_id; uint16_t provider_id;
uint64_t refcount; uint64_t refcount;
}; };
......
...@@ -14,31 +14,13 @@ ...@@ -14,31 +14,13 @@
#include <ssg.h> #include <ssg.h>
#include "mobject-client.h" #include "mobject-client.h"
#include "src/client/mobject-client-impl.h"
#include "src/io-chain/prepare-write-op.h" #include "src/io-chain/prepare-write-op.h"
#include "src/io-chain/prepare-read-op.h" #include "src/io-chain/prepare-read-op.h"
#include "src/rpc-types/write-op.h" #include "src/rpc-types/write-op.h"
#include "src/rpc-types/read-op.h" #include "src/rpc-types/read-op.h"
#include "src/util/log.h" #include "src/util/log.h"
struct mobject_client {
margo_instance_id mid;
char* client_addr;
hg_id_t mobject_write_op_rpc_id;
hg_id_t mobject_read_op_rpc_id;
uint64_t num_provider_handles;
};
struct mobject_provider_handle {
mobject_client_t client;
hg_addr_t addr;
uint8_t mplex_id;
uint64_t refcount;
};
static int mobject_client_register(mobject_client_t client, margo_instance_id mid) static int mobject_client_register(mobject_client_t client, margo_instance_id mid)
{ {
int ret; int ret;
...@@ -110,7 +92,7 @@ int mobject_client_finalize(mobject_client_t client) ...@@ -110,7 +92,7 @@ int mobject_client_finalize(mobject_client_t client)
int mobject_provider_handle_create( int mobject_provider_handle_create(
mobject_client_t client, mobject_client_t client,
hg_addr_t addr, hg_addr_t addr,
uint8_t mplex_id, uint16_t provider_id,
mobject_provider_handle_t* handle) mobject_provider_handle_t* handle)
{ {
if(client == MOBJECT_CLIENT_NULL) return -1; if(client == MOBJECT_CLIENT_NULL) return -1;
...@@ -126,9 +108,9 @@ int mobject_provider_handle_create( ...@@ -126,9 +108,9 @@ int mobject_provider_handle_create(
return -1; return -1;
} }
provider->client = client; provider->client = client;
provider->mplex_id = mplex_id; provider->provider_id = provider_id;
provider->refcount = 1; provider->refcount = 1;
client->num_provider_handles += 1; client->num_provider_handles += 1;
...@@ -192,9 +174,7 @@ int mobject_write_op_operate( ...@@ -192,9 +174,7 @@ int mobject_write_op_operate(
return -1; return -1;
} }
margo_set_target_id(h, mph->mplex_id); ret = margo_provider_forward(mph->provider_id, h, &in);
ret = margo_forward(h, &in);
if(ret != HG_SUCCESS) { if(ret != HG_SUCCESS) {
fprintf(stderr, "[MOBJECT] margo_forward() failed in mobject_write_op_operate()\n"); fprintf(stderr, "[MOBJECT] margo_forward() failed in mobject_write_op_operate()\n");
margo_destroy(h); margo_destroy(h);
...@@ -247,9 +227,7 @@ int mobject_read_op_operate( ...@@ -247,9 +227,7 @@ int mobject_read_op_operate(
return -1; return -1;
} }
margo_set_target_id(h, mph->mplex_id); ret = margo_provider_forward(mph->provider_id, h, &in);
ret = margo_forward(h, &in);
if(ret != HG_SUCCESS) { if(ret != HG_SUCCESS) {
fprintf(stderr, "[MOBJECT] margo_forward() failed in mobject_read_op_operate()\n"); fprintf(stderr, "[MOBJECT] margo_forward() failed in mobject_read_op_operate()\n");
margo_destroy(h); margo_destroy(h);
......
...@@ -20,7 +20,7 @@ struct mobject_server_context ...@@ -20,7 +20,7 @@ struct mobject_server_context
{ {
/* margo, bake, sds-keyval, ssg state */ /* margo, bake, sds-keyval, ssg state */
margo_instance_id mid; margo_instance_id mid;
uint8_t mplex_id; uint16_t provider_id;
ABT_pool pool; ABT_pool pool;
/* ssg-related data */ /* ssg-related data */
ssg_group_id_t gid; ssg_group_id_t gid;
......
...@@ -36,7 +36,7 @@ static void mobject_finalize_cb(void* data); ...@@ -36,7 +36,7 @@ static void mobject_finalize_cb(void* data);
int mobject_provider_register( int mobject_provider_register(
margo_instance_id mid, margo_instance_id mid,
uint8_t mplex_id, uint16_t provider_id,
ABT_pool pool, ABT_pool pool,
bake_provider_handle_t bake_ph, bake_provider_handle_t bake_ph,
sdskv_provider_handle_t sdskv_ph, sdskv_provider_handle_t sdskv_ph,
...@@ -52,9 +52,9 @@ int mobject_provider_register( ...@@ -52,9 +52,9 @@ int mobject_provider_register(
{ {
hg_id_t id; hg_id_t id;
hg_bool_t flag; hg_bool_t flag;
margo_registered_name_mplex(mid, "mobject_write_op", mplex_id, &id, &flag); margo_provider_registered_name(mid, "mobject_write_op", provider_id, &id, &flag);
if(flag == HG_TRUE) { if(flag == HG_TRUE) {
fprintf(stderr, "mobject_provider_register(): a provider with the same mplex id (%d) already exists\n", mplex_id); fprintf(stderr, "mobject_provider_register(): a provider with the same id (%d) already exists\n", provider_id);
return -1; return -1;
} }
} }
...@@ -64,7 +64,7 @@ int mobject_provider_register( ...@@ -64,7 +64,7 @@ int mobject_provider_register(
if (!srv_ctx) if (!srv_ctx)
return -1; return -1;
srv_ctx->mid = mid; srv_ctx->mid = mid;
srv_ctx->mplex_id = mplex_id; srv_ctx->provider_id = provider_id;
srv_ctx->pool = pool; srv_ctx->pool = pool;
srv_ctx->ref_count = 1; srv_ctx->ref_count = 1;
...@@ -135,15 +135,15 @@ int mobject_provider_register( ...@@ -135,15 +135,15 @@ int mobject_provider_register(
hg_id_t rpc_id; hg_id_t rpc_id;
rpc_id = MARGO_REGISTER_MPLEX(mid, "mobject_write_op", rpc_id = MARGO_REGISTER_PROVIDER(mid, "mobject_write_op",
write_op_in_t, write_op_out_t, mobject_write_op_ult, write_op_in_t, write_op_out_t, mobject_write_op_ult,
mplex_id, pool); provider_id, pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, srv_ctx, NULL); margo_register_data(mid, rpc_id, srv_ctx, NULL);
rpc_id = MARGO_REGISTER_MPLEX(mid, "mobject_read_op", rpc_id = MARGO_REGISTER_PROVIDER(mid, "mobject_read_op",
read_op_in_t, read_op_out_t, mobject_read_op_ult, read_op_in_t, read_op_out_t, mobject_read_op_ult,
mplex_id, pool); provider_id, pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, srv_ctx, NULL); margo_register_data(mid, rpc_id, srv_ctx, NULL);
margo_push_finalize_callback(mid, mobject_finalize_cb, (void*)srv_ctx); margo_push_finalize_callback(mid, mobject_finalize_cb, (void*)srv_ctx);
...@@ -170,7 +170,7 @@ static hg_return_t mobject_write_op_ult(hg_handle_t h) ...@@ -170,7 +170,7 @@ static hg_return_t mobject_write_op_ult(hg_handle_t h)
vargs.object_name = in.object_name; vargs.object_name = in.object_name;
vargs.oid = 0; vargs.oid = 0;
vargs.pool_name = in.pool_name; vargs.pool_name = in.pool_name;
vargs.srv_ctx = margo_registered_data_mplex(mid, info->id, info->target_id); vargs.srv_ctx = margo_registered_data(mid, info->id);
if(vargs.srv_ctx == NULL) return HG_OTHER_ERROR; if(vargs.srv_ctx == NULL) return HG_OTHER_ERROR;
vargs.client_addr_str = in.client_addr; vargs.client_addr_str = in.client_addr;
vargs.client_addr = info->addr; vargs.client_addr = info->addr;
...@@ -223,7 +223,7 @@ static hg_return_t mobject_read_op_ult(hg_handle_t h) ...@@ -223,7 +223,7 @@ static hg_return_t mobject_read_op_ult(hg_handle_t h)
vargs.object_name = in.object_name; vargs.object_name = in.object_name;
vargs.oid = 0; vargs.oid = 0;
vargs.pool_name = in.pool_name; vargs.pool_name = in.pool_name;
vargs.srv_ctx = margo_registered_data_mplex(mid, info->id, info->target_id); vargs.srv_ctx = margo_registered_data(mid, info->id);
if(vargs.srv_ctx == NULL) return HG_OTHER_ERROR; if(vargs.srv_ctx == NULL) return HG_OTHER_ERROR;
vargs.client_addr_str = in.client_addr; vargs.client_addr_str = in.client_addr;
vargs.client_addr = info->addr; vargs.client_addr = info->addr;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment