Commit 50184891 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

Merge branch 'dev-new-pep' into 'master'

Developped new Parallel Event Processing based on Mercury

See merge request !2
parents 61ff41b3 d4ccfc77
......@@ -22,6 +22,7 @@ struct ParallelEventProcessorOptions {
unsigned cacheSize = std::numeric_limits<unsigned>::max(); // cache size of internal prefetcher
unsigned inputBatchSize = 16; // size of batches loaded from HEPnOS
unsigned outputBatchSize = 16; // size of batches sent over MPI to workers
uint16_t providerID = 0; // provider id to use (if multiple ParallelEventProcessor instances are used)
};
struct ParallelEventProcessorStatistics {
......
......@@ -55,7 +55,7 @@ void DataStore::shutdown() {
throw Exception("Calling DataStore member function on an invalid DataStore object");
}
for(auto addr : m_impl->m_addrs) {
margo_shutdown_remote_instance(m_impl->m_mid, addr.second);
m_impl->m_engine.shutdown_remote_engine(addr.second);
}
}
......
......@@ -11,6 +11,7 @@
#include <unordered_map>
#include <functional>
#include <iostream>
#include <thallium.hpp>
#include <yaml-cpp/yaml.h>
#include <sdskv-client.hpp>
#include <ch-placement.h>
......@@ -49,20 +50,23 @@ struct DistributedDBInfo {
struct ch_placement_instance* chi = nullptr;
};
using namespace std::string_literals;
namespace tl = thallium;
class DataStoreImpl {
public:
margo_instance_id m_mid; // Margo instance
std::unordered_map<std::string,hg_addr_t> m_addrs; // Addresses used by the service
sdskv::client m_sdskv_client; // SDSKV client
DistributedDBInfo m_dataset_dbs; // list of SDSKV databases for DataSets
DistributedDBInfo m_run_dbs; // list of SDSKV databases for Runs
DistributedDBInfo m_subrun_dbs; // list of SDSKV databases for Runs
DistributedDBInfo m_event_dbs; // list of SDSKV databases for Runs
DistributedDBInfo m_product_dbs; // list of SDSKV databases for Products
tl::engine m_engine; // Thallium engine
bool m_engine_initialized = false;
std::unordered_map<std::string,tl::endpoint> m_addrs; // Addresses used by the service
sdskv::client m_sdskv_client; // SDSKV client
DistributedDBInfo m_dataset_dbs; // list of SDSKV databases for DataSets
DistributedDBInfo m_run_dbs; // list of SDSKV databases for Runs
DistributedDBInfo m_subrun_dbs; // list of SDSKV databases for Runs
DistributedDBInfo m_event_dbs; // list of SDSKV databases for Runs
DistributedDBInfo m_product_dbs; // list of SDSKV databases for Products
DataStoreImpl()
: m_mid(MARGO_INSTANCE_NULL)
{}
~DataStoreImpl() {
......@@ -76,24 +80,28 @@ class DataStoreImpl {
std::string str_addr = address_it->first.as<std::string>();
YAML::Node providers = address_it->second;
// lookup the address
hg_addr_t addr;
tl::endpoint addr;
if(m_addrs.count(str_addr) != 0) {
addr = m_addrs[str_addr];
} else {
hret = margo_addr_lookup(m_mid, str_addr.c_str(), &addr);
if(hret != HG_SUCCESS) {
margo_addr_free(m_mid,addr);
cleanup();
throw Exception("margo_addr_lookup failed (MARGO error="+std::to_string(hret)+")");
try {
addr = m_engine.lookup(str_addr);
m_addrs[str_addr] = addr;
} catch(const std::exception& ex) {
throw Exception("Address lookup failed: "s + ex.what());
}
m_addrs[str_addr] = addr;
}
// iterate over providers for this address
for(YAML::const_iterator provider_it = providers.begin(); provider_it != providers.end(); provider_it++) {
// get the provider id
uint16_t provider_id = provider_it->first.as<uint16_t>();
// create provider handle
sdskv::provider_handle ph(m_sdskv_client, addr, provider_id);
sdskv::provider_handle ph;
try {
ph = sdskv::provider_handle(m_sdskv_client, addr.get_addr(), provider_id);
} catch(const std::exception& ex) {
throw Exception("Could not create SDSKV provider handle: "s + ex.what());
}
// get the database ids
YAML::Node databases = provider_it->second;
// iterate over databases for this provider
......@@ -120,14 +128,16 @@ class DataStoreImpl {
memset(&hg_opt, 0, sizeof(hg_opt));
if(busySpin)
hg_opt.na_init_info.progress_mode = NA_NO_BLOCK;
m_mid = margo_init_opt(proto.c_str(), MARGO_CLIENT_MODE, &hg_opt, use_progress_thread, 0);
if(!m_mid) {
try {
m_engine = tl::engine(proto, THALLIUM_SERVER_MODE, use_progress_thread, -1, &hg_opt);
m_engine_initialized = true;
} catch(const std::exception& ex) {
cleanup();
throw Exception("Could not initialized Margo");
throw Exception("Could not initialized Thallium: "s + ex.what());
}
// initialize SDSKV client
try {
m_sdskv_client = sdskv::client(m_mid);
m_sdskv_client = sdskv::client(m_engine.get_margo_instance());
} catch(sdskv::exception& ex) {
cleanup();
throw Exception("Could not create SDSKV client (SDSKV error="+std::to_string(ex.error())+")");
......@@ -157,11 +167,10 @@ class DataStoreImpl {
if(m_run_dbs.chi) ch_placement_finalize(m_run_dbs.chi);
if(m_subrun_dbs.chi) ch_placement_finalize(m_subrun_dbs.chi);
if(m_event_dbs.chi) ch_placement_finalize(m_event_dbs.chi);
if(m_product_dbs.chi) ch_placement_finalize(m_product_dbs.chi);
for(auto& addr : m_addrs) {
margo_addr_free(m_mid, addr.second);
}
if(m_mid) margo_finalize(m_mid);
if(m_product_dbs.chi) ch_placement_finalize(m_product_dbs.chi);
m_addrs.clear();
if(m_engine_initialized) m_engine.finalize();
m_engine_initialized = false;
}
size_t numTargets(const ItemType& type) const {
......
......@@ -52,6 +52,7 @@ ParallelEventProcessor::ParallelEventProcessor(
m_impl->m_loader_ranks = std::move(loader_ranks);
m_impl->m_targets = std::move(my_targets);
MPI_Barrier(comm);
}
ParallelEventProcessor::ParallelEventProcessor(
......@@ -62,11 +63,7 @@ ParallelEventProcessor::ParallelEventProcessor(
m_impl->m_async = async.m_impl;
}
ParallelEventProcessor::~ParallelEventProcessor() {
if(m_impl) {
MPI_Barrier(m_impl->m_comm);
}
}
ParallelEventProcessor::~ParallelEventProcessor() = default;
void ParallelEventProcessor::process(
const DataSet& dataset,
......
......@@ -12,28 +12,34 @@
#include "ProductCacheImpl.hpp"
#include "hepnos/EventSet.hpp"
#include "hepnos/ParallelEventProcessor.hpp"
#include <thallium/serialization/stl/vector.hpp>
namespace hepnos {
namespace tl = thallium;
struct ParallelEventProcessorImpl {
struct ParallelEventProcessorImpl : public tl::provider<ParallelEventProcessorImpl> {
std::shared_ptr<DataStoreImpl> m_datastore;
std::shared_ptr<AsyncEngineImpl> m_async;
MPI_Comm m_comm;
int m_my_rank;
ParallelEventProcessorOptions m_options;
std::vector<int> m_loader_ranks;
std::vector<int> m_targets;
std::unordered_set<std::string> m_product_keys;
tl::remote_procedure m_req_events_rpc;
std::vector<tl::provider_handle> m_provider_handles;
bool m_loader_running = false;
std::queue<EventDescriptor> m_event_queue;
tl::mutex m_event_queue_mtx;
tl::condition_variable m_event_queue_cv;
int m_num_active_consumers;
tl::managed<tl::xstream> m_mpi_xstream;
tl::eventual<void> m_no_more_consumers;
bool m_is_loader = false;
size_t m_num_processing_ults = 0;
tl::mutex m_processing_ults_mtx;
......@@ -46,14 +52,31 @@ struct ParallelEventProcessorImpl {
std::shared_ptr<DataStoreImpl> ds,
MPI_Comm comm,
const ParallelEventProcessorOptions& options)
: m_datastore(std::move(ds))
: tl::provider<ParallelEventProcessorImpl>(ds->m_engine, options.providerID)
, m_datastore(std::move(ds))
, m_comm(comm)
, m_options(options) {
MPI_Comm_size(comm, &m_num_active_consumers);
m_num_active_consumers -= 1;
, m_options(options)
, m_req_events_rpc(define("hepnos_pep_req_events", &ParallelEventProcessorImpl::requestEventsRPC)) {
int size;
MPI_Comm_rank(comm, &m_my_rank);
MPI_Comm_size(comm, &size);
m_num_active_consumers = size-1;
// exchange addresses
std::string my_addr_str = m_datastore->m_engine.self();
my_addr_str.resize(1024, '\0');
std::vector<char> all_addresses_packed(size*1024);
MPI_Allgather(my_addr_str.data(), 1024, MPI_BYTE, all_addresses_packed.data(), 1024, MPI_BYTE, comm);
m_provider_handles.resize(size);
for(unsigned i=0; i < size; i++) {
unsigned j = (m_my_rank + i) % size;
auto ep = m_datastore->m_engine.lookup(all_addresses_packed.data()+j*1024);
m_provider_handles[j] = tl::provider_handle(std::move(ep), options.providerID);
}
}
~ParallelEventProcessorImpl() {}
~ParallelEventProcessorImpl() {
m_req_events_rpc.deregister();
}
/**
* Main function to start processing events in parallel.
......@@ -61,14 +84,17 @@ struct ParallelEventProcessorImpl {
void process(const std::vector<EventSet>& evsets,
const ParallelEventProcessor::EventProcessingWithCacheFn& function,
ParallelEventProcessorStatistics* stats) {
int size;
MPI_Comm_size(m_comm, &size);
m_is_loader = (m_loader_ranks[0] == m_my_rank);
if(size == 1)
m_no_more_consumers.set_value();
m_stats = stats;
startLoadingEventsFromTargets(evsets);
startRespondingToMPIrequests();
processEvents(function);
if(!m_mpi_xstream->is_null()) {
m_mpi_xstream->join();
}
m_stats = nullptr;
if(m_is_loader)
m_no_more_consumers.wait();
}
/**
......@@ -97,7 +123,8 @@ struct ParallelEventProcessorImpl {
DataStore(m_datastore),
m_options.cacheSize,
m_options.inputBatchSize);
// XXX instead of waking up all the threads at every event,
// we should try to fill up batches of the appropriate size in the queue
for(auto it = evset.begin(prefetcher); it != evset.end(); it++) {
EventDescriptor descriptor;
it->toDescriptor(descriptor);
......@@ -115,49 +142,32 @@ struct ParallelEventProcessorImpl {
}
}
/**
* Starts the ES that responds to MPI requests from other clients.
* This has to be done in a separate ES because MPI isn't Argobots-aware.
*/
void startRespondingToMPIrequests() {
if(!m_loader_running)
return;
m_mpi_xstream = tl::xstream::create();
m_mpi_xstream->make_thread([this](){
respondToMPIrequests();
}, tl::anonymous());
}
/**
* Function called in the above ES. This ULT will wait for requests from
* clients that don't have local events to process anymore.
*/
void respondToMPIrequests() {
int my_rank;
MPI_Comm_rank(m_comm, &my_rank);
while(m_num_active_consumers != 0) {
MPI_Status status;
size_t num_events_requested;
MPI_Recv(&num_events_requested, sizeof(num_events_requested),
MPI_BYTE, MPI_ANY_SOURCE, 1111, m_comm, &status);
if(num_events_requested == 0) num_events_requested = 1;
int consumer_rank = status.MPI_SOURCE;
std::vector<EventDescriptor> descriptorsToSend;
descriptorsToSend.reserve(num_events_requested);
{
std::unique_lock<tl::mutex> lock(m_event_queue_mtx);
while(m_loader_running && m_event_queue.empty())
m_event_queue_cv.wait(lock);
for(unsigned i = 0; i < num_events_requested && !m_event_queue.empty(); i++) {
descriptorsToSend.push_back(m_event_queue.front());
m_event_queue.pop();
}
void requestEventsRPC(const tl::request& req, size_t max, tl::bulk& remote_mem) {
std::vector<EventDescriptor> descriptorsToSend;
descriptorsToSend.reserve(max);
{
std::unique_lock<tl::mutex> lock(m_event_queue_mtx);
while(m_loader_running && m_event_queue.empty())
m_event_queue_cv.wait(lock);
for(unsigned i = 0; i < max && !m_event_queue.empty(); i++) {
descriptorsToSend.push_back(m_event_queue.front());
m_event_queue.pop();
}
MPI_Send(descriptorsToSend.data(),
descriptorsToSend.size()*sizeof(descriptorsToSend[0]),
MPI_BYTE, consumer_rank, 1112, m_comm);
if(descriptorsToSend.empty()) {
m_num_active_consumers -= 1;
}
if(descriptorsToSend.size() != 0) {
std::vector<std::pair<void*, size_t>> segment =
{{ descriptorsToSend.data(), sizeof(descriptorsToSend[0])*descriptorsToSend.size() }};
auto local_mem = m_datastore->m_engine.expose(segment, tl::bulk_mode::read_only);
remote_mem.on(req.get_endpoint()) << local_mem;
}
req.respond(static_cast<size_t>(descriptorsToSend.size()));
if(descriptorsToSend.empty()) {
m_num_active_consumers -= 1;
if(m_num_active_consumers == 0) {
m_no_more_consumers.set_value(); // allow the destructor to complete
}
}
}
......@@ -171,11 +181,9 @@ struct ParallelEventProcessorImpl {
bool requestEvents(std::vector<EventDescriptor>& descriptors) {
double t1 = tl::timer::wtime();
double t2;
int my_rank;
MPI_Comm_rank(m_comm, &my_rank);
while(m_loader_ranks.size() != 0) {
int loader_rank = m_loader_ranks[0];
if(loader_rank == my_rank) {
if(loader_rank == m_my_rank) {
std::unique_lock<tl::mutex> lock(m_event_queue_mtx);
while(m_event_queue.empty() && m_loader_running)
m_event_queue_cv.wait(lock);
......@@ -202,15 +210,16 @@ struct ParallelEventProcessorImpl {
m_loader_ranks.erase(m_loader_ranks.begin());
}
} else {
size_t num_events_requested = m_options.outputBatchSize;
MPI_Send(&num_events_requested, sizeof(num_events_requested), MPI_BYTE, loader_rank, 1111, m_comm);
MPI_Status status;
descriptors.resize(num_events_requested);
MPI_Recv(descriptors.data(), sizeof(descriptors[0])*descriptors.size(),
MPI_BYTE, loader_rank, 1112, m_comm, &status);
int count;
MPI_Get_count(&status, MPI_BYTE, &count);
size_t num_actual_events = count/sizeof(descriptors[0]);
size_t max = m_options.outputBatchSize;
descriptors.resize(max);
std::vector<std::pair<void*, size_t>> segment =
{{ descriptors.data(), sizeof(descriptors[0])*descriptors.size() }};
auto local_mem = m_datastore->m_engine.expose(segment, tl::bulk_mode::write_only);
size_t num_actual_events = m_req_events_rpc
.on(m_provider_handles[loader_rank])(max, local_mem);
if(num_actual_events != 0) {
descriptors.resize(num_actual_events);
t2 = tl::timer::wtime();
......
......@@ -18,7 +18,7 @@ int main(int argc, char* argv[])
sleep(1);
// Create the datastore
hepnos::DataStore ds = hepnos::DataStore::connect(argv[1]);
hepnos::DataStore ds = hepnos::DataStore::connect(argv[1], true);
datastore = &ds;
// Get the top level suite from the registry
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment