Commit 1d24041d authored by David Rich's avatar David Rich Committed by Rob Latham
Browse files

Got test-mpi working using bulk transfer. These are all fixups related to that testing.

parent e12321ad
......@@ -19,21 +19,29 @@ noinst_HEADERS = src/BwTree/src/bwtree.h \
check_PROGRAMS = test/test-client \
test/test-server \
test/bench-client
test/bench-client \
test/test-mpi
test_bench_client_SOURCES = test/bench-client.cc
test_bench_client_CPPFLAGS = ${CPPFLAGS} -I${srcdir}/src
test_bench_client_LDADD = ${LIBS} -lkvclient
TESTS = test/test-client \
test/test-server
test/test-server \
test/bench-client \
test/test-mpi
test_test_client_CPPFLAGS = ${CPPFLAGS} -I${srcdir}/src
test_test_client_LDADD = ${LIBS} -lkvclient
test_test_server_CPPFLAGS = ${CPPFLAGS} -I${srcdir}/src
test_test_server_LDADD = ${LIBS} -lkvserver
test_test_server_LDADD = ${LIBS} -lkvserver -lstdc++
test_test_mpi_SOURCES = test/test-mpi.cc
test_test_mpi_CPPFLAGS = ${CPPFLAGS} -I${srcdir}/src
test_test_mpi_LDADD = ${LIBS} -lkvclient -lkvserver
pkgconfigdir = $(libdir)/pkgconfig
pkgconfig_DATA = maint/kv-server.pc \
maint/kv-client.pc
......@@ -28,9 +28,18 @@ kv_context *kv_client_register(char *addr_str) {
context->put_id = MARGO_REGISTER(context->mid, "put",
put_in_t, put_out_t, NULL);
context->put_id = MARGO_REGISTER(context->mid, "put",
put_in_t, put_out_t, NULL);
context->bulk_put_id = MARGO_REGISTER(context->mid, "bulk_put",
bulk_put_in_t, bulk_put_out_t, NULL);
context->get_id = MARGO_REGISTER(context->mid, "get",
get_in_t, get_out_t, NULL);
context->bulk_get_id = MARGO_REGISTER(context->mid, "bulk_get",
bulk_get_in_t, bulk_get_out_t, NULL);
context->open_id = MARGO_REGISTER(context->mid, "open",
open_in_t, open_out_t, NULL);
......@@ -68,19 +77,19 @@ int kv_open(kv_context *context, char * server, char *name,
* BAKE has a handle-caching mechanism we should consult.
* should margo be caching handles? */
ret = margo_create(context->mid, context->svr_addr,
context->put_id, &(context->put_handle) );
context->put_id, &(context->put_handle) );
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, context->svr_addr,
context->put_id, &(context->bulk_put_handle) );
context->bulk_put_id, &(context->bulk_put_handle) );
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, context->svr_addr,
context->get_id, &(context->get_handle) );
context->get_id, &(context->get_handle) );
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, context->svr_addr,
context->get_id, &(context->bulk_get_handle) );
context->bulk_get_id, &(context->bulk_get_handle) );
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, context->svr_addr,
context->bench_id, &(context->bench_handle) );
context->bench_id, &(context->bench_handle) );
margo_free_output(handle, &open_out);
margo_destroy(handle);
......@@ -156,7 +165,7 @@ int kv_bulk_get(kv_context *context, void *key, void *data, hg_size_t data_size)
ret = HG_Get_output(context->bulk_get_handle, &bgout);
assert(ret == HG_SUCCESS);
assert(bgout.ret == HG_SUCCESS); // make sure the server side says all is OK
HG_Free_output(context->get_handle, &bgout);
HG_Free_output(context->bulk_get_handle, &bgout);
return HG_SUCCESS;
}
......@@ -169,7 +178,7 @@ int kv_close(kv_context *context)
put_out_t close_out;
ret = margo_create(context->mid, context->svr_addr,
context->close_id, &handle);
context->close_id, &handle);
assert(ret == HG_SUCCESS);
ret = margo_forward(handle, &close_in);
assert(ret == HG_SUCCESS);
......@@ -179,8 +188,11 @@ int kv_close(kv_context *context)
HG_Destroy(context->put_handle);
HG_Destroy(context->get_handle);
HG_Destroy(context->bulk_put_handle);
HG_Destroy(context->bulk_get_handle);
HG_Destroy(context->bench_handle);
HG_Destroy(handle);
return ret;
return HG_SUCCESS;
}
bench_result *kv_benchmark(kv_context *context, int count) {
......@@ -217,8 +229,14 @@ bench_result *kv_benchmark(kv_context *context, int count) {
}
int kv_client_deregister(kv_context *context) {
margo_addr_free(context->mid, context->svr_addr);
margo_finalize(context->mid);
free(context);
return 0;
int ret;
ret = kv_close(context);
assert(ret == HG_SUCCESS);
ret = margo_addr_free(context->mid, context->svr_addr);
assert(ret == HG_SUCCESS);
margo_finalize(context->mid);
free(context);
return HG_SUCCESS;
}
......@@ -20,6 +20,7 @@ typedef int kv_id;
typedef enum {
KV_INT,
KV_UINT,
KV_FLOAT,
KV_DOUBLE,
KV_STRING,
......@@ -31,7 +32,9 @@ typedef struct kv_context_s {
margo_instance_id mid;
hg_addr_t svr_addr;
hg_id_t put_id;
hg_id_t bulk_put_id;
hg_id_t get_id;
hg_id_t bulk_get_id;
hg_id_t open_id;
hg_id_t close_id;
hg_id_t bench_id;
......@@ -128,8 +131,10 @@ int kv_server_deregister(kv_context *context);
int kv_open(kv_context *context, char *server, char *name,
kv_type keytype, kv_type valtype);
int kv_put(kv_context *context, void *key, void *value);
int kv_bulk_put(kv_context *context, void *key, void *data, hg_size_t data_size);
int kv_get(kv_context *context, void *key, void *value);
bench_result *kv_benchmark(kv_context *context, int count);
int kv_bulk_get(kv_context *context, void *key, void *data, hg_size_t data_size);
int kv_close(kv_context *context);
......
......@@ -3,7 +3,7 @@
int main(int argc, char **argv) {
int ret;
kv_context * context = kv_client_register(argc, argv);
kv_context * context = kv_client_register(NULL);
/* open */
ret = kv_open(context, argv[1], "booger", KV_INT, KV_INT);
......
[eth2]
# use this example for TCP
transport = tcp
interface = eth0 # switch this to eth2 or to an external hostname for non-localhost use
0 /home/dor/sds-keyval/test/test-server
1 /home/dor/sds-keyval/test/test-client tcp://192.168.100.49:52345
/*
* Copyright (c) 2017, Los Alamos National Security, LLC.
* All rights reserved.
*
*/
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include <mpi.h>
#include <margo.h>
#include <mercury.h>
#include <abt.h>
#include "sds-keyval.h"
#include <vector>
#define DIE_IF(cond_expr, err_fmt, ...) \
do { \
if (cond_expr) { \
fprintf(stderr, "ERROR at %s:%d (" #cond_expr "): " \
err_fmt "\n", __FILE__, __LINE__, ##__VA_ARGS__); \
exit(1); \
} \
} while(0)
static void usage()
{
fprintf(stderr, "Usage: test-mpi <addr>\n");
}
int main(int argc, char *argv[])
{
hg_class_t *hgcl = NULL;
hg_context_t *hgctx = NULL;
int sleep_time = 0;
char client_addr_str[128];
char server_addr_str[128];
hg_size_t server_addr_str_sz = 128;
hg_handle_t handle = HG_HANDLE_NULL;
hg_return_t hret;
hg_addr_t server_addr;
int ret;
int rank;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
if (rank == 0) {
// kv-server
kv_context *context = kv_server_register(argv[1]);
hgctx = margo_get_context(context->mid);
hgcl = HG_Context_get_class(hgctx);
// get server address
hret = HG_Addr_self(hgcl, &server_addr);
DIE_IF(hret != HG_SUCCESS, "HG_Addr_self");
hret = HG_Addr_to_string(hgcl, server_addr_str, &server_addr_str_sz, server_addr);
DIE_IF(hret != HG_SUCCESS, "HG_Addr_to_string");
HG_Addr_free(hgcl, server_addr);
printf("server (rank %d): server add_str: %s\n", rank, server_addr_str);
// broadcast (send) server address to all clients
MPI_Bcast(server_addr_str, 128, MPI_BYTE, 0, MPI_COMM_WORLD);
// process requests until finalized
margo_wait_for_finalize(context->mid);
// now shutdown
printf("rank %d: server deregistering\n", rank);
kv_server_deregister(context);
printf("rank %d: server deregistered\n", rank);
}
else {
// broadcast (recv) server address
MPI_Bcast(server_addr_str, 128, MPI_BYTE, 0, MPI_COMM_WORLD);
printf("client (rank %d): server add_str: %s\n", rank, server_addr_str);
// kv-client
sprintf(client_addr_str, "cci+tcp://localhost:5345%d", rank);
printf("client (rank %d): client add_str: %s\n", rank, client_addr_str);
kv_context *context = kv_client_register(client_addr_str);
sleep(2*rank);
// open specified "DB" (pass in the server's address)
char *db = "kv-test-db";
ret = kv_open(context, server_addr_str, db, KV_UINT, KV_BULK);
// put
uint64_t key = rank;
int put_val = rank;
std::vector<char> put_data;
put_data.resize(sizeof(put_val));
memcpy(put_data.data(), &put_val, sizeof(put_val));
ret = kv_bulk_put(context, (void*)&key, (void*)put_data.data(), put_data.size());
// get
int get_val;
std::vector<char> get_data;
get_data.resize(sizeof(get_val));
ret = kv_bulk_get(context, (void*)&key, (void*)get_data.data(), get_data.size());
memcpy(&get_val, get_data.data(), sizeof(get_val));
printf("key: %lu in: %d out: %d\n", key, put_val, get_val);
// close
ret = kv_close(context);
printf("rank %d: client deregistering\n", rank);
kv_client_deregister(context);
printf("rank %d: client deregistered\n", rank);
}
MPI_Finalize();
printf("rank %d: finalized\n", rank);
return 0;
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment