Commit 44f34ae1 authored by Matthieu Dorier's avatar Matthieu Dorier

added threading options and the possibility to use multiple providers and targets:

parent f7108125
This diff is collapsed.
...@@ -5,19 +5,19 @@ ...@@ -5,19 +5,19 @@
namespace hepnos { namespace hepnos {
struct ConnectionInfoGenerator::Impl { struct ConnectionInfoGenerator::Impl {
std::string m_addr; // address of this process std::string m_addr; // address of this process
uint16_t m_bake_id; // provider ids for BAKE uint16_t m_num_bake_providers; // number of BAKE provider ids
uint16_t m_sdskv_id; // provider ids for SDSKV uint16_t m_num_sdskv_providers; // number of SDSKV provider ids
}; };
ConnectionInfoGenerator::ConnectionInfoGenerator( ConnectionInfoGenerator::ConnectionInfoGenerator(
const std::string& address, const std::string& address,
uint16_t sdskv_provider_id, uint16_t sdskv_providers,
uint16_t bake_provider_id) uint16_t bake_providers)
: m_impl(std::make_unique<Impl>()) { : m_impl(std::make_unique<Impl>()) {
m_impl->m_addr = address; m_impl->m_addr = address;
m_impl->m_bake_id = bake_provider_id; m_impl->m_num_bake_providers = bake_providers;
m_impl->m_sdskv_id = sdskv_provider_id; m_impl->m_num_sdskv_providers = sdskv_providers;
} }
ConnectionInfoGenerator::~ConnectionInfoGenerator() {} ConnectionInfoGenerator::~ConnectionInfoGenerator() {}
...@@ -42,7 +42,7 @@ void ConnectionInfoGenerator::generateFile(MPI_Comm comm, const std::string& fil ...@@ -42,7 +42,7 @@ void ConnectionInfoGenerator::generateFile(MPI_Comm comm, const std::string& fil
// Exchange bake providers info // Exchange bake providers info
std::vector<uint16_t> bake_pr_ids_buf(size); std::vector<uint16_t> bake_pr_ids_buf(size);
MPI_Gather(&(m_impl->m_bake_id), MPI_Gather(&(m_impl->m_num_bake_providers),
1, MPI_UNSIGNED_SHORT, 1, MPI_UNSIGNED_SHORT,
bake_pr_ids_buf.data(), bake_pr_ids_buf.data(),
1, MPI_UNSIGNED_SHORT, 1, MPI_UNSIGNED_SHORT,
...@@ -50,7 +50,7 @@ void ConnectionInfoGenerator::generateFile(MPI_Comm comm, const std::string& fil ...@@ -50,7 +50,7 @@ void ConnectionInfoGenerator::generateFile(MPI_Comm comm, const std::string& fil
// Exchange sdskv providers info // Exchange sdskv providers info
std::vector<uint16_t> sdskv_pr_ids_buf(size); std::vector<uint16_t> sdskv_pr_ids_buf(size);
MPI_Gather(&(m_impl->m_sdskv_id), MPI_Gather(&(m_impl->m_num_sdskv_providers),
1, MPI_UNSIGNED_SHORT, 1, MPI_UNSIGNED_SHORT,
sdskv_pr_ids_buf.data(), sdskv_pr_ids_buf.data(),
1, MPI_UNSIGNED_SHORT, 1, MPI_UNSIGNED_SHORT,
......
...@@ -17,8 +17,7 @@ private: ...@@ -17,8 +17,7 @@ private:
public: public:
ConnectionInfoGenerator(const std::string& address, ConnectionInfoGenerator(const std::string& address,
uint16_t sdskv_provider_id, uint16_t num_sdskv_providers, uint16_t num_bake_providers);
uint16_t bake_provider_id);
ConnectionInfoGenerator(const ConnectionInfoGenerator&) = delete; ConnectionInfoGenerator(const ConnectionInfoGenerator&) = delete;
ConnectionInfoGenerator(ConnectionInfoGenerator&&) = delete; ConnectionInfoGenerator(ConnectionInfoGenerator&&) = delete;
ConnectionInfoGenerator& operator=(const ConnectionInfoGenerator&) = delete; ConnectionInfoGenerator& operator=(const ConnectionInfoGenerator&) = delete;
......
...@@ -40,7 +40,7 @@ void hepnos_run_service(MPI_Comm comm, const char* config_file, const char* conn ...@@ -40,7 +40,7 @@ void hepnos_run_service(MPI_Comm comm, const char* config_file, const char* conn
} }
/* Margo initialization */ /* Margo initialization */
mid = margo_init(config->getAddress().c_str(), MARGO_SERVER_MODE, 0, -1); mid = margo_init(config->getAddress().c_str(), MARGO_SERVER_MODE, 0, config->getNumThreads()-1);
if (mid == MARGO_INSTANCE_NULL) if (mid == MARGO_INSTANCE_NULL)
{ {
std::cerr << "Error: unable to initialize margo" << std::endl; std::cerr << "Error: unable to initialize margo" << std::endl;
...@@ -57,54 +57,59 @@ void hepnos_run_service(MPI_Comm comm, const char* config_file, const char* conn ...@@ -57,54 +57,59 @@ void hepnos_run_service(MPI_Comm comm, const char* config_file, const char* conn
hg_size_t self_addr_str_size = 128; hg_size_t self_addr_str_size = 128;
margo_addr_to_string(mid, self_addr_str, &self_addr_str_size, self_addr); margo_addr_to_string(mid, self_addr_str, &self_addr_str_size, self_addr);
uint16_t bake_provider_id = 0;
if(config->hasStorage()) { if(config->hasStorage()) {
/* Bake provider initialization */ for(auto bake_provider_id = 0; bake_provider_id < config->getNumStorageProviders(); bake_provider_id++) {
bake_provider_id = 1; // XXX we can make that come from the config file /* create provider */
const char* bake_target_name = config->getStoragePath().c_str(); bake_provider_t bake_prov;
size_t bake_target_size = config->getStorageSize()*(1024*1024); ret = bake_provider_register(mid, bake_provider_id, BAKE_ABT_POOL_DEFAULT, &bake_prov);
/* create the bake target if it does not exist */ ASSERT(ret == 0, "bake_provider_register() failed (ret = %d)\n", ret);
if(-1 == access(bake_target_name, F_OK)) { /* create databases */
ret = bake_makepool(bake_target_name, bake_target_size, 0664); for(unsigned i=0; i < config->getNumStorageTargets(); i++) {
ASSERT(ret == 0, "bake_makepool() failed (ret = %d)\n", ret); auto bake_target_name = config->getStoragePath(rank, bake_provider_id, i);
size_t bake_target_size = config->getStorageSize()*(1024*1024);
if(-1 == access(bake_target_name.c_str(), F_OK)) {
ret = bake_makepool(bake_target_name.c_str(), bake_target_size, 0664);
ASSERT(ret == 0, "bake_makepool() failed (ret = %d)\n", ret);
}
bake_target_id_t bake_tid;
ret = bake_provider_add_storage_target(bake_prov, bake_target_name.c_str(), &bake_tid);
ASSERT(ret == 0, "bake_provider_add_storage_target() failed to add target %s (ret = %d)\n",
bake_target_name.c_str(), ret);
}
} }
bake_provider_t bake_prov;
bake_target_id_t bake_tid;
ret = bake_provider_register(mid, bake_provider_id, BAKE_ABT_POOL_DEFAULT, &bake_prov);
ASSERT(ret == 0, "bake_provider_register() failed (ret = %d)\n", ret);
ret = bake_provider_add_storage_target(bake_prov, bake_target_name, &bake_tid);
ASSERT(ret == 0, "bake_provider_add_storage_target() failed to add target %s (ret = %d)\n",
bake_target_name, ret);
} }
uint8_t sdskv_provider_id = 0;
if(config->hasDatabase()) { if(config->hasDatabase()) {
/* SDSKV provider initialization */ /* SDSKV provider initialization */
sdskv_provider_id = 2; // XXX we can make that come from the config file for(auto sdskv_provider_id = 0; sdskv_provider_id < config->getNumDatabaseProviders(); sdskv_provider_id++) {
sdskv_provider_t sdskv_prov; sdskv_provider_t sdskv_prov;
ret = sdskv_provider_register(mid, sdskv_provider_id, SDSKV_ABT_POOL_DEFAULT, &sdskv_prov); ret = sdskv_provider_register(mid, sdskv_provider_id, SDSKV_ABT_POOL_DEFAULT, &sdskv_prov);
ASSERT(ret == 0, "sdskv_provider_register() failed (ret = %d)\n", ret); ASSERT(ret == 0, "sdskv_provider_register() failed (ret = %d)\n", ret);
/* creating the database */ for(unsigned i=0 ; i < config->getNumDatabaseTargets(); i++) {
const char* db_path = config->getDatabasePath().c_str(); auto db_path = config->getDatabasePath(rank, sdskv_provider_id, i);
const char* db_name = config->getDatabaseName().c_str(); auto db_name = config->getDatabaseName(rank, sdskv_provider_id, i);
sdskv_db_type_t db_type; sdskv_db_type_t db_type;
if(config->getDatabaseType() == "map") db_type = KVDB_MAP; if(config->getDatabaseType() == "map") db_type = KVDB_MAP;
if(config->getDatabaseType() == "ldb") db_type = KVDB_LEVELDB; if(config->getDatabaseType() == "ldb") db_type = KVDB_LEVELDB;
if(config->getDatabaseType() == "bdb") db_type = KVDB_BERKELEYDB; if(config->getDatabaseType() == "bdb") db_type = KVDB_BERKELEYDB;
sdskv_database_id_t db_id; sdskv_database_id_t db_id;
sdskv_config_t config; sdskv_config_t config;
std::memset(&config, 0, sizeof(config)); std::memset(&config, 0, sizeof(config));
config.db_name = db_name; config.db_name = db_name.c_str();
config.db_path = db_path; config.db_path = db_path.c_str();
config.db_type = db_type; config.db_type = db_type;
ret = sdskv_provider_attach_database(sdskv_prov, &config, &db_id); ret = sdskv_provider_attach_database(sdskv_prov, &config, &db_id);
ASSERT(ret == 0, "sdskv_provider_attach_database() failed (ret = %d)\n", ret); ASSERT(ret == 0, "sdskv_provider_attach_database() failed (ret = %d)\n", ret);
}
}
} }
margo_addr_free(mid, self_addr); margo_addr_free(mid, self_addr);
hepnos::ConnectionInfoGenerator fileGen(self_addr_str, sdskv_provider_id, bake_provider_id); hepnos::ConnectionInfoGenerator fileGen(self_addr_str,
config->getNumDatabaseProviders(),
config->getNumStorageProviders());
fileGen.generateFile(MPI_COMM_WORLD, connection_file); fileGen.generateFile(MPI_COMM_WORLD, connection_file);
margo_wait_for_finalize(mid); margo_wait_for_finalize(mid);
......
#include <cmath>
#include <iomanip>
#include "ServiceConfig.hpp" #include "ServiceConfig.hpp"
#include "hepnos/Exception.hpp" #include "hepnos/Exception.hpp"
#include <yaml-cpp/yaml.h> #include <yaml-cpp/yaml.h>
...@@ -7,41 +9,68 @@ namespace hepnos { ...@@ -7,41 +9,68 @@ namespace hepnos {
struct ServiceConfig::Impl { struct ServiceConfig::Impl {
std::string m_address; std::string m_address;
uint32_t m_numRanks;
bool m_hasDatabase; bool m_hasDatabase;
std::string m_databasePath; std::string m_databasePath;
std::string m_databaseName; std::string m_databaseName;
std::string m_databaseType; std::string m_databaseType;
uint32_t m_databaseProviders = 1;
uint32_t m_databaseTargets = 1;
bool m_hasStorage; bool m_hasStorage;
std::string m_storagePath; std::string m_storagePath;
size_t m_storageSize; size_t m_storageSize;
uint32_t m_storageProviders = 1;
uint32_t m_storageTargets = 1;
uint32_t m_numThreads = 1;
}; };
static YAML::Node loadAndValidate(const std::string& filename); static YAML::Node loadAndValidate(const std::string& filename);
static std::string insertRankIn(const std::string& str, int rank); static std::string formatString(const std::string& str,
int rank, int provider, int target,
int maxRank, int maxProvider, int maxTarget);
ServiceConfig::ServiceConfig(const std::string& filename, int rank) ServiceConfig::ServiceConfig(const std::string& filename, int rank, int numRanks)
: m_impl(std::make_unique<Impl>()) { : m_impl(std::make_unique<Impl>()) {
m_impl->m_numRanks = numRanks;
YAML::Node config = loadAndValidate(filename); YAML::Node config = loadAndValidate(filename);
YAML::Node address = config["address"]; YAML::Node address = config["address"];
YAML::Node threads = config["threads"];
YAML::Node db_node = config["database"]; YAML::Node db_node = config["database"];
YAML::Node storage_node = config["storage"]; YAML::Node storage_node = config["storage"];
if(threads) {
m_impl->m_numThreads = threads.as<uint32_t>();
}
if(m_impl->m_numThreads == 0) m_impl->m_numThreads = 1;
m_impl->m_address = address.as<std::string>(); m_impl->m_address = address.as<std::string>();
if(!db_node) { if(!db_node) {
m_impl->m_hasDatabase = false; m_impl->m_hasDatabase = false;
} else { } else {
m_impl->m_hasDatabase = true; m_impl->m_hasDatabase = true;
m_impl->m_databasePath = insertRankIn(db_node["path"].as<std::string>(), rank); m_impl->m_databasePath = db_node["path"].as<std::string>();
m_impl->m_databaseName = db_node["name"].as<std::string>(); m_impl->m_databaseName = db_node["name"].as<std::string>();
m_impl->m_databaseType = db_node["type"].as<std::string>(); m_impl->m_databaseType = db_node["type"].as<std::string>();
if(db_node["providers"]) {
m_impl->m_databaseProviders = db_node["providers"].as<uint32_t>();
}
if(db_node["targets"]) {
m_impl->m_databaseTargets = db_node["targets"].as<uint32_t>();
}
} }
if(!storage_node) { if(!storage_node) {
m_impl->m_hasStorage = false; m_impl->m_hasStorage = false;
} else { } else {
m_impl->m_hasStorage = true; m_impl->m_hasStorage = true;
m_impl->m_storagePath = insertRankIn(storage_node["path"].as<std::string>(), rank); m_impl->m_storagePath = storage_node["path"].as<std::string>();
m_impl->m_storageSize = storage_node["size"].as<size_t>(); m_impl->m_storageSize = storage_node["size"].as<size_t>();
if(storage_node["providers"]) {
m_impl->m_storageProviders = storage_node["providers"].as<uint32_t>();
}
if(storage_node["targets"]) {
m_impl->m_storageTargets = storage_node["targets"].as<uint32_t>();
}
} }
} }
...@@ -55,11 +84,25 @@ bool ServiceConfig::hasDatabase() const { ...@@ -55,11 +84,25 @@ bool ServiceConfig::hasDatabase() const {
return m_impl->m_hasDatabase; return m_impl->m_hasDatabase;
} }
const std::string& ServiceConfig::getDatabasePath() const { std::string ServiceConfig::getDatabasePath(int rank, int provider, int target) const {
int maxRank = m_impl->m_numRanks - 1;
int maxProvider = getNumDatabaseProviders() - 1;
int maxTarget = getNumDatabaseTargets() - 1;
return formatString(m_impl->m_databasePath, rank, provider, target, maxRank, maxProvider, maxTarget);
}
std::string ServiceConfig::getDatabaseName(int rank, int provider, int target) const {
int maxRank = m_impl->m_numRanks - 1;
int maxProvider = getNumDatabaseProviders() - 1;
int maxTarget = getNumDatabaseTargets() - 1;
return formatString(m_impl->m_databaseName, rank, provider, target, maxRank, maxProvider, maxTarget);
}
const std::string& ServiceConfig::getDatabasePathTemplate() const {
return m_impl->m_databasePath; return m_impl->m_databasePath;
} }
const std::string& ServiceConfig::getDatabaseName() const { const std::string& ServiceConfig::getDatabaseNameTemplate() const {
return m_impl->m_databaseName; return m_impl->m_databaseName;
} }
...@@ -67,11 +110,26 @@ const std::string& ServiceConfig::getDatabaseType() const { ...@@ -67,11 +110,26 @@ const std::string& ServiceConfig::getDatabaseType() const {
return m_impl->m_databaseType; return m_impl->m_databaseType;
} }
uint32_t ServiceConfig::getNumDatabaseProviders() const {
return m_impl->m_databaseProviders;
}
uint32_t ServiceConfig::getNumDatabaseTargets() const {
return m_impl->m_databaseTargets;
}
bool ServiceConfig::hasStorage() const { bool ServiceConfig::hasStorage() const {
return m_impl->m_hasStorage; return m_impl->m_hasStorage;
} }
const std::string& ServiceConfig::getStoragePath() const { std::string ServiceConfig::getStoragePath(int rank, int provider, int target) const {
int maxRank = m_impl->m_numRanks - 1;
int maxProvider = getNumStorageProviders() - 1;
int maxTarget = getNumStorageTargets() - 1;
return formatString(m_impl->m_storagePath, rank, provider, target, maxRank, maxProvider, maxTarget);
}
const std::string& ServiceConfig::getStoragePathTemplate() const {
return m_impl->m_storagePath; return m_impl->m_storagePath;
} }
...@@ -79,6 +137,18 @@ size_t ServiceConfig::getStorageSize() const { ...@@ -79,6 +137,18 @@ size_t ServiceConfig::getStorageSize() const {
return m_impl->m_storageSize; return m_impl->m_storageSize;
} }
uint32_t ServiceConfig::getNumStorageProviders() const {
return m_impl->m_storageProviders;
}
uint32_t ServiceConfig::getNumStorageTargets() const {
return m_impl->m_storageTargets;
}
uint32_t ServiceConfig::getNumThreads() const {
return m_impl->m_numThreads;
}
static YAML::Node loadAndValidate(const std::string& filename) { static YAML::Node loadAndValidate(const std::string& filename) {
YAML::Node config = YAML::LoadFile(filename); YAML::Node config = YAML::LoadFile(filename);
if(!config["address"]) { if(!config["address"]) {
...@@ -113,12 +183,20 @@ static YAML::Node loadAndValidate(const std::string& filename) { ...@@ -113,12 +183,20 @@ static YAML::Node loadAndValidate(const std::string& filename) {
return config; return config;
} }
static std::string insertRankIn(const std::string& str, int rank) { static std::string formatString(const std::string& str,
size_t index = 0; int rank, int provider, int target,
int maxRank=0, int maxProvider=0, int maxTarget=0) {
std::string result = str; std::string result = str;
std::stringstream ssrank; std::stringstream ssrank;
ssrank << rank; ssrank << std::setw(log10(maxRank+1)+1) << std::setfill('0') << rank;
std::stringstream ssprovider;
ssprovider << std::setw(log10(maxProvider+1)+1) << std::setfill('0') << provider;
std::stringstream sstarget;
sstarget << std::setw(log10(maxTarget+1)+1) << std::setfill('0') << target;
std::string srank = ssrank.str(); std::string srank = ssrank.str();
std::string sprovider = ssprovider.str();
std::string starget = sstarget.str();
size_t index = 0;
while (true) { while (true) {
index = result.find("$RANK", index); index = result.find("$RANK", index);
if (index == std::string::npos) break; if (index == std::string::npos) break;
...@@ -129,6 +207,28 @@ static std::string insertRankIn(const std::string& str, int rank) { ...@@ -129,6 +207,28 @@ static std::string insertRankIn(const std::string& str, int rank) {
} }
index += 5; index += 5;
} }
index = 0;
while (true) {
index = result.find("$PROVIDER", index);
if (index == std::string::npos) break;
if(rank >= 0) {
result.replace(index, 9, sprovider.c_str());
} else {
result.replace(index, 9, "");
}
index += 9;
}
index = 0;
while (true) {
index = result.find("$TARGET", index);
if (index == std::string::npos) break;
if(rank >= 0) {
result.replace(index, 7, starget.c_str());
} else {
result.replace(index, 7, "");
}
index += 7;
}
return result; return result;
} }
......
...@@ -15,7 +15,7 @@ private: ...@@ -15,7 +15,7 @@ private:
public: public:
ServiceConfig(const std::string& filename, int rank=-1); ServiceConfig(const std::string& filename, int rank=0, int numRanks=1);
ServiceConfig(const ServiceConfig&) = delete; ServiceConfig(const ServiceConfig&) = delete;
ServiceConfig(ServiceConfig&&) = delete; ServiceConfig(ServiceConfig&&) = delete;
...@@ -25,12 +25,20 @@ public: ...@@ -25,12 +25,20 @@ public:
const std::string& getAddress() const; const std::string& getAddress() const;
bool hasDatabase() const; bool hasDatabase() const;
const std::string& getDatabasePath() const; std::string getDatabasePath(int rank=0, int provider=0, int target=0) const;
const std::string& getDatabaseName() const; const std::string& getDatabasePathTemplate() const;
std::string getDatabaseName(int rank=0, int provider=0, int target=0) const;
const std::string& getDatabaseNameTemplate() const;
const std::string& getDatabaseType() const; const std::string& getDatabaseType() const;
uint32_t getNumDatabaseProviders() const;
uint32_t getNumDatabaseTargets() const;
bool hasStorage() const; bool hasStorage() const;
const std::string& getStoragePath() const; std::string getStoragePath(int rank=0, int provider=0, int target=0) const;
const std::string& getStoragePathTemplate() const;
size_t getStorageSize() const; size_t getStorageSize() const;
uint32_t getNumStorageProviders() const;
uint32_t getNumStorageTargets() const;
uint32_t getNumThreads() const;
}; };
} }
......
--- ---
address: tcp:// address: tcp://
threads: 4
database: database:
name: hepnosdb name: hepnosdb.$RANK.$PROVIDER.$TARGET
path: XXX/$RANK path: XXX/$RANK_$PROVIDER_$TARGET
type: bdb type: bdb
providers: 2
targets: 2
storage: storage:
path: /dev/shm/hepnos.$RANK.dat path: /dev/shm/hepnos.$RANK.$PROVIDER.$TARGET.dat
size: 50 size: 50
providers: 2
targets: 2
...@@ -30,5 +30,5 @@ wait ...@@ -30,5 +30,5 @@ wait
# cleanup # cleanup
rm -rf $TEST_DIR rm -rf $TEST_DIR
rm -rf /dev/shm/hepnos.dat rm -rf /dev/shm/hepnos.*.dat
exit 0 exit 0
...@@ -43,5 +43,5 @@ wait ...@@ -43,5 +43,5 @@ wait
############## ##############
# cleanup # cleanup
rm -rf $TEST_DIR rm -rf $TEST_DIR
rm -rf /dev/shm/hepnos.dat rm -rf /dev/shm/hepnos.*.dat
exit 0 exit 0
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment