Commit 132e2957 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

added server daemon

parent 2b1919c1
......@@ -5,6 +5,13 @@ CLIENT_LiBS=@CLIENT_LIBS@
AM_CPPFLAGS = -I${srcdir}/src
bin_PROGRAMS = src/sdskv-server-daemon
src_sdskv_server_daemon_SOURCES = src/sdskv-server-daemon.c
src_sdskv_server_daemon_DEPENDENCIES = lib/libkvclient.la
src_sdskv_server_daemon_LDFLAGS = -Llib -lsdskv-server
src_sdskv_server_daemon_LDADD = ${LIBS} -lsdskv-server ${SERVER_LIBS}
lib_LTLIBRARIES = lib/libkvclient.la \
lib/libkvserver.la \
lib/libkvgroupclient.la \
......
......@@ -539,3 +539,7 @@ int sdskv_list_keys_with_prefix(sdskv_provider_handle_t provider,
// TODO
}
int sdskv_shutdown_service(sdskv_client_t client, hg_addr_t addr)
{
return margo_shutdown_remote_instance(client->mid, addr);
}
......@@ -73,6 +73,17 @@ int sdskv_list_keys_with_prefix(sdskv_provider_handle_t provider,
// keys for each key
hg_size_t* max_keys); // maximum number of keys requested
/**
* Shuts down a remote SDSKV service (given an address).
* This will shutdown all the providers on the target address.
*
* @param [in] client SDSKV client
* @param [in] addr address of the server
* @returns 0 on success, -1 on failure
*/
int sdskv_shutdown_service(
sdskv_client_t client, hg_addr_t addr);
#if defined(__cplusplus)
}
#endif
......
/*
* (C) 2015 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <margo.h>
#include <sdskv-server.h>
typedef enum {
MODE_DATABASES = 0,
MODE_PROVIDERS = 1
} kv_mplex_mode_t;
struct options
{
char *listen_addr_str;
unsigned num_db;
char **db_names;
sdskv_db_type_t *db_types;
char *host_file;
kv_mplex_mode_t mplex_mode;
};
static void usage(int argc, char **argv)
{
fprintf(stderr, "Usage: sdskv-server-daemon [OPTIONS] <listen_addr> <db name 1>[:bwt|:bdb|:ldb] <db name 2>[:bwt|:bdb|:ldb] ...\n");
fprintf(stderr, " listen_addr is the Mercury address to listen on\n");
fprintf(stderr, " db name X are the names of the databases\n");
fprintf(stderr, " [-f filename] to write the server address to a file\n");
fprintf(stderr, " [-m mode] multiplexing mode (providers or databases) for managing multiple databases (default is databases)\n");
fprintf(stderr, "Example: ./sdskv-server-daemon tcp://localhost:1234 foo:bdb bar\n");
return;
}
static sdskv_db_type_t parse_db_type(const char* db_fullname) {
const char* column = strstr(db_fullname, ":");
if(column == NULL) return KVDB_BWTREE;
const char* db_type = column + 1;
if(strcmp(db_type, "bwt") == 0) {
return KVDB_BWTREE;
} else if(strcmp(db_type, "bdb") == 0) {
return KVDB_BERKELEYDB;
} else if(strcmp(db_type, "ldb") == 0) {
return KVDB_LEVELDB;
}
fprintf(stderr, "Unknown database type \"%s\"\n", db_type);
exit(-1);
}
static void parse_args(int argc, char **argv, struct options *opts)
{
int opt;
memset(opts, 0, sizeof(*opts));
/* get options */
while((opt = getopt(argc, argv, "f:m:")) != -1)
{
switch(opt)
{
case 'f':
opts->host_file = optarg;
break;
case 'm':
if(0 == strcmp(optarg, "databases"))
opts->mplex_mode = MODE_DATABASES;
else if(0 == strcmp(optarg, "providers"))
opts->mplex_mode = MODE_PROVIDERS;
else {
fprintf(stderr, "Unrecognized multiplexing mode \"%s\"\n", optarg);
exit(EXIT_FAILURE);
}
break;
default:
usage(argc, argv);
exit(EXIT_FAILURE);
}
}
/* get required arguments after options */
if((argc - optind) < 2)
{
usage(argc, argv);
exit(EXIT_FAILURE);
}
opts->num_db = argc - optind - 1;
opts->listen_addr_str = argv[optind++];
opts->db_names = calloc(opts->num_db, sizeof(char*));
opts->db_types = calloc(opts->num_db, sizeof(sdskv_db_type_t));
int i;
for(i=0; i < opts->num_db; i++) {
opts->db_names[i] = argv[optind++];
opts->db_types[i] = parse_db_type(opts->db_names[i]);
}
return;
}
int main(int argc, char **argv)
{
struct options opts;
margo_instance_id mid;
int ret;
parse_args(argc, argv, &opts);
/* start margo */
/* use the main xstream for driving progress and executing rpc handlers */
mid = margo_init(opts.listen_addr_str, MARGO_SERVER_MODE, 0, -1);
if(mid == MARGO_INSTANCE_NULL)
{
fprintf(stderr, "Error: margo_init()\n");
return(-1);
}
margo_enable_remote_shutdown(mid);
if(opts.host_file)
{
/* write the server address to file if requested */
FILE *fp;
hg_addr_t self_addr;
char self_addr_str[128];
hg_size_t self_addr_str_sz = 128;
hg_return_t hret;
/* figure out what address this server is listening on */
hret = margo_addr_self(mid, &self_addr);
if(hret != HG_SUCCESS)
{
fprintf(stderr, "Error: margo_addr_self()\n");
margo_finalize(mid);
return(-1);
}
hret = margo_addr_to_string(mid, self_addr_str, &self_addr_str_sz, self_addr);
if(hret != HG_SUCCESS)
{
fprintf(stderr, "Error: margo_addr_to_string()\n");
margo_addr_free(mid, self_addr);
margo_finalize(mid);
return(-1);
}
margo_addr_free(mid, self_addr);
fp = fopen(opts.host_file, "w");
if(!fp)
{
perror("fopen");
margo_finalize(mid);
return(-1);
}
fprintf(fp, "%s", self_addr_str);
fclose(fp);
}
/* initialize the SDSKV server */
if(opts.mplex_mode == MODE_PROVIDERS) {
int i;
for(i=0; i< opts.num_db; i++) {
sdskv_provider_t provider;
ret = sdskv_provider_register(mid, i+1,
SDSKV_ABT_POOL_DEFAULT,
&provider);
if(ret != 0)
{
fprintf(stderr, "Error: sdskv_provider_register()\n");
margo_finalize(mid);
return(-1);
}
sdskv_db_type_t db_type = KVDB_BWTREE; // TODO get from argv
sdskv_database_id_t db_id;
ret = sdskv_provider_add_database(provider, opts.db_names[i], db_type, &db_id);
if(ret != 0)
{
fprintf(stderr, "Error: bake_provider_add_database()\n");
margo_finalize(mid);
return(-1);
}
printf("Provider %d managing new database at multiplex id %d\n", i, i+1);
}
} else {
int i;
sdskv_provider_t provider;
ret = sdskv_provider_register(mid, 1,
SDSKV_ABT_POOL_DEFAULT,
&provider);
if(ret != 0)
{
fprintf(stderr, "Error: sdskv_provider_register()\n");
margo_finalize(mid);
return(-1);
}
for(i=0; i < opts.num_db; i++) {
sdskv_db_type_t db_type = KVDB_BWTREE; // TODO get from argv
sdskv_database_id_t db_id;
ret = sdskv_provider_add_database(provider, opts.db_names[i], db_type, &db_id);
if(ret != 0)
{
fprintf(stderr, "Error: sdskv_provider_add_database()\n");
margo_finalize(mid);
return(-1);
}
printf("Provider 0 managing new database at multiplex id %d\n", 1);
}
}
/* suspend until the BAKE server gets a shutdown signal from the client */
margo_wait_for_finalize(mid);
free(opts.db_names);
return(0);
}
......@@ -48,7 +48,7 @@ extern "C" int sdskv_provider_register(
}
/* allocate the resulting structure */
tmp_svr_ctx = (sdskv_provider_t)calloc(1,sizeof(*tmp_svr_ctx));
tmp_svr_ctx = new sdskv_server_context_t;
if(!tmp_svr_ctx)
return(-1);
......@@ -620,7 +620,6 @@ static void sdskv_list_ult(hg_handle_t handle)
margo_destroy(handle);
return;
return;
}
DEFINE_MARGO_RPC_HANDLER(sdskv_list_ult)
......@@ -631,7 +630,7 @@ static void sdskv_server_finalize_cb(void *data)
sdskv_provider_remove_all_databases(svr_ctx);
free(svr_ctx);
delete svr_ctx;
return;
}
......
......@@ -8,7 +8,7 @@
#define __SDSKV_SERVER_H
#include <margo.h>
#include "sdskv-common.h"
#include <sdskv-common.h>
#ifdef __cplusplus
extern "C" {
......@@ -36,7 +36,7 @@ int sdskv_provider_register(
*
* @return 0 on success, -1 on failure
*/
int sdskv_provider_add_storage_target(
int sdskv_provider_add_database(
sdskv_provider_t provider,
const char* db_name,
sdskv_db_type_t db_type,
......@@ -50,7 +50,7 @@ int sdskv_provider_add_storage_target(
*
* @return 0 on success, -1 on failure
*/
int sdskv_provider_remove_storage_target(
int sdskv_provider_remove_database(
sdskv_provider_t provider,
sdskv_database_id_t db_id);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment