Commit 262c744e authored by Matthieu Dorier's avatar Matthieu Dorier

started converting configuration file to use 5 databases instead of 1

parent 11f84b91
......@@ -25,19 +25,22 @@ namespace hepnos {
// DataStoreImpl implementation
////////////////////////////////////////////////////////////////////////////////////////////
struct DistributedDBInfo {
std::vector<sdskv::database> dbs;
struct ch_placement_instance* chi = nullptr;
};
class DataStoreImpl {
public:
margo_instance_id m_mid; // Margo instance
std::unordered_map<std::string,hg_addr_t> m_addrs; // Addresses used by the service
sdskv::client m_sdskv_client; // SDSKV client
std::vector<sdskv::database> m_databases; // list of SDSKV databases
struct ch_placement_instance* m_chi_sdskv; // ch-placement instance for SDSKV
DistributedDBInfo m_databases; // list of SDSKV databases
const DataStore::iterator m_end; // iterator for the end() of the DataStore
DataStoreImpl()
: m_mid(MARGO_INSTANCE_NULL)
, m_chi_sdskv(nullptr)
, m_end() {}
void init(const std::string& configFile) {
......@@ -61,9 +64,12 @@ class DataStoreImpl {
throw Exception("Could not create SDSKV client (SDSKV error="+std::to_string(ex.error())+")");
}
// create list of sdskv provider handles
YAML::Node sdskv = config["hepnos"]["providers"]["sdskv"];
for(YAML::const_iterator it = sdskv.begin(); it != sdskv.end(); it++) {
std::string str_addr = it->first.as<std::string>();
YAML::Node databases = config["hepnos"]["databases"];
YAML::Node dataset_db = databases["datasets"];
for(YAML::const_iterator address_it = dataset_db.begin(); address_it != dataset_db.end(); address_it++) {
std::string str_addr = address_it->first.as<std::string>();
YAML::Node providers = address_it->second;
// lookup the address
hg_addr_t addr;
if(m_addrs.count(str_addr) != 0) {
addr = m_addrs[str_addr];
......@@ -76,33 +82,29 @@ class DataStoreImpl {
}
m_addrs[str_addr] = addr;
}
// get the number of providers
uint16_t num_providers = it->second.as<uint16_t>();
for(uint16_t provider_id = 0 ; provider_id < num_providers; provider_id++) {
std::vector<sdskv::database> dbs;
try {
sdskv::provider_handle ph(m_sdskv_client, addr, provider_id);
dbs = m_sdskv_client.open(ph);
} catch(sdskv::exception& ex) {
cleanup();
throw Exception("Could not open databases (SDSKV error="+std::to_string(ex.error())+")");
// iterate over providers for this address
for(YAML::const_iterator provider_it = providers.begin(); provider_it != providers.end(); provider_it++) {
// get the provider id
uint16_t provider_id = provider_it->first.as<uint16_t>();
// create provider handle
sdskv::provider_handle ph(m_sdskv_client, addr, provider_id);
// get the database ids
YAML::Node databases = provider_it->second;
// iterate over databases for this provider
for(unsigned i=0; i < databases.size(); i++) {
m_databases.dbs.push_back(sdskv::database(ph, databases[i].as<uint64_t>()));
}
if(dbs.size() == 0) {
continue;
}
for(auto& db : dbs)
m_databases.push_back(db);
}
}
} // for each provider
} // for each address
// initialize ch-placement for the SDSKV providers
m_chi_sdskv = ch_placement_initialize("hash_lookup3", m_databases.size(), 4, 0);
m_databases.chi = ch_placement_initialize("hash_lookup3", m_databases.dbs.size(), 4, 0);
}
void cleanup() {
m_databases.clear();
m_databases.dbs.clear();
m_sdskv_client = sdskv::client();
if(m_chi_sdskv)
ch_placement_finalize(m_chi_sdskv);
if(m_databases.chi)
ch_placement_finalize(m_databases.chi);
for(auto& addr : m_addrs) {
margo_addr_free(m_mid, addr.second);
}
......@@ -127,27 +129,26 @@ class DataStoreImpl {
if(!protoNode) {
throw Exception("\"protocol\" entry not found in \"client\" section");
}
// hepnos entry has providers entry
auto providersNode = hepnosNode["providers"];
if(!providersNode) {
throw Exception("\"providers\" entry not found in \"hepnos\" section");
}
// provider entry has sdskv entry
auto sdskvNode = providersNode["sdskv"];
if(!sdskvNode) {
throw Exception("\"sdskv\" entry not found in \"providers\" section");
}
// sdskv entry is not empty
if(sdskvNode.size() == 0) {
throw Exception("No provider found in \"sdskv\" section");
// hepnos entry has databases entry
auto databasesNode = hepnosNode["databases"];
if(!databasesNode) {
throw Exception("\"databasess\" entry not found in \"hepnos\" section");
}
// for each sdskv entry
for(auto it = sdskvNode.begin(); it != sdskvNode.end(); it++) {
if(it->second.IsScalar()) continue; // one provider id given
else {
throw Exception("Invalid value type for provider in \"sdskv\" section");
if(!databasesNode.IsMap()) {
throw Exception("\"databases\" entry should be a map");
} /*
for(auto provider_it = databasesNode.begin(); provider_it != databasesNode.end(); provider_it++) {
// provider entry should be a sequence
if(!provider.IsSequence()) {
throw Exception("provider entry should be a sequence");
}
for(auto db : provider) {
if(!db.IsScalar()) {
throw Exception("database id should be a scalar");
}
}
}
*/
}
public:
......@@ -181,7 +182,7 @@ class DataStoreImpl {
// use the complete name for final objects (level 0)
name_hash = hashString(key);
}
ch_placement_find_closest(m_chi_sdskv, name_hash, 1, &sdskv_db_idx);
ch_placement_find_closest(m_databases.chi, name_hash, 1, &sdskv_db_idx);
return sdskv_db_idx;
}
......@@ -193,7 +194,7 @@ class DataStoreImpl {
// find out which DB to access
long unsigned sdskv_db_idx = computeDbIndex(level, containerName, key);
// make corresponding datastore entry
auto& db = m_databases[sdskv_db_idx];
auto& db = m_databases.dbs[sdskv_db_idx];
// read the value
if(data.size() == 0)
data.resize(2048); // eagerly allocate 2KB
......@@ -216,7 +217,7 @@ class DataStoreImpl {
// find out which DB to access
long unsigned sdskv_db_idx = computeDbIndex(level, containerName, key);
// make corresponding datastore entry
auto& db = m_databases[sdskv_db_idx];
auto& db = m_databases.dbs[sdskv_db_idx];
// read the value
try {
db.get(key.data(), key.size(), value, vsize);
......@@ -239,7 +240,7 @@ class DataStoreImpl {
// find out which DB to access
long unsigned sdskv_db_idx = computeDbIndex(level, containerName, key);
// make corresponding datastore entry
auto& db = m_databases[sdskv_db_idx];
auto& db = m_databases.dbs[sdskv_db_idx];
try {
return db.exists(key);
} catch(sdskv::exception& ex) {
......@@ -255,7 +256,7 @@ class DataStoreImpl {
// find out which DB to access
long unsigned sdskv_db_idx = computeDbIndex(level, containerName, key);
// Create the product id
const auto& db = m_databases[sdskv_db_idx];
const auto& db = m_databases.dbs[sdskv_db_idx];
try {
db.put(key.data(), key.size(), data, data_size);
} catch(sdskv::exception& ex) {
......@@ -272,7 +273,7 @@ class DataStoreImpl {
const std::vector<std::string>& keys,
const std::vector<std::string>& values) {
// Create the product id
const auto& db = m_databases[db_index];
const auto& db = m_databases.dbs[db_index];
try {
db.put_multi(keys, values);
} catch(sdskv::exception& ex) {
......@@ -288,11 +289,11 @@ class DataStoreImpl {
// hash the name to get the provider id
long unsigned db_idx = 0;
uint64_t h = hashString(containerName);
ch_placement_find_closest(m_chi_sdskv, h, 1, &db_idx);
ch_placement_find_closest(m_databases.chi, h, 1, &db_idx);
// make an entry for the lower bound
auto lb_entry = buildKey(level, containerName, lower);
// get provider and database
const auto& db = m_databases[db_idx];
const auto& db = m_databases.dbs[db_idx];
// ignore keys that don't have the same level or the same prefix
std::string prefix(2+containerName.size(), '\0');
prefix[0] = level;
......
......@@ -4,27 +4,16 @@
namespace hepnos {
struct ConnectionInfoGenerator::Impl {
std::string m_addr; // address of this process
uint16_t m_num_bake_providers; // number of BAKE provider ids
uint16_t m_num_sdskv_providers; // number of SDSKV provider ids
};
ConnectionInfoGenerator::ConnectionInfoGenerator(
const std::string& address,
uint16_t sdskv_providers,
uint16_t bake_providers)
: m_impl(std::make_unique<Impl>()) {
m_impl->m_addr = address;
m_impl->m_num_bake_providers = bake_providers;
m_impl->m_num_sdskv_providers = sdskv_providers;
}
const std::string& address,
const ServiceConfig& config)
: address(address), serviceConfig(config) {}
ConnectionInfoGenerator::~ConnectionInfoGenerator() {}
void ConnectionInfoGenerator::generateFile(MPI_Comm comm, const std::string& filename) const {
int rank, size;
auto addr_cpy = m_impl->m_addr;
auto addr_cpy = address;
addr_cpy.resize(1024,'\0');
const char* addr = addr_cpy.c_str();
......@@ -40,21 +29,28 @@ void ConnectionInfoGenerator::generateFile(MPI_Comm comm, const std::string& fil
std::vector<char> addresses_buf(1024*size);
MPI_Gather(addr, 1024, MPI_BYTE, addresses_buf.data(), 1024, MPI_BYTE, 0, comm);
// Exchange bake providers info
std::vector<uint16_t> bake_pr_ids_buf(size);
MPI_Gather(&(m_impl->m_num_bake_providers),
1, MPI_UNSIGNED_SHORT,
bake_pr_ids_buf.data(),
1, MPI_UNSIGNED_SHORT,
0, comm);
std::vector<sdskv_database_id_t> local_ids;
std::vector<sdskv_database_id_t> all_db_ids;
for(auto& p : serviceConfig.datasetProviders)
for(auto& db : p.databases)
local_ids.push_back(db.id);
for(auto& p : serviceConfig.runProviders)
for(auto& db : p.databases)
local_ids.push_back(db.id);
for(auto& p : serviceConfig.subrunProviders)
for(auto& db : p.databases)
local_ids.push_back(db.id);
for(auto& p : serviceConfig.eventProviders)
for(auto& db : p.databases)
local_ids.push_back(db.id);
for(auto& p : serviceConfig.productProviders)
for(auto& db : p.databases)
local_ids.push_back(db.id);
// Exchange database ids
all_db_ids.resize(local_ids.size()*size);
MPI_Gather(local_ids.data(), sizeof(sdskv_database_id_t)*local_ids.size(), MPI_BYTE,
all_db_ids.data(), sizeof(sdskv_database_id_t)*local_ids.size(), MPI_BYTE, 0, comm);
// Exchange sdskv providers info
std::vector<uint16_t> sdskv_pr_ids_buf(size);
MPI_Gather(&(m_impl->m_num_sdskv_providers),
1, MPI_UNSIGNED_SHORT,
sdskv_pr_ids_buf.data(),
1, MPI_UNSIGNED_SHORT,
0, comm);
// After this line, the rest is executed only by rank 0
if(rank != 0) return;
......@@ -66,14 +62,61 @@ void ConnectionInfoGenerator::generateFile(MPI_Comm comm, const std::string& fil
YAML::Node config;
config["hepnos"]["client"]["protocol"] = proto;
YAML::Node providers = config["hepnos"]["providers"];
for(unsigned int i=0; i < size; i++) {
const auto& provider_addr = addresses[i];
if(sdskv_pr_ids_buf[i]) {
providers["sdskv"][provider_addr] = sdskv_pr_ids_buf[i];
YAML::Node databases = config["hepnos"]["databases"];
YAML::Node datasets = databases["datasets"];
YAML::Node runs = databases["runs"];
YAML::Node subruns = databases["subruns"];
YAML::Node events = databases["events"];
YAML::Node products = databases["products"];
// for all the server nodes...
for(unsigned i=0; i < size; i++) {
YAML::Node datasetProviders = datasets[addresses[i]];
YAML::Node runProviders = runs[addresses[i]];
YAML::Node subrunProviders = subruns[addresses[i]];
YAML::Node eventProviders = events[addresses[i]];
YAML::Node productProviders = products[addresses[i]];
int db_per_node = local_ids.size();
int j=0;
for(auto& p : serviceConfig.datasetProviders) {
auto provider = datasetProviders[std::to_string(p.provider_id)];
for(auto& db : p.databases) {
auto id = all_db_ids[i*db_per_node+j];
provider.push_back(id);
j += 1;
}
}
for(auto& p : serviceConfig.runProviders) {
auto provider = runProviders[std::to_string(p.provider_id)];
for(auto& db : p.databases) {
auto id = all_db_ids[i*db_per_node+j];
provider.push_back(id);
j += 1;
}
}
for(auto& p : serviceConfig.subrunProviders) {
auto provider = subrunProviders[std::to_string(p.provider_id)];
for(auto& db : p.databases) {
auto id = all_db_ids[i*db_per_node+j];
provider.push_back(id);
j += 1;
}
}
for(auto& p : serviceConfig.eventProviders) {
auto provider = eventProviders[std::to_string(p.provider_id)];
for(auto& db : p.databases) {
auto id = all_db_ids[i*db_per_node+j];
provider.push_back(id);
j += 1;
}
}
if(bake_pr_ids_buf[i]) {
providers["bake"][provider_addr] = bake_pr_ids_buf[i];
for(auto& p : serviceConfig.productProviders) {
auto provider = productProviders[std::to_string(p.provider_id)];
for(auto& db : p.databases) {
auto id = all_db_ids[i*db_per_node+j];
provider.push_back(id);
j += 1;
}
}
}
......
......@@ -4,6 +4,7 @@
#include <string>
#include <memory>
#include <mpi.h>
#include "ServiceConfig.hpp"
namespace hepnos {
......@@ -11,13 +12,14 @@ class ConnectionInfoGenerator {
private:
class Impl;
std::unique_ptr<Impl> m_impl;
const std::string& address;
const ServiceConfig& serviceConfig;
public:
ConnectionInfoGenerator(const std::string& address,
uint16_t num_sdskv_providers, uint16_t num_bake_providers);
const ServiceConfig& config);
ConnectionInfoGenerator(const ConnectionInfoGenerator&) = delete;
ConnectionInfoGenerator(ConnectionInfoGenerator&&) = delete;
ConnectionInfoGenerator& operator=(const ConnectionInfoGenerator&) = delete;
......
......@@ -20,17 +20,21 @@ namespace tl = thallium;
#define ASSERT(__cond, __msg, ...) { if(!(__cond)) { fprintf(stderr, "[%s:%d] " __msg, __FILE__, __LINE__, __VA_ARGS__); exit(-1); } }
static void createProviderAndDatabases(tl::engine& engine, hepnos::ProviderConfig& provider_config);
void hepnos_run_service(MPI_Comm comm, const char* config_file, const char* connection_file)
{
int ret;
int rank;
int size;
MPI_Comm_rank(comm, &rank);
MPI_Comm_size(comm, &size);
/* load configuration */
std::unique_ptr<hepnos::ServiceConfig> config;
try {
config = std::make_unique<hepnos::ServiceConfig>(config_file, rank);
config = std::make_unique<hepnos::ServiceConfig>(config_file, rank, size);
} catch(const std::exception& e) {
std::cerr << "Error: when reading configuration:" << std::endl;
std::cerr << " " << e.what() << std::endl;
......@@ -42,9 +46,9 @@ void hepnos_run_service(MPI_Comm comm, const char* config_file, const char* conn
std::unique_ptr<tl::engine> engine;
try {
engine = std::make_unique<tl::engine>(
config->getAddress(),
config->address,
THALLIUM_SERVER_MODE,
false, config->getNumThreads()-1);
false, config->numThreads-1);
} catch(std::exception& ex) {
std::cerr << "Error: unable to initialize thallium" << std::endl;
......@@ -56,37 +60,37 @@ void hepnos_run_service(MPI_Comm comm, const char* config_file, const char* conn
engine->enable_remote_shutdown();
auto self_addr_str = static_cast<std::string>(engine->self());
if(config->hasDatabase()) {
/* SDSKV provider initialization */
for(auto sdskv_provider_id = 0; sdskv_provider_id < config->getNumDatabaseProviders(); sdskv_provider_id++) {
sdskv::provider* provider = sdskv::provider::create(
engine->get_margo_instance(), sdskv_provider_id, SDSKV_ABT_POOL_DEFAULT);
for(unsigned i=0 ; i < config->getNumDatabaseTargets(); i++) {
auto db_path = config->getDatabasePath(rank, sdskv_provider_id, i);
auto db_name = config->getDatabaseName(rank, sdskv_provider_id, i);
sdskv_db_type_t db_type;
if(config->getDatabaseType() == "null") db_type = KVDB_NULL;
if(config->getDatabaseType() == "map") db_type = KVDB_MAP;
if(config->getDatabaseType() == "ldb") db_type = KVDB_LEVELDB;
if(config->getDatabaseType() == "bdb") db_type = KVDB_BERKELEYDB;
sdskv_database_id_t db_id;
sdskv_config_t config;
std::memset(&config, 0, sizeof(config));
config.db_name = db_name.c_str();
config.db_path = db_path.c_str();
config.db_type = db_type;
config.db_no_overwrite = 1;
db_id = provider->attach_database(config);
}
}
}
/* SDSKV providers initialization */
for(auto& provider_config : config->datasetProviders)
createProviderAndDatabases(*engine, provider_config);
for(auto& provider_config : config->runProviders)
createProviderAndDatabases(*engine, provider_config);
for(auto& provider_config : config->subrunProviders)
createProviderAndDatabases(*engine, provider_config);
for(auto& provider_config : config->eventProviders)
createProviderAndDatabases(*engine, provider_config);
for(auto& provider_config : config->productProviders)
createProviderAndDatabases(*engine, provider_config);
hepnos::ConnectionInfoGenerator fileGen(self_addr_str,
config->getNumDatabaseProviders(),
config->getNumStorageProviders());
hepnos::ConnectionInfoGenerator fileGen(self_addr_str, *config);
fileGen.generateFile(MPI_COMM_WORLD, connection_file);
engine->wait_for_finalize();
}
static void createProviderAndDatabases(tl::engine& engine, hepnos::ProviderConfig& provider_config) {
sdskv::provider* provider = sdskv::provider::create(
engine.get_margo_instance(),
provider_config.provider_id,
SDSKV_ABT_POOL_DEFAULT);
for(auto& db_config : provider_config.databases) {
sdskv_config_t config;
std::memset(&config, 0, sizeof(config));
config.db_name = db_config.name.c_str();
config.db_path = db_config.path.c_str();
config.db_type = db_config.type;
config.db_no_overwrite = 1;
db_config.id = provider->attach_database(config);
}
}
This diff is collapsed.
......@@ -3,18 +3,36 @@
#include <string>
#include <memory>
#include <sdskv-common.h>
#include <yaml-cpp/yaml.h>
namespace hepnos {
class ServiceConfig {
struct DataBaseConfig {
std::string name;
std::string path;
sdskv_db_type_t type;
sdskv_database_id_t id = 0;
};
struct ProviderConfig {
uint16_t provider_id;
std::vector<DataBaseConfig> databases;
};
private:
class Impl;
std::unique_ptr<Impl> m_impl;
class ServiceConfig {
public:
std::string address;
uint32_t numRanks = 1;
uint32_t numThreads = 1;
std::vector<ProviderConfig> datasetProviders;
std::vector<ProviderConfig> runProviders;
std::vector<ProviderConfig> subrunProviders;
std::vector<ProviderConfig> eventProviders;
std::vector<ProviderConfig> productProviders;
ServiceConfig(const std::string& filename, int rank=0, int numRanks=1);
ServiceConfig(const ServiceConfig&) = delete;
......@@ -23,22 +41,6 @@ public:
ServiceConfig& operator=(ServiceConfig&&) = delete;
~ServiceConfig();
const std::string& getAddress() const;
bool hasDatabase() const;
std::string getDatabasePath(int rank=0, int provider=0, int target=0) const;
const std::string& getDatabasePathTemplate() const;
std::string getDatabaseName(int rank=0, int provider=0, int target=0) const;
const std::string& getDatabaseNameTemplate() const;
const std::string& getDatabaseType() const;
uint32_t getNumDatabaseProviders() const;
uint32_t getNumDatabaseTargets() const;
bool hasStorage() const;
std::string getStoragePath(int rank=0, int provider=0, int target=0) const;
const std::string& getStoragePathTemplate() const;
size_t getStorageSize() const;
uint32_t getNumStorageProviders() const;
uint32_t getNumStorageTargets() const;
uint32_t getNumThreads() const;
};
}
......
---
address: na+sm://
threads: 4
database:
name: hepnosdb.$RANK.$PROVIDER.$TARGET
path: XXX/$RANK_$PROVIDER_$TARGET
type: bdb
providers: 2
targets: 2
databases:
datasets:
name: hepnos-datasets.$RANK.$PROVIDER.$TARGET
path: XXX/$RANK
type: bdb
targets: 2
providers: 2
runs:
name: hepnos-runs.$RANK.$PROVIDER.$TARGET
path: XXX/$RANK
type: bdb
targets: 2
providers: 2
subruns:
name: hepnos-subruns.$RANK.$PROVIDER.$TARGET
path: XXX/$RANK
type: bdb
targets: 2
providers: 2
events:
name: hepnos-events.$RANK.$PROVIDER.$TARGET
path: XXX/$RANK
type: bdb
targets: 2
providers: 2
products:
name: hepnos-products.$RANK.$PROVIDER.$TARGET
path: XXX/$RANK
type: bdb
targets: 2
providers: 2
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment