Commit d54d59f9 authored by Shane Snyder's avatar Shane Snyder

port bake-bulk to new margo api changes

parent 9da77dd1
...@@ -10,15 +10,13 @@ ...@@ -10,15 +10,13 @@
#include "uthash.h" #include "uthash.h"
#include "bake-bulk-rpc.h" #include "bake-bulk-rpc.h"
/* Refers to a single Mercury/Margo initialization, for now this is shared by /* Refers to a single Margo initialization, for now this is shared by
* all remote targets. In the future we probably need to support multiple in * all remote targets. In the future we probably need to support multiple in
* case we run atop more than one transport at a time. * case we run atop more than one transport at a time.
*/ */
struct hg_instance struct bake_margo_instance
{ {
margo_instance_id mid; margo_instance_id mid;
hg_class_t *hg_class;
hg_context_t *hg_context;
int refct; int refct;
hg_id_t bake_bulk_probe_id; hg_id_t bake_bulk_probe_id;
...@@ -52,10 +50,8 @@ struct bake_instance ...@@ -52,10 +50,8 @@ struct bake_instance
struct bake_instance *instance_hash = NULL; struct bake_instance *instance_hash = NULL;
struct hg_instance g_hginst = { struct bake_margo_instance g_margo_inst = {
.mid = MARGO_INSTANCE_NULL, .mid = MARGO_INSTANCE_NULL,
.hg_class = NULL,
.hg_context = NULL,
.refct = 0, .refct = 0,
}; };
...@@ -68,120 +64,101 @@ static int bake_bulk_eager_read( ...@@ -68,120 +64,101 @@ static int bake_bulk_eager_read(
static struct bake_handle_cache_el *get_handle(struct bake_instance *instance, hg_id_t id); static struct bake_handle_cache_el *get_handle(struct bake_instance *instance, hg_id_t id);
static void put_handle(struct bake_instance *instance, struct bake_handle_cache_el *el); static void put_handle(struct bake_instance *instance, struct bake_handle_cache_el *el);
static int hg_instance_init(const char *mercury_dest) static int bake_margo_instance_init(const char *mercury_dest)
{ {
char hg_na[64] = {0}; char hg_na[64] = {0};
int i; int i;
/* have we already started a Mercury instance? */ /* have we already started a Margo instance? */
if(g_hginst.refct > 0) if(g_margo_inst.refct > 0)
{ {
g_hginst.refct++; g_margo_inst.refct++;
return(0); return(0);
} }
/* boilerplate HG initialization steps */ /* initialize Margo using the transport portion of the destination
/***************************************/
/* initialize Mercury using the transport portion of the destination
* address (i.e., the part before the first : character if present) * address (i.e., the part before the first : character if present)
*/ */
for(i=0; (i<63 && mercury_dest[i] != '\0' && mercury_dest[i] != ':'); i++) for(i=0; (i<63 && mercury_dest[i] != '\0' && mercury_dest[i] != ':'); i++)
hg_na[i] = mercury_dest[i]; hg_na[i] = mercury_dest[i];
g_hginst.hg_class = HG_Init(hg_na, HG_FALSE); g_margo_inst.mid = margo_init(hg_na, MARGO_CLIENT_MODE, 0, 0);
if(!g_hginst.hg_class) if(g_margo_inst.mid == MARGO_INSTANCE_NULL)
{
return(-1);
}
g_hginst.hg_context = HG_Context_create(g_hginst.hg_class);
if(!g_hginst.hg_context)
{ {
HG_Finalize(g_hginst.hg_class);
return(-1); return(-1);
} }
g_margo_inst.refct = 1;
/* register RPCs */ /* register RPCs */
g_hginst.bake_bulk_probe_id = g_margo_inst.bake_bulk_probe_id =
MERCURY_REGISTER(g_hginst.hg_class, MARGO_REGISTER(g_margo_inst.mid,
"bake_bulk_probe_rpc", void, bake_bulk_probe_out_t, "bake_bulk_probe_rpc", void, bake_bulk_probe_out_t,
NULL); NULL);
g_hginst.bake_bulk_shutdown_id = g_margo_inst.bake_bulk_shutdown_id =
MERCURY_REGISTER(g_hginst.hg_class, MARGO_REGISTER(g_margo_inst.mid,
"bake_bulk_shutdown_rpc", void, void, "bake_bulk_shutdown_rpc", void, void,
NULL); NULL);
g_hginst.bake_bulk_create_id = g_margo_inst.bake_bulk_create_id =
MERCURY_REGISTER(g_hginst.hg_class, MARGO_REGISTER(g_margo_inst.mid,
"bake_bulk_create_rpc", "bake_bulk_create_rpc",
bake_bulk_create_in_t, bake_bulk_create_in_t,
bake_bulk_create_out_t, bake_bulk_create_out_t,
NULL); NULL);
g_hginst.bake_bulk_write_id = g_margo_inst.bake_bulk_write_id =
MERCURY_REGISTER(g_hginst.hg_class, MARGO_REGISTER(g_margo_inst.mid,
"bake_bulk_write_rpc", "bake_bulk_write_rpc",
bake_bulk_write_in_t, bake_bulk_write_in_t,
bake_bulk_write_out_t, bake_bulk_write_out_t,
NULL); NULL);
g_hginst.bake_bulk_eager_write_id = g_margo_inst.bake_bulk_eager_write_id =
MERCURY_REGISTER(g_hginst.hg_class, MARGO_REGISTER(g_margo_inst.mid,
"bake_bulk_eager_write_rpc", "bake_bulk_eager_write_rpc",
bake_bulk_eager_write_in_t, bake_bulk_eager_write_in_t,
bake_bulk_eager_write_out_t, bake_bulk_eager_write_out_t,
NULL); NULL);
g_hginst.bake_bulk_eager_read_id = g_margo_inst.bake_bulk_eager_read_id =
MERCURY_REGISTER(g_hginst.hg_class, MARGO_REGISTER(g_margo_inst.mid,
"bake_bulk_eager_read_rpc", "bake_bulk_eager_read_rpc",
bake_bulk_eager_read_in_t, bake_bulk_eager_read_in_t,
bake_bulk_eager_read_out_t, bake_bulk_eager_read_out_t,
NULL); NULL);
g_hginst.bake_bulk_persist_id = g_margo_inst.bake_bulk_persist_id =
MERCURY_REGISTER(g_hginst.hg_class, MARGO_REGISTER(g_margo_inst.mid,
"bake_bulk_persist_rpc", "bake_bulk_persist_rpc",
bake_bulk_persist_in_t, bake_bulk_persist_in_t,
bake_bulk_persist_out_t, bake_bulk_persist_out_t,
NULL); NULL);
g_hginst.bake_bulk_get_size_id = g_margo_inst.bake_bulk_get_size_id =
MERCURY_REGISTER(g_hginst.hg_class, MARGO_REGISTER(g_margo_inst.mid,
"bake_bulk_get_size_rpc", "bake_bulk_get_size_rpc",
bake_bulk_get_size_in_t, bake_bulk_get_size_in_t,
bake_bulk_get_size_out_t, bake_bulk_get_size_out_t,
NULL); NULL);
g_hginst.bake_bulk_read_id = g_margo_inst.bake_bulk_read_id =
MERCURY_REGISTER(g_hginst.hg_class, MARGO_REGISTER(g_margo_inst.mid,
"bake_bulk_read_rpc", "bake_bulk_read_rpc",
bake_bulk_read_in_t, bake_bulk_read_in_t,
bake_bulk_read_out_t, bake_bulk_read_out_t,
NULL); NULL);
g_hginst.bake_bulk_noop_id = g_margo_inst.bake_bulk_noop_id =
MERCURY_REGISTER(g_hginst.hg_class, MARGO_REGISTER(g_margo_inst.mid,
"bake_bulk_noop_rpc", "bake_bulk_noop_rpc",
void, void,
void, void,
NULL); NULL);
g_hginst.mid = margo_init(0, 0, g_hginst.hg_context);
if(!g_hginst.mid)
{
HG_Context_destroy(g_hginst.hg_context);
HG_Finalize(g_hginst.hg_class);
return(-1);
}
g_hginst.refct = 1;
return(0); return(0);
} }
void hg_instance_finalize(void) void bake_margo_instance_finalize(void)
{ {
g_hginst.refct--; g_margo_inst.refct--;
assert(g_hginst.refct > -1); assert(g_margo_inst.refct > -1);
if(g_hginst.refct == 0) if(g_margo_inst.refct == 0)
{ {
margo_finalize(g_hginst.mid); margo_finalize(g_margo_inst.mid);
HG_Context_destroy(g_hginst.hg_context);
HG_Finalize(g_hginst.hg_class);
} }
} }
...@@ -195,50 +172,50 @@ int bake_probe_instance( ...@@ -195,50 +172,50 @@ int bake_probe_instance(
hg_handle_t handle; hg_handle_t handle;
struct bake_instance *new_instance; struct bake_instance *new_instance;
ret = hg_instance_init(mercury_dest); ret = bake_margo_instance_init(mercury_dest);
if(ret < 0) if(ret < 0)
return(ret); return(ret);
new_instance = calloc(1, sizeof(*new_instance)); new_instance = calloc(1, sizeof(*new_instance));
if(!new_instance) if(!new_instance)
{ {
hg_instance_finalize(); bake_margo_instance_finalize();
return(-1); return(-1);
} }
hret = margo_addr_lookup(g_hginst.mid, mercury_dest, &new_instance->dest); hret = margo_addr_lookup(g_margo_inst.mid, mercury_dest, &new_instance->dest);
if(hret != HG_SUCCESS) if(hret != HG_SUCCESS)
{ {
free(new_instance); free(new_instance);
hg_instance_finalize(); bake_margo_instance_finalize();
return(-1); return(-1);
} }
/* create handle */ /* create handle */
hret = HG_Create(g_hginst.hg_context, new_instance->dest, hret = margo_create(g_margo_inst.mid, new_instance->dest,
g_hginst.bake_bulk_probe_id, &handle); g_margo_inst.bake_bulk_probe_id, &handle);
if(hret != HG_SUCCESS) if(hret != HG_SUCCESS)
{ {
free(new_instance); free(new_instance);
hg_instance_finalize(); bake_margo_instance_finalize();
return(-1); return(-1);
} }
hret = margo_forward(g_hginst.mid, handle, NULL); hret = margo_forward(handle, NULL);
if(hret != HG_SUCCESS) if(hret != HG_SUCCESS)
{ {
free(new_instance); free(new_instance);
HG_Destroy(handle); margo_destroy(handle);
hg_instance_finalize(); bake_margo_instance_finalize();
return(-1); return(-1);
} }
hret = HG_Get_output(handle, &out); hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS) if(hret != HG_SUCCESS)
{ {
free(new_instance); free(new_instance);
HG_Destroy(handle); margo_destroy(handle);
hg_instance_finalize(); bake_margo_instance_finalize();
return(-1); return(-1);
} }
...@@ -246,13 +223,13 @@ int bake_probe_instance( ...@@ -246,13 +223,13 @@ int bake_probe_instance(
*bti = out.bti; *bti = out.bti;
new_instance->bti = out.bti; new_instance->bti = out.bti;
HG_Free_output(handle, &out); margo_free_output(handle, &out);
HG_Destroy(handle); margo_destroy(handle);
if(ret != 0) if(ret != 0)
{ {
free(new_instance); free(new_instance);
hg_instance_finalize(); bake_margo_instance_finalize();
} }
else else
{ {
...@@ -273,9 +250,9 @@ void bake_release_instance( ...@@ -273,9 +250,9 @@ void bake_release_instance(
return; return;
HASH_DELETE(hh, instance_hash, instance); HASH_DELETE(hh, instance_hash, instance);
HG_Addr_free(g_hginst.hg_class, instance->dest); margo_addr_free(g_margo_inst.mid, instance->dest);
free(instance); free(instance);
hg_instance_finalize(); bake_margo_instance_finalize();
return; return;
} }
...@@ -290,10 +267,10 @@ int bake_shutdown_service(bake_target_id_t bti) ...@@ -290,10 +267,10 @@ int bake_shutdown_service(bake_target_id_t bti)
if(!instance) if(!instance)
return(-1); return(-1);
el = get_handle(instance, g_hginst.bake_bulk_shutdown_id); el = get_handle(instance, g_margo_inst.bake_bulk_shutdown_id);
assert(el); assert(el);
hret = margo_forward(g_hginst.mid, el->handle, NULL); hret = margo_forward(el->handle, NULL);
if(hret != HG_SUCCESS) if(hret != HG_SUCCESS)
{ {
put_handle(instance, el); put_handle(instance, el);
...@@ -328,17 +305,17 @@ static int bake_bulk_eager_write( ...@@ -328,17 +305,17 @@ static int bake_bulk_eager_write(
in.size = buf_size; in.size = buf_size;
in.buffer = (char*)buf; in.buffer = (char*)buf;
el = get_handle(instance, g_hginst.bake_bulk_eager_write_id); el = get_handle(instance, g_margo_inst.bake_bulk_eager_write_id);
assert(el); assert(el);
hret = margo_forward(g_hginst.mid, el->handle, &in); hret = margo_forward(el->handle, &in);
if(hret != HG_SUCCESS) if(hret != HG_SUCCESS)
{ {
put_handle(instance, el); put_handle(instance, el);
return(-1); return(-1);
} }
hret = HG_Get_output(el->handle, &out); hret = margo_get_output(el->handle, &out);
if(hret != HG_SUCCESS) if(hret != HG_SUCCESS)
{ {
put_handle(instance, el); put_handle(instance, el);
...@@ -347,7 +324,7 @@ static int bake_bulk_eager_write( ...@@ -347,7 +324,7 @@ static int bake_bulk_eager_write(
ret = out.ret; ret = out.ret;
HG_Free_output(el->handle, &out); margo_free_output(el->handle, &out);
put_handle(instance, el); put_handle(instance, el);
return(ret); return(ret);
...@@ -382,36 +359,36 @@ int bake_bulk_write( ...@@ -382,36 +359,36 @@ int bake_bulk_write(
in.rid = rid; in.rid = rid;
in.region_offset = region_offset; in.region_offset = region_offset;
hret = HG_Bulk_create(g_hginst.hg_class, 1, (void**)(&buf), &buf_size, hret = margo_bulk_create(g_margo_inst.mid, 1, (void**)(&buf), &buf_size,
HG_BULK_READ_ONLY, &in.bulk_handle); HG_BULK_READ_ONLY, &in.bulk_handle);
if(hret != HG_SUCCESS) if(hret != HG_SUCCESS)
{ {
return(-1); return(-1);
} }
el = get_handle(instance, g_hginst.bake_bulk_write_id); el = get_handle(instance, g_margo_inst.bake_bulk_write_id);
assert(el); assert(el);
hret = margo_forward(g_hginst.mid, el->handle, &in); hret = margo_forward(el->handle, &in);
if(hret != HG_SUCCESS) if(hret != HG_SUCCESS)
{ {
HG_Bulk_free(in.bulk_handle); margo_bulk_free(in.bulk_handle);
put_handle(instance, el); put_handle(instance, el);
return(-1); return(-1);
} }
hret = HG_Get_output(el->handle, &out); hret = margo_get_output(el->handle, &out);
if(hret != HG_SUCCESS) if(hret != HG_SUCCESS)
{ {
HG_Bulk_free(in.bulk_handle); margo_bulk_free(in.bulk_handle);
put_handle(instance, el); put_handle(instance, el);
return(-1); return(-1);
} }
ret = out.ret; ret = out.ret;
HG_Free_output(el->handle, &out); margo_free_output(el->handle, &out);
HG_Bulk_free(in.bulk_handle); margo_bulk_free(in.bulk_handle);
put_handle(instance, el); put_handle(instance, el);
return(ret); return(ret);
} }
...@@ -435,17 +412,17 @@ int bake_bulk_create( ...@@ -435,17 +412,17 @@ int bake_bulk_create(
in.bti = bti; in.bti = bti;
in.region_size = region_size; in.region_size = region_size;
el = get_handle(instance, g_hginst.bake_bulk_create_id); el = get_handle(instance, g_margo_inst.bake_bulk_create_id);
assert(el); assert(el);
hret = margo_forward(g_hginst.mid, el->handle, &in); hret = margo_forward(el->handle, &in);
if(hret != HG_SUCCESS) if(hret != HG_SUCCESS)
{ {
put_handle(instance, el); put_handle(instance, el);
return(-1); return(-1);
} }
hret = HG_Get_output(el->handle, &out); hret = margo_get_output(el->handle, &out);
if(hret != HG_SUCCESS) if(hret != HG_SUCCESS)
{ {
put_handle(instance, el); put_handle(instance, el);
...@@ -455,7 +432,7 @@ int bake_bulk_create( ...@@ -455,7 +432,7 @@ int bake_bulk_create(
ret = out.ret; ret = out.ret;
*rid = out.rid; *rid = out.rid;
HG_Free_output(el->handle, &out); margo_free_output(el->handle, &out);
put_handle(instance, el); put_handle(instance, el);
return(ret); return(ret);
} }
...@@ -479,17 +456,17 @@ int bake_bulk_persist( ...@@ -479,17 +456,17 @@ int bake_bulk_persist(
in.bti = bti; in.bti = bti;
in.rid = rid; in.rid = rid;
el = get_handle(instance, g_hginst.bake_bulk_persist_id); el = get_handle(instance, g_margo_inst.bake_bulk_persist_id);
assert(el); assert(el);
hret = margo_forward(g_hginst.mid, el->handle, &in); hret = margo_forward(el->handle, &in);
if(hret != HG_SUCCESS) if(hret != HG_SUCCESS)
{ {
put_handle(instance, el); put_handle(instance, el);
return(-1); return(-1);
} }
hret = HG_Get_output(el->handle, &out); hret = margo_get_output(el->handle, &out);
if(hret != HG_SUCCESS) if(hret != HG_SUCCESS)
{ {
put_handle(instance, el); put_handle(instance, el);
...@@ -498,7 +475,7 @@ int bake_bulk_persist( ...@@ -498,7 +475,7 @@ int bake_bulk_persist(
ret = out.ret; ret = out.ret;
HG_Free_output(el->handle, &out); margo_free_output(el->handle, &out);
put_handle(instance, el); put_handle(instance, el);
return(ret); return(ret);
} }
...@@ -522,17 +499,17 @@ int bake_bulk_get_size( ...@@ -522,17 +499,17 @@ int bake_bulk_get_size(
in.bti = bti; in.bti = bti;
in.rid = rid; in.rid = rid;
el = get_handle(instance, g_hginst.bake_bulk_get_size_id); el = get_handle(instance, g_margo_inst.bake_bulk_get_size_id);
assert(el); assert(el);
hret = margo_forward(g_hginst.mid, el->handle, &in); hret = margo_forward(el->handle, &in);
if(hret != HG_SUCCESS) if(hret != HG_SUCCESS)
{ {
put_handle(instance, el); put_handle(instance, el);
return(-1); return(-1);
} }
hret = HG_Get_output(el->handle, &out); hret = margo_get_output(el->handle, &out);
if(hret != HG_SUCCESS) if(hret != HG_SUCCESS)
{ {
put_handle(instance, el); put_handle(instance, el);
...@@ -542,7 +519,7 @@ int bake_bulk_get_size( ...@@ -542,7 +519,7 @@ int bake_bulk_get_size(
ret = out.ret; ret = out.ret;
*region_size = out.size; *region_size = out.size;
HG_Free_output(el->handle, &out); margo_free_output(el->handle, &out);
put_handle(instance, el); put_handle(instance, el);
return(ret); return(ret);
} }
...@@ -558,10 +535,10 @@ int bake_bulk_noop( ...@@ -558,10 +535,10 @@ int bake_bulk_noop(
if(!instance) if(!instance)
return(-1); return(-1);
el = get_handle(instance, g_hginst.bake_bulk_noop_id); el = get_handle(instance, g_margo_inst.bake_bulk_noop_id);
assert(el); assert(el);
hret = margo_forward(g_hginst.mid, el->handle, NULL); hret = margo_forward(el->handle, NULL);
if(hret != HG_SUCCESS) if(hret != HG_SUCCESS)
{ {
put_handle(instance, el); put_handle(instance, el);
...@@ -599,36 +576,36 @@ int bake_bulk_read( ...@@ -599,36 +576,36 @@ int bake_bulk_read(
in.rid = rid; in.rid = rid;
in.region_offset = region_offset; in.region_offset = region_offset;
hret = HG_Bulk_create(g_hginst.hg_class, 1, (void**)(&buf), &buf_size, hret = margo_bulk_create(g_margo_inst.mid, 1, (void**)(&buf), &buf_size,
HG_BULK_WRITE_ONLY, &in.bulk_handle); HG_BULK_WRITE_ONLY, &in.bulk_handle);
if(hret != HG_SUCCESS) if(hret != HG_SUCCESS)
{ {
return(-1); return(-1);
} }
el = get_handle(instance, g_hginst.bake_bulk_read_id); el = get_handle(instance, g_margo_inst.bake_bulk_read_id);
assert(el); assert(el);
hret = margo_forward(g_hginst.mid, el->handle, &in); hret = margo_forward(el->handle, &in);
if(hret != HG_SUCCESS) if(hret != HG_SUCCESS)
{ {
HG_Bulk_free(in.bulk_handle); margo_bulk_free(in.bulk_handle);
put_handle(instance, el); put_handle(instance, el);
return(-1); return(-1);
} }
hret = HG_Get_output(el->handle, &out); hret = margo_get_output(el->handle, &out);
if(hret != HG_SUCCESS) if(hret != HG_SUCCESS)
{ {
HG_Bulk_free(in.bulk_handle); margo_bulk_free(in.bulk_handle);
put_handle(instance, el); put_handle(instance, el);
return(-1); return(-1);
} }
ret = out.ret; ret = out.ret;
HG_Free_output(el->handle, &out); margo_free_output(el->handle, &out);
HG_Bulk_free(in.bulk_handle); margo_bulk_free(in.bulk_handle);
put_handle(instance, el); put_handle(instance, el);
return(ret); return(ret);
} }
...@@ -657,17 +634,17 @@ static int bake_bulk_eager_read( ...@@ -657,17 +634,17 @@ static int bake_bulk_eager_read(
in.region_offset = region_offset; in.region_offset = region_offset;
in.size = buf_size; in.size = buf_size;
el = get_handle(instance, g_hginst.bake_bulk_eager_read_id); el = get_handle(instance, g_margo_inst.bake_bulk_eager_read_id);