Commit 3565dbab authored by Shane Snyder's avatar Shane Snyder

port mobject to new SSG version

parent 0454fdc0
......@@ -24,7 +24,8 @@ int mobject_store_aio_write_op_operate(
hg_return_t ret;
// XXX pick other servers using ch-placement
hg_addr_t svr_addr = ssg_get_addr(io->cluster->gid, 0);
ssg_member_id_t svr_id = ssg_get_group_member_id_from_rank(io->cluster->gid, 0);
hg_addr_t svr_addr = ssg_get_group_member_addr(io->cluster->gid, svr_id);
mobject_provider_handle_t mph;
mobject_provider_handle_create(io->cluster->mobject_clt, svr_addr, 1, &mph);
......@@ -46,7 +47,8 @@ int mobject_store_aio_read_op_operate(mobject_store_read_op_t read_op,
hg_return_t ret;
// XXX pick other servers using ch-placement
hg_addr_t svr_addr = ssg_get_addr(io->cluster->gid, 0);
ssg_member_id_t svr_id = ssg_get_group_member_id_from_rank(io->cluster->gid, 0);
hg_addr_t svr_addr = ssg_get_group_member_addr(io->cluster->gid, svr_id);
mobject_provider_handle_t mph;
mobject_provider_handle_create(io->cluster->mobject_clt, svr_addr, 1, &mph);
......
......@@ -85,7 +85,6 @@ int mobject_store_connect(mobject_store_t cluster)
if (!svr_addr_str)
{
fprintf(stderr, "Error: Unable to obtain cluster group server address\n");
ssg_group_id_free(cluster_handle->gid);
free(cluster_handle);
return -1;
}
......@@ -100,7 +99,6 @@ int mobject_store_connect(mobject_store_t cluster)
{
fprintf(stderr, "Error: Unable to initialize margo\n");
free(svr_addr_str);
ssg_group_id_free(cluster_handle->gid);
free(cluster_handle);
return -1;
}
......@@ -114,20 +112,18 @@ int mobject_store_connect(mobject_store_t cluster)
fprintf(stderr, "Error: Unable to initialize SSG\n");
margo_finalize(cluster_handle->mid);
free(svr_addr_str);
ssg_group_id_free(cluster_handle->gid);
free(cluster_handle);
return -1;
}
/* attach to the cluster group */
ret = ssg_group_attach(cluster_handle->gid);
/* observe the cluster group */
ret = ssg_group_observe(cluster_handle->gid);
if (ret != SSG_SUCCESS)
{
fprintf(stderr, "Error: Unable to attach to the mobject cluster group\n");
fprintf(stderr, "Error: Unable to observe the mobject cluster group\n");
ssg_finalize();
margo_finalize(cluster_handle->mid);
free(svr_addr_str);
ssg_group_id_free(cluster_handle->gid);
free(cluster_handle);
return -1;
}
......@@ -138,10 +134,10 @@ int mobject_store_connect(mobject_store_t cluster)
if(gsize == 0)
{
fprintf(stderr, "Error: Unable to get SSG group size\n");
ssg_group_unobserve(cluster_handle->gid);
ssg_finalize();
margo_finalize(cluster_handle->mid);
free(svr_addr_str);
ssg_group_id_free(cluster_handle->gid);
free(cluster_handle);
return -1;
}
......@@ -152,10 +148,10 @@ int mobject_store_connect(mobject_store_t cluster)
if(!cluster_handle->ch_instance)
{
fprintf(stderr, "Error: Unable to initialize ch-placement instance\n");
ssg_group_unobserve(cluster_handle->gid);
ssg_finalize();
margo_finalize(cluster_handle->mid);
free(svr_addr_str);
ssg_group_id_free(cluster_handle->gid);
free(cluster_handle);
return -1;
}
......@@ -165,6 +161,10 @@ int mobject_store_connect(mobject_store_t cluster)
if(ret != 0)
{
fprintf(stderr, "Error: Unable to create a mobject client\n");
ssg_group_unobserve(cluster_handle->gid);
ssg_finalize();
margo_finalize(cluster_handle->mid);
free(svr_addr_str);
free(cluster_handle);
return -1;
}
......@@ -199,10 +199,9 @@ void mobject_store_shutdown(mobject_store_t cluster)
}
mobject_client_finalize(cluster_handle->mobject_clt);
ssg_group_detach(cluster_handle->gid);
ssg_group_unobserve(cluster_handle->gid);
ssg_finalize();
margo_finalize(cluster_handle->mid);
ssg_group_id_free(cluster_handle->gid);
ch_placement_finalize(cluster_handle->ch_instance);
free(cluster_handle);
......@@ -334,13 +333,13 @@ int mobject_store_write_op_operate(mobject_store_write_op_t write_op,
{
mobject_provider_handle_t mph = MOBJECT_PROVIDER_HANDLE_NULL;
uint64_t oid_hash = sdbm_hash(oid);
unsigned long server_idx;
ch_placement_find_closest(io->cluster->ch_instance, oid_hash, 1, &server_idx);
// XXX multiple providers may be in the same node (with distinct mplex ids)
hg_addr_t svr_addr = ssg_get_addr(io->cluster->gid, server_idx);
unsigned long server_rank;
ch_placement_find_closest(io->cluster->ch_instance, oid_hash, 1, &server_rank);
ssg_member_id_t svr_id = ssg_get_group_member_id_from_rank(io->cluster->gid, server_rank);
hg_addr_t svr_addr = ssg_get_group_member_addr(io->cluster->gid, svr_id);
// TODO for now multiplex id is hard-coded as 1
// XXX multiple providers may be in the same node (with distinct mplex ids)
int r = mobject_provider_handle_create(io->cluster->mobject_clt, svr_addr, 1, &mph);
if(r != 0) return r;
......@@ -412,10 +411,12 @@ int mobject_store_read_op_operate(mobject_store_read_op_t read_op,
{
mobject_provider_handle_t mph = MOBJECT_PROVIDER_HANDLE_NULL;
uint64_t oid_hash = sdbm_hash(oid);
unsigned long server_idx;
ch_placement_find_closest(ioctx->cluster->ch_instance, oid_hash, 1, &server_idx);
unsigned long server_rank;
ch_placement_find_closest(ioctx->cluster->ch_instance, oid_hash, 1, &server_rank);
ssg_member_id_t svr_id = ssg_get_group_member_id_from_rank(ioctx->cluster->gid, server_rank);
hg_addr_t svr_addr = ssg_get_group_member_addr(ioctx->cluster->gid, svr_id);
// XXX multiple providers may be in the same node (with distinct mplex ids)
hg_addr_t svr_addr = ssg_get_addr(ioctx->cluster->gid, server_idx);
// TODO for now multiplex id is hard-coded as 1
int r = mobject_provider_handle_create(ioctx->cluster->mobject_clt, svr_addr, 1, &mph);
if(r != 0) return r;
......@@ -429,9 +430,11 @@ int mobject_store_read_op_operate(mobject_store_read_op_t read_op,
static int mobject_store_shutdown_servers(struct mobject_store_handle *cluster_handle)
{
hg_addr_t svr_addr;
ssg_member_id_t svr_id;
/* get the address of the first server */
svr_addr = ssg_get_addr(cluster_handle->gid, 0);
svr_id = ssg_get_group_member_id_from_rank(cluster_handle->gid, 0);
svr_addr = ssg_get_group_member_addr(cluster_handle->gid, svr_id);
if (svr_addr == HG_ADDR_NULL)
{
fprintf(stderr, "Error: Unable to obtain address for mobject server\n");
......
......@@ -49,6 +49,7 @@ int main(int argc, char *argv[])
int group_size;
int (*send_op_ptr)(margo_instance_id, hg_addr_t);
int i;
ssg_member_id_t server_id;
hg_addr_t server_addr;
int ret;
......@@ -66,20 +67,6 @@ int main(int argc, char *argv[])
return -1;
}
ret = ssg_group_id_load(server_gid_file, &server_gid);
if (ret != 0)
{
fprintf(stderr, "Error: Unable to load mobject server group ID\n");
return -1;
}
server_addr_str = ssg_group_id_get_addr_str(server_gid);
if (!server_addr_str)
{
fprintf(stderr, "Error: Unable to get mobject server group address string\n");
return -1;
}
/* we only need to get the proto portion of the address to initialize margo */
for(i=0; i<24 && server_addr_str[i] != '\0' && server_addr_str[i] != ':'; i++)
proto[i] = server_addr_str[i];
......@@ -88,7 +75,6 @@ int main(int argc, char *argv[])
if (mid == MARGO_INSTANCE_NULL)
{
fprintf(stderr, "Error: Unable to initialize margo\n");
free(server_addr_str);
return(-1);
}
......@@ -104,15 +90,32 @@ int main(int argc, char *argv[])
{
fprintf(stderr, "Error: Unable to initialize SSG\n");
margo_finalize(mid);
free(server_addr_str);
return(-1);
}
/* attach to server group to get all server addresses */
ret = ssg_group_attach(server_gid);
ret = ssg_group_id_load(server_gid_file, &server_gid);
if (ret != 0)
{
fprintf(stderr, "Error: Unable to load mobject server group ID\n");
ssg_finalize();
margo_finalize(mid);
return -1;
}
server_addr_str = ssg_group_id_get_addr_str(server_gid);
if (!server_addr_str)
{
fprintf(stderr, "Error: Unable to get mobject server group address string\n");
ssg_finalize();
margo_finalize(mid);
return -1;
}
/* observe server group to get all server addresses */
ret = ssg_group_observe(server_gid);
if (ret != SSG_SUCCESS)
{
fprintf(stderr, "Error: Unable to attach to SSG server group\n");
fprintf(stderr, "Error: Unable to observe SSG server group\n");
ssg_finalize();
margo_finalize(mid);
free(server_addr_str);
......@@ -124,7 +127,7 @@ int main(int argc, char *argv[])
if (group_size == 0)
{
fprintf(stderr, "Error: Unable to determine SSG server group size\n");
ssg_group_detach(server_gid);
ssg_group_unobserve(server_gid);
ssg_finalize();
margo_finalize(mid);
free(server_addr_str);
......@@ -134,11 +137,12 @@ int main(int argc, char *argv[])
for (i = 0 ; i < group_size; i++)
{
server_addr = ssg_get_addr(server_gid, i);
server_id = ssg_get_group_member_id_from_rank(server_gid, i);
server_addr = ssg_get_group_member_addr(server_gid, server_id);
if (server_addr == HG_ADDR_NULL)
{
fprintf(stderr, "Error: NULL address given for group member %d\n", i);
ssg_group_detach(server_gid);
ssg_group_unobserve(server_gid);
ssg_finalize();
margo_finalize(mid);
free(server_addr_str);
......@@ -148,7 +152,7 @@ int main(int argc, char *argv[])
ret = send_op_ptr(mid, server_addr);
}
ssg_group_detach(server_gid);
ssg_group_unobserve(server_gid);
ssg_finalize();
margo_finalize(mid);
free(server_addr_str);
......
......@@ -113,7 +113,6 @@ static void parse_args(int argc, char **argv, mobject_server_options *opts)
static void finalize_ssg_cb(void* data);
static void finalize_bake_client_cb(void* data);
static void finalize_sdskv_client_cb(void* data);
static void finalized_ssg_group_cb(void* data);
int main(int argc, char *argv[])
{
......@@ -202,8 +201,8 @@ int main(int argc, char *argv[])
/* SSG group creation */
ssg_group_id_t gid = ssg_group_create_mpi(MOBJECT_SERVER_GROUP_NAME, MPI_COMM_WORLD, NULL, NULL);
ASSERT(gid != SSG_GROUP_ID_NULL, "ssg_group_create_mpi() failed (ret = %s)","SSG_GROUP_ID_NULL");
margo_push_finalize_callback(mid, &finalized_ssg_group_cb, (void*)&gid);
ASSERT(gid != SSG_GROUP_ID_INVALID, "ssg_group_create_mpi() failed (ret = %s)","SSG_GROUP_ID_NULL");
margo_push_finalize_callback(mid, &finalize_ssg_cb, (void*)&gid);
/* Mobject provider initialization */
mobject_provider_t mobject_prov;
......@@ -232,6 +231,7 @@ static void finalize_ssg_cb(void* data)
{
ssg_group_id_t* gid = (ssg_group_id_t*)data;
ssg_group_destroy(*gid);
ssg_finalize();
}
static void finalize_bake_client_cb(void* data)
......@@ -247,9 +247,3 @@ static void finalize_sdskv_client_cb(void* data)
sdskv_provider_handle_release(clt_data->provider_handle);
sdskv_client_finalize(clt_data->client);
}
static void finalized_ssg_group_cb(void* data)
{
ssg_group_id_t gid = *((ssg_group_id_t*)data);
ssg_group_destroy(gid);
}
......@@ -48,7 +48,7 @@ int mobject_provider_register(
mobject_provider_t* provider)
{
mobject_provider_t srv_ctx;
int my_id;
int my_rank;
int ret;
/* check if a provider with the same multiplex id already exists */
......@@ -74,10 +74,10 @@ int mobject_provider_register(
ABT_mutex_create(&srv_ctx->stats_mutex);
srv_ctx->gid = gid;
my_id = ssg_get_group_self_id(srv_ctx->gid);
my_rank = ssg_get_group_self_rank(srv_ctx->gid);
/* one proccess writes cluster connect info to file for clients to find later */
if (my_id == 0)
if (my_rank == 0)
{
ret = ssg_group_id_store(cluster_file, srv_ctx->gid);
if (ret != 0)
......@@ -293,24 +293,24 @@ DEFINE_MARGO_RPC_HANDLER(mobject_server_clean_ult)
static hg_return_t mobject_server_stat_ult(hg_handle_t h)
{
hg_return_t ret;
int my_rank;
ssg_member_id_t my_id;
char my_hostname[256] = {0};
const struct hg_info* info = margo_get_info(h);
margo_instance_id mid = margo_hg_handle_get_instance(h);
struct mobject_server_context *srv_ctx = margo_registered_data(mid, info->id);
my_rank = ssg_get_group_self_id(srv_ctx->gid);
my_id = ssg_get_self_id(mid);
gethostname(my_hostname, sizeof(my_hostname));
ABT_mutex_lock(srv_ctx->stats_mutex);
fprintf(stderr,
"Server %d (host: %s):\n" \
"Server %lu (host: %s):\n" \
"\tSegments allocated: %u\n" \
"\tTotal segment size: %lu bytes\n" \
"\tTotal segment write time: %.4lf s\n" \
"\tTotal segment write b/w: %.4lf MiB/s\n", \
my_rank, my_hostname, srv_ctx->segs,
my_id, my_hostname, srv_ctx->segs,
srv_ctx->total_seg_size, srv_ctx->total_seg_wr_duration,
(srv_ctx->total_seg_size / (1024.0 * 1024.0 ) / srv_ctx->total_seg_wr_duration));
ABT_mutex_unlock(srv_ctx->stats_mutex);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment