Commit 3394c550 authored by Philip Carns's avatar Philip Carns

refactor mercury init to share across targets

parent b0bf341a
Pipeline #976 skipped
......@@ -4,46 +4,55 @@
* See COPYRIGHT in top-level directory.
*/
#include <assert.h>
#include <bake-bulk.h>
#include <margo.h>
#include "bake-bulk-rpc.h"
/* Refers to a single Mercury/Margo initialization, for now this is shared by
* all remote targets. In the future we probably need to support multiple in
* case we run atop more than one transport at a time.
*/
struct hg_instance
{
margo_instance_id mid;
hg_class_t *hg_class;
hg_context_t *hg_context;
int refct;
hg_id_t bake_bulk_shutdown_id;
hg_id_t bake_bulk_create_id;
hg_id_t bake_bulk_write_id;
hg_id_t bake_bulk_persist_id;
hg_id_t bake_bulk_get_size_id;
hg_id_t bake_bulk_read_id;
};
/* Refers to an instance connected to a specific target */
struct bake_instance
{
bake_target_id_t bti; /* persistent identifier for this target */
hg_addr_t dest; /* remote Mercury address */
/* TODO: stuff will probably be split out into an hg instance */
margo_instance_id mid; /* Margo instance */
hg_class_t *hg_class;
hg_context_t *hg_context;
};
/* TODO: replace later, hard coded global instance */
struct bake_instance g_binst;
struct bake_instance g_binst = {
.bti = 0,
.dest = HG_ADDR_NULL,
};
/* TODO: these IDs are probably not global in the long run either */
static hg_id_t g_bake_bulk_shutdown_id;
static hg_id_t g_bake_bulk_create_id;
static hg_id_t g_bake_bulk_write_id;
static hg_id_t g_bake_bulk_persist_id;
static hg_id_t g_bake_bulk_get_size_id;
static hg_id_t g_bake_bulk_read_id;
struct hg_instance g_hginst = {
.mid = MARGO_INSTANCE_NULL,
.hg_class = NULL,
.hg_context = NULL,
.refct = 0,
};
int bake_probe_instance(
const char *mercury_dest,
bake_target_id_t *bti)
static int hg_instance_init(const char *mercury_dest)
{
*bti = 0; /* TODO: use a real id eventually, just a placeholder for now */
hg_return_t hret;
memset(&g_binst, 0, sizeof(g_binst));
/* TODO: eventually we will not init HG on every target probe, probably
* separate step so hg instance can be shared. Simple test case is to
* build a single client application that communicates with two distinct
* targets (on different servers) simultaneously.
*/
/* have we already started a Mercury instance? */
if(g_hginst.refct > 0)
return(0);
/* boilerplate HG initialization steps */
/***************************************/
......@@ -51,68 +60,96 @@ int bake_probe_instance(
* na_listen flag is false); but we pass in the *target* server address
* here to make sure that Mercury starts up the correct transport
*/
g_binst.hg_class = HG_Init(mercury_dest, HG_FALSE);
if(!g_binst.hg_class)
g_hginst.hg_class = HG_Init(mercury_dest, HG_FALSE);
if(!g_hginst.hg_class)
{
return(-1);
}
g_binst.hg_context = HG_Context_create(g_binst.hg_class);
if(!g_binst.hg_context)
g_hginst.hg_context = HG_Context_create(g_hginst.hg_class);
if(!g_hginst.hg_context)
{
HG_Finalize(g_binst.hg_class);
HG_Finalize(g_hginst.hg_class);
return(-1);
}
/* register RPCs */
g_bake_bulk_shutdown_id =
MERCURY_REGISTER(g_binst.hg_class,
g_hginst.bake_bulk_shutdown_id =
MERCURY_REGISTER(g_hginst.hg_class,
"bake_bulk_shutdown_rpc", void, void,
NULL);
g_bake_bulk_create_id =
MERCURY_REGISTER(g_binst.hg_class,
g_hginst.bake_bulk_create_id =
MERCURY_REGISTER(g_hginst.hg_class,
"bake_bulk_create_rpc",
bake_bulk_create_in_t,
bake_bulk_create_out_t,
NULL);
g_bake_bulk_write_id =
MERCURY_REGISTER(g_binst.hg_class,
g_hginst.bake_bulk_write_id =
MERCURY_REGISTER(g_hginst.hg_class,
"bake_bulk_write_rpc",
bake_bulk_write_in_t,
bake_bulk_write_out_t,
NULL);
g_bake_bulk_persist_id =
MERCURY_REGISTER(g_binst.hg_class,
g_hginst.bake_bulk_persist_id =
MERCURY_REGISTER(g_hginst.hg_class,
"bake_bulk_persist_rpc",
bake_bulk_persist_in_t,
bake_bulk_persist_out_t,
NULL);
g_bake_bulk_get_size_id =
MERCURY_REGISTER(g_binst.hg_class,
g_hginst.bake_bulk_get_size_id =
MERCURY_REGISTER(g_hginst.hg_class,
"bake_bulk_get_size_rpc",
bake_bulk_get_size_in_t,
bake_bulk_get_size_out_t,
NULL);
g_bake_bulk_read_id =
MERCURY_REGISTER(g_binst.hg_class,
g_hginst.bake_bulk_read_id =
MERCURY_REGISTER(g_hginst.hg_class,
"bake_bulk_read_rpc",
bake_bulk_read_in_t,
bake_bulk_read_out_t,
NULL);
g_binst.mid = margo_init(0, 0, g_binst.hg_context);
if(!g_binst.mid)
g_hginst.mid = margo_init(0, 0, g_hginst.hg_context);
if(!g_hginst.mid)
{
HG_Context_destroy(g_binst.hg_context);
HG_Finalize(g_binst.hg_class);
HG_Context_destroy(g_hginst.hg_context);
HG_Finalize(g_hginst.hg_class);
return(-1);
}
g_hginst.refct = 1;
return(0);
}
void hg_instance_finalize(void)
{
g_hginst.refct--;
assert(g_hginst.refct > -1);
if(g_hginst.refct == 0)
{
margo_finalize(g_hginst.mid);
HG_Context_destroy(g_hginst.hg_context);
HG_Finalize(g_hginst.hg_class);
}
}
int bake_probe_instance(
const char *mercury_dest,
bake_target_id_t *bti)
{
hg_return_t hret;
*bti = 0; /* TODO: use a real id eventually, just a placeholder for now */
int ret;
ret = hg_instance_init(mercury_dest);
if(ret < 0)
return(ret);
hret = margo_addr_lookup(g_binst.mid, mercury_dest, &g_binst.dest);
hret = margo_addr_lookup(g_hginst.mid, mercury_dest, &g_binst.dest);
if(hret != HG_SUCCESS)
{
margo_finalize(g_binst.mid);
HG_Context_destroy(g_binst.hg_context);
HG_Finalize(g_binst.hg_class);
hg_instance_finalize();
return(-1);
}
......@@ -122,10 +159,8 @@ int bake_probe_instance(
void bake_release_instance(
bake_target_id_t bti)
{
HG_Addr_free(g_binst.hg_class, g_binst.dest);
margo_finalize(g_binst.mid);
HG_Context_destroy(g_binst.hg_context);
HG_Finalize(g_binst.hg_class);
HG_Addr_free(g_hginst.hg_class, g_binst.dest);
hg_instance_finalize();
memset(&g_binst, 0, sizeof(g_binst));
return;
}
......@@ -136,14 +171,14 @@ int bake_shutdown_service(bake_target_id_t bti)
hg_handle_t handle;
/* create handle */
hret = HG_Create(g_binst.hg_context, g_binst.dest,
g_bake_bulk_shutdown_id, &handle);
hret = HG_Create(g_hginst.hg_context, g_binst.dest,
g_hginst.bake_bulk_shutdown_id, &handle);
if(hret != HG_SUCCESS)
{
return(-1);
}
hret = margo_forward(g_binst.mid, handle, NULL);
hret = margo_forward(g_hginst.mid, handle, NULL);
if(hret != HG_SUCCESS)
{
HG_Destroy(handle);
......@@ -171,7 +206,7 @@ int bake_bulk_write(
in.rid = rid;
in.region_offset = region_offset;
hret = HG_Bulk_create(g_binst.hg_class, 1, (void**)(&buf), &buf_size,
hret = HG_Bulk_create(g_hginst.hg_class, 1, (void**)(&buf), &buf_size,
HG_BULK_READ_ONLY, &in.bulk_handle);
if(hret != HG_SUCCESS)
{
......@@ -179,15 +214,15 @@ int bake_bulk_write(
}
/* create handle */
hret = HG_Create(g_binst.hg_context, g_binst.dest,
g_bake_bulk_write_id, &handle);
hret = HG_Create(g_hginst.hg_context, g_binst.dest,
g_hginst.bake_bulk_write_id, &handle);
if(hret != HG_SUCCESS)
{
HG_Bulk_free(in.bulk_handle);
return(-1);
}
hret = margo_forward(g_binst.mid, handle, &in);
hret = margo_forward(g_hginst.mid, handle, &in);
if(hret != HG_SUCCESS)
{
HG_Destroy(handle);
......@@ -226,14 +261,14 @@ int bake_bulk_create(
in.region_size = region_size;
/* create handle */
hret = HG_Create(g_binst.hg_context, g_binst.dest,
g_bake_bulk_create_id, &handle);
hret = HG_Create(g_hginst.hg_context, g_binst.dest,
g_hginst.bake_bulk_create_id, &handle);
if(hret != HG_SUCCESS)
{
return(-1);
}
hret = margo_forward(g_binst.mid, handle, &in);
hret = margo_forward(g_hginst.mid, handle, &in);
if(hret != HG_SUCCESS)
{
HG_Destroy(handle);
......@@ -270,14 +305,14 @@ int bake_bulk_persist(
in.rid = rid;
/* create handle */
hret = HG_Create(g_binst.hg_context, g_binst.dest,
g_bake_bulk_persist_id, &handle);
hret = HG_Create(g_hginst.hg_context, g_binst.dest,
g_hginst.bake_bulk_persist_id, &handle);
if(hret != HG_SUCCESS)
{
return(-1);
}
hret = margo_forward(g_binst.mid, handle, &in);
hret = margo_forward(g_hginst.mid, handle, &in);
if(hret != HG_SUCCESS)
{
HG_Destroy(handle);
......@@ -313,14 +348,14 @@ int bake_bulk_get_size(
in.rid = rid;
/* create handle */
hret = HG_Create(g_binst.hg_context, g_binst.dest,
g_bake_bulk_get_size_id, &handle);
hret = HG_Create(g_hginst.hg_context, g_binst.dest,
g_hginst.bake_bulk_get_size_id, &handle);
if(hret != HG_SUCCESS)
{
return(-1);
}
hret = margo_forward(g_binst.mid, handle, &in);
hret = margo_forward(g_hginst.mid, handle, &in);
if(hret != HG_SUCCESS)
{
HG_Destroy(handle);
......@@ -359,7 +394,7 @@ int bake_bulk_read(
in.rid = rid;
in.region_offset = region_offset;
hret = HG_Bulk_create(g_binst.hg_class, 1, (void**)(&buf), &buf_size,
hret = HG_Bulk_create(g_hginst.hg_class, 1, (void**)(&buf), &buf_size,
HG_BULK_WRITE_ONLY, &in.bulk_handle);
if(hret != HG_SUCCESS)
{
......@@ -367,15 +402,15 @@ int bake_bulk_read(
}
/* create handle */
hret = HG_Create(g_binst.hg_context, g_binst.dest,
g_bake_bulk_read_id, &handle);
hret = HG_Create(g_hginst.hg_context, g_binst.dest,
g_hginst.bake_bulk_read_id, &handle);
if(hret != HG_SUCCESS)
{
HG_Bulk_free(in.bulk_handle);
return(-1);
}
hret = margo_forward(g_binst.mid, handle, &in);
hret = margo_forward(g_hginst.mid, handle, &in);
if(hret != HG_SUCCESS)
{
HG_Destroy(handle);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment