Commit f1e3831a authored by Matthieu Dorier's avatar Matthieu Dorier

Merge branch 'dev-config' into 'master'

Dev config

See merge request sds/HEPnOS!1
parents 08bad35b 537d3ff4
......@@ -20,14 +20,14 @@
void usage(void)
{
fprintf(stderr, "Usage: hepnos-daemon <addr> <config>\n");
fprintf(stderr, " <addr> the Mercury address to listen on (e.g. tcp://)\n");
fprintf(stderr, " <config> path to the YAML file to generate for clients\n");
fprintf(stderr, " <config> path to the YAML file containing the service configuration\n");
fprintf(stderr, " <connection> path to the YAML file to generate for clients\n");
exit(-1);
}
int main(int argc, char *argv[])
{
char* listen_addr;
char* connection_file;
char* config_file;
int rank;
......@@ -42,10 +42,10 @@ int main(int argc, char *argv[])
exit(0);
}
listen_addr = argv[1];
config_file = argv[2];
config_file = argv[1];
connection_file = argv[2];
hepnos_run_service(MPI_COMM_WORLD, listen_addr, config_file);
hepnos_run_service(MPI_COMM_WORLD, config_file, connection_file);
MPI_Finalize();
}
......
......@@ -7,7 +7,7 @@
extern "C" {
#endif
void hepnos_run_service(MPI_Comm comm, const char* listen_addr, const char* config_file);
void hepnos_run_service(MPI_Comm comm, const char* config_file, const char* connection_file);
#ifdef __cplusplus
}
......
......@@ -6,7 +6,9 @@ set(hepnos-src DataStore.cpp
SubRun.cpp
Event.cpp)
set(hepnos-service-src service/HEPnOSService.cpp)
set(hepnos-service-src service/HEPnOSService.cpp
service/ServiceConfig.cpp
service/ConnectionInfoGenerator.cpp)
# load package helper for generating cmake CONFIG packages
include (CMakePackageConfigHelpers)
......@@ -19,7 +21,7 @@ set (hepnos-pkg "share/cmake/hepnos")
#
set (HEPNOS_VERSION_MAJOR 0)
set (HEPNOS_VERSION_MINOR 1)
set (HEPNOS_VERSION_PATCH 0)
set (HEPNOS_VERSION_PATCH 2)
set (hepnos-vers "${HEPNOS_VERSION_MAJOR}.${HEPNOS_VERSION_MINOR}")
set (HEPNOS_VERSION "${hepnos-vers}.${HEPNOS_VERSION_PATCH}")
......@@ -44,6 +46,8 @@ target_include_directories (hepnos-service PUBLIC $<INSTALL_INTERFACE:include>)
# local include's BEFORE, in case old incompatable .h files in prefix/include
target_include_directories (hepnos-service BEFORE PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../include>)
target_include_directories (hepnos-service BEFORE PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/service>)
# for shared libs, establish the lib version
set_target_properties (hepnos-service
......
......@@ -163,7 +163,7 @@ DataSet DataStore::createDataSet(const std::string& name) {
void DataStore::shutdown() {
for(auto addr : m_impl->m_addrs) {
margo_shutdown_remote_instance(m_impl->m_mid, addr);
margo_shutdown_remote_instance(m_impl->m_mid, addr.second);
}
}
......
......@@ -8,6 +8,7 @@
#include <vector>
#include <unordered_set>
#include <unordered_map>
#include <functional>
#include <iostream>
#include <yaml-cpp/yaml.h>
......@@ -15,6 +16,7 @@
#include <bake-client.h>
#include <ch-placement.h>
#include "KeyTypes.hpp"
#include "ValueTypes.hpp"
#include "hepnos/Exception.hpp"
#include "hepnos/DataStore.hpp"
#include "hepnos/DataSet.hpp"
......@@ -28,16 +30,17 @@ namespace hepnos {
class DataStore::Impl {
public:
margo_instance_id m_mid; // Margo instance
std::unordered_set<hg_addr_t> m_addrs; // Addresses used by the service
sdskv_client_t m_sdskv_client; // SDSKV client
bake_client_t m_bake_client; // BAKE client
std::vector<sdskv_provider_handle_t> m_sdskv_ph; // list of SDSKV provider handlers
std::vector<sdskv_database_id_t> m_sdskv_db; // list of SDSKV database ids
struct ch_placement_instance* m_chi_sdskv; // ch-placement instance for SDSKV
std::vector<bake_provider_handle_t> m_bake_ph; // list of BAKE provider handlers
struct ch_placement_instance* m_chi_bake; // ch-placement instance for BAKE
const DataStore::iterator m_end; // iterator for the end() of the DataStore
margo_instance_id m_mid; // Margo instance
std::unordered_map<std::string,hg_addr_t> m_addrs; // Addresses used by the service
sdskv_client_t m_sdskv_client; // SDSKV client
bake_client_t m_bake_client; // BAKE client
std::vector<sdskv_provider_handle_t> m_sdskv_ph; // list of SDSKV provider handlers
std::vector<sdskv_database_id_t> m_sdskv_db; // list of SDSKV database ids
struct ch_placement_instance* m_chi_sdskv; // ch-placement instance for SDSKV
std::vector<bake_provider_handle_t> m_bake_ph; // list of BAKE provider handlers
std::vector<bake_target_id_t> m_bake_targets; // list of BAKE target ids
struct ch_placement_instance* m_chi_bake; // ch-placement instance for BAKE
const DataStore::iterator m_end; // iterator for the end() of the DataStore
Impl(DataStore* parent)
: m_mid(MARGO_INSTANCE_NULL)
......@@ -77,18 +80,22 @@ class DataStore::Impl {
for(YAML::const_iterator it = sdskv.begin(); it != sdskv.end(); it++) {
std::string str_addr = it->first.as<std::string>();
hg_addr_t addr;
hret = margo_addr_lookup(m_mid, str_addr.c_str(), &addr);
if(hret != HG_SUCCESS) {
margo_addr_free(m_mid,addr);
cleanup();
throw Exception("margo_addr_lookup failed");
if(m_addrs.count(str_addr) != 0) {
addr = m_addrs[str_addr];
} else {
hret = margo_addr_lookup(m_mid, str_addr.c_str(), &addr);
if(hret != HG_SUCCESS) {
margo_addr_free(m_mid,addr);
cleanup();
throw Exception("margo_addr_lookup failed");
}
m_addrs[str_addr] = addr;
}
m_addrs.insert(addr);
// get the provider id(s)
if(it->second.IsScalar()) {
uint16_t provider_id = it->second.as<uint16_t>();
sdskv_provider_handle_t ph;
ret = sdskv_provider_handle_create(m_sdskv_client, addr, provider_id, &ph);
margo_addr_free(m_mid, addr);
if(ret != SDSKV_SUCCESS) {
cleanup();
throw Exception("sdskv_provider_handle_create failed");
......@@ -99,7 +106,6 @@ class DataStore::Impl {
uint16_t provider_id = pid->second.as<uint16_t>();
sdskv_provider_handle_t ph;
ret = sdskv_provider_handle_create(m_sdskv_client, addr, provider_id, &ph);
margo_addr_free(m_mid, addr);
if(ret != SDSKV_SUCCESS) {
cleanup();
throw Exception("sdskv_provider_handle_create failed");
......@@ -120,40 +126,60 @@ class DataStore::Impl {
}
// initialize ch-placement for the SDSKV providers
m_chi_sdskv = ch_placement_initialize("hash_lookup3", m_sdskv_ph.size(), 4, 0);
// get list of bake provider handles
YAML::Node bake = config["hepnos"]["providers"]["bake"];
for(YAML::const_iterator it = bake.begin(); it != bake.end(); it++) {
std::string str_addr = it->first.as<std::string>();
hg_addr_t addr;
hret = margo_addr_lookup(m_mid, str_addr.c_str(), &addr);
if(hret != HG_SUCCESS) {
margo_addr_free(m_mid, addr);
cleanup();
throw Exception("margo_addr_lookup failed");
}
m_addrs.insert(addr);
if(it->second.IsScalar()) {
uint16_t provider_id = it->second.as<uint16_t>();
bake_provider_handle_t ph;
ret = bake_provider_handle_create(m_bake_client, addr, provider_id, &ph);
margo_addr_free(m_mid, addr);
if(ret != 0) {
cleanup();
throw Exception("bake_provider_handle_create failed");
if(bake) {
for(YAML::const_iterator it = bake.begin(); it != bake.end(); it++) {
// get the address of a bake provider
std::string str_addr = it->first.as<std::string>();
hg_addr_t addr;
if(m_addrs.count(str_addr) != 0) {
addr = m_addrs[str_addr];
} else {
// lookup the address
hret = margo_addr_lookup(m_mid, str_addr.c_str(), &addr);
if(hret != HG_SUCCESS) {
margo_addr_free(m_mid, addr);
cleanup();
throw Exception("margo_addr_lookup failed");
}
m_addrs[str_addr] = addr;
}
m_bake_ph.push_back(ph);
} else if(it->second.IsSequence()) {
for(YAML::const_iterator pid = it->second.begin(); pid != it->second.end(); pid++) {
uint16_t provider_id = pid->second.as<uint16_t>();
if(it->second.IsScalar()) {
uint16_t provider_id = it->second.as<uint16_t>();
bake_provider_handle_t ph;
ret = bake_provider_handle_create(m_bake_client, addr, provider_id, &ph);
margo_addr_free(m_mid, addr);
if(ret != 0) {
cleanup();
throw Exception("bake_provider_handle_create failed");
}
m_bake_ph.push_back(ph);
} else if(it->second.IsSequence()) {
for(YAML::const_iterator pid = it->second.begin(); pid != it->second.end(); pid++) {
uint16_t provider_id = pid->second.as<uint16_t>();
bake_provider_handle_t ph;
ret = bake_provider_handle_create(m_bake_client, addr, provider_id, &ph);
if(ret != 0) {
cleanup();
throw Exception("bake_provider_handle_create failed");
}
m_bake_ph.push_back(ph);
}
} // if(it->second.IsSequence())
} // for loop
// find out the bake targets at each bake provider
for(auto& bake_ph : m_bake_ph) {
bake_target_id_t bti;
uint64_t num_targets = 0;
ret = bake_probe(bake_ph, 1, &bti, &num_targets);
if(ret != BAKE_SUCCESS) {
throw Exception("bake_probe failed to retrieve targets");
}
if(num_targets != 1) {
throw Exception("bake_prove returned no target");
}
m_bake_targets.push_back(bti);
}
}
// initialize ch-placement for the bake providers
......@@ -175,45 +201,63 @@ class DataStore::Impl {
ch_placement_finalize(m_chi_sdskv);
if(m_chi_bake)
ch_placement_finalize(m_chi_bake);
for(auto& addr : m_addrs) {
margo_addr_free(m_mid, addr.second);
}
if(m_mid) margo_finalize(m_mid);
}
private:
static void checkConfig(YAML::Node& config) {
// config file starts with hepnos entry
auto hepnosNode = config["hepnos"];
if(!hepnosNode) {
throw Exception("\"hepnos\" entry not found in YAML file");
}
// hepnos entry has client entry
auto clientNode = hepnosNode["client"];
if(!clientNode) {
throw Exception("\"client\" entry not found in \"hepnos\" section");
}
// client entry has protocol entry
auto protoNode = clientNode["protocol"];
if(!protoNode) {
throw Exception("\"protocol\" entry not found in \"client\" section");
}
// hepnos entry has providers entry
auto providersNode = hepnosNode["providers"];
if(!providersNode) {
throw Exception("\"providers\" entry not found in \"hepnos\" section");
}
// provider entry has sdskv entry
auto sdskvNode = providersNode["sdskv"];
if(!sdskvNode) {
throw Exception("\"sdskv\" entry not found in \"providers\" section");
}
// sdskv entry is not empty
if(sdskvNode.size() == 0) {
throw Exception("No provider found in \"sdskv\" section");
}
// for each sdskv entry
for(auto it = sdskvNode.begin(); it != sdskvNode.end(); it++) {
if(it->second.IsScalar()) continue; // one provider id given
if(it->second.IsSequence()) { // array of provider ids given
// the sequence is not empty
if(it->second.size() == 0) {
throw Exception("Empty array of provider ids encountered in \"sdskv\" section");
}
// all objects in the sequence are scalar and appear only once
std::unordered_set<uint16_t> ids;
for(auto pid = it->second.begin(); pid != it->second.end(); pid++) {
if(!pid->second.IsScalar()) {
throw Exception("Non-scalar provider id encountered in \"sdskv\" section");
}
uint16_t pid_int = pid->as<uint16_t>();
if(ids.count(pid_int) != 0) {
throw Exception("Provider id encountered twice in \"sdskv\" section");
}
ids.insert(pid_int);
}
} else {
throw Exception("Invalid value type for provider in \"sdskv\" section");
......@@ -230,10 +274,16 @@ class DataStore::Impl {
if(it->second.size() == 0) {
throw Exception("No provider found in \"bake\" section");
}
std::unordered_set<uint16_t> ids;
for(auto pid = it->second.begin(); pid != it->second.end(); pid++) {
if(!pid->second.IsScalar()) {
throw Exception("Non-scalar provider id encountered in \"bake\" section");
}
uint16_t pid_int = pid->as<uint16_t>();
if(ids.count(pid_int) != 0) {
throw Exception("Provider id encountered twice in \"bake\" section");
}
ids.insert(pid_int);
}
} else {
throw Exception("Invalid value type for provider in \"bake\" section");
......@@ -252,33 +302,68 @@ class DataStore::Impl {
ss << containerName << "/";
ss << objectName;
// hash the name to get the provider id
long unsigned provider_idx = 0;
long unsigned sdskv_provider_idx = 0;
uint64_t name_hash;
if(level != 0) {
uint64_t h = std::hash<std::string>()(containerName);
ch_placement_find_closest(m_chi_sdskv, h, 1, &provider_idx);
name_hash = std::hash<std::string>()(containerName);
} else {
// use the complete name for final objects (level 0)
uint64_t h = std::hash<std::string>()(ss.str());
ch_placement_find_closest(m_chi_sdskv, h, 1, &provider_idx);
name_hash = std::hash<std::string>()(ss.str());
}
ch_placement_find_closest(m_chi_sdskv, name_hash, 1, &sdskv_provider_idx);
// make corresponding datastore entry
DataStoreEntryPtr entry = make_datastore_entry(level, ss.str());
auto ph = m_sdskv_ph[provider_idx];
auto db_id = m_sdskv_db[provider_idx];
// find the size of the value, as a way to check if the key exists
hg_size_t vsize;
ret = sdskv_length(ph, db_id, entry->raw(), entry->length(), &vsize);
if(ret == SDSKV_ERR_UNKNOWN_KEY) {
return false;
}
if(ret != SDSKV_SUCCESS) {
throw Exception("Error occured when calling sdskv_length");
}
auto sdskv_ph = m_sdskv_ph[sdskv_provider_idx];
auto db_id = m_sdskv_db[sdskv_provider_idx];
// read the value
data.resize(vsize);
ret = sdskv_get(ph, db_id, entry->raw(), entry->length(), data.data(), &vsize);
if(ret != SDSKV_SUCCESS) {
throw Exception("Error occured when calling sdskv_get");
if(level != 0 || m_bake_ph.empty()) { // read directly from sdskv
// find the size of the value, as a way to check if the key exists
hg_size_t vsize;
ret = sdskv_length(sdskv_ph, db_id, entry->raw(), entry->length(), &vsize);
if(ret == SDSKV_ERR_UNKNOWN_KEY) {
return false;
}
if(ret != SDSKV_SUCCESS) {
throw Exception("Error occured when calling sdskv_length");
}
data.resize(vsize);
ret = sdskv_get(sdskv_ph, db_id, entry->raw(), entry->length(), data.data(), &vsize);
if(ret != SDSKV_SUCCESS) {
throw Exception("Error occured when calling sdskv_get");
}
} else { // read from BAKE
// first get the key/val from sdskv
DataStoreValue rid_info;
hg_size_t vsize = sizeof(rid_info);
ret = sdskv_get(sdskv_ph, db_id, entry->raw(), entry->length(), (void*)(&rid_info), &vsize);
if(ret == SDSKV_ERR_UNKNOWN_KEY) {
return false;
}
if(ret != SDSKV_SUCCESS) {
throw Exception("Error occured when calling sdskv_get");
}
if(vsize != sizeof(rid_info)) {
throw Exception("Call to sdskv_get returned a value of unexpected size");
}
// now read the data from bake
data.resize(rid_info.getDataSize());
if(data.size() == 0) return true;
long unsigned bake_provider_idx = 0;
ch_placement_find_closest(m_chi_bake, name_hash, 1, &bake_provider_idx);
auto bake_ph = m_bake_ph[bake_provider_idx];
auto target = m_bake_targets[bake_provider_idx];
uint64_t bytes_read = 0;
ret = bake_read(bake_ph, rid_info.getBakeRegionID(), 0, data.data(), data.size(), &bytes_read);
if(ret != BAKE_SUCCESS) {
throw Exception("Couldn't read region from BAKE");
}
if(bytes_read != rid_info.getDataSize()) {
throw Exception("Bytes read from BAKE did not match expected object size");
}
}
return true;
}
......@@ -291,29 +376,52 @@ class DataStore::Impl {
ss << containerName << "/";
ss << objectName;
// hash the name to get the provider id
long unsigned provider_idx = 0;
long unsigned sdskv_provider_idx = 0;
uint64_t name_hash;
if(level != 0) {
uint64_t h = std::hash<std::string>()(containerName);
ch_placement_find_closest(m_chi_sdskv, h, 1, &provider_idx);
name_hash = std::hash<std::string>()(containerName);
} else {
// use the complete name for final objects (level 0)
uint64_t h = std::hash<std::string>()(ss.str());
ch_placement_find_closest(m_chi_sdskv, h, 1, &provider_idx);
name_hash = std::hash<std::string>()(ss.str());
}
// make corresponding datastore entry
ch_placement_find_closest(m_chi_sdskv, name_hash, 1, &sdskv_provider_idx);
// make corresponding datastore entry key
DataStoreEntryPtr entry = make_datastore_entry(level, ss.str());
auto ph = m_sdskv_ph[provider_idx];
auto db_id = m_sdskv_db[provider_idx];
auto sdskv_ph = m_sdskv_ph[sdskv_provider_idx];
auto db_id = m_sdskv_db[sdskv_provider_idx];
// check if the key exists
hg_size_t vsize;
int ret = sdskv_length(ph, db_id, entry->raw(), entry->length(), &vsize);
int ret = sdskv_length(sdskv_ph, db_id, entry->raw(), entry->length(), &vsize);
if(ret == HG_SUCCESS) return false; // key already exists
if(ret != SDSKV_ERR_UNKNOWN_KEY) { // there was a problem with sdskv
throw Exception("Could not check if key exists in SDSKV (sdskv_length error)");
}
ret = sdskv_put(ph, db_id, entry->raw(), entry->length(), data.data(), data.size());
if(ret != SDSKV_SUCCESS) {
throw Exception("Could not put key/value pair in SDSKV (sdskv_put error)");
// if it's not a last-level data entry (data product), store in sdskeyval
if(level != 0 || m_bake_ph.empty()) {
ret = sdskv_put(sdskv_ph, db_id, entry->raw(), entry->length(), data.data(), data.size());
if(ret != SDSKV_SUCCESS) {
throw Exception("Could not put key/value pair in SDSKV (sdskv_put error)");
}
} else { // store data in bake
long unsigned bake_provider_idx = 0;
ch_placement_find_closest(m_chi_bake, name_hash, 1, &bake_provider_idx);
auto bake_ph = m_bake_ph[bake_provider_idx];
auto target = m_bake_targets[bake_provider_idx];
bake_region_id_t rid;
ret = bake_create_write_persist(bake_ph, target, data.data(), data.size(), &rid);
if(ret != BAKE_SUCCESS) {
throw Exception("Could not create bake region (bake_create_write_persist error)");
}
// create Value to put in SDSKV
DataStoreValue value(data.size(), bake_provider_idx, rid);
ret = sdskv_put(sdskv_ph, db_id, entry->raw(), entry->length(), (void*)(&value), sizeof(value));
if(ret != SDSKV_SUCCESS) {
ret = bake_remove(bake_ph, rid);
if(ret != BAKE_SUCCESS) {
throw Exception("Dude, not only did SDSKV fail to put the key, but I couldn't cleanup BAKE. Is it Friday 13?");
}
throw Exception("Could not put key/value pair in SDSKV (sdskv_put error)");
}
}
return true;
}
......
/*
* (C) 2018 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __PRIVATE_VALUE_TYPES_H
#define __PRIVATE_VALUE_TYPES_H
#include <cstring>
#include <cstdlib>
#include <cstdint>
#include <memory>
namespace hepnos {
class DataStoreValue {
size_t m_object_size;
uint64_t m_server_id;
bake_region_id_t m_region_id;
public:
DataStoreValue()
: m_object_size(0), m_server_id(0) {}
DataStoreValue(size_t object_size, uint64_t bake_server_id, const bake_region_id_t& region_id)
: m_object_size(object_size), m_server_id(bake_server_id), m_region_id(region_id) {}
size_t getDataSize() const {
return m_object_size;
}
const bake_region_id_t& getBakeRegionID() const {
return m_region_id;
}
const uint64_t& getBakeServerID() const {
return m_server_id;
}
};
}
#endif
#include <fstream>
#include <yaml-cpp/yaml.h>
#include "ConnectionInfoGenerator.hpp"
namespace hepnos {
struct ConnectionInfoGenerator::Impl {
std::string m_addr; // address of this process
uint16_t m_bake_id; // provider ids for BAKE
uint16_t m_sdskv_id; // provider ids for SDSKV
};
ConnectionInfoGenerator::ConnectionInfoGenerator(
const std::string& address,
uint16_t sdskv_provider_id,
uint16_t bake_provider_id)
: m_impl(std::make_unique<Impl>()) {
m_impl->m_addr = address;
m_impl->m_bake_id = bake_provider_id;
m_impl->m_sdskv_id = sdskv_provider_id;
}
ConnectionInfoGenerator::~ConnectionInfoGenerator() {}
void ConnectionInfoGenerator::generateFile(MPI_Comm comm, const std::string& filename) const {
int rank, size;
const char* addr = m_impl->m_addr.c_str();
MPI_Comm_rank(comm, &rank);
MPI_Comm_size(comm, &size);
unsigned j=0;
while(addr[j] != '\0' && addr[j] != ':') j++;
std::string proto(addr, j);
// Exchange addresses
std::vector<char> addresses_buf(128*size);
MPI_Gather(addr, 128, MPI_BYTE, addresses_buf.data(), 128, MPI_BYTE, 0, comm);
// Exchange bake providers info
std::vector<uint16_t> bake_pr_ids_buf(size);
MPI_Gather(&(m_impl->m_bake_id),
1, MPI_UNSIGNED_SHORT,
bake_pr_ids_buf.data(),
1, MPI_UNSIGNED_SHORT,
0, comm);
// Exchange sdskv providers info
std::vector<uint16_t> sdskv_pr_ids_buf(size);
MPI_Gather(&(m_impl->m_sdskv_id),
1, MPI_UNSIGNED_SHORT,
sdskv_pr_ids_buf.data(),
1, MPI_UNSIGNED_SHORT,
0, comm);
// After this line, the rest is executed only by rank 0
if(rank != 0) return;
std::vector<std::string> addresses;
for(unsigned i=0; i < size; i++) {
addresses.emplace_back(&addresses_buf[128*i]);
}
YAML::Node config;
config["hepnos"]["client"]["protocol"] = proto;
YAML::Node providers = config["hepnos"]["providers"];
for(unsigned int i=0; i < size; i++) {
const auto& provider_addr = addresses[i];
if(sdskv_pr_ids_buf[i]) {
providers["sdskv"][provider_addr] = sdskv_pr_ids_buf[i];
}
if(bake_pr_ids_buf[i]) {
providers["bake"][provider_addr] = bake_pr_ids_buf[i];
}
}
std::ofstream fout(filename);
fout << config;
}
}
#ifndef __HEPNOS_CONNECTION_INFO_GENERATOR_H
#define __HEPNOS_CONNECTION_INFO_GENERATOR_H
#include <string>
#include <memory>
#include <mpi.h>
namespace hepnos {
class ConnectionInfoGenerator {
private:
class Impl;
std::unique_ptr<Impl> m_impl;
public:
ConnectionInfoGenerator(const std::string& address,
uint16_t sdskv_provider_id,
uint16_t bake_provider_id);
ConnectionInfoGenerator(const ConnectionInfoGenerator&) = delete;
ConnectionInfoGenerator(ConnectionInfoGenerator&&) = delete;
ConnectionInfoGenerator& operator=(const ConnectionInfoGenerator&) = delete;
ConnectionInfoGenerator& operator=(ConnectionInfoGenerator&&) = delete;
~ConnectionInfoGenerator();
void generateFile(MPI_Comm comm, const std::string& filename) const;
};
}
#endif
......@@ -12,14 +12,15 @@
#include <margo.h>
#include <bake-server.h>
#include <sdskv-server.h>
#include <yaml-cpp/yaml.h>
#include "ServiceConfig.hpp"
#include "ConnectionInfoGenerator.hpp"
#include "hepnos-service.h"
#define ASSERT(__cond, __msg, ...) { if(!(__cond)) { fprintf(stderr, "[%s:%d] " __msg, __FILE__, __LINE__, __VA_ARGS__); exit(-1); } }
static void generate_config_file(MPI_Comm comm, const char* addr, const char* config_file);
//static void generate_connection_file(MPI_Comm comm, const char* addr, const char* filename);
void hepnos_run_service(MPI_Comm comm, const char* listen_addr, const char* config_file)
void hepnos_run_service(MPI_Comm comm, const char* config_file, const char* connection_file)
{
margo_instance_id mid;
int ret;
......@@ -27,11 +28,24 @@ void hepnos_run_service(MPI_Comm comm, const char* listen_addr, const char* conf
MPI_Comm_rank(comm, &rank);
/* load configuration */
std::unique_ptr<hepnos::ServiceConfig> config;
try {
config = std::make_unique<hepnos::ServiceConfig>(config_file, rank);
} catch(const std::exception& e) {
std::cerr << "Error: when reading configuration:" << std::endl;
std::cerr << " " << e.what() << std::endl;
std::cerr << "Aborting." << std::endl;
MPI_Abort(MPI_COMM_WORLD, -1);
return;
}
/* Margo initialization */
mid = margo_init(listen_addr, MARGO_SERVER_MODE, 0, -1);
mid = margo_init(config->getAddress().c_str(), MARGO_SERVER_MODE, 0, -1);
if (mid == MARGO_INSTANCE_NULL)
{
fprintf(stderr, "Error: Unable to initialize margo\n");
std::cerr << "Error: unable to initialize margo" << std::endl;
std::cerr << "Aborting." << std::endl;
MPI_Abort(MPI_COMM_WORLD, -1);
return;
}
......@@ -44,43 +58,55 @@ void hepnos_run_service(MPI_Comm comm, const char* listen_addr, const char* conf
hg_size_t self_addr_str_size = 128;
margo_addr_to_string(mid, self_addr_str, &self_addr_str_size, self_addr);
/* Bake provider initialization */
uint16_t bake_mplex_id = 1;
char bake_target_name[128];
sprintf(bake_target_name, "/dev/shm/hepnos.%d.dat", rank);
/* create the bake target if it does not exist */
if(-1 == access(bake_target_name, F_OK)) {
// XXX creating a pool of 10MB - this should come from a config file