Commit f37f6c7d authored by David Rich's avatar David Rich Committed by Rob Latham
Browse files

First cut at adding distributed support. Implemented as separate client/server...

First cut at adding distributed support. Implemented as separate client/server libs for now. Required some tweaking of the keyval API (see sds-keyval.h). May not compile yet.
parent 3be8f07c
......@@ -12,20 +12,32 @@ test_bench_client_LDADD = -lkvclient
lib_LTLIBRARIES = libkvclient.la \
libkvserver.la
lib_LTGLIBRARIES = libkvgroupclient.la \
libkvgroupserver.la
libkvclient_la_SOURCES = src/kv-client.c
libkvgroupclient_la_SOURCES = src/kvgroup-client.cc
libkvgroupclient_la_CXXFLAGS = -pthread -std=c++11 -g -Wall -mcx16 -Wno-invalid-offsetof
libkvgroupclient_la_CPPFLAGS = ${CPPFLAGS}
libkvserver_la_SOURCES = src/kv-server.cc \
src/datastore.cc \
src/BwTree/src/bwtree.cpp
libkvserver_la_CXXFLAGS = -pthread -std=c++11 -g -Wall -mcx16 -Wno-invalid-offsetof
libkvserver_la_CPPFLAGS = ${CPPFLAGS} -I${srcdir}/src/BwTree/src
include_HEADERS = src/sds-keyval.h
include_HEADERS = src/sds-keyval.h \
src/sds-keyval-group.h
noinst_HEADERS = src/BwTree/src/bwtree.h \
src/BwTree/src/atomic_stack.h\
src/BwTree/src/bloom_filter.h \
src/BwTree/src/sorted_small_set.h
libkvgroupserver_la_SOURCES = src/kvgroup-server.cc
libkvgroupserver_la_CXXFLAGS = -pthread -std=c++11 -g -Wall -mcx16 -Wno-invalid-offsetof
libkvgroupserver_la_CPPFLAGS = ${CPPFLAGS}
check_PROGRAMS = test/test-client \
test/test-server \
......
......@@ -7,48 +7,39 @@
#include <assert.h>
// pass in NULL pointer to get default behavior
kv_context_t *kv_client_register(const char *addr_str) {
hg_return_t ret;
kv_context_t * context;
context = (kv_context_t*)malloc(sizeof(kv_context_t));
memset(context, 0, sizeof(kv_context_t));
/* client side: no custom xstreams */
if (!addr_str) {
context->mid = margo_init("ofi+tcp://",
MARGO_CLIENT_MODE, 0, -1);
}
else {
context->mid = margo_init(addr_str,
MARGO_CLIENT_MODE, 0, -1);
}
// pass in Margo instance ID
kv_context_t *kv_client_register(const margo_instance_id mid) {
hg_return_t ret;
kv_context_t *context;
context = (kv_context_t*)malloc(sizeof(kv_context_t));
memset(context, 0, sizeof(kv_context_t));
context->put_id = MARGO_REGISTER(context->mid, "put",
put_in_t, put_out_t, NULL);
context->mid = mid;
context->put_id = MARGO_REGISTER(context->mid, "put",
put_in_t, put_out_t, NULL);
context->bulk_put_id = MARGO_REGISTER(context->mid, "bulk_put",
bulk_put_in_t, bulk_put_out_t, NULL);
context->bulk_put_id = MARGO_REGISTER(context->mid, "bulk_put",
bulk_put_in_t, bulk_put_out_t, NULL);
context->get_id = MARGO_REGISTER(context->mid, "get",
get_in_t, get_out_t, NULL);
context->get_id = MARGO_REGISTER(context->mid, "get",
get_in_t, get_out_t, NULL);
context->bulk_get_id = MARGO_REGISTER(context->mid, "bulk_get",
bulk_get_in_t, bulk_get_out_t, NULL);
context->bulk_get_id = MARGO_REGISTER(context->mid, "bulk_get",
bulk_get_in_t, bulk_get_out_t, NULL);
context->open_id = MARGO_REGISTER(context->mid, "open",
open_in_t, open_out_t, NULL);
context->open_id = MARGO_REGISTER(context->mid, "open",
open_in_t, open_out_t, NULL);
context->close_id = MARGO_REGISTER(context->mid, "close",
void, close_out_t, NULL);
context->close_id = MARGO_REGISTER(context->mid, "close",
void, close_out_t, NULL);
context->bench_id = MARGO_REGISTER(context->mid, "bench",
bench_in_t, bench_out_t, NULL);
context->bench_id = MARGO_REGISTER(context->mid, "bench",
bench_in_t, bench_out_t, NULL);
context->shutdown_id = MARGO_REGISTER(context->mid, "shutdown",
void, void, NULL);
return context;
context->shutdown_id = MARGO_REGISTER(context->mid, "shutdown",
void, void, NULL);
return context;
}
hg_return_t kv_open(kv_context_t *context, const char *server_addr, const char *db_name) {
......@@ -327,17 +318,15 @@ hg_return_t kv_client_deregister(kv_context_t *context) {
margo_destroy(context->bulk_get_handle);
margo_destroy(context->shutdown_handle);
assert(ret == HG_SUCCESS);
ret = margo_addr_free(context->mid, context->svr_addr);
assert(ret == HG_SUCCESS);
margo_finalize(context->mid);
free(context);
return HG_SUCCESS;
}
// only one client calls shutdown
hg_return_t kv_client_signal_shutdown(kv_context_t *context) {
hg_return_t ret;
......
......@@ -13,7 +13,7 @@
#include <iostream>
// since this is global, we're assuming this server instance will manage a single DB
AbstractDataStore *datastore = NULL; // created by caller, passed into kv_server_register
AbstractDataStore *datastore = NULL;
std::string db_name;
static hg_return_t open_handler(hg_handle_t handle)
......@@ -471,26 +471,19 @@ static hg_return_t bench_handler(hg_handle_t handle)
DEFINE_MARGO_RPC_HANDLER(bench_handler)
#endif
kv_context *kv_server_register(margo_instance_id mid);
kv_context_t *kv_server_register(const margo_instance_id mid)
{
hg_return_t ret;
hg_addr_t addr_self;
char addr_self_string[128];
hg_size_t addr_self_string_sz = 128;
kv_context_t *context;
kv_context_t *context = NULL;
/* sds keyval server init */
context = (kv_context_t*)malloc(sizeof(kv_context_t));
memset(context, 0, sizeof(kv_context_t));
if (!addr_str) {
context->mid = margo_init("ofi+tcp://",
MARGO_SERVER_MODE, 0, -1);
}
else {
context->mid = margo_init(addr_str,
MARGO_SERVER_MODE, 0, -1);
}
assert(context->mid);
context->mid = mid;
/* figure out what address this server is listening on */
ret = margo_addr_self(context->mid, &addr_self);
......
#include "sds-keyval-group.h"
#include <stdlib.h>
#include <time.h>
#include <assert.h>
#include <iostream>
unsigned long server_indexes[CH_MAX_REPLICATION];
kvgroup_context_t *kvgroup_client_register(margo_instance_id mid, ssg_group_id_t gid)
{
kvgroup_context_t context = (kvgroup_context_t*)malloc(sizeof(kvgroup_context_t));
memset(context, 0, sizeof(kvgroup_context_t));
int sret = ssg_init(mid);
assert(sret == SSG_SUCCESS);
sret = ssg_group_attach(gid);
assert(sret == SSG_SUCCESS);
/* update kvgroup_context_t with MID, GID */
context->mid = mid;
context->gid = gid;
return context;
}
hg_return_t kvgroup_open(kvgroup_context_t *context, const char *db_name)
{
int addr_str_sz = 128;
char addr_str[addr_str_sz];
hg_return_t ret = HG_SUCCESS;
// register and open a connection with each kv-server in the group
hg_size_t gsize = ssg_get_group_size(context->gid);
context->gize = gsize;
context->kv_context = (kv_context_t*)malloc(gsize*sizeof(context->kv_context));
for (hg_size_t i=0; i<gsize; i++) {
// register this client context with Margo
context->kv_context[i] = kv_client_register(context->mid);
assert(context->kv_context[i] != NULL);
hg_addr_t server_addr = ssg_get_addr(gid, i);
std::string dbname(db_name);
dbname += std::string(".") + std::string(i); // each session uses unique db name
// turn server_addr into string
ret = margo_addr_to_string(context->mid, addr_str, &addr_str_sz, server_addr);
assert(ret == HG_SUCCESS);
// open client connection with this server
ret = kv_open(context->kv_context[i], addr_str, dbname.c_str());
assert(ret == HG_SUCCESS);
}
// initialize consistent hash using "hash_lookup3" with gsize servers each with 1 virtual node for now
context->ch_instance = ch_placement_initialize("hash_lookup3", gsize, 4, 0);
return HG_SUCCESS:
}
// oid is unique associated with key
// in ParSplice key is already a uint64_t hashed value
hg_return_t kvgroup_put(kvgroup_context_t *context, uint64_t oid,
void *key, hg_size_t ksize,
void *value, hg_size_t vsize)
{
// not using any replication for now (is this right?)
ch_placement_find_closest(context->ch_instance, oid, 1, server_indexes);
kv_context_t kv_context = context->kv_context[server_indexes[0]];
return kv_put(kv_context, key, ksize, value, vsize);
}
// oid is unique associated with key
// in ParSplice key is already a uint64_t hashed value
// vsize is in/out
hg_return_t kvgroup_get(kvgroup_context_t *context, uint64_t oid,
void *key, hg_size_t ksize,
void *value, hg_size_t *vsize)
{
// not using any replication for now (is this right?)
ch_placement_find_closest(context->ch_instance, oid, 1, server_indexes);
kv_context_t kv_context = context->kv_context[server_indexes[0]];
return kv_get(kv_context, key, ksize, value, vsize);
}
hg_return_t kvgroup_close(kvgroup_context_t *context)
{
hg_return_t ret;
for (hg_size_t i=0; i<context->gsize; i++) {
ret = kv_close(context->kv_context[i]);
assert(ret == HG_SUCCESS);
}
return HG_SUCCESS;
}
hg_return_t kvgroup_client_deregister(kvgroup_context_t *context)
{
hg_return_t ret;
for (hg_size_t i=0; i<context->gsize; i++) {
ret = kv_client_deregister(context->kv_context[i]);
assert(ret == HG_SUCCESS);
}
ch_placement_finalize(context->ch_instance);
ssg_group_detach(context->gid);
ssg_finalize();
margo_finalize(context->mid);
free(context);
}
// only one client calls shutdown
hg_return_t kvgroup_client_signal_shutdown(kvgroup_context_t *context)
{
hg_return_t ret;
for (hg_size_t i=0; i<context->gsize; i++) {
ret = kv_client_signal_shutudown(context->kv_context[i]);
assert(ret == HG_SUCCESS);
}
return HG_SUCCESS;
}
#include "sds-keyval-group.h"
#include <stdlib.h>
#include <time.h>
#include <assert.h>
#include <iostream>
/* a stub for now */
static void group_update_cb(ssg_membership_update_t update, void *cb_dat)
{
int my_world_rank = *(int *)cb_dat;
if (update.type == SSG_MEMBER_ADD)
printf("%d SSG update: ADD member %"PRIu64"\n", my_world_rank, update.member);
else if (update.type == SSG_MEMBER_REMOVE)
printf("%d SSG update: REMOVE member %"PRIu64"\n", my_world_rank, update.member);
return;
}
/* this is a collective operation */
kvgroup_context_t *kvgroup_server_register(margo_instance_id mid, const char *ssg_name, MPI_comm ssg_comm)
{
kvgroup_context_t context = (kvgroup_context_t*)malloc(sizeof(kvgroup_context_t));
memset(context, 0, sizeof(kvgroup_context_t));
/* update kvgroup_context_t with MID */
context->mid = mid;
context->kv_context = kv_server_register(mid);
assert(context->kv_context != NULL);
int sret = ssg_init(mid);
assert(sret == SSG_SUCCESS);
int my_world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &my_world_rank);
ssg_group_id_t gid = ssg_group_create_mpi(ssg_name, ssg_comm, &group_update_cb, &my_world_rank);
assert(gid != SSG_GROUP_ID_NULL);
/* update kvgroup_context_t with GID */
context->gid = gid;
return context;
}
hg_return_t kvgroup_server_deregister(kvgroup_context_t *context)
{
hg_return_t ret = kv_server_deregister(context->kv_context);
ssg_group_destroy(context->gid);
free(context);
std::cout << "GROUP_SERVER: deregistered" << std::endl;
return ret;
}
hg_return_t kvgroup_server_wait_for_shutdown(kvgroup_context_t *context)
{
hg_return_t ret = kv_server_wait_for_shutdown(context->kv_context);
return ret;
}
#include "sds-keyval.h"
/* initial implementation requires MPI */
#include <mpi.h>
#include <ssg.h>
#include <ssg-mpi.h>
#include "ch-placement.h"
#ifndef sds_keyval_group_h
#define sds_keyval_group_h
#if defined(__cplusplus)
extern "C" {
#endif
typedef struct kvgroup_context_s {
kv_context_t *kv_context;
margo_instance_id mid;
ssg_group_id_t gid; // SSG ID
hg_size_t gsize; // size of SSG
struct ch_placement_instance *ch_instance;
} kvgroup_context_t;
static char *kvgroup_protocol(ssg_group_id_t gid) {
char *addr_str;
int psize = 24;
char *protocol = (char*)malloc(psize);
memset(protocol, 0, psize);
/* figure out protocol to connect with using address information
* associated with the SSG group ID
*/
addr_str = ssg_group_id_get_addr_str(gid);
assert(addr_str != NULL);
/* we only need to the proto portion of the address to initialize */
for(int i=0; i<proto_size && addr_str[i] != '\0' && addr_str[i] != ':'; i++)
protocol[i] = addr_str[i];
return protocol;
}
kvgroup_context_t *kvgroup_server_register(margo_instance_id mid,
const char *ssg_name, MPI_comm ssg_comm);
hg_return_t kvgroup_server_deregister(kvgroup_context_t *context);
hg_return_t kvgroup_server_wait_for_shutdown(kvgroup_context_t *context);
kvgroup_context_t *kvgroup_client_register(margo_instance_id mid, ssg_group_id_t gid);
hg_return_t kvgroup_open(kvgroup_context_t *context, const char *db_name);
hg_return_t kvgroup_put(kvgroup_context_t *context, uint64_t oid,
void *key, hg_size_t ksize,
void *value, hg_size_t vsize);
hg_return_t kvgroup_get(kvgroup_context_t *context, uint64_t oid,
void *key, hg_size_t ksize,
void *value, hg_size_t *vsize);
hg_return_t kvgroup_close(kvgroup_context_t *context);
hg_return_t kvgroup_client_deregister(kvgroup_context_t *context);
hg_return_t kvgroup_client_signal_shutdown(kvgroup_context_t *context);
#if defined(__cplusplus)
}
#endif
#endif // sds_keyval_group_h
......@@ -293,8 +293,8 @@ MERCURY_GEN_PROC(bench_out_t, ((bench_result)(result)) )
// kv-client API
kv_context_t *kv_client_register(const char *addr_str);
kv_context_t *kv_server_register(const char *addr_str);
kv_context_t *kv_client_register(const margo_instance_id mid);
kv_context_t *kv_server_register(const margo_instance_id mid);
// both the same: should probably move to common?
hg_return_t kv_client_deregister(kv_context_t *context);
......@@ -313,6 +313,37 @@ hg_return_t kv_close(kv_context_t *context);
// benchmark routine
bench_result_t *kv_benchmark(kv_context_t *context, int32_t count);
static char *kv_protocol(char *addr_str) {
int psize = 24;
char *protocol = (char*)malloc(psize);
memset(protocol, 0, psize);
/* we only need to the proto portion of the address to initialize */
for(int i=0; i<proto_size && addr_str[i] != '\0' && addr_str[i] != ':'; i++)
protocol[i] = addr_str[i];
return protocol;
}
// pass in address string and mode
// string can be just the protocol (e.g. "ofi+tcp")
// mode can be MARGO_CLIENT_MODE or MARGO_SERVER_MODE
static margo_instance_id kv_margo_init(const char *addr_str, int mode)
{
margo_instance_id mid;
assert(addr_str != NULL); // better pass something!
assert(mode == MARGO_CLIENT_MODE || mode == MARGO_SERVER_MODE);
mid = margo_init(addr_str, mode, 0, -1);
assert(mid != MARGO_INSTANCE_NULL);
return mid;
}
static void kv_margo_finalize(mid)
{
margo_finalize(mid);
}
#if defined(__cplusplus)
}
#endif
......
......@@ -67,10 +67,15 @@ int main(int argc, char **argv)
bench_result_t rpc;
bench_result_t *server;
kv_context_t *context;
assert(argc == 3);
size_t items = atoi(argv[1]);
char *server_addr_str = argv[2];
context = kv_client_register(NULL);
ret = kv_open(context, argv[2], "db/testdb");
margo_instance_id mid = kv_margo_init(kv_protocol(server_addr_str), MARGO_CLIENT_MODE);
context = kv_client_register(mid);
ret = kv_open(context, server_addr_str, "db/testdb");
assert(ret == HG_SUCCESS);
RandomInsertSpeedTest(context, items, &rpc);
......@@ -93,4 +98,6 @@ int main(int argc, char **argv)
/* cleanup */
ret = kv_client_deregister(context);
assert(ret == HG_SUCCESS);
kv_margo_finalize(mid);
}
......@@ -6,10 +6,15 @@
int main(int argc, char **argv) {
hg_return_t ret;
kv_context_t * context = kv_client_register(NULL);
assert(argc == 2);
char *server_addr_str = argv[1];
margo_instance_id mid = kv_margo_init(kv_protocol(server_addr_str), MARGO_CLIENT_MODE);
kv_context_t *context = kv_client_register(mid);
/* open */
ret = kv_open(context, argv[1], "db/booger");
ret = kv_open(context, server_addr_str, "db/booger");
assert(ret == HG_SUCCESS);
/* put */
......@@ -83,4 +88,6 @@ int main(int argc, char **argv) {
/* cleanup */
ret = kv_client_deregister(context);
assert(ret == HG_SUCCESS);
kv_margo_finalize(mid);
}
......@@ -33,26 +33,31 @@ int main(int argc, char *argv[])
{
int rank;
assert(argc == 2);
char *addr_str = argv[1];
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm clientComm;
if (rank == 0) {
char server_addr_str[128];
hg_size_t server_addr_str_sz = 128;
hg_size_t addr_str_sz = 128;
char server_addr_str[addr_str_sz];
hg_addr_t server_addr;
hg_return_t hret;
MPI_Comm_split(MPI_COMM_WORLD, MPI_UNDEFINED, rank, &clientComm);
// kv-server
kv_context_t *context = kv_server_register(argv[1]);
margo_instance_id mid = kv_margo_init(kv_protocol(addr_str), MARGO_SERVER_MODE);
kv_context_t *context = kv_server_register(mid);
hret = margo_addr_self(context->mid, &server_addr);
DIE_IF(hret != HG_SUCCESS, "margo_addr_self");
// get server address
hret = margo_addr_to_string(context->mid, server_addr_str, &server_addr_str_sz, server_addr);
hret = margo_addr_to_string(context->mid, server_addr_str, &addr_str_sz, server_addr);
DIE_IF(hret != HG_SUCCESS, "margo_addr_to_string");
margo_addr_free(context->mid, server_addr);
......@@ -66,12 +71,13 @@ int main(int argc, char *argv[])
// now finish cleaning up
kv_server_deregister(context);
printf("rank %d: server deregistered\n", rank);
kv_margo_finalize(mid);
}
else {
char server_addr_str[128];
char client_addr_str_in[128];
char client_addr_str_out[128];
hg_size_t client_addr_str_sz = 128;
hg_size_t addr_str_sz = 128;
char server_addr_str[addr_str_sz];
char client_addr_str[addr_str_sz];
hg_addr_t client_addr;
hg_return_t hret;
......@@ -82,17 +88,17 @@ int main(int argc, char *argv[])
printf("client (rank %d): server addr_str: %s\n", rank, server_addr_str);
// kv-client
//sprintf(client_addr_str_in, "cci+tcp://534%02d", rank);
sprintf(client_addr_str_in, "ofi+tcp://");
kv_context_t *context = kv_client_register(client_addr_str_in);
margo_instance_id mid = kv_margo_init(kv_protocol(server_addr_str), MARGO_CLIENT_MODE);
kv_context_t *context = kv_client_register(mid);
hret = margo_addr_self(context->mid, &client_addr);
DIE_IF(hret != HG_SUCCESS, "margo_addr_self");
// get client address
hret = margo_addr_to_string(context->mid, client_addr_str_out, &client_addr_str_sz, client_addr);
hret = margo_addr_to_string(context->mid, client_addr_str, &addr_str_sz, client_addr);
DIE_IF(hret != HG_SUCCESS, "margo_addr_to_string");
margo_addr_free(context->mid, client_addr);
printf("client (rank %d): client addr_str: %s\n", rank, client_addr_str_out);
printf("client (rank %d): client addr_str: %s\n", rank, client_addr_str);
// open specified "DB" (pass in the server's address)
const char *db = "db/minima_store";
......@@ -156,6 +162,8 @@ int main(int argc, char *argv[])
// now finish cleaning up
kv_client_deregister(context);
printf("rank %d: client deregistered\n", rank);
kv_margo_finalize(mid);
}
MPI_Finalize();
......
......@@ -2,7 +2,12 @@
#include <assert.h>
int main(int argc, char **argv) {
kv_context_t *context = kv_server_register(argv[1]);
assert(argc == 2);
char *addr_str = argv[1];
margo_instance_id mid = kv_margo_init(kv_protocol(addr_str), MARGO_SERVER_MODE);
kv_context_t *context = kv_server_register(mid);