Commit e6d39f0f authored by Philip Carns's avatar Philip Carns
Browse files

Merge branch 'master' of xgitlab.cels.anl.gov:sds/ssg

parents 8d161c09 9e4401b7
......@@ -67,6 +67,14 @@ LIBS="$MARGO_LIBS $LIBS"
CPPFLAGS="$MARGO_CFLAGS $CPPFLAGS"
CFLAGS="$MARGO_CFLAGS $CFLAGS"
PKG_CHECK_MODULES([ABT_SNOOZER],[abt-snoozer],
[AC_DEFINE([HAVE_ABT_SNOOZER], [1], [Use ABT-SNOOZER])],
[printf ""])
LIBS="$ABT_SNOOZER_LIBS $LIBS"
CPPFLAGS="$ABT_SNOOZER_CFLAGS $CPPFLAGS"
CFLAGS="$ABT_SNOOZER_CFLAGS $CFLAGS"
AC_ARG_ENABLE([swim-fd],
[ --enable-swim-fd enable SWIM failure detection (default: disabled)],
[if test "x$enable_swim_fd" = "xyes" ; then
......
......@@ -7,6 +7,6 @@ Name: ssg
Description: Scalable Service Groups (SSG) interface for Mercury
Version: 0.1
URL: https://xgitlab.cels.anl.gov/sds/ssg
Requires: mercury
Requires: margo
Libs: -L${libdir} -lssg
Cflags: -I${includedir}
......@@ -51,15 +51,11 @@ static hg_id_t ssg_group_attach_rpc_id;
*/
void ssg_register_rpcs()
{
hg_class_t *hgcl = NULL;
hgcl = margo_get_class(ssg_inst->mid);
if (!hgcl) return;
/* register HG RPCs for SSG */
ssg_group_attach_rpc_id = MERCURY_REGISTER(hgcl, "ssg_group_attach",
ssg_group_attach_rpc_id =
MARGO_REGISTER(ssg_inst->mid, "ssg_group_attach",
ssg_group_attach_request_t, ssg_group_attach_response_t,
ssg_group_attach_recv_ult_handler);
ssg_group_attach_recv_ult);
return;
}
......@@ -74,7 +70,6 @@ int ssg_group_attach_send(
int * group_size,
void ** view_buf)
{
hg_class_t *hgcl = NULL;
hg_addr_t member_addr = HG_ADDR_NULL;
hg_handle_t handle = HG_HANDLE_NULL;
hg_bulk_t bulk_handle = HG_BULK_NULL;
......@@ -89,15 +84,12 @@ int ssg_group_attach_send(
*group_size = 0;
*view_buf = NULL;
hgcl = margo_get_class(ssg_inst->mid);
if (!hgcl) goto fini;
/* lookup the address of the given group member */
hret = margo_addr_lookup(ssg_inst->mid, group_descriptor->addr_str,
&member_addr);
if (hret != HG_SUCCESS) goto fini;
hret = HG_Create(margo_get_context(ssg_inst->mid), member_addr,
hret = margo_create(ssg_inst->mid, member_addr,
ssg_group_attach_rpc_id, &handle);
if (hret != HG_SUCCESS) goto fini;
......@@ -109,17 +101,17 @@ int ssg_group_attach_send(
tmp_view_buf = malloc(tmp_view_buf_size);
if (!tmp_view_buf) goto fini;
hret = HG_Bulk_create(hgcl, 1, &tmp_view_buf, &tmp_view_buf_size,
hret = margo_bulk_create(ssg_inst->mid, 1, &tmp_view_buf, &tmp_view_buf_size,
HG_BULK_WRITE_ONLY, &bulk_handle);
if (hret != HG_SUCCESS) goto fini;
/* send an attach request to the given group member address */
memcpy(&attach_req.group_descriptor, group_descriptor, sizeof(*group_descriptor));
attach_req.bulk_handle = bulk_handle;
hret = margo_forward(ssg_inst->mid, handle, &attach_req);
hret = margo_forward(handle, &attach_req);
if (hret != HG_SUCCESS) goto fini;
hret = HG_Get_output(handle, &attach_resp);
hret = margo_get_output(handle, &attach_resp);
if (hret != HG_SUCCESS) goto fini;
/* if our initial buffer is too small, reallocate to the exact size & reattach */
......@@ -128,24 +120,24 @@ int ssg_group_attach_send(
b = realloc(tmp_view_buf, attach_resp.view_buf_size);
if(!b)
{
HG_Free_output(handle, &attach_resp);
margo_free_output(handle, &attach_resp);
goto fini;
}
tmp_view_buf = b;
tmp_view_buf_size = attach_resp.view_buf_size;
/* free old bulk handle and recreate it */
HG_Bulk_free(bulk_handle);
hret = HG_Bulk_create(hgcl, 1, &tmp_view_buf, &tmp_view_buf_size,
margo_bulk_free(bulk_handle);
hret = margo_bulk_create(ssg_inst->mid, 1, &tmp_view_buf, &tmp_view_buf_size,
HG_BULK_WRITE_ONLY, &bulk_handle);
if (hret != HG_SUCCESS) goto fini;
attach_req.bulk_handle = bulk_handle;
hret = margo_forward(ssg_inst->mid, handle, &attach_req);
hret = margo_forward(handle, &attach_req);
if (hret != HG_SUCCESS) goto fini;
HG_Free_output(handle, &attach_resp);
hret = HG_Get_output(handle, &attach_resp);
margo_free_output(handle, &attach_resp);
hret = margo_get_output(handle, &attach_resp);
if (hret != HG_SUCCESS) goto fini;
}
......@@ -155,7 +147,7 @@ int ssg_group_attach_send(
b = realloc(tmp_view_buf, attach_resp.view_buf_size);
if(!b)
{
HG_Free_output(handle, &attach_resp);
margo_free_output(handle, &attach_resp);
goto fini;
}
tmp_view_buf = b;
......@@ -166,13 +158,13 @@ int ssg_group_attach_send(
*group_size = (int)attach_resp.group_size;
*view_buf = tmp_view_buf;
HG_Free_output(handle, &attach_resp);
margo_free_output(handle, &attach_resp);
tmp_view_buf = NULL;
sret = SSG_SUCCESS;
fini:
if (hgcl && member_addr != HG_ADDR_NULL) HG_Addr_free(hgcl, member_addr);
if (handle != HG_HANDLE_NULL) HG_Destroy(handle);
if (bulk_handle != HG_BULK_NULL) HG_Bulk_free(bulk_handle);
if (member_addr != HG_ADDR_NULL) margo_addr_free(ssg_inst->mid, member_addr);
if (handle != HG_HANDLE_NULL) margo_destroy(handle);
if (bulk_handle != HG_BULK_NULL) margo_bulk_free(bulk_handle);
free(tmp_view_buf);
return sret;
......@@ -181,7 +173,6 @@ fini:
static void ssg_group_attach_recv_ult(
hg_handle_t handle)
{
hg_class_t *hgcl = NULL;
const struct hg_info *hgi = NULL;
ssg_group_t *g = NULL;
ssg_group_attach_request_t attach_req;
......@@ -195,39 +186,37 @@ static void ssg_group_attach_recv_ult(
if (!ssg_inst) goto fini;
hgcl = margo_get_class(ssg_inst->mid);
if (!hgcl) goto fini;
hgi = HG_Get_info(handle);
hgi = margo_get_info(handle);
if (!hgi) goto fini;
hret = HG_Get_input(handle, &attach_req);
hret = margo_get_input(handle, &attach_req);
if (hret != HG_SUCCESS) goto fini;
view_size_requested = HG_Bulk_get_size(attach_req.bulk_handle);
view_size_requested = margo_bulk_get_size(attach_req.bulk_handle);
/* look for the given group in my local table of groups */
HASH_FIND(hh, ssg_inst->group_table, &attach_req.group_descriptor.name_hash,
sizeof(uint64_t), g);
if (!g)
{
HG_Free_input(handle, &attach_req);
margo_free_input(handle, &attach_req);
goto fini;
}
sret = ssg_group_view_serialize(&g->view, &view_buf, &view_buf_size);
if (sret != SSG_SUCCESS)
{
HG_Free_input(handle, &attach_req);
margo_free_input(handle, &attach_req);
goto fini;
}
if (view_size_requested >= view_buf_size)
{
/* if attacher's buf is large enough, transfer the view */
hret = HG_Bulk_create(hgcl, 1, &view_buf, &view_buf_size, HG_BULK_READ_ONLY,
&bulk_handle);
hret = margo_bulk_create(ssg_inst->mid, 1, &view_buf, &view_buf_size,
HG_BULK_READ_ONLY, &bulk_handle);
if (hret != HG_SUCCESS)
{
HG_Free_input(handle, &attach_req);
margo_free_input(handle, &attach_req);
goto fini;
}
......@@ -235,7 +224,7 @@ static void ssg_group_attach_recv_ult(
attach_req.bulk_handle, 0, bulk_handle, 0, view_buf_size);
if (hret != HG_SUCCESS)
{
HG_Free_input(handle, &attach_req);
margo_free_input(handle, &attach_req);
goto fini;
}
}
......@@ -244,13 +233,13 @@ static void ssg_group_attach_recv_ult(
attach_resp.group_name = g->name;
attach_resp.group_size = (int)g->view.size;
attach_resp.view_buf_size = view_buf_size;
margo_respond(ssg_inst->mid, handle, &attach_resp);
margo_respond(handle, &attach_resp);
HG_Free_input(handle, &attach_req);
margo_free_input(handle, &attach_req);
fini:
free(view_buf); /* TODO: cache this */
if (handle != HG_HANDLE_NULL) HG_Destroy(handle);
if (bulk_handle != HG_BULK_NULL) HG_Bulk_free(bulk_handle);
if (handle != HG_HANDLE_NULL) margo_destroy(handle);
if (bulk_handle != HG_BULK_NULL) margo_bulk_free(bulk_handle);
return;
}
......
......@@ -130,7 +130,6 @@ ssg_group_id_t ssg_group_create(
ssg_membership_update_cb update_cb,
void * update_cb_dat)
{
hg_class_t *hgcl = NULL;
hg_addr_t self_addr = HG_ADDR_NULL;
char *self_addr_str = NULL;
hg_size_t self_addr_str_size = 0;
......@@ -142,9 +141,6 @@ ssg_group_id_t ssg_group_create(
if (!ssg_inst) return group_id;
hgcl = margo_get_class(ssg_inst->mid);
if (!hgcl) return group_id;
/* generate a unique ID for this group */
tmp_descriptor = ssg_group_descriptor_create(group_name, group_addr_strs[0]);
if (tmp_descriptor == NULL) return group_id;
......@@ -159,13 +155,13 @@ ssg_group_id_t ssg_group_create(
}
/* get my address */
hret = HG_Addr_self(hgcl, &self_addr);
hret = margo_addr_self(ssg_inst->mid, &self_addr);
if (hret != HG_SUCCESS) goto fini;
hret = HG_Addr_to_string(hgcl, NULL, &self_addr_str_size, self_addr);
hret = margo_addr_to_string(ssg_inst->mid, NULL, &self_addr_str_size, self_addr);
if (hret != HG_SUCCESS) goto fini;
self_addr_str = malloc(self_addr_str_size);
if (self_addr_str == NULL) goto fini;
hret = HG_Addr_to_string(hgcl, self_addr_str, &self_addr_str_size, self_addr);
hret = margo_addr_to_string(ssg_inst->mid, self_addr_str, &self_addr_str_size, self_addr);
if (hret != HG_SUCCESS) goto fini;
/* allocate an SSG group data structure and initialize some of it */
......@@ -215,7 +211,7 @@ ssg_group_id_t ssg_group_create(
self_addr = HG_ADDR_NULL;
g = NULL;
fini:
if (hgcl && self_addr != HG_ADDR_NULL) HG_Addr_free(hgcl, self_addr);
if (self_addr != HG_ADDR_NULL) margo_addr_free(ssg_inst->mid, self_addr);
free(self_addr_str);
if (g)
{
......@@ -326,7 +322,6 @@ ssg_group_id_t ssg_group_create_mpi(
ssg_membership_update_cb update_cb,
void * update_cb_dat)
{
hg_class_t *hgcl = NULL;
hg_addr_t self_addr = HG_ADDR_NULL;
char *self_addr_str = NULL;
hg_size_t self_addr_str_size = 0;
......@@ -341,17 +336,14 @@ ssg_group_id_t ssg_group_create_mpi(
if (!ssg_inst) goto fini;
hgcl = margo_get_class(ssg_inst->mid);
if (!hgcl) goto fini;
/* get my address */
hret = HG_Addr_self(hgcl, &self_addr);
hret = margo_addr_self(ssg_inst->mid, &self_addr);
if (hret != HG_SUCCESS) goto fini;
hret = HG_Addr_to_string(hgcl, NULL, &self_addr_str_size, self_addr);
hret = margo_addr_to_string(ssg_inst->mid, NULL, &self_addr_str_size, self_addr);
if (hret != HG_SUCCESS) goto fini;
self_addr_str = malloc(self_addr_str_size);
if (self_addr_str == NULL) goto fini;
hret = HG_Addr_to_string(hgcl, self_addr_str, &self_addr_str_size, self_addr);
hret = margo_addr_to_string(ssg_inst->mid, self_addr_str, &self_addr_str_size, self_addr);
if (hret != HG_SUCCESS) goto fini;
self_addr_str_size_int = (int)self_addr_str_size; /* null char included in call */
......@@ -388,7 +380,7 @@ ssg_group_id_t ssg_group_create_mpi(
fini:
/* cleanup before returning */
if (hgcl && self_addr != HG_ADDR_NULL) HG_Addr_free(hgcl, self_addr);
if (self_addr != HG_ADDR_NULL) margo_addr_free(ssg_inst->mid, self_addr);
free(self_addr_str);
free(sizes);
free(sizes_psum);
......@@ -737,16 +729,11 @@ void ssg_apply_membership_update(
ssg_group_t *g,
ssg_membership_update_t update)
{
hg_class_t *hgcl = NULL;
if(!ssg_inst || !g) return;
hgcl = margo_get_class(ssg_inst->mid);
if (!hgcl) return;
if (update.type == SSG_MEMBER_REMOVE)
{
HG_Addr_free(hgcl, g->view.member_states[update.member].addr);
margo_addr_free(ssg_inst->mid, g->view.member_states[update.member].addr);
free(g->view.member_states[update.member].addr_str);
g->view.member_states[update.member].addr_str = NULL;
g->view.member_states[update.member].is_member = 0;
......@@ -978,7 +965,7 @@ static void ssg_group_view_destroy(
for (i = 0; i < view->size; i++)
{
free(view->member_states[i].addr_str);
HG_Addr_free(margo_get_class(ssg_inst->mid), view->member_states[i].addr);
margo_addr_free(ssg_inst->mid, view->member_states[i].addr);
}
free(view->member_states);
......
......@@ -11,7 +11,6 @@
#include <assert.h>
#include <mercury.h>
#include <abt.h>
#include <margo.h>
#include "ssg.h"
......@@ -72,18 +71,16 @@ static hg_id_t swim_iping_rpc_id;
void swim_register_ping_rpcs(
ssg_group_t *g)
{
hg_class_t *hg_cls = margo_get_class(ssg_inst->mid);
/* register RPC handlers for SWIM pings */
swim_dping_rpc_id = MERCURY_REGISTER(hg_cls, "swim_dping", swim_dping_req_t,
swim_dping_resp_t, swim_dping_recv_ult_handler);
swim_iping_rpc_id = MERCURY_REGISTER(hg_cls, "swim_iping", swim_iping_req_t,
swim_iping_resp_t, swim_iping_recv_ult_handler);
swim_dping_rpc_id = MARGO_REGISTER(ssg_inst->mid, "swim_dping", swim_dping_req_t,
swim_dping_resp_t, swim_dping_recv_ult);
swim_iping_rpc_id = MARGO_REGISTER(ssg_inst->mid, "swim_iping", swim_iping_req_t,
swim_iping_resp_t, swim_iping_recv_ult);
/* register swim context data structure with each RPC type */
/* XXX: this won't work for multiple groups ... */
HG_Register_data(hg_cls, swim_dping_rpc_id, g, NULL);
HG_Register_data(hg_cls, swim_iping_rpc_id, g, NULL);
margo_register_data(ssg_inst->mid, swim_dping_rpc_id, g, NULL);
margo_register_data(ssg_inst->mid, swim_iping_rpc_id, g, NULL);
return;
}
......@@ -137,8 +134,7 @@ static int swim_send_dping(
if(target_addr == HG_ADDR_NULL)
return(ret);
hret = HG_Create(margo_get_context(ssg_inst->mid), target_addr, swim_dping_rpc_id,
&handle);
hret = margo_create(ssg_inst->mid, target_addr, swim_dping_rpc_id, &handle);
if(hret != HG_SUCCESS)
return(ret);
......@@ -148,11 +144,11 @@ static int swim_send_dping(
swim_pack_message(g, &(dping_req.msg));
/* send a direct ping that expires at the end of the protocol period */
hret = margo_forward_timed(ssg_inst->mid, handle, &dping_req,
hret = margo_forward_timed(handle, &dping_req,
swim_ctx->prot_period_len);
if (hret == HG_SUCCESS)
{
hret = HG_Get_output(handle, &dping_resp);
hret = margo_get_output(handle, &dping_resp);
if(hret != HG_SUCCESS) goto fini;
SSG_DEBUG(g, "SWIM: recv dping ack from %d\n", (int)dping_resp.msg.source_id);
......@@ -161,7 +157,7 @@ static int swim_send_dping(
/* extract target's membership state from response */
swim_unpack_message(g, &(dping_resp.msg));
HG_Free_output(handle, &dping_resp);
margo_free_output(handle, &dping_resp);
ret = 0;
}
else if(hret != HG_TIMEOUT)
......@@ -170,7 +166,7 @@ static int swim_send_dping(
}
fini:
HG_Destroy(handle);
margo_destroy(handle);
return(ret);
}
......@@ -178,20 +174,17 @@ static void swim_dping_recv_ult(hg_handle_t handle)
{
ssg_group_t *g;
swim_context_t *swim_ctx;
const struct hg_info *info;
swim_dping_req_t dping_req;
swim_dping_resp_t dping_resp;
hg_return_t hret;
/* get ssg & swim state */
info = HG_Get_info(handle);
if(info == NULL) goto fini;
g = (ssg_group_t *)HG_Registered_data(info->hg_class, swim_dping_rpc_id);
g = (ssg_group_t *)margo_registered_data(ssg_inst->mid, swim_dping_rpc_id);
assert(g != NULL);
swim_ctx = (swim_context_t *)g->fd_ctx;
assert(swim_ctx != NULL);
hret = HG_Get_input(handle, &dping_req);
hret = margo_get_input(handle, &dping_req);
if(hret != HG_SUCCESS) goto fini;
SSG_DEBUG(g, "SWIM: recv dping req from %d\n", (int)dping_req.msg.source_id);
......@@ -205,11 +198,11 @@ static void swim_dping_recv_ult(hg_handle_t handle)
SSG_DEBUG(g, "SWIM: send dping ack to %d\n", (int)dping_req.msg.source_id);
/* respond to sender of the dping req */
margo_respond(ssg_inst->mid, handle, &dping_resp);
margo_respond(handle, &dping_resp);
HG_Free_input(handle, &dping_req);
margo_free_input(handle, &dping_req);
fini:
HG_Destroy(handle);
margo_destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(swim_dping_recv_ult)
......@@ -250,8 +243,7 @@ void swim_iping_send_ult(
if(target_addr == HG_ADDR_NULL)
return;
hret = HG_Create(margo_get_context(ssg_inst->mid), target_addr, swim_iping_rpc_id,
&handle);
hret = margo_create(ssg_inst->mid, target_addr, swim_iping_rpc_id, &handle);
if(hret != HG_SUCCESS)
return;
......@@ -269,11 +261,11 @@ void swim_iping_send_ult(
* the dping timeout, which should cause this iping to timeout
* right at the end of the current protocol period.
*/
hret = margo_forward_timed(ssg_inst->mid, handle, &iping_req,
hret = margo_forward_timed(handle, &iping_req,
(swim_ctx->prot_period_len - swim_ctx->dping_timeout));
if (hret == HG_SUCCESS)
{
hret = HG_Get_output(handle, &iping_resp);
hret = margo_get_output(handle, &iping_resp);
if(hret != HG_SUCCESS) goto fini;
SSG_DEBUG(g, "SWIM: recv iping ack from %d (target=%d)\n",
......@@ -289,7 +281,7 @@ void swim_iping_send_ult(
if(swim_ctx->ping_target == iping_req.target_id)
swim_ctx->ping_target_acked = 1;
HG_Free_output(handle, &iping_resp);
margo_free_output(handle, &iping_resp);
}
else if(hret != HG_TIMEOUT)
{
......@@ -298,7 +290,7 @@ void swim_iping_send_ult(
}
fini:
HG_Destroy(handle);
margo_destroy(handle);
return;
}
......@@ -306,21 +298,18 @@ static void swim_iping_recv_ult(hg_handle_t handle)
{
ssg_group_t *g;
swim_context_t *swim_ctx;
const struct hg_info *info;
swim_iping_req_t iping_req;
swim_iping_resp_t iping_resp;
hg_return_t hret;
int ret;
/* get the swim state */
info = HG_Get_info(handle);
if(info == NULL) goto fini;
g = (ssg_group_t *)HG_Registered_data(info->hg_class, swim_dping_rpc_id);
g = (ssg_group_t *)margo_registered_data(ssg_inst->mid, swim_dping_rpc_id);
assert(g != NULL);
swim_ctx = (swim_context_t *)g->fd_ctx;
assert(swim_ctx != NULL);
hret = HG_Get_input(handle, &iping_req);
hret = margo_get_input(handle, &iping_req);
if(hret != HG_SUCCESS) goto fini;
SSG_DEBUG(g, "SWIM: recv iping req from %d (target=%d)\n",
......@@ -342,12 +331,12 @@ static void swim_iping_recv_ult(hg_handle_t handle)
(int)iping_req.msg.source_id, (int)iping_req.target_id);
/* respond to sender of the iping req */
margo_respond(ssg_inst->mid, handle, &iping_resp);
margo_respond(handle, &iping_resp);
}
HG_Free_input(handle, &iping_req);
margo_free_input(handle, &iping_req);
fini:
HG_Destroy(handle);
margo_destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(swim_iping_recv_ult)
......
......@@ -2,10 +2,8 @@ check_PROGRAMS += \
tests/ssg-test-simple \
tests/ssg-test-attach
tests_ssg_test_simple_SOURCES = tests/ssg-test-simple.c
tests_ssg_test_simple_LDADD = src/libssg.la
tests_ssg_test_attach_SOURCES = tests/ssg-test-attach.c
tests_ssg_test_attach_LDADD = src/libssg.la
if SSG_HAVE_MPI
......
......@@ -4,6 +4,8 @@
* See COPYRIGHT in top-level directory.
*/
#include "ssg-config.h"
#include <unistd.h>
#include <stdio.h>
#include <string.h>
......@@ -12,7 +14,9 @@
#include <mpi.h>
#include <margo.h>
#ifdef HAVE_ABT_SNOOZER
#include <abt-snoozer.h>
#endif
#include <mercury.h>
#include <abt.h>
#include <ssg.h>
......@@ -21,6 +25,10 @@
struct options
{
int iterations;
int snoozer_flag_client;
int snoozer_flag_server;
unsigned int mercury_timeout_client;
unsigned int mercury_timeout_server;
char* diag_file_name;
char* na_transport;
};
......@@ -28,8 +36,7 @@ struct options
static void parse_args(int argc, char **argv, struct options *opts);
static void usage(void);
static int run_benchmark(int iterations, hg_id_t id, ssg_member_id_t target,
ssg_group_id_t gid, margo_instance_id mid, hg_context_t *hg_context,
double *measurement_array);
ssg_group_id_t gid, margo_instance_id mid, double *measurement_array);
static void bench_routine_print(const char* op, int size, int iterations,
double* measurement_array);
static int measurement_cmp(const void* a, const void *b);
......@@ -46,11 +53,12 @@ int main(int argc, char **argv)
int nranks;
hg_context_t *hg_context;
hg_class_t *hg_class;
ABT_xstream xstream;
ABT_pool pool;
int ret;
ssg_group_id_t gid;
ssg_member_id_t self;
int rank;
hg_bool_t flag;
double *measurement_array;
int namelen;
char processor_name[MPI_MAX_PROCESSOR_NAME];
......@@ -88,27 +96,56 @@ int main(int argc, char **argv)
return(-1);
}
/* set primary ES to idle without polling */
ret = ABT_snoozer_xstream_self_set();
if((rank == 0 && g_opts.snoozer_flag_client) ||
(rank == 1 && g_opts.snoozer_flag_server))
{
#ifdef HAVE_ABT_SNOOZER
/* set primary ES to idle without polling in scheduler */
ret = ABT_snoozer_xstream_self_set();
if(ret != 0)
{
fprintf(stderr, "Error: ABT_snoozer_xstream_self_set()\n");
return(-1);
}
#else
fprintf(stderr, "Error: abt-snoozer scheduler is not supported\n");
return(-1);
#endif
}
/* get main pool for running mercury progress and RPC handlers */
ret = ABT_xstream_self(&xstream);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_xstream_self()\n");
return(-1);
}
ret = ABT_xstream_get_main_pools(xstream, 1, &pool);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_snoozer_xstream_self_set()\n");
fprintf(stderr, "Error: ABT_xstream_get_main_pools()\n");
return(-1);
}
/* actually start margo */