Commit 9b87991f authored by David Rich's avatar David Rich Committed by Rob Latham
Browse files

Build/test fixups. Note that there are 2 workarounds in place so...

Build/test fixups. Note that there are 2 workarounds in place so test-mpi-group terminates cleanly: the ssg_finalize in the kv-server shutdown_handler and the commented out margo_finalize in the kvgroup_client_deregister function.
parent 366359a6
ACLOCAL_AMFLAGS="-Im4" ACLOCAL_AMFLAGS="-Im4"
pkgconfigdir = $(libdir)/pkgconfig
pkgconfig_DATA = maint/sds-keyval.pc
bin_PROGRAMS = test/bench-client \ bin_PROGRAMS = test/bench-client \
test/test-client \ test/test-client \
test/test-server test/test-server
test_bench_client_SOURCES = test/bench-client.cc test_bench_client_SOURCES = test/bench-client.cc
test_bench_client_CXXFLAGS = -pthread -std=c++11 -g -Wall -mcx16 -Wno-invalid-offsetof test_bench_client_CXXFLAGS = -pthread -std=c++11 -g -Wall -mcx16 -Wno-invalid-offsetof
test_bench_client_CPPFLAGS = ${CPPFLAGS} -I${srcdir}/src test_bench_client_CPPFLAGS = ${CPPFLAGS} -I${srcdir}/src
test_bench_client_DEPENDENCIES = libkvclient.la test_bench_client_DEPENDENCIES = lib/libkvclient.la
test_bench_client_LDADD = -lkvclient test_bench_client_LDFLAGS = -Llib -lkvclient
lib_LTLIBRARIES = libkvclient.la \ lib_LTLIBRARIES = lib/libkvclient.la \
libkvserver.la lib/libkvserver.la \
lib/libkvgroupclient.la \
lib/libkvgroupserver.la
lib_LTGLIBRARIES = libkvgroupclient.la \ lib_libkvclient_la_SOURCES = src/kv-client.c
libkvgroupserver.la
libkvclient_la_SOURCES = src/kv-client.c lib_libkvgroupclient_la_SOURCES = src/kvgroup-client.cc
lib_libkvgroupclient_la_CXXFLAGS = -pthread -std=c++11 -g -Wall -mcx16 -Wno-invalid-offsetof
lib_libkvgroupclient_la_CPPFLAGS = ${CPPFLAGS}
libkvgroupclient_la_SOURCES = src/kvgroup-client.cc lib_libkvserver_la_SOURCES = src/kv-server.cc \
libkvgroupclient_la_CXXFLAGS = -pthread -std=c++11 -g -Wall -mcx16 -Wno-invalid-offsetof
libkvgroupclient_la_CPPFLAGS = ${CPPFLAGS}
libkvserver_la_SOURCES = src/kv-server.cc \
src/datastore.cc \ src/datastore.cc \
src/BwTree/src/bwtree.cpp src/BwTree/src/bwtree.cpp
libkvserver_la_CXXFLAGS = -pthread -std=c++11 -g -Wall -mcx16 -Wno-invalid-offsetof
libkvserver_la_CPPFLAGS = ${CPPFLAGS} -I${srcdir}/src/BwTree/src
include_HEADERS = src/sds-keyval.h \ include_HEADERS = src/sds-keyval.h \
src/sds-keyval-group.h src/sds-keyval-group.h
...@@ -34,15 +34,18 @@ noinst_HEADERS = src/BwTree/src/bwtree.h \ ...@@ -34,15 +34,18 @@ noinst_HEADERS = src/BwTree/src/bwtree.h \
src/BwTree/src/atomic_stack.h\ src/BwTree/src/atomic_stack.h\
src/BwTree/src/bloom_filter.h \ src/BwTree/src/bloom_filter.h \
src/BwTree/src/sorted_small_set.h src/BwTree/src/sorted_small_set.h
libkvgroupserver_la_SOURCES = src/kvgroup-server.cc lib_libkvserver_la_CPPFLAGS = ${CPPFLAGS} -I${srcdir}/src/BwTree/src
libkvgroupserver_la_CXXFLAGS = -pthread -std=c++11 -g -Wall -mcx16 -Wno-invalid-offsetof
libkvgroupserver_la_CPPFLAGS = ${CPPFLAGS} lib_libkvgroupserver_la_SOURCES = src/kvgroup-server.cc
lib_libkvgroupserver_la_CXXFLAGS = -pthread -std=c++11 -g -Wall -mcx16 -Wno-invalid-offsetof
lib_libkvgroupserver_la_CPPFLAGS = ${CPPFLAGS}
check_PROGRAMS = test/test-client \ check_PROGRAMS = test/test-client \
test/test-server \ test/test-server \
test/bench-client \ test/bench-client \
test/test-mpi test/test-mpi \
test/test-mpi-group
test_bench_client_SOURCES = test/bench-client.cc test_bench_client_SOURCES = test/bench-client.cc
test_bench_client_CPPFLAGS = ${CPPFLAGS} -I${srcdir}/src test_bench_client_CPPFLAGS = ${CPPFLAGS} -I${srcdir}/src
...@@ -51,26 +54,35 @@ test_bench_client_LDADD = ${LIBS} -lkvclient ...@@ -51,26 +54,35 @@ test_bench_client_LDADD = ${LIBS} -lkvclient
TESTS = test/test-client \ TESTS = test/test-client \
test/test-server \ test/test-server \
test/bench-client \ test/bench-client \
test/test-mpi test/test-mpi \
test/test-mpi-group
test_test_client_SOURCES = test/test-client.cc test_test_client_SOURCES = test/test-client.cc
test_test_client_CXXFLAGS = -pthread -std=c++11 -g -Wall -mcx16 -Wno-invalid-offsetof test_test_client_CXXFLAGS = -pthread -std=c++11 -g -Wall -mcx16 -Wno-invalid-offsetof
test_test_client_CPPFLAGS = ${CPPFLAGS} -I${srcdir}/src test_test_client_CPPFLAGS = ${CPPFLAGS} -I${srcdir}/src
test_test_client_DEPENDENCIES = libkvclient.la test_test_client_DEPENDENCIES = lib/libkvclient.la
test_test_client_LDADD = -lkvclient test_test_client_LDFLAGS = -Llib -lkvclient
test_test_server_SOURCES = test/test-server.cc test_test_server_SOURCES = test/test-server.cc
test_test_server_CXXFLAGS = -pthread -std=c++11 -g -Wall -mcx16 -Wno-invalid-offsetof test_test_server_CXXFLAGS = -pthread -std=c++11 -g -Wall -mcx16 -Wno-invalid-offsetof
test_test_server_CPPFLAGS = ${CPPFLAGS} -I${srcdir}/src test_test_server_CPPFLAGS = ${CPPFLAGS} -I${srcdir}/src
test_test_server_DEPENDENCIES = libkvserver.la test_test_server_DEPENDENCIES = lib/libkvserver.la
test_test_server_LDADD = -lkvserver -ldb -ldb_cxx -ldb_stl -lleveldb -lsnappy -lboost_filesystem -lboost_system test_test_server_LDFLAGS = -Llib -lkvserver -lssg -lleveldb -ldb_cxx -lboost_filesystem -lboost_system
test_test_mpi_SOURCES = test/test-mpi.cc test_test_mpi_SOURCES = test/test-mpi.cc
test_test_mpi_CXXFLAGS = -pthread -std=c++11 -g -Wall -mcx16 -Wno-invalid-offsetof test_test_mpi_CXXFLAGS = -pthread -std=c++11 -g -Wall -mcx16 -Wno-invalid-offsetof
test_test_mpi_CPPFLAGS = ${CPPFLAGS} -I${srcdir}/src test_test_mpi_CPPFLAGS = ${CPPFLAGS} -I${srcdir}/src
test_test_mpi_DEPENDENCIES = ${lib_LTLIBRARIES}
test_test_mpi_LDADD = -lkvclient -lkvserver -ldb -ldb_cxx -ldb_stl -lleveldb -lsnappy -lboost_filesystem -lboost_system
test_test_mpi_DEPENDENCIES = lib/libkvserver.la lib/libkvclient.la
test_test_mpi_LDFLAGS = -Llib -lkvclient -lkvserver -lssg -lleveldb -ldb_cxx -lboost_filesystem -lboost_system
test_test_mpi_group_SOURCES = test/test-mpi-group.cc
test_test_mpi_group_CXXFLAGS = -pthread -std=c++11 -g -Wall -mcx16 -Wno-invalid-offsetof
test_test_mpi_group_CPPFLAGS = ${CPPFLAGS} -I${srcdir}/src
test_test_mpi_group_DEPENDENCIES = lib/libkvgroupserver.la lib/libkvgroupclient.la lib/libkvserver.la lib/libkvclient.la
test_test_mpi_group_LDFLAGS = -Llib -lkvgroupserver -lkvgroupclient -lkvclient -lkvserver -lssg -lch-placement -lleveldb -ldb_cxx -lboost_filesystem -lboost_system
pkgconfigdir = $(libdir)/pkgconfig pkgconfigdir = $(libdir)/pkgconfig
pkgconfig_DATA = maint/kv-server.pc \ pkgconfig_DATA = maint/kv-server.pc \
......
...@@ -28,12 +28,6 @@ if test "x$PKG_CONFIG" == "x"; then ...@@ -28,12 +28,6 @@ if test "x$PKG_CONFIG" == "x"; then
AC_MSG_ERROR([Could not find pkg-config utility!]) AC_MSG_ERROR([Could not find pkg-config utility!])
fi fi
PKG_CHECK_MODULES([MARGO],[margo],[],
AC_MSG_ERROR([Could not find working margo installation!]) )
LIBS="$MARGO_LIBS $LIBS"
CPPFLAGS="$MARGO_CFLAGS $CPPFLAGS"
CFLAGS="$MARGO_CFLAGS $CFLAGS"
# Checks for typedefs, structures, and compiler characteristics. # Checks for typedefs, structures, and compiler characteristics.
AC_C_INLINE AC_C_INLINE
AC_TYPE_INT64_T AC_TYPE_INT64_T
......
...@@ -6,7 +6,6 @@ ...@@ -6,7 +6,6 @@
#include <assert.h> #include <assert.h>
// pass in Margo instance ID // pass in Margo instance ID
kv_context_t *kv_client_register(const margo_instance_id mid) { kv_context_t *kv_client_register(const margo_instance_id mid) {
hg_return_t ret; hg_return_t ret;
...@@ -39,6 +38,7 @@ kv_context_t *kv_client_register(const margo_instance_id mid) { ...@@ -39,6 +38,7 @@ kv_context_t *kv_client_register(const margo_instance_id mid) {
context->shutdown_id = MARGO_REGISTER(context->mid, "shutdown", context->shutdown_id = MARGO_REGISTER(context->mid, "shutdown",
void, void, NULL); void, void, NULL);
return context; return context;
} }
......
...@@ -5,12 +5,13 @@ ...@@ -5,12 +5,13 @@
#include <margo.h> #include <margo.h>
#include <abt-snoozer.h> #include <abt-snoozer.h>
#include <abt.h> #include <abt.h>
#include <assert.h> #include <ssg.h>
//#include <random> //#include <random>
#include <stdlib.h> #include <stdlib.h>
#include <time.h> #include <time.h>
#include <iostream> #include <iostream>
#include <assert.h>
// since this is global, we're assuming this server instance will manage a single DB // since this is global, we're assuming this server instance will manage a single DB
AbstractDataStore *datastore = NULL; AbstractDataStore *datastore = NULL;
...@@ -86,7 +87,7 @@ static hg_return_t put_handler(hg_handle_t handle) ...@@ -86,7 +87,7 @@ static hg_return_t put_handler(hg_handle_t handle)
hg_return_t ret; hg_return_t ret;
put_in_t pin; put_in_t pin;
put_out_t pout; put_out_t pout;
double st1, et1, st2, et2; double st1, et1;
st1 = ABT_get_wtime(); st1 = ABT_get_wtime();
ret = margo_get_input(handle, &pin); ret = margo_get_input(handle, &pin);
...@@ -136,7 +137,7 @@ static hg_return_t bulk_put_handler(hg_handle_t handle) ...@@ -136,7 +137,7 @@ static hg_return_t bulk_put_handler(hg_handle_t handle)
hg_bulk_t bulk_handle; hg_bulk_t bulk_handle;
const struct hg_info *hgi; const struct hg_info *hgi;
margo_instance_id mid; margo_instance_id mid;
double st1, et1, st2, et2; double st1, et1;
st1 = ABT_get_wtime(); st1 = ABT_get_wtime();
ret = margo_get_input(handle, &bpin); ret = margo_get_input(handle, &bpin);
...@@ -204,7 +205,7 @@ static hg_return_t get_handler(hg_handle_t handle) ...@@ -204,7 +205,7 @@ static hg_return_t get_handler(hg_handle_t handle)
hg_return_t ret; hg_return_t ret;
get_in_t gin; get_in_t gin;
get_out_t gout; get_out_t gout;
double st1, et1, st2, et2; double st1, et1;
st1 = ABT_get_wtime(); st1 = ABT_get_wtime();
ret = margo_get_input(handle, &gin); ret = margo_get_input(handle, &gin);
...@@ -265,7 +266,7 @@ static hg_return_t bulk_get_handler(hg_handle_t handle) ...@@ -265,7 +266,7 @@ static hg_return_t bulk_get_handler(hg_handle_t handle)
hg_bulk_t bulk_handle; hg_bulk_t bulk_handle;
const struct hg_info *hgi; const struct hg_info *hgi;
margo_instance_id mid; margo_instance_id mid;
double st1, et1, st2, et2; double st1, et1;
st1 = ABT_get_wtime(); st1 = ABT_get_wtime();
ret = margo_get_input(handle, &bgin); ret = margo_get_input(handle, &bgin);
...@@ -355,6 +356,7 @@ static void shutdown_handler(hg_handle_t handle) ...@@ -355,6 +356,7 @@ static void shutdown_handler(hg_handle_t handle)
* RPC executes, so there is no need to send any * RPC executes, so there is no need to send any
* extra signal to notify it. * extra signal to notify it.
*/ */
ssg_finalize(); // ignore return and should be a no-op?
margo_finalize(mid); margo_finalize(mid);
std::cout << "SERVER: margo finalized" << std::endl; std::cout << "SERVER: margo finalized" << std::endl;
......
...@@ -9,7 +9,7 @@ unsigned long server_indexes[CH_MAX_REPLICATION]; ...@@ -9,7 +9,7 @@ unsigned long server_indexes[CH_MAX_REPLICATION];
kvgroup_context_t *kvgroup_client_register(margo_instance_id mid, ssg_group_id_t gid) kvgroup_context_t *kvgroup_client_register(margo_instance_id mid, ssg_group_id_t gid)
{ {
kvgroup_context_t context = (kvgroup_context_t*)malloc(sizeof(kvgroup_context_t)); kvgroup_context_t *context = (kvgroup_context_t*)malloc(sizeof(kvgroup_context_t));
memset(context, 0, sizeof(kvgroup_context_t)); memset(context, 0, sizeof(kvgroup_context_t));
int sret = ssg_init(mid); int sret = ssg_init(mid);
...@@ -27,25 +27,27 @@ kvgroup_context_t *kvgroup_client_register(margo_instance_id mid, ssg_group_id_t ...@@ -27,25 +27,27 @@ kvgroup_context_t *kvgroup_client_register(margo_instance_id mid, ssg_group_id_t
hg_return_t kvgroup_open(kvgroup_context_t *context, const char *db_name) hg_return_t kvgroup_open(kvgroup_context_t *context, const char *db_name)
{ {
int addr_str_sz = 128; hg_size_t addr_str_sz = 128;
char addr_str[addr_str_sz]; char addr_str[addr_str_sz];
hg_return_t ret = HG_SUCCESS; hg_return_t ret = HG_SUCCESS;
// register and open a connection with each kv-server in the group // register and open a connection with each kv-server in the group
hg_size_t gsize = ssg_get_group_size(context->gid); hg_size_t gsize = ssg_get_group_size(context->gid);
context->gize = gsize; context->gsize = gsize;
context->kv_context = (kv_context_t*)malloc(gsize*sizeof(context->kv_context)); context->kv_context = (kv_context_t**)malloc(gsize*sizeof(kv_context_t*));
for (hg_size_t i=0; i<gsize; i++) { for (hg_size_t i=0; i<gsize; i++) {
// register this client context with Margo // register this client context with Margo
context->kv_context[i] = kv_client_register(context->mid); context->kv_context[i] = kv_client_register(context->mid);
assert(context->kv_context[i] != NULL); assert(context->kv_context[i] != NULL);
hg_addr_t server_addr = ssg_get_addr(gid, i); hg_addr_t server_addr = ssg_get_addr(context->gid, i);
std::string dbname(db_name);
dbname += std::string(".") + std::string(i); // each session uses unique db name
// turn server_addr into string // turn server_addr into string
ret = margo_addr_to_string(context->mid, addr_str, &addr_str_sz, server_addr); ret = margo_addr_to_string(context->mid, addr_str, &addr_str_sz, server_addr);
assert(ret == HG_SUCCESS); assert(ret == HG_SUCCESS);
margo_addr_free(context->mid, server_addr);
std::string dbname(db_name);
dbname += std::string(".") + std::to_string(i); // each session uses unique db name
// open client connection with this server // open client connection with this server
std::cout << "request open of " << dbname << " from server " << addr_str << std::endl;
ret = kv_open(context->kv_context[i], addr_str, dbname.c_str()); ret = kv_open(context->kv_context[i], addr_str, dbname.c_str());
assert(ret == HG_SUCCESS); assert(ret == HG_SUCCESS);
} }
...@@ -53,7 +55,7 @@ hg_return_t kvgroup_open(kvgroup_context_t *context, const char *db_name) ...@@ -53,7 +55,7 @@ hg_return_t kvgroup_open(kvgroup_context_t *context, const char *db_name)
// initialize consistent hash using "hash_lookup3" with gsize servers each with 1 virtual node for now // initialize consistent hash using "hash_lookup3" with gsize servers each with 1 virtual node for now
context->ch_instance = ch_placement_initialize("hash_lookup3", gsize, 4, 0); context->ch_instance = ch_placement_initialize("hash_lookup3", gsize, 4, 0);
return HG_SUCCESS: return HG_SUCCESS;
} }
// oid is unique associated with key // oid is unique associated with key
...@@ -65,7 +67,7 @@ hg_return_t kvgroup_put(kvgroup_context_t *context, uint64_t oid, ...@@ -65,7 +67,7 @@ hg_return_t kvgroup_put(kvgroup_context_t *context, uint64_t oid,
// not using any replication for now (is this right?) // not using any replication for now (is this right?)
ch_placement_find_closest(context->ch_instance, oid, 1, server_indexes); ch_placement_find_closest(context->ch_instance, oid, 1, server_indexes);
kv_context_t kv_context = context->kv_context[server_indexes[0]]; kv_context_t *kv_context = context->kv_context[server_indexes[0]];
return kv_put(kv_context, key, ksize, value, vsize); return kv_put(kv_context, key, ksize, value, vsize);
} }
...@@ -79,7 +81,7 @@ hg_return_t kvgroup_get(kvgroup_context_t *context, uint64_t oid, ...@@ -79,7 +81,7 @@ hg_return_t kvgroup_get(kvgroup_context_t *context, uint64_t oid,
{ {
// not using any replication for now (is this right?) // not using any replication for now (is this right?)
ch_placement_find_closest(context->ch_instance, oid, 1, server_indexes); ch_placement_find_closest(context->ch_instance, oid, 1, server_indexes);
kv_context_t kv_context = context->kv_context[server_indexes[0]]; kv_context_t *kv_context = context->kv_context[server_indexes[0]];
return kv_get(kv_context, key, ksize, value, vsize); return kv_get(kv_context, key, ksize, value, vsize);
} }
...@@ -104,8 +106,12 @@ hg_return_t kvgroup_client_deregister(kvgroup_context_t *context) ...@@ -104,8 +106,12 @@ hg_return_t kvgroup_client_deregister(kvgroup_context_t *context)
ch_placement_finalize(context->ch_instance); ch_placement_finalize(context->ch_instance);
ssg_group_detach(context->gid); ssg_group_detach(context->gid);
ssg_finalize(); ssg_finalize();
margo_finalize(context->mid); margo_diag_dump(context->mid, "-", 0);
//margo_finalize(context->mid); // workaround since we core dump here
ssg_group_id_free(context->gid);
free(context->kv_context);
free(context); free(context);
return HG_SUCCESS;
} }
// only one client calls shutdown // only one client calls shutdown
...@@ -113,7 +119,7 @@ hg_return_t kvgroup_client_signal_shutdown(kvgroup_context_t *context) ...@@ -113,7 +119,7 @@ hg_return_t kvgroup_client_signal_shutdown(kvgroup_context_t *context)
{ {
hg_return_t ret; hg_return_t ret;
for (hg_size_t i=0; i<context->gsize; i++) { for (hg_size_t i=0; i<context->gsize; i++) {
ret = kv_client_signal_shutudown(context->kv_context[i]); ret = kv_client_signal_shutdown(context->kv_context[i]);
assert(ret == HG_SUCCESS); assert(ret == HG_SUCCESS);
} }
return HG_SUCCESS; return HG_SUCCESS;
......
...@@ -11,24 +11,25 @@ static void group_update_cb(ssg_membership_update_t update, void *cb_dat) ...@@ -11,24 +11,25 @@ static void group_update_cb(ssg_membership_update_t update, void *cb_dat)
int my_world_rank = *(int *)cb_dat; int my_world_rank = *(int *)cb_dat;
if (update.type == SSG_MEMBER_ADD) if (update.type == SSG_MEMBER_ADD)
printf("%d SSG update: ADD member %"PRIu64"\n", my_world_rank, update.member); printf("%d SSG update: ADD member %lu\n", my_world_rank, update.member);
else if (update.type == SSG_MEMBER_REMOVE) else if (update.type == SSG_MEMBER_REMOVE)
printf("%d SSG update: REMOVE member %"PRIu64"\n", my_world_rank, update.member); printf("%d SSG update: REMOVE member %lu\n", my_world_rank, update.member);
return; return;
} }
/* this is a collective operation */ /* this is a collective operation */
kvgroup_context_t *kvgroup_server_register(margo_instance_id mid, const char *ssg_name, MPI_comm ssg_comm) kvgroup_context_t *kvgroup_server_register(margo_instance_id mid, const char *ssg_name, MPI_Comm ssg_comm)
{ {
kvgroup_context_t context = (kvgroup_context_t*)malloc(sizeof(kvgroup_context_t)); kvgroup_context_t *context = (kvgroup_context_t*)malloc(sizeof(kvgroup_context_t));
memset(context, 0, sizeof(kvgroup_context_t)); memset(context, 0, sizeof(kvgroup_context_t));
/* update kvgroup_context_t with MID */ /* update kvgroup_context_t with MID */
context->mid = mid; context->mid = mid;
context->kv_context = kv_server_register(mid); context->kv_context = (kv_context_t**)malloc(sizeof(kv_context_t*)); // just 1
assert(context->kv_context != NULL); context->kv_context[0] = kv_server_register(mid);
assert(context->kv_context[0] != NULL);
int sret = ssg_init(mid); int sret = ssg_init(mid);
assert(sret == SSG_SUCCESS); assert(sret == SSG_SUCCESS);
...@@ -46,8 +47,9 @@ kvgroup_context_t *kvgroup_server_register(margo_instance_id mid, const char *ss ...@@ -46,8 +47,9 @@ kvgroup_context_t *kvgroup_server_register(margo_instance_id mid, const char *ss
hg_return_t kvgroup_server_deregister(kvgroup_context_t *context) hg_return_t kvgroup_server_deregister(kvgroup_context_t *context)
{ {
hg_return_t ret = kv_server_deregister(context->kv_context); hg_return_t ret = kv_server_deregister(context->kv_context[0]);
ssg_group_destroy(context->gid); ssg_group_destroy(context->gid);
free(context->kv_context);
free(context); free(context);
std::cout << "GROUP_SERVER: deregistered" << std::endl; std::cout << "GROUP_SERVER: deregistered" << std::endl;
return ret; return ret;
...@@ -55,7 +57,7 @@ hg_return_t kvgroup_server_deregister(kvgroup_context_t *context) ...@@ -55,7 +57,7 @@ hg_return_t kvgroup_server_deregister(kvgroup_context_t *context)
hg_return_t kvgroup_server_wait_for_shutdown(kvgroup_context_t *context) hg_return_t kvgroup_server_wait_for_shutdown(kvgroup_context_t *context)
{ {
hg_return_t ret = kv_server_wait_for_shutdown(context->kv_context); hg_return_t ret = kv_server_wait_for_shutdown(context->kv_context[0]);
return ret; return ret;
} }
......
...@@ -15,14 +15,14 @@ extern "C" { ...@@ -15,14 +15,14 @@ extern "C" {
#endif #endif
typedef struct kvgroup_context_s { typedef struct kvgroup_context_s {
kv_context_t *kv_context; kv_context_t **kv_context;
margo_instance_id mid; margo_instance_id mid;
ssg_group_id_t gid; // SSG ID ssg_group_id_t gid; // SSG ID
hg_size_t gsize; // size of SSG hg_size_t gsize; // size of SSG
struct ch_placement_instance *ch_instance; struct ch_placement_instance *ch_instance;
} kvgroup_context_t; } kvgroup_context_t;
static char *kvgroup_protocol(ssg_group_id_t gid) { static inline char *kvgroup_protocol(ssg_group_id_t gid) {
char *addr_str; char *addr_str;
int psize = 24; int psize = 24;
...@@ -36,14 +36,14 @@ static char *kvgroup_protocol(ssg_group_id_t gid) { ...@@ -36,14 +36,14 @@ static char *kvgroup_protocol(ssg_group_id_t gid) {
assert(addr_str != NULL); assert(addr_str != NULL);
/* we only need to the proto portion of the address to initialize */ /* we only need to the proto portion of the address to initialize */
for(int i=0; i<proto_size && addr_str[i] != '\0' && addr_str[i] != ':'; i++) for(int i=0; i<psize && addr_str[i] != '\0' && addr_str[i] != ':'; i++)
protocol[i] = addr_str[i]; protocol[i] = addr_str[i];
return protocol; return protocol;
} }
kvgroup_context_t *kvgroup_server_register(margo_instance_id mid, kvgroup_context_t *kvgroup_server_register(margo_instance_id mid,
const char *ssg_name, MPI_comm ssg_comm); const char *ssg_name, MPI_Comm ssg_comm);
hg_return_t kvgroup_server_deregister(kvgroup_context_t *context); hg_return_t kvgroup_server_deregister(kvgroup_context_t *context);
hg_return_t kvgroup_server_wait_for_shutdown(kvgroup_context_t *context); hg_return_t kvgroup_server_wait_for_shutdown(kvgroup_context_t *context);
......
...@@ -313,14 +313,14 @@ hg_return_t kv_close(kv_context_t *context); ...@@ -313,14 +313,14 @@ hg_return_t kv_close(kv_context_t *context);
// benchmark routine // benchmark routine
bench_result_t *kv_benchmark(kv_context_t *context, int32_t count); bench_result_t *kv_benchmark(kv_context_t *context, int32_t count);
static char *kv_protocol(char *addr_str) { static inline char *kv_protocol(char *addr_str) {
int psize = 24; int psize = 24;
char *protocol = (char*)malloc(psize); char *protocol = (char*)malloc(psize);
memset(protocol, 0, psize); memset(protocol, 0, psize);
/* we only need to the proto portion of the address to initialize */ /* we only need to the proto portion of the address to initialize */
for(int i=0; i<proto_size && addr_str[i] != '\0' && addr_str[i] != ':'; i++) for(int i=0; i<psize && addr_str[i] != '\0' && addr_str[i] != ':'; i++)
protocol[i] = addr_str[i]; protocol[i] = addr_str[i];
return protocol; return protocol;
...@@ -329,17 +329,18 @@ static char *kv_protocol(char *addr_str) { ...@@ -329,17 +329,18 @@ static char *kv_protocol(char *addr_str) {
// pass in address string and mode // pass in address string and mode
// string can be just the protocol (e.g. "ofi+tcp") // string can be just the protocol (e.g. "ofi+tcp")
// mode can be MARGO_CLIENT_MODE or MARGO_SERVER_MODE // mode can be MARGO_CLIENT_MODE or MARGO_SERVER_MODE
static margo_instance_id kv_margo_init(const char *addr_str, int mode) static inline margo_instance_id kv_margo_init(const char *addr_str, int mode)
{ {
margo_instance_id mid; margo_instance_id mid;
assert(addr_str != NULL); // better pass something! assert(addr_str != NULL); // better pass something!
assert(mode == MARGO_CLIENT_MODE || mode == MARGO_SERVER_MODE); assert(mode == MARGO_CLIENT_MODE || mode == MARGO_SERVER_MODE);
mid = margo_init(addr_str, mode, 0, -1); mid = margo_init(addr_str, mode, 0, -1);
assert(mid != MARGO_INSTANCE_NULL); assert(mid != MARGO_INSTANCE_NULL);
margo_diag_start(mid); // initialize diagnostic support
return mid; return mid;
} }
static void kv_margo_finalize(mid) static inline void kv_margo_finalize(margo_instance_id mid)
{ {
margo_finalize(mid); margo_finalize(mid);
} }
......
...@@ -44,7 +44,7 @@ int main(int argc, char *argv[]) ...@@ -44,7 +44,7 @@ int main(int argc, char *argv[])
MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &nranks); MPI_Comm_size(MPI_COMM_WORLD, &nranks);
assert(nranks >= (num_servers+2)); // insist on at least 2 clients assert(nranks >= (num_servers+1)); // insist on at least 1 clients
MPI_Comm clientComm, ssgComm; MPI_Comm clientComm, ssgComm;
...@@ -83,9 +83,9 @@ int main(int argc, char *argv[]) ...@@ -83,9 +83,9 @@ int main(int argc, char *argv[])
ssg_group_id_serialize(context->gid, &serialized_gid, &gid_size); ssg_group_id_serialize(context->gid, &serialized_gid, &gid_size);
assert(serialized_gid != NULL && gid_size != 0); assert(serialized_gid != NULL && gid_size != 0);
// send size first // send size first
MPI_Bcast(&gid_size, 1, M