Commit 698508dd authored by Rob Latham's avatar Rob Latham
Browse files

separate notion of "service" and "database"

Split up "context" (the instance of the sds-keyval service) and
"database" (a specific collection of keys and values).

Furthermore, teach 'group' about this split.  Now a 'group' is a
collection of one context and one or more databases.

fixes #5 and #6
fully fixes #7
parent 598af617
......@@ -23,10 +23,10 @@ extern "C" {
typedef int kv_id;
/* 'Context' describes operations available to keyval clients */
/* do we need one for server, one for client? */
typedef struct kv_context_s {
margo_instance_id mid;
hg_addr_t svr_addr;
hg_id_t put_id;
hg_id_t bulk_put_id;
hg_id_t get_id;
......@@ -35,13 +35,23 @@ typedef struct kv_context_s {
hg_id_t close_id;
hg_id_t bench_id;
hg_id_t shutdown_id;
hg_handle_t put_handle;
kv_id kv;
} kv_context_t;
/* 'Database' contains server-specific information: the instantiation of a
* particular keyval service; the handles used to send information back and
* forth */
typedef struct kv_database_s {
margo_instance_id mid; /* bulk xfer needs to create bulk handles */
hg_addr_t svr_addr;
hg_handle_t close_handle;
hg_handle_t put_handle;
hg_handle_t bulk_put_handle;
hg_handle_t get_handle;
hg_handle_t bulk_get_handle;
hg_handle_t shutdown_handle;
kv_id kv;
} kv_context_t;
hg_handle_t bench_handle;
} kv_database_t;
#define MAX_RPC_MESSAGE_SIZE 4000 // in bytes
......
......@@ -41,17 +41,22 @@ kv_context_t *kv_client_register(const margo_instance_id mid) {
return context;
}
hg_return_t kv_open(kv_context_t *context, const char *server_addr, const char *db_name) {
kv_database_t * kv_open(kv_context_t *context,
const char *server_addr, const char *db_name)
{
hg_return_t ret = HG_SUCCESS;
hg_handle_t handle;
open_in_t open_in;
open_out_t open_out;
kv_database_t *db = calloc(1, sizeof(*db));
db->mid = context->mid;
printf("kv-client: kv_open, server_addr %s\n", server_addr);
ret = margo_addr_lookup(context->mid, server_addr, &(context->svr_addr));
ret = margo_addr_lookup(context->mid, server_addr, &(db->svr_addr));
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, context->svr_addr,
ret = margo_create(context->mid, db->svr_addr,
context->open_id, &handle);
assert(ret == HG_SUCCESS);
......@@ -70,31 +75,39 @@ hg_return_t kv_open(kv_context_t *context, const char *server_addr, const char *
* BAKE has a handle-caching mechanism we should consult.
* should margo be caching handles? */
ret = margo_create(context->mid, context->svr_addr,
context->put_id, &(context->put_handle));
ret = margo_create(context->mid, db->svr_addr,
context->put_id, &(db->put_handle));
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, db->svr_addr,
context->bulk_put_id, &(db->bulk_put_handle));
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, context->svr_addr,
context->bulk_put_id, &(context->bulk_put_handle));
ret = margo_create(context->mid, db->svr_addr,
context->get_id, &(db->get_handle));
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, context->svr_addr,
context->get_id, &(context->get_handle));
ret = margo_create(context->mid, db->svr_addr,
context->bulk_get_id, &(db->bulk_get_handle));
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, context->svr_addr,
context->bulk_get_id, &(context->bulk_get_handle));
ret = margo_create(context->mid, db->svr_addr,
context->shutdown_id, &(db->shutdown_handle));
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, context->svr_addr,
context->shutdown_id, &(context->shutdown_handle));
ret = margo_create(context->mid, db->svr_addr,
context->close_id, &(db->close_handle) );
assert(ret == HG_SUCCESS);
ret = margo_create(context->mid, db->svr_addr,
context->bench_id, &(db->bench_handle));
assert(ret == HG_SUCCESS);
margo_free_output(handle, &open_out);
margo_destroy(handle);
return HG_SUCCESS;
return db;
}
/* we gave types in the open call. Will need to maintain in 'context' the
* size. */
hg_return_t kv_put(kv_context_t *context,
hg_return_t kv_put(kv_database_t *db,
void *key, hg_size_t ksize,
void *value, hg_size_t vsize) {
hg_return_t ret;
......@@ -119,16 +132,16 @@ hg_return_t kv_put(kv_context_t *context,
pin.pi.vsize = vsize;
st2 = ABT_get_wtime();
ret = margo_forward(context->put_handle, &pin);
ret = margo_forward(db->put_handle, &pin);
et2 = ABT_get_wtime();
printf("kv_put forward time: %f microseconds, vsize = %lu\n", (et2-st2)*1000000, vsize);
assert(ret == HG_SUCCESS);
ret = margo_get_output(context->put_handle, &pout);
ret = margo_get_output(db->put_handle, &pout);
assert(ret == HG_SUCCESS);
ret = pout.ret;
margo_free_output(context->put_handle, &pout);
margo_free_output(db->put_handle, &pout);
}
else {
// use bulk transfer method to move value
......@@ -146,23 +159,23 @@ hg_return_t kv_put(kv_context_t *context,
bpin.bulk.vsize = vsize;
st2 = ABT_get_wtime();
ret = margo_bulk_create(context->mid, 1, &value, &bpin.bulk.vsize,
ret = margo_bulk_create(db->mid, 1, &value, &bpin.bulk.vsize,
HG_BULK_READ_ONLY, &bpin.bulk.handle);
et2 = ABT_get_wtime();
printf("kv_put bulk create time: %f microseconds\n", (et2-st2)*1000000);
assert(ret == HG_SUCCESS);
st2 = ABT_get_wtime();
ret = margo_forward(context->bulk_put_handle, &bpin);
ret = margo_forward(db->bulk_put_handle, &bpin);
et2 = ABT_get_wtime();
printf("kv_put bulk forward time: %f microseconds, vsize = %lu\n", (et2-st2)*1000000, vsize);
assert(ret == HG_SUCCESS);
ret = margo_get_output(context->bulk_put_handle, &bpout);
ret = margo_get_output(db->bulk_put_handle, &bpout);
assert(ret == HG_SUCCESS);
ret = bpout.ret; // make sure the server side says all is OK
margo_free_output(context->bulk_put_handle, &bpout);
margo_free_output(db->bulk_put_handle, &bpout);
}
et1 = ABT_get_wtime();
printf("kv_put time: %f microseconds\n", (et1-st1)*1000000);
......@@ -171,7 +184,7 @@ hg_return_t kv_put(kv_context_t *context,
}
// vsize is in/out
hg_return_t kv_get(kv_context_t *context,
hg_return_t kv_get(kv_database_t *db,
void *key, hg_size_t ksize,
void *value, hg_size_t *vsize)
{
......@@ -198,12 +211,12 @@ hg_return_t kv_get(kv_context_t *context,
gin.gi.vsize = size;
st2 = ABT_get_wtime();
ret = margo_forward(context->get_handle, &gin);
ret = margo_forward(db->get_handle, &gin);
et2 = ABT_get_wtime();
printf("kv_get forward time: %f microseconds, vsize = %lu\n", (et2-st2)*1000000, size);
assert(ret == HG_SUCCESS);
ret = margo_get_output(context->get_handle, &gout);
ret = margo_get_output(db->get_handle, &gout);
assert(ret == HG_SUCCESS);
ret = gout.go.ret;
......@@ -217,7 +230,7 @@ hg_return_t kv_get(kv_context_t *context,
memcpy(value, gout.go.value, gout.go.vsize);
}
margo_free_output(context->get_handle, &gout);
margo_free_output(db->get_handle, &gout);
}
else {
bulk_get_in_t bgin;
......@@ -228,19 +241,19 @@ hg_return_t kv_get(kv_context_t *context,
bgin.bulk.vsize = size;
st2 = ABT_get_wtime();
ret = margo_bulk_create(context->mid, 1, &value, &bgin.bulk.vsize,
ret = margo_bulk_create(db->mid, 1, &value, &bgin.bulk.vsize,
HG_BULK_WRITE_ONLY, &bgin.bulk.handle);
et2 = ABT_get_wtime();
printf("kv_get bulk create time: %f microseconds\n", (et2-st2)*1000000);
assert(ret == HG_SUCCESS);
st2 = ABT_get_wtime();
ret = margo_forward(context->bulk_get_handle, &bgin);
ret = margo_forward(db->bulk_get_handle, &bgin);
et2 = ABT_get_wtime();
printf("kv_get bulk forward time: %f microseconds, vsize = %lu\n", (et2-st2)*1000000, size);
assert(ret == HG_SUCCESS);
ret = margo_get_output(context->bulk_get_handle, &bgout);
ret = margo_get_output(db->bulk_get_handle, &bgout);
assert(ret == HG_SUCCESS);
ret = bgout.ret; // make sure the server side says all is OK
......@@ -251,7 +264,7 @@ hg_return_t kv_get(kv_context_t *context,
*/
*vsize = (hg_size_t)bgout.size;
margo_free_output(context->bulk_get_handle, &bgout);
margo_free_output(db->bulk_get_handle, &bgout);
}
et1 = ABT_get_wtime();
printf("kv_get time: %f microseconds\n", (et1-st1)*1000000);
......@@ -259,45 +272,43 @@ hg_return_t kv_get(kv_context_t *context,
return ret;
}
hg_return_t kv_close(kv_context_t *context)
hg_return_t kv_close(kv_database_t *db)
{
hg_return_t ret;
hg_handle_t handle;
close_out_t close_out;
ret = margo_create(context->mid, context->svr_addr,
context->close_id, &handle);
assert(ret == HG_SUCCESS);
ret = margo_forward(handle, NULL);
ret = margo_forward(db->close_handle, NULL);
assert(ret == HG_SUCCESS);
ret = margo_get_output(handle, &close_out);
ret = margo_get_output(db->close_handle, &close_out);
assert(ret == HG_SUCCESS);
assert(close_out.ret == HG_SUCCESS);
margo_free_output(handle, &close_out);
margo_destroy(handle);
margo_free_output(db->close_handle, &close_out);
margo_destroy(db->put_handle);
margo_destroy(db->get_handle);
margo_destroy(db->bulk_put_handle);
margo_destroy(db->bulk_get_handle);
margo_destroy(db->shutdown_handle);
ret = margo_addr_free(db->mid, db->svr_addr);
assert(ret == HG_SUCCESS);
return HG_SUCCESS;
}
bench_result_t *kv_benchmark(kv_context_t *context, int32_t count) {
bench_result_t *kv_benchmark(kv_database_t *db, int32_t count) {
hg_return_t ret;
hg_handle_t handle;
bench_in_t bench_in;
bench_out_t bench_out;
bench_result_t *result = NULL;
ret = margo_create(context->mid, context->svr_addr,
context->bench_id, &handle);
assert(ret == HG_SUCCESS);
bench_in.count = count;
ret = margo_forward(handle, &bench_in);
ret = margo_forward(db->bench_handle, &bench_in);
assert(ret == HG_SUCCESS);
ret = margo_get_output(handle, &bench_out);
ret = margo_get_output(db->bench_handle, &bench_out);
assert(ret == HG_SUCCESS);
result = malloc(sizeof(bench_result_t));
......@@ -306,23 +317,12 @@ bench_result_t *kv_benchmark(kv_context_t *context, int32_t count) {
result->read_time = bench_out.result.read_time;
result->overhead = bench_out.result.overhead;
margo_free_output(handle, &bench_out);
margo_destroy(handle);
margo_free_output(db->bench_handle, &bench_out);
return result;
}
hg_return_t kv_client_deregister(kv_context_t *context) {
hg_return_t ret;
margo_destroy(context->put_handle);
margo_destroy(context->get_handle);
margo_destroy(context->bulk_put_handle);
margo_destroy(context->bulk_get_handle);
margo_destroy(context->shutdown_handle);
ret = margo_addr_free(context->mid, context->svr_addr);
assert(ret == HG_SUCCESS);
free(context);
......@@ -330,10 +330,10 @@ hg_return_t kv_client_deregister(kv_context_t *context) {
}
// only one client calls shutdown
hg_return_t kv_client_signal_shutdown(kv_context_t *context) {
hg_return_t kv_client_signal_shutdown(kv_database_t *db) {
hg_return_t ret;
ret = margo_forward(context->shutdown_handle, NULL);
ret = margo_forward(db->shutdown_handle, NULL);
assert(ret == HG_SUCCESS);
return HG_SUCCESS;
......
#include "keyval-internal.h"
#include "sds-keyval-group.h"
#include "kv-config.h"
......@@ -10,10 +11,9 @@
unsigned long server_indexes[CH_MAX_REPLICATION];
kvgroup_context_t *kvgroup_client_register(margo_instance_id mid, ssg_group_id_t gid)
kv_group_t *kvgroup_client_register(margo_instance_id mid, ssg_group_id_t gid)
{
kvgroup_context_t *context = (kvgroup_context_t*)malloc(sizeof(kvgroup_context_t));
memset(context, 0, sizeof(kvgroup_context_t));
kv_group_t *group = (kv_group_t*)calloc(1, sizeof(group));
int sret = ssg_init(mid);
assert(sret == SSG_SUCCESS);
......@@ -22,13 +22,13 @@ kvgroup_context_t *kvgroup_client_register(margo_instance_id mid, ssg_group_id_t
assert(sret == SSG_SUCCESS);
/* update kvgroup_context_t with MID, GID */
context->mid = mid;
context->gid = gid;
group->mid = mid;
group->gid = gid;
return context;
return group;
}
hg_return_t kvgroup_open(kvgroup_context_t *context, const char *db_name)
hg_return_t kvgroup_open(kv_group_t *group, const char *db_name)
{
hg_size_t addr_str_sz = 128;
char addr_str[addr_str_sz];
......@@ -41,96 +41,97 @@ hg_return_t kvgroup_open(kvgroup_context_t *context, const char *db_name)
std::string separator("/");
// register and open a connection with each kv-server in the group
hg_size_t gsize = ssg_get_group_size(context->gid);
context->gsize = gsize;
context->kv_context = (kv_context_t**)malloc(gsize*sizeof(kv_context_t*));
hg_size_t gsize = ssg_get_group_size(group->gid);
group->gsize = gsize;
/* one 'context' for sds-keyval itself; one 'database' per server */
group->kv_context = (kv_context_t *)calloc(1,sizeof(kv_context_t));
group->db = (kv_database_t **) calloc(gsize, sizeof(kv_database_t *));
// register this client context with Margo
group->kv_context = kv_client_register(group->mid);
assert(group->kv_context != NULL);
for (hg_size_t i=0; i<gsize; i++) {
// register this client context with Margo
context->kv_context[i] = kv_client_register(context->mid);
assert(context->kv_context[i] != NULL);
hg_addr_t server_addr = ssg_get_addr(context->gid, i);
hg_addr_t server_addr = ssg_get_addr(group->gid, i);
// turn server_addr into string
ret = margo_addr_to_string(context->mid, addr_str, &addr_str_sz, server_addr);
ret = margo_addr_to_string(group->mid, addr_str,
&addr_str_sz, server_addr);
assert(ret == HG_SUCCESS);
margo_addr_free(context->mid, server_addr);
margo_addr_free(group->mid, server_addr);
std::string server_dbname = basepath + separator + std::string("kvdb.") + std::to_string(i)
+ separator + name; // each session uses unique db name
// open client connection with this server
std::cout << "request open of " << server_dbname << " from server " << addr_str << std::endl;
ret = kv_open(context->kv_context[i], addr_str, server_dbname.c_str());
group->db[i] = kv_open(group->kv_context, addr_str, server_dbname.c_str());
assert(ret == HG_SUCCESS);
}
// initialize consistent hash using "hash_lookup3" with gsize servers each with 1 virtual node for now
context->ch_instance = ch_placement_initialize("hash_lookup3", gsize, 4, 0);
group->ch_instance = ch_placement_initialize("hash_lookup3", gsize, 4, 0);
return HG_SUCCESS;
}
// oid is unique associated with key
// in ParSplice key is already a uint64_t hashed value
hg_return_t kvgroup_put(kvgroup_context_t *context, uint64_t oid,
hg_return_t kvgroup_put(kv_group_t * group, uint64_t oid,
void *key, hg_size_t ksize,
void *value, hg_size_t vsize)
{
// not using any replication for now (is this right?)
ch_placement_find_closest(context->ch_instance, oid, 1, server_indexes);
kv_context_t *kv_context = context->kv_context[server_indexes[0]];
ch_placement_find_closest(group->ch_instance, oid, 1, server_indexes);
std::cout << "kvgroup_put: key=" << oid << ", server_index=" << server_indexes[0] << std::endl;
return kv_put(kv_context, key, ksize, value, vsize);
return kv_put(group->db[server_indexes[0]], key, ksize, value, vsize);
}
// oid is unique associated with key
// in ParSplice key is already a uint64_t hashed value
// vsize is in/out
hg_return_t kvgroup_get(kvgroup_context_t *context, uint64_t oid,
hg_return_t kvgroup_get(kv_group_t *group, uint64_t oid,
void *key, hg_size_t ksize,
void *value, hg_size_t *vsize)
{
// not using any replication for now (is this right?)
ch_placement_find_closest(context->ch_instance, oid, 1, server_indexes);
kv_context_t *kv_context = context->kv_context[server_indexes[0]];
ch_placement_find_closest(group->ch_instance, oid, 1, server_indexes);
std::cout << "kvgroup_get: key=" << oid << ", server_index=" << server_indexes[0] << std::endl;
return kv_get(kv_context, key, ksize, value, vsize);
return kv_get(group->db[server_indexes[0]], key, ksize, value, vsize);
}
hg_return_t kvgroup_close(kvgroup_context_t *context)
hg_return_t kvgroup_close(kv_group_t * group)
{
hg_return_t ret;
for (hg_size_t i=0; i<context->gsize; i++) {
ret = kv_close(context->kv_context[i]);
for (hg_size_t i=0; i<group->gsize; i++) {
ret = kv_close(group->db[i]);
assert(ret == HG_SUCCESS);
}
return HG_SUCCESS;
}
hg_return_t kvgroup_client_deregister(kvgroup_context_t *context)
hg_return_t kvgroup_client_deregister(kv_group_t * group)
{
hg_return_t ret;
for (hg_size_t i=0; i<context->gsize; i++) {
ret = kv_client_deregister(context->kv_context[i]);
assert(ret == HG_SUCCESS);
}
ch_placement_finalize(context->ch_instance);
ssg_group_detach(context->gid);
ret = kv_client_deregister(group->kv_context);
ch_placement_finalize(group->ch_instance);
ssg_group_detach(group->gid);
ssg_finalize();
margo_diag_dump(context->mid, "-", 0);
margo_diag_dump(group->mid, "-", 0);
//margo_finalize(context->mid); // workaround since we core dump here
ssg_group_id_free(context->gid);
free(context->kv_context);
free(context);
return HG_SUCCESS;
ssg_group_id_free(group->gid);
free(group->kv_context);
free(group->db);
free(group);
return ret;
}
// only one client calls shutdown
hg_return_t kvgroup_client_signal_shutdown(kvgroup_context_t *context)
hg_return_t kvgroup_client_signal_shutdown(kv_group_t *group)
{
hg_return_t ret;
for (hg_size_t i=0; i<context->gsize; i++) {
ret = kv_client_signal_shutdown(context->kv_context[i]);
for (hg_size_t i=0; i<group->gsize; i++) {
ret = kv_client_signal_shutdown(group->db[i]);
assert(ret == HG_SUCCESS);
}
return HG_SUCCESS;
......
......@@ -19,17 +19,17 @@ static void group_update_cb(ssg_membership_update_t update, void *cb_dat)
}
/* this is a collective operation */
kvgroup_context_t *kvgroup_server_register(margo_instance_id mid, const char *ssg_name, MPI_Comm ssg_comm)
kv_group_t *kvgroup_server_register(margo_instance_id mid,
const char *ssg_name, MPI_Comm ssg_comm)
{
kvgroup_context_t *context = (kvgroup_context_t*)malloc(sizeof(kvgroup_context_t));
memset(context, 0, sizeof(kvgroup_context_t));
kv_group_t * group = (kv_group_t *)calloc(1, sizeof(kv_group_t));
/* update kvgroup_context_t with MID */
context->mid = mid;
group->mid = mid;
context->kv_context = (kv_context_t**)malloc(sizeof(kv_context_t*)); // just 1
context->kv_context[0] = kv_server_register(mid);
assert(context->kv_context[0] != NULL);
group->db = (kv_database_t **)malloc(sizeof(kv_database_t*)); // just 1
group->kv_context = kv_server_register(mid);
assert(group->kv_context != NULL);
int sret = ssg_init(mid);
assert(sret == SSG_SUCCESS);
......@@ -40,24 +40,24 @@ kvgroup_context_t *kvgroup_server_register(margo_instance_id mid, const char *ss
assert(gid != SSG_GROUP_ID_NULL);
/* update kvgroup_context_t with GID */
context->gid = gid;
group->gid = gid;
return context;
return group;
}
hg_return_t kvgroup_server_deregister(kvgroup_context_t *context)
hg_return_t kvgroup_server_deregister(kv_group_t *group)
{
hg_return_t ret = kv_server_deregister(context->kv_context[0]);
ssg_group_destroy(context->gid);
free(context->kv_context);
free(context);
hg_return_t ret = kv_server_deregister(group->kv_context);
ssg_group_destroy(group->gid);
free(group->kv_context);
free(group);
std::cout << "GROUP_SERVER: deregistered" << std::endl;
return ret;
}
hg_return_t kvgroup_server_wait_for_shutdown(kvgroup_context_t *context)
hg_return_t kvgroup_server_wait_for_shutdown(kv_group_t *group)
{
hg_return_t ret = kv_server_wait_for_shutdown(context->kv_context[0]);
hg_return_t ret = kv_server_wait_for_shutdown(group->kv_context);
return ret;
}
......
......@@ -15,13 +15,14 @@
extern "C" {
#endif
typedef struct kvgroup_context_s {
kv_context_t **kv_context;
typedef struct kv_group_s {
kv_context_t *kv_context;
kv_database_t **db;
margo_instance_id mid;
ssg_group_id_t gid; // SSG ID
hg_size_t gsize; // size of SSG
struct ch_placement_instance *ch_instance;
} kvgroup_context_t;
} kv_group_t;
// helper routine for stripping protocol part of address string
// stored in ssg_group_id_t data structure
......@@ -47,22 +48,22 @@ static inline char *kvgroup_protocol(ssg_group_id_t gid) {
return protocol;
}
kvgroup_context_t *kvgroup_server_register(margo_instance_id mid,
kv_group_t *kvgroup_server_register(margo_instance_id mid,
const char *ssg_name, MPI_Comm ssg_comm);
hg_return_t kvgroup_server_deregister(kvgroup_context_t *context);
hg_return_t kvgroup_server_wait_for_shutdown(kvgroup_context_t *context);
hg_return_t kvgroup_server_deregister(kv_group_t *group);
hg_return_t kvgroup_server_wait_for_shutdown(kv_group_t *group);
kvgroup_context_t *kvgroup_client_register(margo_instance_id mid, ssg_group_id_t gid);
hg_return_t kvgroup_open(kvgroup_context_t *context, const char *db_name);
hg_return_t kvgroup_put(kvgroup_context_t *context, uint64_t oid,
kv_group_t *kvgroup_client_register(margo_instance_id mid, ssg_group_id_t gid);
hg_return_t kvgroup_open(kv_group_t *group, const char *db_name);
hg_return_t kvgroup_put(kv_group_t *group, uint64_t oid,
void *key, hg_size_t ksize,
void *value, hg_size_t vsize);
hg_return_t kvgroup_get(kvgroup_context_t *context, uint64_t oid,
hg_return_t kvgroup_get(kv_group_t *group, uint64_t oid,
void *key, hg_size_t ksize,
void *value, hg_size_t *vsize);
hg_return_t kvgroup_close(kvgroup_context_t *context);
hg_return_t kvgroup_client_deregister(kvgroup_context_t *context);
hg_return_t kvgroup_client_signal_shutdown(kvgroup_context_t *context);
hg_return_t kvgroup_close(kv_group_t *group);
hg_return_t kvgroup_client_deregister(kv_group_t *group);
hg_return_t kvgroup_client_signal_shutdown(kv_group_t *group);
void kvgroup_server_send_gid(ssg_group_id_t gid, MPI_Comm comm);