Commit 4d389cc8 authored by Shane Snyder's avatar Shane Snyder
Browse files

Merge branch 'mobject-connect-dev' into io-chain-rpc-dev

parents 39f4dbe2 54a4f337
...@@ -31,7 +31,9 @@ hg_id_t mobject_shutdown_rpc_id; ...@@ -31,7 +31,9 @@ hg_id_t mobject_shutdown_rpc_id;
typedef struct mobject_store_handle typedef struct mobject_store_handle
{ {
margo_instance_id mid;
ssg_group_id_t gid; ssg_group_id_t gid;
int connected;
} mobject_store_handle_t; } mobject_store_handle_t;
...@@ -55,6 +57,7 @@ int mobject_store_create(mobject_store_t *cluster, const char * const id) ...@@ -55,6 +57,7 @@ int mobject_store_create(mobject_store_t *cluster, const char * const id)
{ {
fprintf(stderr, "Error: %s env variable must point to mobject cluster file\n", fprintf(stderr, "Error: %s env variable must point to mobject cluster file\n",
MOBJECT_CLUSTER_FILE_ENV); MOBJECT_CLUSTER_FILE_ENV);
free(cluster_handle);
return -1; return -1;
} }
...@@ -63,6 +66,7 @@ int mobject_store_create(mobject_store_t *cluster, const char * const id) ...@@ -63,6 +66,7 @@ int mobject_store_create(mobject_store_t *cluster, const char * const id)
{ {
fprintf(stderr, "Error: Unable to load mobject cluster info from file %s\n", fprintf(stderr, "Error: Unable to load mobject cluster info from file %s\n",
cluster_file); cluster_file);
free(cluster_handle);
return -1; return -1;
} }
...@@ -74,7 +78,69 @@ int mobject_store_create(mobject_store_t *cluster, const char * const id) ...@@ -74,7 +78,69 @@ int mobject_store_create(mobject_store_t *cluster, const char * const id)
int mobject_store_connect(mobject_store_t cluster) int mobject_store_connect(mobject_store_t cluster)
{ {
/* TODO ssg attach to mobject cluster group id */ mobject_store_handle_t *cluster_handle = (mobject_store_handle_t *)cluster;
char *srv_addr;
char proto[24] = {0};
int i;
int ret;
if (cluster_handle->connected)
return 0;
/* figure out protocol to connect with using address information
* associated with the SSG group ID
*/
srv_addr = ssg_group_id_get_addr_str(cluster_handle->gid);
if (!srv_addr)
{
fprintf(stderr, "Error: Unable to obtain cluster group server address\n");
ssg_group_id_free(cluster_handle->gid);
free(cluster_handle);
return -1;
}
/* we only need to the proto portion of the address to initialize */
for(i=0; i<24 && srv_addr[i] != '\0' && srv_addr[i] != ':'; i++)
proto[i] = srv_addr[i];
/* intialize margo */
/* XXX: probably want to expose some way of tweaking threading parameters */
cluster_handle->mid = margo_init(proto, MARGO_CLIENT_MODE, 0, -1);
if (cluster_handle->mid == MARGO_INSTANCE_NULL)
{
fprintf(stderr, "Error: Unable to initialize margo\n");
free(srv_addr);
ssg_group_id_free(cluster_handle->gid);
free(cluster_handle);
return -1;
}
/* initialize ssg */
ret = ssg_init(cluster_handle->mid);
if (ret != SSG_SUCCESS)
{
fprintf(stderr, "Error: Unable to initialize SSG\n");
margo_finalize(cluster_handle->mid);
free(srv_addr);
ssg_group_id_free(cluster_handle->gid);
free(cluster_handle);
return -1;
}
/* attach to the cluster group */
ret = ssg_group_attach(cluster_handle->gid);
if (ret != SSG_SUCCESS)
{
fprintf(stderr, "Error: Unable to attach to the mobject cluster group\n");
ssg_finalize();
margo_finalize(cluster_handle->mid);
free(srv_addr);
ssg_group_id_free(cluster_handle->gid);
free(cluster_handle);
return -1;
}
cluster_handle->connected = 1;
free(srv_addr);
return 0; return 0;
} }
...@@ -85,8 +151,13 @@ void mobject_store_shutdown(mobject_store_t cluster) ...@@ -85,8 +151,13 @@ void mobject_store_shutdown(mobject_store_t cluster)
(mobject_store_handle_t *)cluster; (mobject_store_handle_t *)cluster;
assert(cluster_handle != NULL); assert(cluster_handle != NULL);
/* TODO ssg detatch from mobject cluster group id. free gid? */ if (!cluster_handle->connected)
return;
ssg_group_detach(cluster_handle->gid);
ssg_finalize();
margo_finalize(cluster_handle->mid);
ssg_group_id_free(cluster_handle->gid);
free(cluster_handle); free(cluster_handle);
return; return;
......
...@@ -27,14 +27,14 @@ typedef struct mobject_server_context ...@@ -27,14 +27,14 @@ typedef struct mobject_server_context
static int mobject_server_register(mobject_server_context_t *srv_ctx); static int mobject_server_register(mobject_server_context_t *srv_ctx);
DECLARE_MARGO_RPC_HANDLER(mobject_shutdown_ult)
DECLARE_MARGO_RPC_HANDLER(mobject_write_op_ult) DECLARE_MARGO_RPC_HANDLER(mobject_write_op_ult)
DECLARE_MARGO_RPC_HANDLER(mobject_read_op_ult) DECLARE_MARGO_RPC_HANDLER(mobject_read_op_ult)
DECLARE_MARGO_RPC_HANDLER(mobject_shutdown_ult)
/* mobject RPC IDs */ /* mobject RPC IDs */
static hg_id_t mobject_shutdown_rpc_id;
static hg_id_t mobject_write_op_rpc_id; static hg_id_t mobject_write_op_rpc_id;
static hg_id_t mobject_read_op_rpc_id; static hg_id_t mobject_read_op_rpc_id;
static hg_id_t mobject_shutdown_rpc_id;
/* XXX one global mobject server state struct */ /* XXX one global mobject server state struct */
static mobject_server_context_t *g_srv_ctx = NULL; static mobject_server_context_t *g_srv_ctx = NULL;
...@@ -124,18 +124,17 @@ void mobject_server_shutdown(margo_instance_id mid) ...@@ -124,18 +124,17 @@ void mobject_server_shutdown(margo_instance_id mid)
static int mobject_server_register(mobject_server_context_t *srv_ctx) static int mobject_server_register(mobject_server_context_t *srv_ctx)
{ {
int ret=0; int ret=0;
margo_instance_id mid = srv_ctx->mid; margo_instance_id mid = srv_ctx->mid;
mobject_shutdown_rpc_id = MARGO_REGISTER(mid, "mobject_shutdown",
void, void, mobject_shutdown_ult);
mobject_write_op_rpc_id = MARGO_REGISTER(mid, "mobject_write_op", mobject_write_op_rpc_id = MARGO_REGISTER(mid, "mobject_write_op",
write_op_in_t, write_op_out_t, mobject_write_op_ult); write_op_in_t, write_op_out_t, mobject_write_op_ult);
mobject_read_op_rpc_id = MARGO_REGISTER(mid, "mobject_read_op", mobject_read_op_rpc_id = MARGO_REGISTER(mid, "mobject_read_op",
read_op_in_t, read_op_out_t, mobject_read_op_ult) read_op_in_t, read_op_out_t, mobject_read_op_ult)
mobject_shutdown_rpc_id = MARGO_REGISTER(mid, "mobject_shutdown",
void, void, mobject_shutdown_ult);
#if 0 #if 0
bake_server_register(mid, pool_info); bake_server_register(mid, pool_info);
metadata = kv_server_register(mid); metadata = kv_server_register(mid);
...@@ -144,14 +143,6 @@ static int mobject_server_register(mobject_server_context_t *srv_ctx) ...@@ -144,14 +143,6 @@ static int mobject_server_register(mobject_server_context_t *srv_ctx)
return ret; return ret;
} }
static void mobject_shutdown_ult(hg_handle_t handle)
{
margo_destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(mobject_shutdown_ult)
static hg_return_t mobject_write_op_ult(hg_handle_t h) static hg_return_t mobject_write_op_ult(hg_handle_t h)
{ {
hg_return_t ret; hg_return_t ret;
...@@ -223,3 +214,11 @@ static hg_return_t mobject_read_op_ult(hg_handle_t h) ...@@ -223,3 +214,11 @@ static hg_return_t mobject_read_op_ult(hg_handle_t h)
return ret; return ret;
} }
DEFINE_MARGO_RPC_HANDLER(mobject_read_op_ult) DEFINE_MARGO_RPC_HANDLER(mobject_read_op_ult)
static void mobject_shutdown_ult(hg_handle_t handle)
{
margo_destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(mobject_shutdown_ult)
...@@ -23,6 +23,9 @@ int main(int argc, char *argv[]) ...@@ -23,6 +23,9 @@ int main(int argc, char *argv[])
ret = rados_create(&cluster, "admin"); ret = rados_create(&cluster, "admin");
assert(ret == 0); assert(ret == 0);
ret = rados_connect(cluster);
assert(ret == 0);
rados_shutdown(cluster); rados_shutdown(cluster);
return(0); return(0);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment