Commit 4926f777 authored by David Rich's avatar David Rich Committed by Rob Latham
Browse files

Use margo API for everything rather than mixing margo and HG. Probably doesn't matter.

parent b4cdbb8c
......@@ -65,14 +65,14 @@ int kv_open(kv_context *context, char * server, char *name,
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, context->svr_addr,
context->open_id, &handle);
context->open_id, &handle);
assert(ret == HG_SUCCESS);
open_in.name = name;
ret = margo_forward(handle, &open_in);
assert(ret == HG_SUCCESS);
ret = HG_Get_output(handle, &open_out);
ret = margo_get_output(handle, &open_out);
assert(ret == HG_SUCCESS);
ret = open_out.ret;
assert(ret == HG_SUCCESS);
......@@ -100,9 +100,11 @@ int kv_open(kv_context *context, char * server, char *name,
ret = margo_create(context->mid, context->svr_addr,
context->shutdown_id, &(context->shutdown_handle) );
assert(ret == HG_SUCCESS);
margo_free_output(handle, &open_out);
margo_destroy(handle);
return ret;
return HG_SUCCESS;
}
/* we gave types in the open call. Will need to maintain in 'context' the
......@@ -116,9 +118,9 @@ int kv_put(kv_context *context, void *key, void *value) {
put_in.value = *(int*)value;
ret = margo_forward(context->put_handle, &put_in);
assert(ret == HG_SUCCESS);
ret = HG_Get_output(context->put_handle, &put_out);
ret = margo_get_output(context->put_handle, &put_out);
assert(ret == HG_SUCCESS);
HG_Free_output(context->put_handle, &put_out);
margo_free_output(context->put_handle, &put_out);
return ret;
}
......@@ -134,10 +136,10 @@ int kv_bulk_put(kv_context *context, void *key, void *data, hg_size_t data_size)
assert(ret == HG_SUCCESS);
ret = margo_forward(context->bulk_put_handle, &bpin);
assert(ret == HG_SUCCESS);
ret = HG_Get_output(context->bulk_put_handle, &bpout);
ret = margo_get_output(context->bulk_put_handle, &bpout);
assert(ret == HG_SUCCESS);
assert(bpout.ret == HG_SUCCESS); // make sure the server side says all is OK
HG_Free_output(context->bulk_put_handle, &bpout);
margo_free_output(context->bulk_put_handle, &bpout);
return HG_SUCCESS;
}
......@@ -151,10 +153,10 @@ int kv_get(kv_context *context, void *key, void *value)
get_in.key = *(int*)key;
ret = margo_forward(context->get_handle, &get_in);
assert(ret == HG_SUCCESS);
ret = HG_Get_output(context->get_handle, &get_out);
ret = margo_get_output(context->get_handle, &get_out);
*(int*) value = get_out.value;
assert(ret == HG_SUCCESS);
HG_Free_output(context->get_handle, &get_out);
margo_free_output(context->get_handle, &get_out);
return ret;
}
......@@ -171,10 +173,10 @@ int kv_bulk_get(kv_context *context, void *key, void *data, hg_size_t data_size)
assert(ret == HG_SUCCESS);
ret = margo_forward(context->bulk_get_handle, &bgin);
assert(ret == HG_SUCCESS);
ret = HG_Get_output(context->bulk_get_handle, &bgout);
ret = margo_get_output(context->bulk_get_handle, &bgout);
assert(ret == HG_SUCCESS);
assert(bgout.ret == HG_SUCCESS); // make sure the server side says all is OK
HG_Free_output(context->bulk_get_handle, &bgout);
margo_free_output(context->bulk_get_handle, &bgout);
return HG_SUCCESS;
}
......@@ -191,10 +193,11 @@ int kv_close(kv_context *context)
assert(ret == HG_SUCCESS);
ret = margo_forward(handle, &close_in);
assert(ret == HG_SUCCESS);
ret = HG_Get_output(handle, &close_out);
ret = margo_get_output(handle, &close_out);
assert(ret == HG_SUCCESS);
HG_Free_output(handle, &close_out);
HG_Destroy(handle);
margo_free_output(handle, &close_out);
margo_destroy(handle);
return HG_SUCCESS;
}
......@@ -212,11 +215,14 @@ bench_result *kv_benchmark(kv_context *context, int count) {
bench_in.count = count;
ret = margo_create(context->mid, context->svr_addr,
context->bench_id, &handle);
context->bench_id, &handle);
assert(ret == HG_SUCCESS);
ret = margo_forward(context->bench_handle, &bench_in);
assert(ret == HG_SUCCESS);
ret = HG_Get_output(context->bench_handle, &bench_out);
ret = margo_get_output(context->bench_handle, &bench_out);
margo_free_output(handle, &bench_out);
margo_destroy(handle);
result = malloc(sizeof(bench_result));
result->nkeys = bench_out.result.nkeys;
......@@ -234,12 +240,12 @@ bench_result *kv_benchmark(kv_context *context, int count) {
int kv_client_deregister(kv_context *context) {
int ret;
HG_Destroy(context->put_handle);
HG_Destroy(context->get_handle);
HG_Destroy(context->bulk_put_handle);
HG_Destroy(context->bulk_get_handle);
HG_Destroy(context->bench_handle);
HG_Destroy(context->shutdown_handle);
margo_destroy(context->put_handle);
margo_destroy(context->get_handle);
margo_destroy(context->bulk_put_handle);
margo_destroy(context->bulk_get_handle);
margo_destroy(context->bench_handle);
margo_destroy(context->shutdown_handle);
assert(ret == HG_SUCCESS);
ret = margo_addr_free(context->mid, context->svr_addr);
......
......@@ -162,13 +162,13 @@ BwTree<uint64_t, std::vector<char>,
const char *my_db = "kv-test-db";
static hg_return_t open_handler(hg_handle_t h)
static hg_return_t open_handler(hg_handle_t handle)
{
hg_return_t ret;
open_in_t in;
open_out_t out;
ret = margo_get_input(h, &in);
ret = margo_get_input(handle, &in);
if (strcmp(in.name, my_db) == 0) {
if (!TREE) {
......@@ -201,64 +201,62 @@ static hg_return_t open_handler(hg_handle_t h)
* away with sloppy casting. Not sure how to do the same with a C++
* template. */
// this works
ret = HG_Respond(h, NULL, NULL, &out);
// but this did not?
//mid = margo_hg_class_to_instance(info->hg_class);
//ret = margo_respond(mid, h, &out);
ret = margo_respond(handle, &out);
assert(ret == HG_SUCCESS);
HG_Free_input(h, &in);
HG_Destroy(h);
margo_free_input(handle, &in);
margo_destroy(handle);
return HG_SUCCESS;
}
DEFINE_MARGO_RPC_HANDLER(open_handler)
static hg_return_t close_handler(hg_handle_t h)
static hg_return_t close_handler(hg_handle_t handle)
{
hg_return_t ret;
close_in_t in;
close_out_t out;
ret = HG_Get_input(h, &in);
ret = margo_get_input(handle, &in);
assert(ret == HG_SUCCESS);
ret = HG_Respond(h, NULL, NULL, &out);
ret = margo_respond(handle, &out);
assert(ret == HG_SUCCESS);
HG_Free_input(h, &in);
HG_Destroy(h);
margo_free_input(handle, &in);
margo_destroy(handle);
return HG_SUCCESS;
}
DEFINE_MARGO_RPC_HANDLER(close_handler)
static hg_return_t put_handler(hg_handle_t h)
static hg_return_t put_handler(hg_handle_t handle)
{
hg_return_t ret;
put_in_t in;
put_out_t out;
ret = HG_Get_input(h, &in);
ret = margo_get_input(handle, &in);
assert(ret == HG_SUCCESS);
std::vector<char> data;
data.resize(sizeof(in.value));
memcpy(data.data(), &in.value, sizeof(in.value));
TREE->Insert(in.key, data);
assert(ret == HG_SUCCESS);
ret = HG_Respond(h, NULL, NULL, &out);
ret = margo_respond(handle, &out);
assert(ret == HG_SUCCESS);
HG_Free_input(h, &in);
HG_Destroy(h);
margo_free_input(handle, &in);
margo_destroy(handle);
return HG_SUCCESS;
}
DEFINE_MARGO_RPC_HANDLER(put_handler)
static hg_return_t bulk_put_handler(hg_handle_t h)
static hg_return_t bulk_put_handler(hg_handle_t handle)
{
hg_return_t ret;
bulk_put_in_t bpin;
......@@ -267,11 +265,12 @@ static hg_return_t bulk_put_handler(hg_handle_t h)
const struct hg_info *hgi;
margo_instance_id mid;
ret = HG_Get_input(h, &bpin);
ret = margo_get_input(handle, &bpin);
assert(ret == HG_SUCCESS);
printf("SERVER: BULK PUT key = %lu size = %lu\n", bpin.key, bpin.size);
/* get handle info and margo instance */
hgi = margo_get_info(h);
hgi = margo_get_info(handle);
assert(hgi);
mid = margo_hg_info_get_instance(hgi);
assert(mid != MARGO_INSTANCE_NULL);
......@@ -296,25 +295,25 @@ static hg_return_t bulk_put_handler(hg_handle_t h)
}
bpout.ret = ret;
ret = HG_Respond(h, NULL, NULL, &bpout);
ret = margo_respond(handle, &bpout);
assert(ret == HG_SUCCESS);
HG_Free_input(h, &bpin);
margo_free_input(handle, &bpin);
margo_bulk_free(bulk_handle);
HG_Destroy(h);
margo_destroy(handle);
return HG_SUCCESS;
}
DEFINE_MARGO_RPC_HANDLER(bulk_put_handler)
static hg_return_t get_handler(hg_handle_t h)
static hg_return_t get_handler(hg_handle_t handle)
{
hg_return_t ret;
get_in_t in;
get_out_t out;
ret = HG_Get_input(h, &in);
ret = margo_get_input(handle, &in);
assert(ret == HG_SUCCESS);
/*void GetValue (const KeyType &search_key, std::vector< ValueType > &value_list) */
......@@ -341,17 +340,17 @@ static hg_return_t get_handler(hg_handle_t h)
out.ret = HG_OTHER_ERROR;
}
ret = HG_Respond(h, NULL, NULL, &out);
ret = margo_respond(handle, &out);
assert(ret == HG_SUCCESS);
HG_Free_input(h, &in);
HG_Destroy(h);
margo_free_input(handle, &in);
margo_destroy(handle);
return HG_SUCCESS;
}
DEFINE_MARGO_RPC_HANDLER(get_handler)
static hg_return_t bulk_get_handler(hg_handle_t h)
static hg_return_t bulk_get_handler(hg_handle_t handle)
{
hg_return_t ret;
bulk_get_in_t bgin;
......@@ -360,7 +359,7 @@ static hg_return_t bulk_get_handler(hg_handle_t h)
const struct hg_info *hgi;
margo_instance_id mid;
ret = HG_Get_input(h, &bgin);
ret = margo_get_input(handle, &bgin);
assert(ret == HG_SUCCESS);
/* void GetValue (const KeyType &search_key, std::vector< ValueType > &value_list) */
......@@ -376,7 +375,7 @@ static hg_return_t bulk_get_handler(hg_handle_t h)
bgout.size = data.size();
if (bgout.size <= bgin.size) {
/* get handle info and margo instance */
hgi = margo_get_info(h);
hgi = margo_get_info(handle);
assert(hgi);
mid = margo_hg_info_get_instance(hgi);
assert(mid != MARGO_INSTANCE_NULL);
......@@ -406,12 +405,12 @@ static hg_return_t bulk_get_handler(hg_handle_t h)
bgout.ret = HG_OTHER_ERROR;
}
ret = HG_Respond(h, NULL, NULL, &bgout);
ret = margo_respond(handle, &bgout);
assert(ret == HG_SUCCESS);
HG_Free_input(h, &bgin);
margo_free_input(handle, &bgin);
margo_bulk_free(bulk_handle);
HG_Destroy(h);
margo_destroy(handle);
return HG_SUCCESS;
}
......@@ -528,15 +527,14 @@ static void RandomInsertSpeedTest(size_t key_num, bench_result *results)
return;
}
static hg_return_t bench_handler(hg_handle_t h)
static hg_return_t bench_handler(hg_handle_t handle)
{
hg_return_t ret = HG_SUCCESS;
bench_in_t bench_in;
bench_out_t bench_out;
bench_result random_insert;
ret = HG_Get_input(h, &bench_in);
ret = margo_get_input(handle, &bench_in);
assert(ret == HG_SUCCESS);
printf("benchmarking %d keys\n", bench_in.count);
RandomInsertSpeedTest(bench_in.count, &random_insert);
......@@ -545,10 +543,10 @@ static hg_return_t bench_handler(hg_handle_t h)
bench_out.result.read_time = random_insert.read_time;
bench_out.result.overhead = random_insert.overhead;
ret = HG_Respond(h, NULL, NULL, &bench_out);
ret = margo_respond(handle, NULL);
HG_Free_input(h, &bench_in);
HG_Destroy(h);
margo_free_input(handle, &bench_in);
margo_destroy(handle);
return ret;
}
DEFINE_MARGO_RPC_HANDLER(bench_handler)
......
......@@ -34,40 +34,33 @@ static void usage()
int main(int argc, char *argv[])
{
hg_class_t *hgcl = NULL;
hg_context_t *hgctx = NULL;
int sleep_time = 0;
char client_addr_str[128];
char server_addr_str[128];
hg_size_t server_addr_str_sz = 128;
hg_handle_t handle = HG_HANDLE_NULL;
hg_return_t hret;
hg_addr_t server_addr;
int ret;
int rank;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm clientComm;
if (rank == 0) {
char server_addr_str[128];
hg_size_t server_addr_str_sz = 128;
hg_addr_t server_addr;
hg_return_t hret;
MPI_Comm_split(MPI_COMM_WORLD, MPI_UNDEFINED, rank, &clientComm);
// kv-server
kv_context *context = kv_server_register(argv[1]);
hgctx = margo_get_context(context->mid);
hgcl = HG_Context_get_class(hgctx);
hret = margo_addr_self(context->mid, &server_addr);
DIE_IF(hret != HG_SUCCESS, "margo_addr_self");
// get server address
hret = HG_Addr_self(hgcl, &server_addr);
DIE_IF(hret != HG_SUCCESS, "HG_Addr_self");
hret = HG_Addr_to_string(hgcl, server_addr_str, &server_addr_str_sz, server_addr);
DIE_IF(hret != HG_SUCCESS, "HG_Addr_to_string");
HG_Addr_free(hgcl, server_addr);
printf("server (rank %d): server addr_str: %s\n", rank, server_addr_str);
hret = margo_addr_to_string(context->mid, server_addr_str, &server_addr_str_sz, server_addr);
DIE_IF(hret != HG_SUCCESS, "margo_addr_to_string");
margo_addr_free(context->mid, server_addr);
// broadcast (send) server address to all clients
printf("server (rank %d): server addr_str: %s\n", rank, server_addr_str);
MPI_Bcast(server_addr_str, 128, MPI_BYTE, 0, MPI_COMM_WORLD);
// process requests until finalized
......@@ -78,19 +71,26 @@ int main(int argc, char *argv[])
printf("rank %d: server deregistered\n", rank);
}
else {
int sleep_time = 0;
char server_addr_str[128];
char client_addr_str[128];
hg_return_t hret;
MPI_Comm_split(MPI_COMM_WORLD, 1, rank, &clientComm);
// broadcast (recv) server address
MPI_Bcast(server_addr_str, 128, MPI_BYTE, 0, MPI_COMM_WORLD);
printf("client (rank %d): server addr_str: %s\n", rank, server_addr_str);
// kv-client
sprintf(client_addr_str, "cci+tcp://localhost:534%02d", rank);
sprintf(client_addr_str, "cci+tcp://534%02d", rank);
printf("client (rank %d): client addr_str: %s\n", rank, client_addr_str);
kv_context *context = kv_client_register(client_addr_str);
// open specified "DB" (pass in the server's address)
const char *db = "kv-test-db";
ret = kv_open(context, server_addr_str, (char*)db, KV_UINT, KV_BULK);
hret = kv_open(context, server_addr_str, (char*)db, KV_UINT, KV_BULK);
DIE_IF(hret != HG_SUCCESS, "kv_open");
// put
uint64_t key = rank;
......@@ -98,7 +98,8 @@ int main(int argc, char *argv[])
std::vector<char> put_data;
put_data.resize(sizeof(put_val));
memcpy(put_data.data(), &put_val, sizeof(put_val));
ret = kv_bulk_put(context, (void*)&key, (void*)put_data.data(), put_data.size());
hret = kv_bulk_put(context, (void*)&key, (void*)put_data.data(), put_data.size());
DIE_IF(hret != HG_SUCCESS, "kv_bulk_put");
sleep(2);
......@@ -106,12 +107,14 @@ int main(int argc, char *argv[])
int get_val;
std::vector<char> get_data;
get_data.resize(sizeof(get_val));
ret = kv_bulk_get(context, (void*)&key, (void*)get_data.data(), get_data.size());
hret = kv_bulk_get(context, (void*)&key, (void*)get_data.data(), get_data.size());
DIE_IF(hret != HG_SUCCESS, "kv_bulk_get");
memcpy(&get_val, get_data.data(), sizeof(get_val));
printf("key: %lu in: %d out: %d\n", key, put_val, get_val);
// close
ret = kv_close(context);
hret = kv_close(context);
DIE_IF(hret != HG_SUCCESS, "kv_close");
// once all clients are done, one client can signal server
MPI_Barrier(clientComm);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment