Commit 7341eef1 authored by Shane Snyder's avatar Shane Snyder

finish porting new SSG changes

parent 72cc9484
......@@ -29,14 +29,27 @@ int mobject_store_create(mobject_store_t *cluster, const char * const id)
{
struct mobject_store_handle *cluster_handle;
char *cluster_file;
int num_group_addrs = SSG_ALL_MEMBERS;
int ret;
(void)id; /* XXX: id unused in mobject */
/* initialize ssg */
/* XXX: we need to think about how to do this once per-client... clients could connect to mult. clusters */
ret = ssg_init();
if (ret != SSG_SUCCESS)
{
fprintf(stderr, "Error: Unable to initialize SSG\n");
return -1;
}
/* allocate a new cluster handle and set some fields */
cluster_handle = (struct mobject_store_handle*)calloc(1,sizeof(*cluster_handle));
if (!cluster_handle)
{
ssg_finalize();
return -1;
}
/* use env variable to determine how to connect to the cluster */
/* NOTE: this is the _only_ method for specifying a cluster for now... */
......@@ -45,15 +58,17 @@ int mobject_store_create(mobject_store_t *cluster, const char * const id)
{
fprintf(stderr, "Error: %s env variable must point to mobject cluster file\n",
MOBJECT_CLUSTER_FILE_ENV);
ssg_finalize();
free(cluster_handle);
return -1;
}
ret = ssg_group_id_load(cluster_file, &cluster_handle->gid);
ret = ssg_group_id_load(cluster_file, &num_group_addrs, &cluster_handle->gid);
if (ret != 0)
{
fprintf(stderr, "Error: Unable to load mobject cluster info from file %s\n",
cluster_file);
ssg_finalize();
free(cluster_handle);
return -1;
}
......@@ -81,10 +96,11 @@ int mobject_store_connect(mobject_store_t cluster)
/* figure out protocol to connect with using address information
* associated with the SSG group ID
*/
svr_addr_str = ssg_group_id_get_addr_str(cluster_handle->gid);
svr_addr_str = ssg_group_id_get_addr_str(cluster_handle->gid, 0);
if (!svr_addr_str)
{
fprintf(stderr, "Error: Unable to obtain cluster group server address\n");
ssg_finalize();
free(cluster_handle);
return -1;
}
......@@ -98,31 +114,20 @@ int mobject_store_connect(mobject_store_t cluster)
if (mid == MARGO_INSTANCE_NULL)
{
fprintf(stderr, "Error: Unable to initialize margo\n");
ssg_finalize();
free(svr_addr_str);
free(cluster_handle);
return -1;
}
cluster_handle->mid = mid;
/* initialize ssg */
/* XXX: we need to think about how to do this once per-client... clients could connect to mult. clusters */
ret = ssg_init(cluster_handle->mid);
if (ret != SSG_SUCCESS)
{
fprintf(stderr, "Error: Unable to initialize SSG\n");
margo_finalize(cluster_handle->mid);
free(svr_addr_str);
free(cluster_handle);
return -1;
}
/* observe the cluster group */
ret = ssg_group_observe(cluster_handle->gid);
ret = ssg_group_observe(mid, cluster_handle->gid);
if (ret != SSG_SUCCESS)
{
fprintf(stderr, "Error: Unable to observe the mobject cluster group\n");
ssg_finalize();
margo_finalize(cluster_handle->mid);
ssg_finalize();
free(svr_addr_str);
free(cluster_handle);
return -1;
......@@ -135,8 +140,8 @@ int mobject_store_connect(mobject_store_t cluster)
{
fprintf(stderr, "Error: Unable to get SSG group size\n");
ssg_group_unobserve(cluster_handle->gid);
ssg_finalize();
margo_finalize(cluster_handle->mid);
ssg_finalize();
free(svr_addr_str);
free(cluster_handle);
return -1;
......@@ -149,8 +154,8 @@ int mobject_store_connect(mobject_store_t cluster)
{
fprintf(stderr, "Error: Unable to initialize ch-placement instance\n");
ssg_group_unobserve(cluster_handle->gid);
ssg_finalize();
margo_finalize(cluster_handle->mid);
ssg_finalize();
free(svr_addr_str);
free(cluster_handle);
return -1;
......@@ -162,8 +167,8 @@ int mobject_store_connect(mobject_store_t cluster)
{
fprintf(stderr, "Error: Unable to create a mobject client\n");
ssg_group_unobserve(cluster_handle->gid);
ssg_finalize();
margo_finalize(cluster_handle->mid);
ssg_finalize();
free(svr_addr_str);
free(cluster_handle);
return -1;
......@@ -200,8 +205,8 @@ void mobject_store_shutdown(mobject_store_t cluster)
mobject_client_finalize(cluster_handle->mobject_clt);
ssg_group_unobserve(cluster_handle->gid);
ssg_finalize();
margo_finalize(cluster_handle->mid);
ssg_finalize();
ch_placement_finalize(cluster_handle->ch_instance);
free(cluster_handle);
......
......@@ -44,6 +44,7 @@ int main(int argc, char *argv[])
char *server_op;
char *server_addr_str;
ssg_group_id_t server_gid;
int num_addrs;
margo_instance_id mid;
char proto[24] = {0};
int group_size;
......@@ -67,57 +68,58 @@ int main(int argc, char *argv[])
return -1;
}
/* we only need to get the proto portion of the address to initialize margo */
for(i=0; i<24 && server_addr_str[i] != '\0' && server_addr_str[i] != ':'; i++)
proto[i] = server_addr_str[i];
mid = margo_init(proto, MARGO_SERVER_MODE, 0, -1);
if (mid == MARGO_INSTANCE_NULL)
{
fprintf(stderr, "Error: Unable to initialize margo\n");
return(-1);
}
/* register RPC handlers for controlling mobject server groups */
mobject_server_clean_rpc_id = MARGO_REGISTER(
mid, "mobject_server_clean", void, void, NULL);
mobject_server_stat_rpc_id = MARGO_REGISTER(
mid, "mobject_server_stat", void, void, NULL);
/* SSG initialization */
ret = ssg_init(mid);
/* NOTE: initialized before Margo so we can pull relevant SSG info out of GID file */
ret = ssg_init();
if (ret != SSG_SUCCESS)
{
fprintf(stderr, "Error: Unable to initialize SSG\n");
margo_finalize(mid);
return(-1);
}
ret = ssg_group_id_load(server_gid_file, &server_gid);
if (ret != 0)
num_addrs = 1;
ret = ssg_group_id_load(server_gid_file, &num_addrs, &server_gid);
if (ret != 0 || num_addrs < 1)
{
fprintf(stderr, "Error: Unable to load mobject server group ID\n");
ssg_finalize();
margo_finalize(mid);
return -1;
}
server_addr_str = ssg_group_id_get_addr_str(server_gid);
server_addr_str = ssg_group_id_get_addr_str(server_gid, 0);
if (!server_addr_str)
{
fprintf(stderr, "Error: Unable to get mobject server group address string\n");
ssg_finalize();
margo_finalize(mid);
return -1;
}
/* we only need to get the proto portion of the address to initialize margo */
for(i=0; i<24 && server_addr_str[i] != '\0' && server_addr_str[i] != ':'; i++)
proto[i] = server_addr_str[i];
mid = margo_init(proto, MARGO_SERVER_MODE, 0, -1);
if (mid == MARGO_INSTANCE_NULL)
{
fprintf(stderr, "Error: Unable to initialize margo\n");
ssg_finalize();
free(server_addr_str);
return(-1);
}
/* register RPC handlers for controlling mobject server groups */
mobject_server_clean_rpc_id = MARGO_REGISTER(
mid, "mobject_server_clean", void, void, NULL);
mobject_server_stat_rpc_id = MARGO_REGISTER(
mid, "mobject_server_stat", void, void, NULL);
/* observe server group to get all server addresses */
ret = ssg_group_observe(server_gid);
ret = ssg_group_observe(mid, server_gid);
if (ret != SSG_SUCCESS)
{
fprintf(stderr, "Error: Unable to observe SSG server group\n");
ssg_finalize();
margo_finalize(mid);
ssg_finalize();
free(server_addr_str);
return(-1);
......@@ -128,8 +130,8 @@ int main(int argc, char *argv[])
{
fprintf(stderr, "Error: Unable to determine SSG server group size\n");
ssg_group_unobserve(server_gid);
ssg_finalize();
margo_finalize(mid);
ssg_finalize();
free(server_addr_str);
return(-1);
......@@ -143,8 +145,8 @@ int main(int argc, char *argv[])
{
fprintf(stderr, "Error: NULL address given for group member %d\n", i);
ssg_group_unobserve(server_gid);
ssg_finalize();
margo_finalize(mid);
ssg_finalize();
free(server_addr_str);
return(-1);
}
......@@ -153,8 +155,8 @@ int main(int argc, char *argv[])
}
ssg_group_unobserve(server_gid);
ssg_finalize();
margo_finalize(mid);
ssg_finalize();
free(server_addr_str);
return 0;
......
......@@ -125,6 +125,7 @@ int main(int argc, char *argv[])
.disable_pipelining = 0, /* use pipelining by default */
};
margo_instance_id mid;
ssg_group_config_t group_config = SSG_GROUP_CONFIG_INITIALIZER;
int ret;
parse_args(argc, argv, &server_opts);
......@@ -145,7 +146,7 @@ int main(int argc, char *argv[])
margo_enable_remote_shutdown(mid);
/* SSG initialization */
ret = ssg_init(mid);
ret = ssg_init();
ASSERT(ret == 0, "ssg_init() failed (ret = %d)\n", ret);
hg_addr_t self_addr;
......@@ -200,7 +201,8 @@ int main(int argc, char *argv[])
margo_push_finalize_callback(mid, &finalize_sdskv_client_cb, (void*)&sdskv_clt_data);
/* SSG group creation */
ssg_group_id_t gid = ssg_group_create_mpi(MOBJECT_SERVER_GROUP_NAME, MPI_COMM_WORLD, NULL, NULL);
group_config.swim_period_length_ms = 10000; /* 10-second period length ... */
ssg_group_id_t gid = ssg_group_create_mpi(mid, MOBJECT_SERVER_GROUP_NAME, MPI_COMM_WORLD, &group_config, NULL, NULL);
ASSERT(gid != SSG_GROUP_ID_INVALID, "ssg_group_create_mpi() failed (ret = %s)","SSG_GROUP_ID_NULL");
margo_push_finalize_callback(mid, &finalize_ssg_cb, (void*)&gid);
......
......@@ -79,7 +79,7 @@ int mobject_provider_register(
/* one proccess writes cluster connect info to file for clients to find later */
if (my_rank == 0)
{
ret = ssg_group_id_store(cluster_file, srv_ctx->gid);
ret = ssg_group_id_store(cluster_file, srv_ctx->gid, SSG_ALL_MEMBERS);
if (ret != 0)
{
fprintf(stderr, "Error: unable to store mobject cluster info to file %s\n",
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment