Commit 17bd4af9 authored by Shane Snyder's avatar Shane Snyder

fully implement cluster create, connect, shutdown

parent f7bdeaef
...@@ -19,7 +19,9 @@ ...@@ -19,7 +19,9 @@
typedef struct mobject_store_handle typedef struct mobject_store_handle
{ {
margo_instance_id mid;
ssg_group_id_t gid; ssg_group_id_t gid;
int connected;
} mobject_store_handle_t; } mobject_store_handle_t;
...@@ -44,6 +46,7 @@ int mobject_store_create(mobject_store_t *cluster, const char * const id) ...@@ -44,6 +46,7 @@ int mobject_store_create(mobject_store_t *cluster, const char * const id)
{ {
fprintf(stderr, "Error: %s env variable must point to mobject cluster file\n", fprintf(stderr, "Error: %s env variable must point to mobject cluster file\n",
MOBJECT_CLUSTER_FILE_ENV); MOBJECT_CLUSTER_FILE_ENV);
free(cluster_handle);
return -1; return -1;
} }
...@@ -52,6 +55,7 @@ int mobject_store_create(mobject_store_t *cluster, const char * const id) ...@@ -52,6 +55,7 @@ int mobject_store_create(mobject_store_t *cluster, const char * const id)
{ {
fprintf(stderr, "Error: Unable to load mobject cluster info from file %s\n", fprintf(stderr, "Error: Unable to load mobject cluster info from file %s\n",
cluster_file); cluster_file);
free(cluster_handle);
return -1; return -1;
} }
...@@ -63,7 +67,69 @@ int mobject_store_create(mobject_store_t *cluster, const char * const id) ...@@ -63,7 +67,69 @@ int mobject_store_create(mobject_store_t *cluster, const char * const id)
int mobject_store_connect(mobject_store_t cluster) int mobject_store_connect(mobject_store_t cluster)
{ {
/* TODO ssg attach to mobject cluster group id */ mobject_store_handle_t *cluster_handle = (mobject_store_handle_t *)cluster;
char *srv_addr;
char proto[24] = {0};
int i;
int ret;
if (cluster_handle->connected)
return 0;
/* figure out transport to connect with using address information
* associated with the SSG group ID
*/
srv_addr = ssg_group_id_get_addr_str(cluster_handle->gid);
if (!srv_addr)
{
fprintf(stderr, "Error: Unable to obtain cluster group server address\n");
ssg_group_id_free(cluster_handle->gid);
free(cluster_handle);
return -1;
}
/* */
for(i=0; i<24 && srv_addr[i] != '\0' && srv_addr[i] != ':'; i++)
proto[i] = srv_addr[i];
/* intialize margo */
/* XXX: probably want to expose some way of tweaking threading parameters */
cluster_handle->mid = margo_init(proto, MARGO_CLIENT_MODE, 0, -1);
if (cluster_handle->mid == MARGO_INSTANCE_NULL)
{
fprintf(stderr, "Error: Unable to initialize margo\n");
free(srv_addr);
ssg_group_id_free(cluster_handle->gid);
free(cluster_handle);
return -1;
}
/* initialize ssg */
ret = ssg_init(cluster_handle->mid);
if (ret != SSG_SUCCESS)
{
fprintf(stderr, "Error: Unable to initialize SSG\n");
margo_finalize(cluster_handle->mid);
free(srv_addr);
ssg_group_id_free(cluster_handle->gid);
free(cluster_handle);
return -1;
}
/* attach to the cluster group */
ret = ssg_group_attach(cluster_handle->gid);
if (ret != SSG_SUCCESS)
{
fprintf(stderr, "Error: Unable to attach to the mobject cluster group\n");
ssg_finalize();
margo_finalize(cluster_handle->mid);
free(srv_addr);
ssg_group_id_free(cluster_handle->gid);
free(cluster_handle);
return -1;
}
cluster_handle->connected = 1;
free(srv_addr);
return 0; return 0;
} }
...@@ -74,8 +140,13 @@ void mobject_store_shutdown(mobject_store_t cluster) ...@@ -74,8 +140,13 @@ void mobject_store_shutdown(mobject_store_t cluster)
(mobject_store_handle_t *)cluster; (mobject_store_handle_t *)cluster;
assert(cluster_handle != NULL); assert(cluster_handle != NULL);
/* TODO ssg detatch from mobject cluster group id. free gid? */ if (!cluster_handle->connected)
return;
ssg_group_detach(cluster_handle->gid);
ssg_finalize();
margo_finalize(cluster_handle->mid);
ssg_group_id_free(cluster_handle->gid);
free(cluster_handle); free(cluster_handle);
return; return;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment