Commit b753fa81 authored by Matthieu Dorier's avatar Matthieu Dorier

added provider handles and now bake_target_id_t is simply a uuid

parent 94b56161
......@@ -16,10 +16,10 @@ extern "C" {
#endif
#define BAKE_CLIENT_NULL ((bake_client_t)NULL)
#define BAKE_TARGET_ID_NULL ((bake_target_id_t)NULL)
#define BAKE_PROVIDER_HANDLE_NULL ((bake_provider_handle_t)NULL)
typedef struct bake_client* bake_client_t;
typedef struct bake_target* bake_target_id_t;
typedef struct bake_provider_handle* bake_provider_handle_t;
/**
* Creates a BAKE client attached to the given margo instance.
......@@ -46,6 +46,16 @@ int bake_client_init(margo_instance_id mid, bake_client_t* client);
*/
int bake_client_finalize(bake_client_t client);
int bake_provider_handle_create(
bake_client_t client,
hg_addr_t addr,
uint8_t mplex_id,
bake_provider_handle_t* handle);
int bake_provider_handle_ref_incr(bake_provider_handle_t handle);
int bake_provider_handle_release(bake_provider_handle_t handle);
/**
* Obtains identifying information for a BAKE target through the provided
* remote mercury address and multiplex id.
......@@ -56,11 +66,11 @@ int bake_client_finalize(bake_client_t client);
* @param [out] bti BAKE target identifier
* @returns 0 on success, -1 on failure
*/
int bake_probe_instance(
bake_client_t client,
hg_addr_t dest_addr,
uint8_t mplex_id,
bake_target_id_t *bti);
int bake_probe(
bake_provider_handle_t provider,
uint64_t max_targets,
bake_target_id_t* bti,
uint64_t* num_targets);
/**
* Creates a bounded-size BAKE data region. The resulting region can be
......@@ -74,9 +84,10 @@ int bake_probe_instance(
* @returns 0 on success, -1 on failure
*/
int bake_create(
bake_target_id_t bti,
uint64_t region_size,
bake_region_id_t *rid);
bake_provider_handle_t provider,
bake_target_id_t bti,
uint64_t region_size,
bake_region_id_t *rid);
/**
* Writes into a BAKE region that was previously created with bake_create().
......@@ -94,11 +105,11 @@ int bake_create(
* @returns 0 on success, -1 on failure
*/
int bake_write(
bake_target_id_t bti,
bake_region_id_t rid,
uint64_t region_offset,
void const *buf,
uint64_t buf_size);
bake_provider_handle_t provider,
bake_region_id_t rid,
uint64_t region_offset,
void const *buf,
uint64_t buf_size);
/**
* Writes data into a previously created BAKE region like bake_write(),
......@@ -114,13 +125,13 @@ int bake_write(
* @returns 0 on success, -1 on failure
*/
int bake_proxy_write(
bake_target_id_t bti,
bake_region_id_t rid,
uint64_t region_offset,
hg_bulk_t remote_bulk,
uint64_t remote_offset,
const char* remote_addr,
uint64_t size);
bake_provider_handle_t provider,
bake_region_id_t rid,
uint64_t region_offset,
hg_bulk_t remote_bulk,
uint64_t remote_offset,
const char* remote_addr,
uint64_t size);
/**
* Persists a BAKE region. The region is considered immutable at this point
......@@ -131,8 +142,8 @@ int bake_proxy_write(
* @returns 0 on success, -1 on failure
*/
int bake_persist(
bake_target_id_t bti,
bake_region_id_t rid);
bake_provider_handle_t provider,
bake_region_id_t rid);
/**
* Creates a bounded-size BAKE region, writes data into it, and persists
......@@ -147,12 +158,13 @@ int bake_persist(
* @returns 0 on success, -1 on failure
*/
int bake_create_write_persist(
bake_target_id_t bti,
uint64_t region_size,
uint64_t region_offset,
void const *buf,
uint64_t buf_size,
bake_region_id_t *rid);
bake_provider_handle_t provider,
bake_target_id_t bti,
uint64_t region_size,
uint64_t region_offset,
void const *buf,
uint64_t buf_size,
bake_region_id_t *rid);
/**
*
......@@ -167,14 +179,15 @@ int bake_create_write_persist(
* @returns 0 on success, -1 on failure
*/
int bake_create_write_persist_proxy(
bake_target_id_t bti,
uint64_t region_size,
uint64_t region_offset,
hg_bulk_t remote_bulk,
uint64_t remote_offset,
const char* remote_addr,
uint64_t size,
bake_region_id_t *rid);
bake_provider_handle_t provider,
bake_target_id_t bti,
uint64_t region_size,
uint64_t region_offset,
hg_bulk_t remote_bulk,
uint64_t remote_offset,
const char* remote_addr,
uint64_t size,
bake_region_id_t *rid);
/**
* Checks the size of an existing BAKE region.
......@@ -185,9 +198,9 @@ int bake_create_write_persist_proxy(
* @returns 0 on success, -1 on failure
*/
int bake_get_size(
bake_target_id_t bti,
bake_region_id_t rid,
uint64_t *size);
bake_provider_handle_t provider,
bake_region_id_t rid,
uint64_t *size);
/**
* Reads from a BAKE region that was previously persisted with bake_persist().
......@@ -203,11 +216,11 @@ int bake_get_size(
* @returns 0 on success, -1 on failure
*/
int bake_read(
bake_target_id_t bti,
bake_region_id_t rid,
uint64_t region_offset,
void *buf,
uint64_t buf_size);
bake_provider_handle_t provider,
bake_region_id_t rid,
uint64_t region_offset,
void *buf,
uint64_t buf_size);
/**
* Reads data from a previously persisted BAKE region like bake_read(),
......@@ -223,22 +236,13 @@ int bake_read(
* @returns 0 on success, -1 on failure
*/
int bake_proxy_read(
bake_target_id_t bti,
bake_region_id_t rid,
uint64_t region_offset,
hg_bulk_t remote_bulk,
uint64_t remote_offset,
const char* remote_addr,
uint64_t size);
/**
* Releases local resources associated with access to a BAKE target;
* does not modify the target in any way.
*
* @param [in] bti BAKE target_identifier
*/
void bake_target_id_release(
bake_target_id_t bti);
bake_provider_handle_t provider,
bake_region_id_t rid,
uint64_t region_offset,
hg_bulk_t remote_bulk,
uint64_t remote_offset,
const char* remote_addr,
uint64_t size);
/**
* Shuts down a remote BAKE service (given an address).
......@@ -249,7 +253,7 @@ void bake_target_id_release(
* @returns 0 on success, -1 on failure
*/
int bake_shutdown_service(
bake_client_t client, hg_addr_t addr);
bake_client_t client, hg_addr_t addr);
/**
* Issues a BAKE no-op operation.
......@@ -257,8 +261,7 @@ int bake_shutdown_service(
* @param [in] bti BAKE target identifier
* @returns 0 on success, -1 on failure
*/
int bake_noop(
bake_target_id_t bti);
int bake_noop(bake_provider_handle_t provider);
#ifdef __cplusplus
}
......
......@@ -16,7 +16,7 @@ extern "C" {
typedef struct {
uuid_t id;
} bake_uuid_t;
} bake_target_id_t;
/**
* Persistent, opaque identifier for a region within a BAKE target.
*/
......
......@@ -35,11 +35,11 @@ struct bake_client
hg_id_t bake_noop_id;
};
struct bake_target {
struct bake_provider_handle {
struct bake_client* client;
bake_uuid_t pool_id;
hg_addr_t dest;
uint8_t mplex_id;
hg_addr_t addr;
uint8_t mplex_id;
uint64_t refcount;
};
static int bake_client_register(bake_client_t client, margo_instance_id mid)
......@@ -102,88 +102,102 @@ int bake_client_finalize(bake_client_t client)
return 0;
}
int bake_probe_instance(
bake_client_t client,
hg_addr_t dest_addr,
uint8_t mplex_id,
bake_target_id_t *bti)
int bake_probe(
bake_provider_handle_t provider,
uint64_t max_targets,
bake_target_id_t *bti,
uint64_t* num_targets)
{
hg_return_t hret;
int ret;
bake_probe_out_t out;
hg_handle_t handle;
struct bake_target *new_target;
new_target = calloc(1, sizeof(*new_target));
if(!new_target)
return(-1);
new_target->client = client;
new_target->mplex_id = mplex_id;
hret = margo_addr_dup(client->mid, dest_addr, &new_target->dest);
if(hret != HG_SUCCESS)
{
free(new_target);
return(-1);
}
/* create handle */
hret = margo_create(client->mid, new_target->dest,
client->bake_probe_id, &handle);
margo_set_target_id(handle, mplex_id);
hret = margo_create(
provider->client->mid,
provider->addr,
provider->client->bake_probe_id,
&handle);
if(hret != HG_SUCCESS)
{
margo_addr_free(client->mid, new_target->dest);
free(new_target);
return(-1);
if(hret != HG_SUCCESS) return -1;
hret = margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return -1;
}
hret = margo_forward(handle, NULL);
if(hret != HG_SUCCESS)
{
if(hret != HG_SUCCESS) {
margo_destroy(handle);
margo_addr_free(client->mid, new_target->dest);
free(new_target);
return(-1);
return -1;
}
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS)
{
if(hret != HG_SUCCESS) {
margo_destroy(handle);
margo_addr_free(client->mid, new_target->dest);
free(new_target);
return(-1);
return -1;
}
ret = out.ret;
new_target->pool_id = out.pool_id;
margo_free_output(handle, &out);
margo_destroy(handle);
if(ret != 0)
{
margo_addr_free(client->mid, new_target->dest);
free(new_target);
} else {
*bti = new_target;
if(ret == HG_SUCCESS) {
*bti = out.bti;
}
return(ret);
return ret;
}
void bake_target_id_release(
bake_target_id_t bti)
int bake_provider_handle_create(
bake_client_t client,
hg_addr_t addr,
uint8_t mplex_id,
bake_provider_handle_t* handle)
{
margo_addr_free(bti->client->mid, bti->dest);
free(bti);
if(client == BAKE_CLIENT_NULL) return -1;
bake_provider_handle_t provider =
(bake_provider_handle_t)calloc(1, sizeof(*provider));
if(!provider) return -1;
hg_return_t ret = margo_addr_dup(client->mid, addr, &(provider->addr));
if(ret != HG_SUCCESS) {
free(provider);
return -1;
}
provider->client = client;
provider->mplex_id = mplex_id;
provider->refcount = 1;
return;
*handle = provider;
return 0;
}
int bake_provider_handle_ref_incr(bake_provider_handle_t handle)
{
if(handle == BAKE_PROVIDER_HANDLE_NULL) return -1;
handle->refcount += 1;
return 0;
}
int bake_provider_handle_release(bake_provider_handle_t handle)
{
if(handle == BAKE_PROVIDER_HANDLE_NULL) return -1;
handle->refcount -= 1;
if(handle->refcount == 0) {
margo_addr_free(handle->client->mid, handle->addr);
free(handle);
}
return 0;
}
int bake_shutdown_service(bake_client_t client, hg_addr_t addr)
{
hg_return_t hret;
......@@ -207,7 +221,7 @@ int bake_shutdown_service(bake_client_t client, hg_addr_t addr)
}
static int bake_eager_write(
bake_target_id_t bti,
bake_provider_handle_t provider,
bake_region_id_t rid,
uint64_t region_offset,
void const *buf,
......@@ -219,15 +233,14 @@ static int bake_eager_write(
bake_eager_write_out_t out;
int ret;
in.pool_id = bti->pool_id;
in.rid = rid;
in.region_offset = region_offset;
in.size = buf_size;
in.buffer = (char*)buf;
hret = margo_create(bti->client->mid, bti->dest,
bti->client->bake_eager_write_id, &handle);
margo_set_target_id(handle, bti->mplex_id);
hret = margo_create(provider->client->mid, provider->addr,
provider->client->bake_eager_write_id, &handle);
margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS)
return(-1);
......@@ -255,7 +268,7 @@ static int bake_eager_write(
}
int bake_write(
bake_target_id_t bti,
bake_provider_handle_t provider,
bake_region_id_t rid,
uint64_t region_offset,
void const *buf,
......@@ -268,23 +281,22 @@ int bake_write(
int ret;
if(buf_size <= BAKE_EAGER_LIMIT)
return(bake_eager_write(bti, rid, region_offset, buf, buf_size));
return(bake_eager_write(provider, rid, region_offset, buf, buf_size));
in.pool_id = bti->pool_id;
in.rid = rid;
in.region_offset = region_offset;
in.bulk_offset = 0;
in.bulk_size = buf_size;
in.remote_addr_str = NULL; /* set remote_addr to NULL to disable proxy write */
hret = margo_bulk_create(bti->client->mid, 1, (void**)(&buf), &buf_size,
hret = margo_bulk_create(provider->client->mid, 1, (void**)(&buf), &buf_size,
HG_BULK_READ_ONLY, &in.bulk_handle);
if(hret != HG_SUCCESS)
return(-1);
hret = margo_create(bti->client->mid, bti->dest,
bti->client->bake_write_id, &handle);
margo_set_target_id(handle, bti->mplex_id);
hret = margo_create(provider->client->mid, provider->addr,
provider->client->bake_write_id, &handle);
margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS)
{
......@@ -317,7 +329,7 @@ int bake_write(
}
int bake_proxy_write(
bake_target_id_t bti,
bake_provider_handle_t provider,
bake_region_id_t rid,
uint64_t region_offset,
hg_bulk_t remote_bulk,
......@@ -331,7 +343,6 @@ int bake_proxy_write(
bake_write_out_t out;
int ret;
in.pool_id = bti->pool_id;
in.rid = rid;
in.region_offset = region_offset;
in.bulk_handle = remote_bulk;
......@@ -339,9 +350,9 @@ int bake_proxy_write(
in.bulk_size = size;
in.remote_addr_str = (char*)remote_addr;
hret = margo_create(bti->client->mid, bti->dest,
bti->client->bake_write_id, &handle);
margo_set_target_id(handle, bti->mplex_id);
hret = margo_create(provider->client->mid, provider->addr,
provider->client->bake_write_id, &handle);
margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS)
return(-1);
......@@ -368,6 +379,7 @@ int bake_proxy_write(
}
int bake_create(
bake_provider_handle_t provider,
bake_target_id_t bti,
uint64_t region_size,
bake_region_id_t *rid)
......@@ -378,17 +390,17 @@ int bake_create(
bake_create_out_t out;
int ret = 0;
in.pool_id = bti->pool_id;
in.bti = bti;
in.region_size = region_size;
hret = margo_create(bti->client->mid, bti->dest,
bti->client->bake_create_id, &handle);
hret = margo_create(provider->client->mid, provider->addr,
provider->client->bake_create_id, &handle);
if(hret != HG_SUCCESS) {
return(-1);
}
margo_set_target_id(handle, bti->mplex_id);
margo_set_target_id(handle, provider->mplex_id);
hret = margo_forward(handle, &in);
if(hret != HG_SUCCESS)
......@@ -414,7 +426,7 @@ int bake_create(
int bake_persist(
bake_target_id_t bti,
bake_provider_handle_t provider,
bake_region_id_t rid)
{
hg_return_t hret;
......@@ -423,12 +435,11 @@ int bake_persist(
bake_persist_out_t out;
int ret;
in.pool_id = bti->pool_id;
in.rid = rid;
hret = margo_create(bti->client->mid, bti->dest,
bti->client->bake_persist_id, &handle);
margo_set_target_id(handle, bti->mplex_id);
hret = margo_create(provider->client->mid, provider->addr,
provider->client->bake_persist_id, &handle);
margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS)
return(-1);
......@@ -455,6 +466,7 @@ int bake_persist(
}
int bake_create_write_persist(
bake_provider_handle_t provider,
bake_target_id_t bti,
uint64_t region_size,
uint64_t region_offset,
......@@ -470,21 +482,21 @@ int bake_create_write_persist(
/* XXX eager path? */
in.pool_id = bti->pool_id;
in.bti = bti;
in.region_size = region_size;
in.region_offset = region_offset;
in.bulk_offset = 0;
in.bulk_size = buf_size;
in.remote_addr_str = NULL; /* set remote_addr to NULL to disable proxy write */
hret = margo_bulk_create(bti->client->mid, 1, (void**)(&buf), &buf_size,
hret = margo_bulk_create(provider->client->mid, 1, (void**)(&buf), &buf_size,
HG_BULK_READ_ONLY, &in.bulk_handle);
if(hret != HG_SUCCESS)
return(-1);
hret = margo_create(bti->client->mid, bti->dest,
bti->client->bake_create_write_persist_id, &handle);
margo_set_target_id(handle, bti->mplex_id);
hret = margo_create(provider->client->mid, provider->addr,
provider->client->bake_create_write_persist_id, &handle);
margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS)
{
......@@ -519,6 +531,7 @@ int bake_create_write_persist(
}
int bake_create_write_persist_proxy(
bake_provider_handle_t provider,
bake_target_id_t bti,
uint64_t region_size,
uint64_t region_offset,
......@@ -534,7 +547,7 @@ int bake_create_write_persist_proxy(
bake_create_write_persist_out_t out;
int ret;
in.pool_id = bti->pool_id;
in.bti = bti;
in.region_size = region_size;
in.region_offset = region_offset;
in.bulk_handle = remote_bulk;
......@@ -542,9 +555,9 @@ int bake_create_write_persist_proxy(
in.bulk_size = size;
in.remote_addr_str = (char*)remote_addr;
hret = margo_create(bti->client->mid, bti->dest,
bti->client->bake_create_write_persist_id, &handle);
margo_set_target_id(handle, bti->mplex_id);
hret = margo_create(provider->client->mid, provider->addr,
provider->client->bake_create_write_persist_id, &handle);
margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS)
return(-1);
......@@ -573,7 +586,7 @@ int bake_create_write_persist_proxy(
}
int bake_get_size(
bake_target_id_t bti,
bake_provider_handle_t provider,
bake_region_id_t rid,
uint64_t *region_size)
{
......@@ -583,12 +596,11 @@ int bake_get_size(
bake_get_size_out_t out;
int ret;
in.pool_id = bti->pool_id;
in.rid = rid;
hret = margo_create(bti->client->mid, bti->dest,
bti->client->bake_get_size_id, &handle);
margo_set_target_id(handle, bti->mplex_id);
hret = margo_create(provider->client->mid, provider->addr,
provider->client->bake_get_size_id, &handle);
margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS)
return(-1);
......@@ -615,15 +627,14 @@ int bake_get_size(
return(ret);
}
int bake_noop(
bake_target_id_t bti)
int bake_noop(bake_provider_handle_t provider)
{
hg_return_t hret;
hg_handle_t handle;
hret = margo_create(bti->client->mid, bti->dest,
bti->client->bake_noop_id, &handle);
margo_set_target_id(handle, bti->mplex_id);
hret = margo_create(provider->client->mid, provider->addr,
provider->client->bake_noop_id, &handle);
margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS)
return(-1);
......@@ -640,7 +651,7 @@ int bake_noop(
}
static int bake_eager_read(
bake_target_id_t bti,
bake_provider_handle_t provider,
bake_region_id_t rid,
uint64_t region_offset,
void *buf,
......@@ -652,14 +663,13 @@ static int bake_eager_read(
bake_eager_read_out_t out;
int ret;
in.pool_id = bti->pool_id;
in.rid = rid;
in.region_offset = region_offset;
in.size = buf_size;
hret = margo_create(bti->client->mid, bti->dest,
bti->client->bake_eager_read_id, &handle);
margo_set_target_id(handle, bti->mplex_id);
hret = margo_create(provider->client->mid, provider->addr,
provider->client->bake_eager_read_id, &handle);
margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS)
return(-1);
......@@ -688,7 +698,7 @@ static int bake_eager_read(
}
int bake_read(
bake_target_id_t bti,
bake_provider_handle_t provider,
bake_region_id_t rid,
uint64_t region_offset,
void *buf,
......@@ -701,23 +711,22 @@ int bake_read(
int ret;
if(buf_size <= BAKE_EAGER_LIMIT)
return(bake_eager_read(bti, rid, region_offset, buf, buf_size));
return(bake_eager_read(provider, rid, region_offset, buf, buf_size));
in.pool_id = bti->pool_id;
in.rid = rid;
in.region_offset = region_offset;
in.bulk_offset = 0;
in.bulk_size = buf_size;
in.remote_addr_str = NULL; /* set remote_addr to NULL to disable proxy read */
hret = margo_bulk_create(bti->client->mid, 1, (void**)(&buf), &buf_size,
hret = margo_bulk_create(provider->client->mid, 1, (void**)(&buf), &buf_size,
HG_BULK_WRITE_ONLY, &in.bulk_handle);
if(hret != HG_SUCCESS)
return(-1);
hret = margo_create(bti->client->mid, bti->dest,
bti->client->bake_read_id, &handle);
margo_set_target_id(handle, bti->mplex_id);
hret = margo_create(provider->client->mid, provider->addr,
provider->client->bake_read_id, &handle);
margo_set_target_id(handle, provider->mplex_id);
if(hret != HG_SUCCESS)
{
......@@ -750,7 +759,7 @@ int bake_read(
}
int bake_proxy_read(
bake_target_id_t bti,
bake_provider_handle_t provider,
bake_region_id_t rid,
uint64_t region_offset,
hg_bulk_t remote_bulk,
......@@ -764,7 +773,6 @@ int bake_proxy_read(
bake_read_out_t out;
int ret;
in.pool_id = bti->pool_id;
in.rid = rid;
in.region_offset = region_offset;
in.bulk_handle = remote_bulk;
......@@ -772,9 +780,9 @@ int bake_proxy_read(
in.bulk_size = size;
in.remote_addr_str = (char*)remote_addr;
hret = margo_create(bti->client->mid, bti->dest,
bti->client->bake_read_id, &handle);
margo_set_target_id(handle, bti->mplex_id);
hret = margo_create(provider->client->mid, provider->addr,
provider->client->bake_read_id, &handle);