Commit 24c764c7 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

added functions to list the databases from the client

parent 262e79b0
......@@ -9,6 +9,7 @@ bin_PROGRAMS = bin/sdskv-server-daemon \
bin/sdskv-shutdown
check_PROGRAMS = test/sdskv-open-test \
test/sdskv-list-db-test \
test/sdskv-put-test \
test/sdskv-length-test \
test/sdskv-get-test \
......@@ -100,6 +101,7 @@ noinst_HEADERS = src/bulk.h \
TESTS = test/basic.sh \
test/open-test.sh \
test/list-db-test.sh \
test/put-test.sh \
test/length-test.sh \
test/get-test.sh \
......@@ -118,6 +120,10 @@ test_sdskv_open_test_SOURCES = test/sdskv-open-test.cc
test_sdskv_open_test_DEPENDENCIES = lib/libsdskv-client.la
test_sdskv_open_test_LDFLAGS = -Llib -lsdskv-client
test_sdskv_list_db_test_SOURCES = test/sdskv-list-db-test.cc
test_sdskv_list_db_test_DEPENDENCIES = lib/libsdskv-client.la
test_sdskv_list_db_test_LDFLAGS = -Llib -lsdskv-client
test_sdskv_put_test_SOURCES = test/sdskv-put-test.cc
test_sdskv_put_test_DEPENDENCIES = lib/libsdskv-client.la
test_sdskv_put_test_LDFLAGS = -Llib -lsdskv-client
......
......@@ -93,6 +93,36 @@ int sdskv_open(
const char* db_name,
sdskv_database_id_t* db_id);
/**
* @brief Gets the number of databases currently managed by a provider.
*
* @param[in] provider provider handle
* @param[out] num number of databases
*
* @return SDSKV_SUCCESS or error code defined in sdskv-common.h
*/
int sdskv_count_databases(
sdskv_provider_handle_t provider,
size_t* num);
/**
* @brief Lists the databases names and ids.
* The caller is responsible for calling free() on each of the
* entries in db_names.
*
* @param[in] provider provider handle
* @param[inout] count max number of databases to query, actual number returned
* @param[out] db_names database names
* @param[out] db_id database ids
*
* @return SDSKV_SUCCESS or error code defined in sdskv-common.h
*/
int sdskv_list_databases(
sdskv_provider_handle_t provider,
size_t* count,
char** db_name,
sdskv_database_id_t* db_id);
/**
* @brief Puts a key/value pair into the database.
*
......
......@@ -7,7 +7,11 @@ int32_t sdskv_remi_errno;
struct sdskv_client {
margo_instance_id mid;
/* opening and querying databases */
hg_id_t sdskv_open_id;
hg_id_t sdskv_count_databases_id;
hg_id_t sdskv_list_databases_id;
/* accessing database */
hg_id_t sdskv_put_id;
hg_id_t sdskv_put_multi_id;
hg_id_t sdskv_bulk_put_id;
......@@ -19,7 +23,6 @@ struct sdskv_client {
hg_id_t sdskv_length_id;
hg_id_t sdskv_length_multi_id;
hg_id_t sdskv_bulk_get_id;
hg_id_t sdskv_open_id;
hg_id_t sdskv_list_keys_id;
hg_id_t sdskv_list_keyvals_id;
/* migration */
......@@ -51,6 +54,9 @@ static int sdskv_client_register(sdskv_client_t client, margo_instance_id mid)
if(flag == HG_TRUE) { /* RPCs already registered */
margo_registered_name(mid, "sdskv_open_rpc", &client->sdskv_open_id, &flag);
margo_registered_name(mid, "sdskv_count_databases_rpc", &client->sdskv_count_databases_id, &flag);
margo_registered_name(mid, "sdskv_list_databases_rpc", &client->sdskv_list_databases_id, &flag);
margo_registered_name(mid, "sdskv_put_rpc", &client->sdskv_put_id, &flag);
margo_registered_name(mid, "sdskv_put_multi_rpc", &client->sdskv_put_multi_id, &flag);
margo_registered_name(mid, "sdskv_bulk_put_rpc", &client->sdskv_bulk_put_id, &flag);
......@@ -62,7 +68,6 @@ static int sdskv_client_register(sdskv_client_t client, margo_instance_id mid)
margo_registered_name(mid, "sdskv_length_rpc", &client->sdskv_length_id, &flag);
margo_registered_name(mid, "sdskv_length_multi_rpc", &client->sdskv_length_multi_id, &flag);
margo_registered_name(mid, "sdskv_bulk_get_rpc", &client->sdskv_bulk_get_id, &flag);
margo_registered_name(mid, "sdskv_open_rpc", &client->sdskv_open_id, &flag);
margo_registered_name(mid, "sdskv_list_keys_rpc", &client->sdskv_list_keys_id, &flag);
margo_registered_name(mid, "sdskv_list_keyvals_rpc", &client->sdskv_list_keyvals_id, &flag);
margo_registered_name(mid, "sdskv_migrate_keys_rpc", &client->sdskv_migrate_keys_id, &flag);
......@@ -73,6 +78,12 @@ static int sdskv_client_register(sdskv_client_t client, margo_instance_id mid)
} else {
client->sdskv_open_id =
MARGO_REGISTER(mid, "sdskv_open_rpc", open_in_t, open_out_t, NULL);
client->sdskv_count_databases_id =
MARGO_REGISTER(mid, "sdskv_count_databases_rpc", void, count_db_out_t, NULL);
client->sdskv_list_databases_id =
MARGO_REGISTER(mid, "sdskv_list_databases_rpc", list_db_in_t, list_db_out_t, NULL);
client->sdskv_put_id =
MARGO_REGISTER(mid, "sdskv_put_rpc", put_in_t, put_out_t, NULL);
client->sdskv_put_multi_id =
......@@ -95,8 +106,6 @@ static int sdskv_client_register(sdskv_client_t client, margo_instance_id mid)
MARGO_REGISTER(mid, "sdskv_length_multi_rpc", length_multi_in_t, length_multi_out_t, NULL);
client->sdskv_bulk_get_id =
MARGO_REGISTER(mid, "sdskv_bulk_get_rpc", bulk_get_in_t, bulk_get_out_t, NULL);
client->sdskv_open_id =
MARGO_REGISTER(mid, "sdskv_open_rpc", open_in_t, open_out_t, NULL);
client->sdskv_list_keys_id =
MARGO_REGISTER(mid, "sdskv_list_keys_rpc", list_keys_in_t, list_keys_out_t, NULL);
client->sdskv_list_keyvals_id =
......@@ -233,6 +242,95 @@ int sdskv_open(
return ret;
}
int sdskv_count_databases(
sdskv_provider_handle_t provider,
size_t* num)
{
hg_return_t hret;
int ret;
count_db_out_t out;
hg_handle_t handle;
/* create handle */
hret = margo_create(
provider->client->mid,
provider->addr,
provider->client->sdskv_count_databases_id,
&handle);
if(hret != HG_SUCCESS) return SDSKV_ERR_MERCURY;
hret = margo_provider_forward(provider->provider_id, handle, NULL);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return SDSKV_ERR_MERCURY;
}
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return SDSKV_ERR_MERCURY;
}
ret = out.ret;
*num = out.count;
margo_free_output(handle, &out);
margo_destroy(handle);
return ret;
}
int sdskv_list_databases(
sdskv_provider_handle_t provider,
size_t* count,
char** db_names,
sdskv_database_id_t* db_ids)
{
hg_return_t hret;
int ret;
list_db_in_t in;
in.count = *count;
list_db_out_t out;
hg_handle_t handle;
/* create handle */
hret = margo_create(
provider->client->mid,
provider->addr,
provider->client->sdskv_list_databases_id,
&handle);
if(hret != HG_SUCCESS) return SDSKV_ERR_MERCURY;
hret = margo_provider_forward(provider->provider_id, handle, &in);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return SDSKV_ERR_MERCURY;
}
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS) {
margo_destroy(handle);
return SDSKV_ERR_MERCURY;
}
ret = out.ret;
if(ret == SDSKV_SUCCESS) {
*count = out.count;
unsigned i;
for(i = 0; i < out.count; i++) {
if(db_names != NULL) db_names[i] = strdup(out.db_names[i]);
if(db_ids != NULL) db_ids[i] = out.db_ids[i];
}
} else {
*count = 0;
}
margo_free_output(handle, &out);
margo_destroy(handle);
return ret;
}
int sdskv_put(sdskv_provider_handle_t provider,
sdskv_database_id_t db_id,
const void *key, hg_size_t ksize,
......
......@@ -53,6 +53,74 @@ MERCURY_GEN_PROC(open_in_t,
((hg_string_t)(name)))
MERCURY_GEN_PROC(open_out_t, ((uint64_t)(db_id)) ((int32_t)(ret)))
// ------------- COUNT DATABASES - //
MERCURY_GEN_PROC(count_db_out_t,\
((uint64_t)(count)) ((int32_t)(ret)))
// ------------- LIST DATABASES -- //
MERCURY_GEN_PROC(list_db_in_t, ((uint64_t)(count)))
typedef struct {
size_t count;
char** db_names;
uint64_t* db_ids;
int32_t ret;
} list_db_out_t;
static hg_return_t hg_proc_list_db_out_t(hg_proc_t proc, void* arg)
{
hg_return_t ret;
list_db_out_t *in = (list_db_out_t*)arg;
unsigned i;
ret = hg_proc_hg_size_t(proc, &in->count);
if(ret != HG_SUCCESS) return ret;
if (in->count) {
switch (hg_proc_get_op(proc)) {
case HG_ENCODE:
for(i=0; i < in->count; i++) {
ret = hg_proc_hg_string_t(proc, &(in->db_names[i]));
if(ret != HG_SUCCESS) return ret;
}
ret = hg_proc_raw(proc, in->db_ids, in->count*sizeof(*(in->db_ids)));
if(ret != HG_SUCCESS) return ret;
ret = hg_proc_int32_t(proc, &(in->ret));
if(ret != HG_SUCCESS) return ret;
break;
case HG_DECODE:
in->db_names = (char**)malloc(in->count*sizeof(char*));
in->db_ids = (uint64_t*)malloc(in->count*sizeof(uint64_t));
for(i=0; i < in->count; i++) {
ret = hg_proc_hg_string_t(proc, &(in->db_names[i]));
if(ret != HG_SUCCESS) {
free(in->db_names);
free(in->db_ids);
return ret;
}
}
ret = hg_proc_raw(proc, in->db_ids, in->count*sizeof(*(in->db_ids)));
if(ret != HG_SUCCESS) {
free(in->db_names);
free(in->db_ids);
return ret;
}
ret = hg_proc_int32_t(proc, &(in->ret));
if(ret != HG_SUCCESS) return ret;
break;
case HG_FREE:
for(i=0; i < in->count; i++) {
hg_proc_hg_string_t(proc, &(in->db_names[i]));
}
free(in->db_names);
free(in->db_ids);
break;
default:
break;
}
}
return HG_SUCCESS;
}
// ------------- PUT ------------- //
MERCURY_GEN_PROC(put_in_t, ((uint64_t)(db_id))\
((kv_data_t)(key))\
......
......@@ -30,6 +30,9 @@ struct sdskv_server_context_t
// operations. There should be something better to avoid locking everything
// but we are going with that for simplicity for now.
hg_id_t sdskv_open_id;
hg_id_t sdskv_count_databases_id;
hg_id_t sdskv_list_databases_id;
hg_id_t sdskv_put_id;
hg_id_t sdskv_put_multi_id;
hg_id_t sdskv_bulk_put_id;
......@@ -41,7 +44,6 @@ struct sdskv_server_context_t
hg_id_t sdskv_length_id;
hg_id_t sdskv_length_multi_id;
hg_id_t sdskv_bulk_get_id;
hg_id_t sdskv_open_id;
hg_id_t sdskv_list_keys_id;
hg_id_t sdskv_list_keyvals_id;
/* migration */
......@@ -64,13 +66,15 @@ inline scoped_call<F> at_exit(F&& f) {
return scoped_call<F>(std::forward<F>(f));
}
DECLARE_MARGO_RPC_HANDLER(sdskv_open_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_count_db_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_list_db_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_put_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_put_multi_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_length_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_length_multi_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_get_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_get_multi_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_open_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_bulk_put_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_bulk_get_ult)
DECLARE_MARGO_RPC_HANDLER(sdskv_list_keys_ult)
......@@ -140,6 +144,21 @@ extern "C" int sdskv_provider_register(
/* register RPCs */
hg_id_t rpc_id;
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_open_rpc",
open_in_t, open_out_t,
sdskv_open_ult, provider_id, abt_pool);
tmp_svr_ctx->sdskv_open_id = rpc_id;
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_count_databases_rpc",
void, count_db_out_t,
sdskv_count_db_ult, provider_id, abt_pool);
tmp_svr_ctx->sdskv_count_databases_id = rpc_id;
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_list_databases_rpc",
list_db_in_t, list_db_out_t,
sdskv_list_db_ult, provider_id, abt_pool);
tmp_svr_ctx->sdskv_list_databases_id = rpc_id;
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_put_rpc",
put_in_t, put_out_t,
sdskv_put_ult, provider_id, abt_pool);
......@@ -185,11 +204,6 @@ extern "C" int sdskv_provider_register(
sdskv_bulk_get_ult, provider_id, abt_pool);
tmp_svr_ctx->sdskv_bulk_get_id = rpc_id;
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_open_rpc",
open_in_t, open_out_t,
sdskv_open_ult, provider_id, abt_pool);
tmp_svr_ctx->sdskv_open_id = rpc_id;
margo_register_data(mid, rpc_id, (void*)tmp_svr_ctx, NULL);
rpc_id = MARGO_REGISTER_PROVIDER(mid, "sdskv_list_keys_rpc",
list_keys_in_t, list_keys_out_t,
sdskv_list_keys_ult, provider_id, abt_pool);
......@@ -418,6 +432,147 @@ extern "C" int sdskv_provider_set_migration_callbacks(
return SDSKV_SUCCESS;
}
static void sdskv_open_ult(hg_handle_t handle)
{
hg_return_t hret;
open_in_t in;
open_out_t out;
margo_instance_id mid = margo_hg_handle_get_instance(handle);
assert(mid);
const struct hg_info* info = margo_get_info(handle);
sdskv_provider_t svr_ctx =
(sdskv_provider_t)margo_registered_data(mid, info->id);
if(!svr_ctx) {
fprintf(stderr, "Error (sdskv_open_ult): SDSKV could not find provider with id\n");
out.ret = SDSKV_ERR_UNKNOWN_PR;
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
ABT_rwlock_rdlock(svr_ctx->lock);
auto it = svr_ctx->name2id.find(std::string(in.name));
if(it == svr_ctx->name2id.end()) {
ABT_rwlock_unlock(svr_ctx->lock);
out.ret = SDSKV_ERR_DB_NAME;
margo_respond(handle, &out);
margo_free_input(handle, &in);
margo_destroy(handle);
return;
}
auto db = it->second;
ABT_rwlock_unlock(svr_ctx->lock);
out.db_id = db;
out.ret = SDSKV_SUCCESS;
margo_respond(handle, &out);
margo_free_input(handle, &in);
margo_destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(sdskv_open_ult)
static void sdskv_count_db_ult(hg_handle_t handle)
{
hg_return_t hret;
count_db_out_t out;
margo_instance_id mid = margo_hg_handle_get_instance(handle);
assert(mid);
const struct hg_info* info = margo_get_info(handle);
sdskv_provider_t svr_ctx =
(sdskv_provider_t)margo_registered_data(mid, info->id);
if(!svr_ctx) {
fprintf(stderr, "Error (sdskv_count_db_ult): SDSKV could not find provider with id\n");
out.ret = SDSKV_ERR_UNKNOWN_PR;
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
uint64_t count;
sdskv_provider_count_databases(svr_ctx, &count);
out.count = count;
out.ret = SDSKV_SUCCESS;
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(sdskv_count_db_ult)
static void sdskv_list_db_ult(hg_handle_t handle)
{
hg_return_t hret;
list_db_in_t in;
list_db_out_t out;
margo_instance_id mid = margo_hg_handle_get_instance(handle);
assert(mid);
const struct hg_info* info = margo_get_info(handle);
sdskv_provider_t svr_ctx =
(sdskv_provider_t)margo_registered_data(mid, info->id);
if(!svr_ctx) {
fprintf(stderr, "Error (sdskv_list_db_ult): SDSKV could not find provider with id\n");
out.ret = SDSKV_ERR_UNKNOWN_PR;
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
std::vector<std::string> db_names;
std::vector<uint64_t> db_ids;
out.count = 0;
ABT_rwlock_rdlock(svr_ctx->lock);
unsigned i = 0;
for(const auto& p : svr_ctx->name2id) {
if(i >= in.count)
break;
db_names.push_back(p.first);
db_ids.push_back(p.second);
i += 1;
}
ABT_rwlock_unlock(svr_ctx->lock);
out.count = i;
out.db_names = (char**)malloc(out.count*sizeof(char*));
for(i=0; i < out.count; i++) {
out.db_names[i] = const_cast<char*>(db_names[i].c_str());
}
out.db_ids = &db_ids[0];
out.ret = SDSKV_SUCCESS;
margo_respond(handle, &out);
margo_free_input(handle, &in);
margo_destroy(handle);
free(out.db_names);
return;
}
DEFINE_MARGO_RPC_HANDLER(sdskv_list_db_ult)
static void sdskv_put_ult(hg_handle_t handle)
{
hg_return_t hret;
......@@ -944,56 +1099,6 @@ static void sdskv_length_multi_ult(hg_handle_t handle)
}
DEFINE_MARGO_RPC_HANDLER(sdskv_length_multi_ult)
static void sdskv_open_ult(hg_handle_t handle)
{
hg_return_t hret;
open_in_t in;
open_out_t out;
margo_instance_id mid = margo_hg_handle_get_instance(handle);
assert(mid);
const struct hg_info* info = margo_get_info(handle);
sdskv_provider_t svr_ctx =
(sdskv_provider_t)margo_registered_data(mid, info->id);
if(!svr_ctx) {
fprintf(stderr, "Error (sdskv_open_ult): SDSKV could not find provider with id\n");
out.ret = SDSKV_ERR_UNKNOWN_PR;
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
hret = margo_get_input(handle, &in);
if(hret != HG_SUCCESS) {
out.ret = SDSKV_ERR_MERCURY;
margo_respond(handle, &out);
margo_destroy(handle);
return;
}
ABT_rwlock_rdlock(svr_ctx->lock);
auto it = svr_ctx->name2id.find(std::string(in.name));
if(it == svr_ctx->name2id.end()) {
ABT_rwlock_unlock(svr_ctx->lock);
out.ret = SDSKV_ERR_DB_NAME;
margo_respond(handle, &out);
margo_free_input(handle, &in);
margo_destroy(handle);
return;
}
auto db = it->second;
ABT_rwlock_unlock(svr_ctx->lock);
out.db_id = db;
out.ret = SDSKV_SUCCESS;
margo_respond(handle, &out);
margo_free_input(handle, &in);
margo_destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(sdskv_open_ult)
static void sdskv_bulk_put_ult(hg_handle_t handle)
{
......
#!/bin/bash -x
if [ -z $srcdir ]; then
echo srcdir variable not set.
exit 1
fi
source $srcdir/test/test-util.sh
find_db_name
# start a server with 2 second wait,
# 20s timeout, and my_test_db as database
test_start_server 2 20 $test_db_full
sleep 1
#####################
# tear down
run_to 10 test/sdskv-list-db-test $svr_addr 1
if [ $? -ne 0 ]; then
wait
exit 1
fi
wait
echo cleaning up $TMPBASE
rm -rf $TMPBASE
exit 0
/*
* (C) 2015 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <vector>
#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <margo.h>
#include <string>
#include "sdskv-client.h"
int main(int argc, char *argv[])
{
char cli_addr_prefix[64] = {0};
char *sdskv_svr_addr_str;
char *db_name;
margo_instance_id mid;
hg_addr_t svr_addr;
uint8_t mplex_id;
sdskv_client_t kvcl;