Commit 55295b22 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

done enabling probing of multiple targets

parent b753fa81
...@@ -46,24 +46,51 @@ int bake_client_init(margo_instance_id mid, bake_client_t* client); ...@@ -46,24 +46,51 @@ int bake_client_init(margo_instance_id mid, bake_client_t* client);
*/ */
int bake_client_finalize(bake_client_t client); int bake_client_finalize(bake_client_t client);
/**
* Creates a provider handle to point to a particular BAKE provider.
*
* @param client client managing the provider handle
* @param addr address of the provider
* @param mplex_id multiplex id of the provider
* @param handle resulting handle
*
* @return 0 on success, -1 on failure
*/
int bake_provider_handle_create( int bake_provider_handle_create(
bake_client_t client, bake_client_t client,
hg_addr_t addr, hg_addr_t addr,
uint8_t mplex_id, uint8_t mplex_id,
bake_provider_handle_t* handle); bake_provider_handle_t* handle);
/**
* Increment the reference counter of the provider handle
*
* @param handle provider handle
*
* @return 0 on success, -1 on failure
*/
int bake_provider_handle_ref_incr(bake_provider_handle_t handle); int bake_provider_handle_ref_incr(bake_provider_handle_t handle);
/**
* Decrement the reference counter of the provider handle,
* effectively freeing the provider handle when the reference count
* is down to 0.
*
* @param handle provider handle
*
* @return 0 on success, -1 on failure
*/
int bake_provider_handle_release(bake_provider_handle_t handle); int bake_provider_handle_release(bake_provider_handle_t handle);
/** /**
* Obtains identifying information for a BAKE target through the provided * Obtains available BAKE targets from a give provider.
* remote mercury address and multiplex id. * If bake_target_id_t is NULL, max_targets is ignored and the
* function returns the number of targets available in num_targets.
* *
* @param [in] client BAKE client * @param [in] provider provider handle
* @param [in] dest_addr destination Mercury address * @param [in] max_targets maximum number of targets to retrieve
* @param [in] mplex_id multiplex id * @param [out] bti array of BAKE target identifiers with enough space for max_targets
* @param [out] bti BAKE target identifier * @param [out] num_targets number of targets returned (at most max_targets)
* @returns 0 on success, -1 on failure * @returns 0 on success, -1 on failure
*/ */
int bake_probe( int bake_probe(
...@@ -78,6 +105,7 @@ int bake_probe( ...@@ -78,6 +105,7 @@ int bake_probe(
* are complete) with a a BAKE persist operation. The region is not valid * are complete) with a a BAKE persist operation. The region is not valid
* for read access until persisted. * for read access until persisted.
* *
* @param [in] provider provider handle
* @param [in] bti BAKE target identifier * @param [in] bti BAKE target identifier
* @param [in] region_size size of region to be created * @param [in] region_size size of region to be created
* @param [out] rid identifier for new region * @param [out] rid identifier for new region
...@@ -97,7 +125,7 @@ int bake_create( ...@@ -97,7 +125,7 @@ int bake_create(
* Results are undefined if multiple writers (from same process or different * Results are undefined if multiple writers (from same process or different
* processes) perform overlapping writes. * processes) perform overlapping writes.
* *
* @param [in] bti BAKE target identifier * @param [in] provider provider handle
* @param [in] rid identifier for region * @param [in] rid identifier for region
* @param [in] region_offset offset into the target region to write * @param [in] region_offset offset into the target region to write
* @param [in] buf local memory buffer to write * @param [in] buf local memory buffer to write
...@@ -115,7 +143,7 @@ int bake_write( ...@@ -115,7 +143,7 @@ int bake_write(
* Writes data into a previously created BAKE region like bake_write(), * Writes data into a previously created BAKE region like bake_write(),
* except the write is performed on behalf of some remote entity. * except the write is performed on behalf of some remote entity.
* *
* @param [in] bti BAKE target identifier * @param [in] provider provider handle
* @param [in] rid identifier for region * @param [in] rid identifier for region
* @param [in] region_offset offset into the target region to write * @param [in] region_offset offset into the target region to write
* @param [in] remote_bulk bulk_handle for remote data region to write from * @param [in] remote_bulk bulk_handle for remote data region to write from
...@@ -137,7 +165,7 @@ int bake_proxy_write( ...@@ -137,7 +165,7 @@ int bake_proxy_write(
* Persists a BAKE region. The region is considered immutable at this point * Persists a BAKE region. The region is considered immutable at this point
* and reads may be performed on the region. * and reads may be performed on the region.
* *
* @param [in] bti BAKE target identifier * @param [in] provider provider handle
* @param [in] rid identifier for region * @param [in] rid identifier for region
* @returns 0 on success, -1 on failure * @returns 0 on success, -1 on failure
*/ */
...@@ -149,6 +177,7 @@ int bake_persist( ...@@ -149,6 +177,7 @@ int bake_persist(
* Creates a bounded-size BAKE region, writes data into it, and persists * Creates a bounded-size BAKE region, writes data into it, and persists
* the reason all in one call/RPC (and thus 1 RTT). * the reason all in one call/RPC (and thus 1 RTT).
* *
* @param [in] provider provider handle
* @param [in] bti BAKE target identifier * @param [in] bti BAKE target identifier
* @param [in] region_size size of region to be created * @param [in] region_size size of region to be created
* @param [in] region_offset offset into the target region to write * @param [in] region_offset offset into the target region to write
...@@ -168,6 +197,7 @@ int bake_create_write_persist( ...@@ -168,6 +197,7 @@ int bake_create_write_persist(
/** /**
* *
* @param [in] provider provider handle
* @param [in] bti BAKE target identifier * @param [in] bti BAKE target identifier
* @param [in] region_size size of region to be created * @param [in] region_size size of region to be created
* @param [in] region_offset offset into the target region to write * @param [in] region_offset offset into the target region to write
...@@ -192,7 +222,7 @@ int bake_create_write_persist_proxy( ...@@ -192,7 +222,7 @@ int bake_create_write_persist_proxy(
/** /**
* Checks the size of an existing BAKE region. * Checks the size of an existing BAKE region.
* *
* @param [in] bti BAKE target identifier * @param [in] provider provider handle
* @param [in] rid identifier for region * @param [in] rid identifier for region
* @param [out] size size of region * @param [out] size size of region
* @returns 0 on success, -1 on failure * @returns 0 on success, -1 on failure
...@@ -208,7 +238,7 @@ int bake_get_size( ...@@ -208,7 +238,7 @@ int bake_get_size(
* NOTE: for now at least, this call does not support "short" reads. It * NOTE: for now at least, this call does not support "short" reads. It
* either succeeds in reading the requested size or not. * either succeeds in reading the requested size or not.
* *
* @param [in] bti BAKE target identifier * @param [in] provider provider handle
* @param [in] rid region identifier * @param [in] rid region identifier
* @param [in] region_offset offset into the target region to read from * @param [in] region_offset offset into the target region to read from
* @param [in] buf local memory buffer read into * @param [in] buf local memory buffer read into
...@@ -226,7 +256,7 @@ int bake_read( ...@@ -226,7 +256,7 @@ int bake_read(
* Reads data from a previously persisted BAKE region like bake_read(), * Reads data from a previously persisted BAKE region like bake_read(),
* except the read is performed on behalf of some remote entity. * except the read is performed on behalf of some remote entity.
* *
* @param [in] bti BAKE target identifier * @param [in] provider provider handle
* @param [in] rid identifier for region * @param [in] rid identifier for region
* @param [in] region_offset offset into the target region to write * @param [in] region_offset offset into the target region to write
* @param [in] remote_bulk bulk_handle for remote data region to read to * @param [in] remote_bulk bulk_handle for remote data region to read to
...@@ -258,7 +288,7 @@ int bake_shutdown_service( ...@@ -258,7 +288,7 @@ int bake_shutdown_service(
/** /**
* Issues a BAKE no-op operation. * Issues a BAKE no-op operation.
* *
* @param [in] bti BAKE target identifier * @param [in] provider provider handle
* @returns 0 on success, -1 on failure * @returns 0 on success, -1 on failure
*/ */
int bake_noop(bake_provider_handle_t provider); int bake_noop(bake_provider_handle_t provider);
......
...@@ -33,6 +33,8 @@ struct bake_client ...@@ -33,6 +33,8 @@ struct bake_client
hg_id_t bake_get_size_id; hg_id_t bake_get_size_id;
hg_id_t bake_read_id; hg_id_t bake_read_id;
hg_id_t bake_noop_id; hg_id_t bake_noop_id;
uint64_t num_provider_handles;
}; };
struct bake_provider_handle { struct bake_provider_handle {
...@@ -49,7 +51,7 @@ static int bake_client_register(bake_client_t client, margo_instance_id mid) ...@@ -49,7 +51,7 @@ static int bake_client_register(bake_client_t client, margo_instance_id mid)
/* register RPCs */ /* register RPCs */
client->bake_probe_id = client->bake_probe_id =
MARGO_REGISTER(mid, "bake_probe_rpc", MARGO_REGISTER(mid, "bake_probe_rpc",
void, bake_probe_out_t, NULL); bake_probe_in_t, bake_probe_out_t, NULL);
client->bake_shutdown_id = client->bake_shutdown_id =
MARGO_REGISTER(mid, "bake_shutdown_rpc", MARGO_REGISTER(mid, "bake_shutdown_rpc",
void, void, NULL); void, void, NULL);
...@@ -89,6 +91,8 @@ int bake_client_init(margo_instance_id mid, bake_client_t* client) ...@@ -89,6 +91,8 @@ int bake_client_init(margo_instance_id mid, bake_client_t* client)
bake_client_t c = (bake_client_t)calloc(1, sizeof(*c)); bake_client_t c = (bake_client_t)calloc(1, sizeof(*c));
if(!c) return -1; if(!c) return -1;
c->num_provider_handles = 0;
int ret = bake_client_register(c, mid); int ret = bake_client_register(c, mid);
if(ret != 0) return ret; if(ret != 0) return ret;
...@@ -98,6 +102,11 @@ int bake_client_init(margo_instance_id mid, bake_client_t* client) ...@@ -98,6 +102,11 @@ int bake_client_init(margo_instance_id mid, bake_client_t* client)
int bake_client_finalize(bake_client_t client) int bake_client_finalize(bake_client_t client)
{ {
if(client->num_provider_handles != 0) {
fprintf(stderr,
"[BAKE] Warning: %d provider handles not released before bake_client_finalize was called\n",
client->num_provider_handles);
}
free(client); free(client);
return 0; return 0;
} }
...@@ -110,9 +119,13 @@ int bake_probe( ...@@ -110,9 +119,13 @@ int bake_probe(
{ {
hg_return_t hret; hg_return_t hret;
int ret; int ret;
bake_probe_in_t in;
bake_probe_out_t out; bake_probe_out_t out;
hg_handle_t handle; hg_handle_t handle;
if(bti == NULL) max_targets = 0;
in.max_targets = max_targets;
/* create handle */ /* create handle */
hret = margo_create( hret = margo_create(
provider->client->mid, provider->client->mid,
...@@ -129,7 +142,7 @@ int bake_probe( ...@@ -129,7 +142,7 @@ int bake_probe(
return -1; return -1;
} }
hret = margo_forward(handle, NULL); hret = margo_forward(handle, &in);
if(hret != HG_SUCCESS) { if(hret != HG_SUCCESS) {
margo_destroy(handle); margo_destroy(handle);
return -1; return -1;
...@@ -143,13 +156,20 @@ int bake_probe( ...@@ -143,13 +156,20 @@ int bake_probe(
ret = out.ret; ret = out.ret;
margo_free_output(handle, &out);
margo_destroy(handle);
if(ret == HG_SUCCESS) { if(ret == HG_SUCCESS) {
*bti = out.bti; if(max_targets == 0) {
*num_targets = out.num_targets;
} else {
uint64_t s = *num_targets > max_targets ? max_targets : *num_targets;
if(s > 0) {
memcpy(bti, out.targets, sizeof(*bti)*s);
}
}
} }
margo_free_output(handle, &out);
margo_destroy(handle);
return ret; return ret;
} }
...@@ -176,6 +196,8 @@ int bake_provider_handle_create( ...@@ -176,6 +196,8 @@ int bake_provider_handle_create(
provider->mplex_id = mplex_id; provider->mplex_id = mplex_id;
provider->refcount = 1; provider->refcount = 1;
client->num_provider_handles += 1;
*handle = provider; *handle = provider;
return 0; return 0;
} }
...@@ -193,6 +215,7 @@ int bake_provider_handle_release(bake_provider_handle_t handle) ...@@ -193,6 +215,7 @@ int bake_provider_handle_release(bake_provider_handle_t handle)
handle->refcount -= 1; handle->refcount -= 1;
if(handle->refcount == 0) { if(handle->refcount == 0) {
margo_addr_free(handle->client->mid, handle->addr); margo_addr_free(handle->client->mid, handle->addr);
handle->client->num_provider_handles -= 1;
free(handle); free(handle);
} }
return 0; return 0;
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
/* encoders for BAKE-specific types */ /* encoders for BAKE-specific types */
static inline hg_return_t hg_proc_bake_target_id_t(hg_proc_t proc, bake_target_id_t *bti); static inline hg_return_t hg_proc_bake_target_id_t(hg_proc_t proc, bake_target_id_t *bti);
static inline hg_return_t hg_proc_bake_region_id_t(hg_proc_t proc, bake_region_id_t *rid); static inline hg_return_t hg_proc_bake_region_id_t(hg_proc_t proc, bake_region_id_t *rid);
static inline hg_return_t hg_proc_bake_probe_out_t(hg_proc_t proc, void* out);
/* BAKE shutdown */ /* BAKE shutdown */
DECLARE_MARGO_RPC_HANDLER(bake_shutdown_ult) DECLARE_MARGO_RPC_HANDLER(bake_shutdown_ult)
...@@ -115,9 +115,17 @@ static inline hg_return_t hg_proc_bake_eager_read_out_t(hg_proc_t proc, void *v_ ...@@ -115,9 +115,17 @@ static inline hg_return_t hg_proc_bake_eager_read_out_t(hg_proc_t proc, void *v_
DECLARE_MARGO_RPC_HANDLER(bake_eager_read_ult) DECLARE_MARGO_RPC_HANDLER(bake_eager_read_ult)
/* BAKE probe */ /* BAKE probe */
MERCURY_GEN_PROC(bake_probe_out_t, MERCURY_GEN_PROC(bake_probe_in_t,
((int32_t)(ret))\ ((uint64_t)(max_targets)))
((bake_target_id_t)(bti))) //MERCURY_GEN_PROC(bake_probe_out_t,
// ((int32_t)(ret))\
// ((bake_target_id_t)(bti)))
typedef struct
{
int32_t ret;
uint64_t num_targets;
bake_target_id_t* targets;
} bake_probe_out_t;
DECLARE_MARGO_RPC_HANDLER(bake_probe_ult) DECLARE_MARGO_RPC_HANDLER(bake_probe_ult)
/* BAKE noop */ /* BAKE noop */
...@@ -192,4 +200,23 @@ static inline hg_return_t hg_proc_bake_eager_read_out_t(hg_proc_t proc, void *v_ ...@@ -192,4 +200,23 @@ static inline hg_return_t hg_proc_bake_eager_read_out_t(hg_proc_t proc, void *v_
return(HG_SUCCESS); return(HG_SUCCESS);
} }
static inline hg_return_t hg_proc_bake_probe_out_t(hg_proc_t proc, void* data)
{
bake_probe_out_t* out = (bake_probe_out_t*)data;
void* buf = NULL;
hg_proc_int32_t(proc, &out->ret);
hg_proc_uint64_t(proc, &out->num_targets);
if(out->num_targets)
{
buf = hg_proc_save_ptr(proc, out->num_targets * sizeof(bake_target_id_t));
if(hg_proc_get_op(proc) == HG_ENCODE)
memcpy(buf, out->targets, out->num_targets * sizeof(bake_target_id_t));
if(hg_proc_get_op(proc) == HG_DECODE)
out->targets = buf;
hg_proc_restore_ptr(proc, buf, out->num_targets * sizeof(bake_target_id_t));
}
return HG_SUCCESS;
}
#endif /* __BAKE_RPC */ #endif /* __BAKE_RPC */
...@@ -144,7 +144,7 @@ int bake_provider_register( ...@@ -144,7 +144,7 @@ int bake_provider_register(
bake_read_ult, mplex_id, abt_pool); bake_read_ult, mplex_id, abt_pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL); margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_MPLEX(mid, "bake_probe_rpc", rpc_id = MARGO_REGISTER_MPLEX(mid, "bake_probe_rpc",
void, bake_probe_out_t, bake_probe_ult, bake_probe_in_t, bake_probe_out_t, bake_probe_ult,
mplex_id, abt_pool); mplex_id, abt_pool);
margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL); margo_register_data_mplex(mid, rpc_id, mplex_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_MPLEX(mid, "bake_noop_rpc", rpc_id = MARGO_REGISTER_MPLEX(mid, "bake_noop_rpc",
...@@ -820,7 +820,10 @@ static void bake_probe_ult(hg_handle_t handle) ...@@ -820,7 +820,10 @@ static void bake_probe_ult(hg_handle_t handle)
} }
out.ret = 0; out.ret = 0;
out.bti = svr_ctx->pmem_root->pool_id; // XXX this is where we should handle multiple targets
bake_target_id_t targets[1] = { svr_ctx->pmem_root->pool_id };
out.targets = targets;
out.num_targets = 1;
margo_respond(handle, &out); margo_respond(handle, &out);
margo_destroy(handle); margo_destroy(handle);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment