Commit 4ad2c7d5 authored by Matthieu Dorier's avatar Matthieu Dorier

omap is now managed by sdskv

parent 6f285fdd
......@@ -9,6 +9,7 @@
#include <margo.h>
#include <bake-client.h>
#include <sdskv-client.h>
/* server-side utilities and routines. Clients are looking for either
* libmobject-store.h or librados-mobject-store.h */
......@@ -24,6 +25,7 @@ typedef struct mobject_server_context* mobject_provider_t;
* @param[in] mplex_id multiplex id of the provider
* @param[in] pool Argobots pool for the provider
* @param[in] bake_ph Bake provider handle to use to write/read data
* @param[in] sdskv_ph SDSKV provider handle to use to access metadata
* @param[in] cluster_file file name to write cluster connect info to
* @param[out] provider resulting provider
*
......@@ -34,6 +36,7 @@ int mobject_provider_register(
uint8_t mplex_id,
ABT_pool pool,
bake_provider_handle_t bake_ph,
sdskv_provider_handle_t sdskv_ph,
const char *cluster_file,
mobject_provider_t* provider);
......
......@@ -9,6 +9,7 @@
#include "src/io-chain/read-op-visitor.h"
#include "src/io-chain/read-resp-impl.h"
#include "src/omap-iter/omap-iter-impl.h"
#include "src/server/core/key-types.h"
#include "src/server/core/fake-kv.hpp"
#include "src/server/core/covermap.hpp"
......@@ -137,7 +138,9 @@ void read_op_exec_omap_get_keys(void* u, const char* start_after, uint64_t max_r
{
auto vargs = static_cast<server_visitor_args_t>(u);
const char* object_name = vargs->object_name;
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t omap_db_id = vargs->srv_ctx->omap_db_id;
int ret;
*prval = 0;
// find oid
......@@ -146,26 +149,56 @@ void read_op_exec_omap_get_keys(void* u, const char* start_after, uint64_t max_r
return;
}
oid_t oid = name_map[object_name];
omap_iter_create(iter);
omap_key_t lb;
lb.oid = oid;
lb.key = std::string(start_after);
size_t lb_size = sizeof(omap_key_t)+strlen(start_after);
omap_key_t* lb = (omap_key_t*)calloc(1, lb_size);
lb->oid = oid;
strcpy(lb->key, start_after);
auto it = omap_map.lower_bound(lb);
if(it == omap_map.end()) return;
if(it->first.key == lb.key) it++;
hg_size_t max_keys = 10;
hg_size_t key_len = 128;
std::vector<void*> keys(max_keys);
std::vector<hg_size_t> ksizes(max_keys, key_len);
std::vector<std::vector<char>> buffers(max_keys, std::vector<char>(key_len));
for(auto i=0; i < max_keys; i++) keys[i] = (void*)buffers[i].data();
for(uint64_t i=0; it != omap_map.end() && i < max_return; it++, i++) {
omap_iter_append(*iter, it->first.key.c_str(), nullptr, 0);
}
hg_size_t keys_retrieved = max_keys;
hg_size_t count = 0;
do {
ret = sdskv_list_keys(sdskv_ph, omap_db_id,
(const void*)lb, lb_size,
keys.data(), ksizes.data(),
&keys_retrieved);
if(ret != SDSKV_SUCCESS) {
*prval = -1;
break;
}
const char* k;
for(auto i = 0; i < keys_retrieved && count < max_return; i++, count++) {
// extract the actual key part, without the oid
k = ((omap_key_t*)keys[i])->key;
/* this key is not part of the same object, we should leave the loop */
if(((omap_key_t*)keys[i])->oid != oid) goto out; /* ugly way of leaving the loop, I know ... */
omap_iter_append(*iter, k, nullptr, 0);
}
strcpy(lb->key, k);
lb_size = strlen(k) + sizeof(omap_key_t);
} while(keys_retrieved == max_keys && count < max_return);
out:
free(lb);
}
void read_op_exec_omap_get_vals(void* u, const char* start_after, const char* filter_prefix, uint64_t max_return, mobject_store_omap_iter_t* iter, int* prval)
{
auto vargs = static_cast<server_visitor_args_t>(u);
const char* object_name = vargs->object_name;
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t omap_db_id = vargs->srv_ctx->omap_db_id;
int ret;
*prval = 0;
// find oid
......@@ -175,29 +208,82 @@ void read_op_exec_omap_get_vals(void* u, const char* start_after, const char* fi
}
oid_t oid = name_map[object_name];
hg_size_t max_items = 10;
// TODO make this changeable
hg_size_t key_len = 128 + sizeof(omap_key_t);
hg_size_t val_len = 256;
omap_iter_create(iter);
omap_key_t lb;
lb.oid = oid;
lb.key = std::string(start_after);
auto it = omap_map.lower_bound(lb);
if(it == omap_map.end()) return;
if(it->first.key == lb.key) it++;
/* omap_key_t equivalent of start_key */
hg_size_t lb_size = sizeof(omap_key_t)+strlen(start_after);
omap_key_t* lb = (omap_key_t*)calloc(1, lb_size);
lb->oid = oid;
strcpy(lb->key, start_after);
std::string prefix(filter_prefix);
for(uint64_t i=0; it != omap_map.end() && i < max_return; it++, i++) {
const std::string& k = it->first.key;
if(k.compare(0, prefix.size(), prefix) == 0) {
omap_iter_append(*iter, k.c_str(), &(it->second[0]), it->second.size());
}
/* omap_key_t equivalent of the filter_prefix */
hg_size_t prefix_size = sizeof(omap_key_t)+strlen(filter_prefix);
omap_key_t* prefix = (omap_key_t*)calloc(1, prefix_size);
prefix->oid = oid;
strcpy(prefix->key, filter_prefix);
/* key entry used to navigate the returned keys */
omap_key_t* key = (omap_key_t*)calloc(1, key_len);
key->oid = oid;
/* initialize structures to pass to SDSKV functions */
std::vector<void*> keys(max_items);
std::vector<void*> vals(max_items);
std::vector<hg_size_t> ksizes(max_items, key_len);
std::vector<hg_size_t> vsizes(max_items, val_len);
std::vector<std::vector<char>> key_buffers(max_items, std::vector<char>(key_len));
std::vector<std::vector<char>> val_buffers(max_items, std::vector<char>(val_len));
for(auto i=0; i < max_items; i++) {
keys[i] = (void*)key_buffers[i].data();
vals[i] = (void*)val_buffers[i].data();
}
hg_size_t items_retrieved = max_items;
hg_size_t count = 0;
do {
ret = sdskv_list_keyvals_with_prefix(
sdskv_ph, omap_db_id,
(const void*)lb, lb_size,
(const void*)prefix, prefix_size,
keys.data(), ksizes.data(),
vals.data(), vsizes.data(),
&items_retrieved);
if(ret != SDSKV_SUCCESS) {
*prval = -1;
break;
}
const char* k;
for(auto i = 0; i < items_retrieved && count < max_return; i++, count++) {
// extract the actual key part, without the oid
k = ((omap_key_t*)keys[i])->key;
/* this key is not part of the same object, we should leave the loop */
if(((omap_key_t*)keys[i])->oid != oid) goto out; /* ugly way of leaving the loop, I know ... */
omap_iter_append(*iter, k, (const char*)vals[i], vsizes[i]);
}
strcpy(lb->key, k);
lb_size = strlen(k) + sizeof(omap_key_t);
} while(items_retrieved == max_items && count < max_return);
out:
free(lb);
free(key);
}
void read_op_exec_omap_get_vals_by_keys(void* u, char const* const* keys, size_t num_keys, mobject_store_omap_iter_t* iter, int* prval)
{
auto vargs = static_cast<server_visitor_args_t>(u);
const char* object_name = vargs->object_name;
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t omap_db_id = vargs->srv_ctx->omap_db_id;
int ret;
*prval = 0;
// find oid
......@@ -209,15 +295,36 @@ void read_op_exec_omap_get_vals_by_keys(void* u, char const* const* keys, size_t
omap_iter_create(iter);
omap_key_t key;
key.oid = oid;
// figure out key sizes
std::vector<hg_size_t> ksizes(num_keys);
hg_size_t max_ksize = 0;
for(auto i=0; i < num_keys; i++) {
hg_size_t s = strlen(keys[i])+1;
if(s > max_ksize) max_ksize = s;
ksizes[i] = s;
}
for(size_t i=0; i<num_keys; i++) {
key.key = keys[i];
auto it = omap_map.find(key);
if(it != omap_map.end()) {
omap_iter_append(*iter, keys[i], &(it->second[0]), it->second.size());
omap_key_t* key = (omap_key_t*)calloc(1, sizeof(omap_key_t)+max_ksize);
key->oid = oid;
for(size_t i=0; i < num_keys; i++) {
strcpy(key->key, keys[i]);
// get length of the value
hg_size_t vsize;
ret = sdskv_length(sdskv_ph, omap_db_id,
(const void*)key, ksizes[i], &vsize);
if(ret != SDSKV_SUCCESS) {
*prval = -1;
break;
}
std::vector<char> value(vsize);
ret = sdskv_get(sdskv_ph, omap_db_id,
(const void*)key, ksizes[i],
(void*)value.data(), &vsize);
if(ret != SDSKV_SUCCESS) {
*prval = -1;
break;
}
omap_iter_append(*iter, keys[i], value.data(), vsize);
}
}
......
......@@ -217,9 +217,20 @@ void write_op_exec_omap_set(void* u, char const* const* keys,
const size_t *lens,
size_t num)
{
int ret;
auto vargs = static_cast<server_visitor_args_t>(u);
oid_t oid = get_or_create_oid(vargs->object_name);
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t omap_db_id = vargs->srv_ctx->omap_db_id;
for(auto i=0; i<num; i++) {
ret = sdskv_put(sdskv_ph, omap_db_id,
(const void*)keys[i], strlen(keys[i])+1,
(const void*)vals[i], lens[i]);
if(ret != SDSKV_SUCCESS) {
fprintf(stderr, "write_op_exec_omap_set: error in sdskv_put() (ret = %d)\n", ret);
}
}
#if 0
for(auto i=0; i<num; i++) {
std::vector<char> val(vals[i], vals[i]+lens[i]);
omap_key_t omk;
......@@ -227,19 +238,31 @@ void write_op_exec_omap_set(void* u, char const* const* keys,
omk.key = std::string(keys[i]);
omap_map[std::move(omk)] = std::move(val);
}
#endif
}
void write_op_exec_omap_rm_keys(void* u, char const* const* keys, size_t num_keys)
{
auto vargs = static_cast<server_visitor_args_t>(u);
oid_t oid = get_or_create_oid(vargs->object_name);
int ret;
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t omap_db_id = vargs->srv_ctx->omap_db_id;
for(auto i=0; i<num_keys; i++) {
ret = sdskv_erase(sdskv_ph, omap_db_id,
(const void*)keys[i], strlen(keys[i])+1);
if(ret != SDSKV_SUCCESS)
fprintf(stderr, "write_op_exec_omap_rm_keys: error in sdskv_erase() (ret = %d)\n", ret);
}
#if 0
for(auto i=0; i < num_keys; i++) {
omap_key_t omk;
omk.oid = oid;
omk.key = std::string(keys[i]);
omap_map.erase(omk);
}
#endif
}
oid_t get_or_create_oid(const char* object_name)
......
......@@ -3,6 +3,8 @@
#include <string>
#include <bake-client.h>
#include "src/server/core/key-types.h"
/*
typedef uint64_t oid_t;
enum class seg_type_t : std::int32_t {
......@@ -26,7 +28,7 @@ struct omap_key_t {
};
#define SMALL_REGION_THRESHOLD (sizeof(bake_region_id_t))
*/
bool operator<(const segment_key_t& s1, const segment_key_t& s2);
bool operator<(const omap_key_t& k1, const omap_key_t& k2);
......
#ifndef __CORE_KEY_TYPES_H
#define __CORE_KEY_TYPES_H
#include <stdint.h>
#include <bake-client.h>
typedef uint64_t oid_t;
typedef enum seg_type_t {
ZERO = 0,
BAKE_REGION = 1,
SMALL_REGION = 2,
TOMBSTONE = 3
} seg_type_t;
typedef struct segment_key_t {
oid_t oid;
uint32_t type; /* seg_type */
double timestamp;
uint64_t start_index; // first index, included
uint64_t end_index; // end index is not included
} segment_key_t;
typedef struct omap_key_t {
oid_t oid;
char key[1];
} omap_key_t;
#define SMALL_REGION_THRESHOLD (sizeof(bake_region_id_t))
#endif
......@@ -8,8 +8,8 @@
#include <margo.h>
//#include <sds-keyval.h>
#include <bake-server.h>
#include <bake-client.h>
#include <sdskv-client.h>
#include <ssg-mpi.h>
#ifdef __cplusplus
......@@ -27,6 +27,12 @@ struct mobject_server_context
/* bake-related data */
bake_provider_handle_t bake_ph;
bake_target_id_t bake_tid;
/* sdskv-related data */
sdskv_provider_handle_t sdskv_ph;
sdskv_database_id_t oid_db_id;
sdskv_database_id_t name_db_id;
sdskv_database_id_t segment_db_id;
sdskv_database_id_t omap_db_id;
/* other data */
int ref_count;
};
......
......@@ -9,8 +9,11 @@
#include <ssg.h>
#include <bake-client.h>
#include <bake-server.h>
#include <sdskv-client.h>
#include <sdskv-server.h>
#include "mobject-server.h"
#include "src/server/core/key-types.h"
void usage(void)
{
......@@ -25,7 +28,19 @@ typedef struct {
bake_provider_handle_t provider_handle;
} bake_client_data;
typedef struct {
sdskv_client_t client;
sdskv_provider_handle_t provider_handle;
} sdskv_client_data;
static void finalize_bake_client_cb(void* data);
static void finalize_sdskv_client_cb(void* data);
/* comparison functions for SDSKV */
static int oid_map_compare(const void*, size_t, const void*, size_t);
static int name_map_compare(const void*, size_t, const void*, size_t);
static int seg_map_compare(const void*, size_t, const void*, size_t);
static int omap_map_compare(const void*, size_t, const void*, size_t);
int main(int argc, char *argv[])
{
......@@ -60,6 +75,8 @@ int main(int argc, char *argv[])
return -1;
}
/* TODO assert the return value of all the calls below */
/* Get self address */
hg_addr_t self_addr;
margo_addr_self(mid, &self_addr);
......@@ -72,7 +89,6 @@ int main(int argc, char *argv[])
bake_target_id_t bake_tid;
bake_provider_register(mid, bake_mplex_id, BAKE_ABT_POOL_DEFAULT, &bake_prov);
bake_provider_add_storage_target(bake_prov, bake_target_name, &bake_tid);
/* TODO check return value of above calls */
/* Bake provider handle initialization from self addr */
bake_client_data bake_clt_data;
......@@ -80,10 +96,34 @@ int main(int argc, char *argv[])
bake_provider_handle_create(bake_clt_data.client, self_addr, bake_mplex_id, &(bake_clt_data.provider_handle));
margo_push_finalize_callback(mid, &finalize_bake_client_cb, (void*)&bake_clt_data);
/* SDSKV provider initialization */
uint8_t sdskv_mplex_id = 1;
sdskv_provider_t sdskv_prov;
sdskv_database_id_t oid_map_id, name_map_id, seg_map_id, omap_map_id;
sdskv_provider_register(mid, sdskv_mplex_id, SDSKV_ABT_POOL_DEFAULT, &sdskv_prov);
sdskv_provider_add_database(sdskv_prov, "oid_map", KVDB_MAP, &oid_map_compare, &oid_map_id);
fprintf(stderr, "oid_map at id %ld\n", oid_map_id);
sdskv_provider_add_database(sdskv_prov, "name_map", KVDB_MAP, &name_map_compare, &name_map_id);
fprintf(stderr, "name_map at id %ld\n", name_map_id);
sdskv_provider_add_database(sdskv_prov, "seg_map", KVDB_MAP, &seg_map_compare, &seg_map_id);
fprintf(stderr, "seg_map at id %ld\n", seg_map_id);
sdskv_provider_add_database(sdskv_prov, "omap_map", KVDB_MAP, &omap_map_compare, &omap_map_id);
fprintf(stderr, "omap_map at id %ld\n", omap_map_id);
/* SDSKV provider handle initialization from self addr */
sdskv_client_data sdskv_clt_data;
sdskv_client_init(mid, &(sdskv_clt_data.client));
sdskv_provider_handle_create(sdskv_clt_data.client, self_addr, sdskv_mplex_id, &(sdskv_clt_data.provider_handle));
margo_push_finalize_callback(mid, &finalize_sdskv_client_cb, (void*)&sdskv_clt_data);
/* Mobject provider initialization */
mobject_provider_t mobject_prov;
ret = mobject_provider_register(mid, 1, MOBJECT_ABT_POOL_DEFAULT,
bake_clt_data.provider_handle, cluster_file, &mobject_prov);
ret = mobject_provider_register(mid, 1,
MOBJECT_ABT_POOL_DEFAULT,
bake_clt_data.provider_handle,
sdskv_clt_data.provider_handle,
cluster_file, &mobject_prov);
if (ret != 0)
{
fprintf(stderr, "Error: Unable to initialize mobject provider\n");
......@@ -104,3 +144,63 @@ static void finalize_bake_client_cb(void* data)
bake_provider_handle_release(clt_data->provider_handle);
bake_client_finalize(clt_data->client);
}
static void finalize_sdskv_client_cb(void* data)
{
sdskv_client_data* clt_data = (sdskv_client_data*)data;
sdskv_provider_handle_release(clt_data->provider_handle);
sdskv_client_finalize(clt_data->client);
}
static int oid_map_compare(const void* k1, size_t sk1, const void* k2, size_t sk2)
{
// keys are oid_t (uint64_t)
oid_t x = *((oid_t*)k1);
oid_t y = *((oid_t*)k2);
if(x == y) return 0;
if(x < y) return -1;
return 1;
}
static int name_map_compare(const void* k1, size_t sk1, const void* k2, size_t sk2)
{
// names are strings (const char*)
const char* n1 = (const char*)k1;
const char* n2 = (const char*)k2;
return strcmp(n1,n2);
}
static int seg_map_compare(const void* k1, size_t sk1, const void* k2, size_t sk2)
{
// segments are as follows:
/*struct segment_key_t {
oid_t oid;
seg_type_t type;
double timestamp;
uint64_t start_index;
uint64_t end_index;
};
*/
const segment_key_t* seg1 = (const segment_key_t*)k1;
const segment_key_t* seg2 = (const segment_key_t*)k2;
if(seg1->oid < seg2->oid) return -1;
if(seg1->oid > seg2->oid) return 1;
if(seg1->timestamp < seg2->timestamp) return -1;
if(seg1->timestamp > seg2->timestamp) return 1;
return 0;
}
static int omap_map_compare(const void* k1, size_t sk1, const void* k2, size_t sk2)
{
// omap keys are as follows:
/* struct omap_key_t {
oid_t oid;
char key[1];
};
*/
const omap_key_t* ok1 = (const omap_key_t*)k1;
const omap_key_t* ok2 = (const omap_key_t*)k2;
if(ok1->oid < ok2->oid) return -1;
if(ok1->oid > ok2->oid) return 1;
return strcmp(ok1->key, ok2->key);
}
......@@ -10,9 +10,6 @@
#include <mpi.h>
#include <abt.h>
#include <margo.h>
//#include <sds-keyval.h>
#include <bake-server.h>
#include <bake-client.h>
#include <ssg-mpi.h>
#include "mobject-server.h"
......@@ -42,6 +39,7 @@ int mobject_provider_register(
uint8_t mplex_id,
ABT_pool pool,
bake_provider_handle_t bake_ph,
sdskv_provider_handle_t sdskv_ph,
const char *cluster_file,
mobject_provider_t* provider)
{
......@@ -113,6 +111,41 @@ int mobject_provider_register(
free(srv_ctx);
return -1;
}
/* SDSKV settings initialization */
sdskv_provider_handle_ref_incr(sdskv_ph);
srv_ctx->sdskv_ph = sdskv_ph;
ret = sdskv_open(sdskv_ph, "oid_map", &(srv_ctx->oid_db_id));
if(ret != SDSKV_SUCCESS) {
fprintf(stderr, "Error: unable to open oid_map from SDSKV provider\n");
ssg_group_destroy(srv_ctx->gid);
bake_provider_handle_release(srv_ctx->bake_ph);
sdskv_provider_handle_release(srv_ctx->sdskv_ph);
free(srv_ctx);
}
ret = sdskv_open(sdskv_ph, "name_map", &(srv_ctx->name_db_id));
if(ret != SDSKV_SUCCESS) {
fprintf(stderr, "Error: unable to open name_map from SDSKV provider\n");
bake_provider_handle_release(srv_ctx->bake_ph);
sdskv_provider_handle_release(srv_ctx->sdskv_ph);
ssg_group_destroy(srv_ctx->gid);
free(srv_ctx);
}
ret = sdskv_open(sdskv_ph, "seg_map", &(srv_ctx->segment_db_id));
if(ret != SDSKV_SUCCESS) {
fprintf(stderr, "Error: unable to open seg_map from SDSKV provider\n");
bake_provider_handle_release(srv_ctx->bake_ph);
sdskv_provider_handle_release(srv_ctx->sdskv_ph);
ssg_group_destroy(srv_ctx->gid);
free(srv_ctx);
}
ret = sdskv_open(sdskv_ph, "omap_map", &(srv_ctx->omap_db_id));
if(ret != SDSKV_SUCCESS) {
fprintf(stderr, "Error: unable to open omap_map from SDSKV provider\n");
bake_provider_handle_release(srv_ctx->bake_ph);
sdskv_provider_handle_release(srv_ctx->sdskv_ph);
ssg_group_destroy(srv_ctx->gid);
free(srv_ctx);
}
hg_id_t rpc_id;
......
......@@ -20,6 +20,10 @@ function mobject_test_start_servers()
startwait=${2:-15}
maxtime=${3:-120}
cfile=${4:-/tmp/mobject-connect-cluster.gid}
storage=${5:-/dev/shm/mobject.dat}
rm -rf ${storage}
bake-mkpool -s 50M /dev/shm/mobject.dat
run_to $maxtime mpirun -np $nservers src/server/mobject-server-daemon na+sm:// $cfile &
if [ $? -ne 0 ]; then
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment