Commit 74f82923 authored by Matthieu Dorier's avatar Matthieu Dorier

rewrote the server daemon to use the C++ interface

parent e3b2c817
...@@ -23,7 +23,7 @@ check_PROGRAMS = test/sdskv-open-test \ ...@@ -23,7 +23,7 @@ check_PROGRAMS = test/sdskv-open-test \
test/sdskv-cxx-test \ test/sdskv-cxx-test \
test/sdskv-custom-server-daemon test/sdskv-custom-server-daemon
bin_sdskv_server_daemon_SOURCES = src/sdskv-server-daemon.c bin_sdskv_server_daemon_SOURCES = src/sdskv-server-daemon.cc
bin_sdskv_server_daemon_DEPENDENCIES = lib/libsdskv-server.la bin_sdskv_server_daemon_DEPENDENCIES = lib/libsdskv-server.la
bin_sdskv_server_daemon_LDFLAGS = -Llib -lsdskv-server bin_sdskv_server_daemon_LDFLAGS = -Llib -lsdskv-server
bin_sdskv_server_daemon_LDADD = ${LIBS} -lsdskv-server ${SERVER_LIBS} bin_sdskv_server_daemon_LDADD = ${LIBS} -lsdskv-server ${SERVER_LIBS}
...@@ -161,7 +161,7 @@ test_sdskv_custom_cmp_test_SOURCES = test/sdskv-custom-cmp-test.cc ...@@ -161,7 +161,7 @@ test_sdskv_custom_cmp_test_SOURCES = test/sdskv-custom-cmp-test.cc
test_sdskv_custom_cmp_test_DEPENDENCIES = lib/libsdskv-client.la test_sdskv_custom_cmp_test_DEPENDENCIES = lib/libsdskv-client.la
test_sdskv_custom_cmp_test_LDFLAGS = -Llib -lsdskv-client test_sdskv_custom_cmp_test_LDFLAGS = -Llib -lsdskv-client
test_sdskv_custom_server_daemon_SOURCES = test/sdskv-custom-server-daemon.c test_sdskv_custom_server_daemon_SOURCES = test/sdskv-custom-server-daemon.cc
test_sdskv_custom_server_daemon_DEPENDENCIES = lib/libsdskv-server.la test_sdskv_custom_server_daemon_DEPENDENCIES = lib/libsdskv-server.la
test_sdskv_custom_server_daemon_LDFLAGS = -Llib -lsdskv-server test_sdskv_custom_server_daemon_LDFLAGS = -Llib -lsdskv-server
test_sdskv_custom_server_daemon_LDADD = ${LIBS} -lsdskv-server ${SERVER_LIBS} test_sdskv_custom_server_daemon_LDADD = ${LIBS} -lsdskv-server ${SERVER_LIBS}
......
/*
* (C) 2015 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <margo.h>
#include <sdskv-server.hpp>
typedef enum {
MODE_DATABASES = 0,
MODE_PROVIDERS = 1
} kv_mplex_mode_t;
struct options
{
char *listen_addr_str;
unsigned num_db;
char **db_names;
sdskv_db_type_t *db_types;
char *host_file;
kv_mplex_mode_t mplex_mode;
};
static void usage(int argc, char **argv)
{
fprintf(stderr, "Usage: sdskv-server-daemon [OPTIONS] <listen_addr> <db name 1>[:map|:bwt|:bdb|:ldb] <db name 2>[:map|:bwt|:bdb|:ldb] ...\n");
fprintf(stderr, " listen_addr is the Mercury address to listen on\n");
fprintf(stderr, " db name X are the names of the databases\n");
fprintf(stderr, " [-f filename] to write the server address to a file\n");
fprintf(stderr, " [-m mode] multiplexing mode (providers or databases) for managing multiple databases (default is databases)\n");
fprintf(stderr, "Example: ./sdskv-server-daemon tcp://localhost:1234 foo:bdb bar\n");
return;
}
static sdskv_db_type_t parse_db_type(char* db_fullname) {
char* column = strstr(db_fullname, ":");
if(column == NULL) {
return KVDB_MAP;
}
*column = '\0';
char* db_type = column + 1;
if(strcmp(db_type, "map") == 0) {
return KVDB_MAP;
} else if(strcmp(db_type, "bwt") == 0) {
return KVDB_BWTREE;
} else if(strcmp(db_type, "bdb") == 0) {
return KVDB_BERKELEYDB;
} else if(strcmp(db_type, "ldb") == 0) {
return KVDB_LEVELDB;
}
fprintf(stderr, "Unknown database type \"%s\"\n", db_type);
exit(-1);
}
static void parse_args(int argc, char **argv, struct options *opts)
{
int opt;
memset(opts, 0, sizeof(*opts));
/* get options */
while((opt = getopt(argc, argv, "f:m:")) != -1)
{
switch(opt)
{
case 'f':
opts->host_file = optarg;
break;
case 'm':
if(0 == strcmp(optarg, "databases"))
opts->mplex_mode = MODE_DATABASES;
else if(0 == strcmp(optarg, "providers"))
opts->mplex_mode = MODE_PROVIDERS;
else {
fprintf(stderr, "Unrecognized multiplexing mode \"%s\"\n", optarg);
exit(EXIT_FAILURE);
}
break;
default:
usage(argc, argv);
exit(EXIT_FAILURE);
}
}
/* get required arguments after options */
if((argc - optind) < 2)
{
usage(argc, argv);
exit(EXIT_FAILURE);
}
opts->num_db = argc - optind - 1;
opts->listen_addr_str = argv[optind++];
opts->db_names = calloc(opts->num_db, sizeof(char*));
opts->db_types = calloc(opts->num_db, sizeof(sdskv_db_type_t));
int i;
for(i=0; i < opts->num_db; i++) {
opts->db_names[i] = argv[optind++];
opts->db_types[i] = parse_db_type(opts->db_names[i]);
}
return;
}
int main(int argc, char **argv)
{
struct options opts;
margo_instance_id mid;
int ret;
parse_args(argc, argv, &opts);
/* start margo */
/* use the main xstream for driving progress and executing rpc handlers */
mid = margo_init(opts.listen_addr_str, MARGO_SERVER_MODE, 0, -1);
if(mid == MARGO_INSTANCE_NULL)
{
fprintf(stderr, "Error: margo_init()\n");
return(-1);
}
margo_enable_remote_shutdown(mid);
if(opts.host_file)
{
/* write the server address to file if requested */
FILE *fp;
hg_addr_t self_addr;
char self_addr_str[128];
hg_size_t self_addr_str_sz = 128;
hg_return_t hret;
/* figure out what address this server is listening on */
hret = margo_addr_self(mid, &self_addr);
if(hret != HG_SUCCESS)
{
fprintf(stderr, "Error: margo_addr_self()\n");
margo_finalize(mid);
return(-1);
}
hret = margo_addr_to_string(mid, self_addr_str, &self_addr_str_sz, self_addr);
if(hret != HG_SUCCESS)
{
fprintf(stderr, "Error: margo_addr_to_string()\n");
margo_addr_free(mid, self_addr);
margo_finalize(mid);
return(-1);
}
margo_addr_free(mid, self_addr);
fp = fopen(opts.host_file, "w");
if(!fp)
{
perror("fopen");
margo_finalize(mid);
return(-1);
}
fprintf(fp, "%s", self_addr_str);
fclose(fp);
}
/* initialize the SDSKV server */
if(opts.mplex_mode == MODE_PROVIDERS) {
int i;
for(i=0; i< opts.num_db; i++) {
sdskv::provider* provider = sdskv::provider::create(mid, i+1, SDSKV_ABT_POOL_DEFAULT);
sdskv_database_id_t db_id;
sdskv_config_t db_config = {
.db_name = opts.db_names[i],
.db_path = "",
.db_type = opts.db_types[i],
.db_comp_fn_name = SDSKV_COMPARE_DEFAULT,
.db_no_overwrite = 0
};
db_id = provider->attach_database(db_config);
printf("Provider %d managing database \"%s\" at multiplex id %d\n", i, opts.db_names[i], i+1);
}
} else {
int i;
sdskv::provider* provider = sdskv::provider::create(mid, 1, SDSKV_ABT_POOL_DEFAULT);
for(i=0; i < opts.num_db; i++) {
sdskv_database_id_t db_id;
sdskv_config_t db_config = {
.db_name = opts.db_names[i],
.db_path = "",
.db_type = opts.db_types[i],
.db_comp_fn_name = SDSKV_COMPARE_DEFAULT,
.db_no_overwrite = 0
};
db_id = provider->attach_database(db_config);
printf("Provider 0 managing database \"%s\" at multiplex id %d\n", opts.db_names[i] , 1);
}
}
/* suspend until the BAKE server gets a shutdown signal from the client */
margo_wait_for_finalize(mid);
free(opts.db_names);
return(0);
}
...@@ -20,6 +20,8 @@ static std::string gen_random_string(size_t len); ...@@ -20,6 +20,8 @@ static std::string gen_random_string(size_t len);
static int put_get_erase_test(sdskv::database& DB, uint32_t num_keys); static int put_get_erase_test(sdskv::database& DB, uint32_t num_keys);
static int put_get_erase_multi_test(sdskv::database& DB, uint32_t num_keys); static int put_get_erase_multi_test(sdskv::database& DB, uint32_t num_keys);
static int list_keys_test(sdskv::database& DB, uint32_t num_keys);
static int list_keyvals_test(sdskv::database& DB, uint32_t num_keys);
int main(int argc, char *argv[]) int main(int argc, char *argv[])
{ {
...@@ -74,6 +76,7 @@ int main(int argc, char *argv[]) ...@@ -74,6 +76,7 @@ int main(int argc, char *argv[])
/* Put get erase test */ /* Put get erase test */
put_get_erase_test(DB, num_keys); put_get_erase_test(DB, num_keys);
put_get_erase_multi_test(DB, num_keys); put_get_erase_multi_test(DB, num_keys);
list_keys_test(DB, num_keys);
/* shutdown the server */ /* shutdown the server */
kvcl.shutdown(svr_addr); kvcl.shutdown(svr_addr);
...@@ -200,3 +203,99 @@ static int put_get_erase_multi_test(sdskv::database& DB, uint32_t num_keys) { ...@@ -200,3 +203,99 @@ static int put_get_erase_multi_test(sdskv::database& DB, uint32_t num_keys) {
return 0; return 0;
} }
static int list_keys_test(sdskv::database& DB, uint32_t num_keys) {
/* **** put keys ***** */
std::vector<std::string> keys;
std::map<std::string, std::string> reference;
size_t max_value_size = 24;
for(unsigned i=0; i < num_keys; i++) {
auto k = gen_random_string(16);
// half of the entries will be put using bulk
auto v = gen_random_string(3+i*(max_value_size-3)/num_keys);
DB.put(k, v);
std::cout << "Inserted " << k << "\t ===> " << v << std::endl;
reference[k] = v;
keys.push_back(k);
}
std::cout << "Successfuly inserted " << num_keys << " keys" << std::endl;
std::string start_key;
std::vector<std::string> keys_out(3);
auto it = reference.begin();
do {
DB.list_keys(start_key, keys_out);
if(keys_out.size() == 0) break;
start_key = keys_out[keys_out.size()-1];
for(auto& k : keys_out) {
if(k != it->first) {
std::cerr << "Error: key " << k << " != " << it->first << std::endl;
throw std::runtime_error("list_keys error");
} else {
std::cerr << "Key OK: key " << k << " == " << it->first << std::endl;
}
++it;
}
} while(keys_out.size() == 3);
/* erase keys */
for(unsigned i=0; i < num_keys; i++) {
DB.erase(keys[i]);
}
return 0;
}
static int list_keyvals_test(sdskv::database& DB, uint32_t num_keys) {
/* **** put keys ***** */
std::vector<std::string> keys;
std::map<std::string, std::string> reference;
size_t max_value_size = 24;
for(unsigned i=0; i < num_keys; i++) {
auto k = gen_random_string(16);
// half of the entries will be put using bulk
auto v = gen_random_string(3+i*(max_value_size-3)/num_keys);
DB.put(k, v);
std::cout << "Inserted " << k << "\t ===> " << v << std::endl;
reference[k] = v;
keys.push_back(k);
}
std::cout << "Successfuly inserted " << num_keys << " keys" << std::endl;
std::string start_key;
std::vector<std::string> keys_out(3);
std::vector<std::string> vals_out(3);
auto it = reference.begin();
do {
DB.list_keyvals(start_key, keys_out, vals_out);
if(keys_out.size() == 0) break;
start_key = keys_out[keys_out.size()-1];
for(unsigned i = 0; i < keys_out.size(); i++) {
auto& k = keys_out[i];
auto& v = vals_out[i];
if(k != it->first) {
std::cerr << "Error: key " << k << " != " << it->first << std::endl;
throw std::runtime_error("list_keys error");
} else {
std::cerr << "Key OK: key " << k << " == " << it->first << std::endl;
}
if(v != it->second) {
std::cerr << "Error: value " << v << " != " << it->second << std::endl;
throw std::runtime_error("list_keys error");
} else {
std::cerr << "Val OK: val " << v << " == " << it->second << std::endl;
}
++it;
}
} while(keys_out.size() == 3);
/* erase keys */
for(unsigned i=0; i < num_keys; i++) {
DB.erase(keys[i]);
}
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment