Commit 0145a03b authored by Matthieu Dorier's avatar Matthieu Dorier

implemented Mercury-based PEP

parent edfc30fd
......@@ -52,6 +52,7 @@ ParallelEventProcessor::ParallelEventProcessor(
m_impl->m_loader_ranks = std::move(loader_ranks);
m_impl->m_targets = std::move(my_targets);
MPI_Barrier(comm);
}
ParallelEventProcessor::ParallelEventProcessor(
......@@ -62,11 +63,7 @@ ParallelEventProcessor::ParallelEventProcessor(
m_impl->m_async = async.m_impl;
}
ParallelEventProcessor::~ParallelEventProcessor() {
if(m_impl) {
MPI_Barrier(m_impl->m_comm);
}
}
ParallelEventProcessor::~ParallelEventProcessor() = default;
void ParallelEventProcessor::process(
const DataSet& dataset,
......
......@@ -12,6 +12,7 @@
#include "ProductCacheImpl.hpp"
#include "hepnos/EventSet.hpp"
#include "hepnos/ParallelEventProcessor.hpp"
#include <thallium/serialization/stl/vector.hpp>
namespace hepnos {
......@@ -28,6 +29,7 @@ struct ParallelEventProcessorImpl : public tl::provider<ParallelEventProcessorIm
std::unordered_set<std::string> m_product_keys;
tl::remote_procedure m_req_events_rpc;
std::vector<tl::provider_handle> m_provider_handles;
bool m_loader_running = false;
std::queue<EventDescriptor> m_event_queue;
......@@ -35,7 +37,8 @@ struct ParallelEventProcessorImpl : public tl::provider<ParallelEventProcessorIm
tl::condition_variable m_event_queue_cv;
int m_num_active_consumers;
tl::managed<tl::xstream> m_mpi_xstream;
tl::eventual<void> m_no_more_consumers;
bool m_is_loader = false;
size_t m_num_processing_ults = 0;
tl::mutex m_processing_ults_mtx;
......@@ -53,11 +56,27 @@ struct ParallelEventProcessorImpl : public tl::provider<ParallelEventProcessorIm
, m_comm(comm)
, m_options(options)
, m_req_events_rpc(define("hepnos_pep_req_events", &ParallelEventProcessorImpl::requestEventsRPC)) {
MPI_Comm_size(comm, &m_num_active_consumers);
m_num_active_consumers -= 1;
int size;
int rank;
MPI_Comm_rank(comm, &rank);
MPI_Comm_size(comm, &size);
m_num_active_consumers = size-1;
// exchange addresses
std::string my_addr_str = m_datastore->m_engine.self();
my_addr_str.resize(1024, '\0');
std::vector<char> all_addresses_packed(size*1024);
MPI_Allgather(my_addr_str.data(), 1024, MPI_BYTE, all_addresses_packed.data(), 1024, MPI_BYTE, comm);
m_provider_handles.resize(size);
for(unsigned i=0; i < size; i++) {
unsigned j = (rank + i) % size;
auto ep = m_datastore->m_engine.lookup(all_addresses_packed.data()+j*1024);
m_provider_handles[j] = tl::provider_handle(std::move(ep), options.providerID);
}
}
~ParallelEventProcessorImpl() {
if(m_is_loader)
m_no_more_consumers.wait();
m_req_events_rpc.deregister();
}
......@@ -69,11 +88,7 @@ struct ParallelEventProcessorImpl : public tl::provider<ParallelEventProcessorIm
ParallelEventProcessorStatistics* stats) {
m_stats = stats;
startLoadingEventsFromTargets(evsets);
startRespondingToMPIrequests();
processEvents(function);
if(!m_mpi_xstream->is_null()) {
m_mpi_xstream->join();
}
m_stats = nullptr;
}
......@@ -103,7 +118,8 @@ struct ParallelEventProcessorImpl : public tl::provider<ParallelEventProcessorIm
DataStore(m_datastore),
m_options.cacheSize,
m_options.inputBatchSize);
// XXX instead of waking up all the threads at every event,
// we should try to fill up batches of the appropriate size in the queue
for(auto it = evset.begin(prefetcher); it != evset.end(); it++) {
EventDescriptor descriptor;
it->toDescriptor(descriptor);
......@@ -121,55 +137,35 @@ struct ParallelEventProcessorImpl : public tl::provider<ParallelEventProcessorIm
}
}
/**
* Starts the ES that responds to MPI requests from other clients.
* This has to be done in a separate ES because MPI isn't Argobots-aware.
*/
void startRespondingToMPIrequests() {
if(!m_loader_running)
return;
m_mpi_xstream = tl::xstream::create();
m_mpi_xstream->make_thread([this](){
respondToMPIrequests();
}, tl::anonymous());
}
void requestEventsRPC(const tl::request& req, size_t max, tl::bulk& remote_mem) {
/**
* Function called in the above ES. This ULT will wait for requests from
* clients that don't have local events to process anymore.
*/
void respondToMPIrequests() {
int my_rank;
MPI_Comm_rank(m_comm, &my_rank);
while(m_num_active_consumers != 0) {
MPI_Status status;
size_t num_events_requested;
MPI_Recv(&num_events_requested, sizeof(num_events_requested),
MPI_BYTE, MPI_ANY_SOURCE, 1111, m_comm, &status);
if(num_events_requested == 0) num_events_requested = 1;
int consumer_rank = status.MPI_SOURCE;
std::vector<EventDescriptor> descriptorsToSend;
descriptorsToSend.reserve(num_events_requested);
{
std::unique_lock<tl::mutex> lock(m_event_queue_mtx);
while(m_loader_running && m_event_queue.empty())
m_event_queue_cv.wait(lock);
for(unsigned i = 0; i < num_events_requested && !m_event_queue.empty(); i++) {
descriptorsToSend.push_back(m_event_queue.front());
m_event_queue.pop();
}
}
MPI_Send(descriptorsToSend.data(),
descriptorsToSend.size()*sizeof(descriptorsToSend[0]),
MPI_BYTE, consumer_rank, 1112, m_comm);
if(descriptorsToSend.empty()) {
m_num_active_consumers -= 1;
std::vector<EventDescriptor> descriptorsToSend;
descriptorsToSend.reserve(max);
{
std::unique_lock<tl::mutex> lock(m_event_queue_mtx);
while(m_loader_running && m_event_queue.empty())
m_event_queue_cv.wait(lock);
for(unsigned i = 0; i < max && !m_event_queue.empty(); i++) {
descriptorsToSend.push_back(m_event_queue.front());
m_event_queue.pop();
}
}
}
void requestEventsRPC(const tl::request& req, size_t max, tl::bulk& remote_mem) {
// TODO
if(descriptorsToSend.size() != 0) {
std::vector<std::pair<void*, size_t>> segment =
{{ descriptorsToSend.data(), sizeof(descriptorsToSend[0])*descriptorsToSend.size() }};
auto local_mem = m_datastore->m_engine.expose(segment, tl::bulk_mode::read_only);
remote_mem.on(req.get_endpoint()) << local_mem;
}
req.respond(static_cast<size_t>(descriptorsToSend.size()));
if(descriptorsToSend.empty()) {
m_num_active_consumers -= 1;
if(m_num_active_consumers == 0) {
m_no_more_consumers.set_value(); // allow the destructor to complete
}
}
}
/**
......@@ -186,6 +182,7 @@ struct ParallelEventProcessorImpl : public tl::provider<ParallelEventProcessorIm
while(m_loader_ranks.size() != 0) {
int loader_rank = m_loader_ranks[0];
if(loader_rank == my_rank) {
m_is_loader = true;
std::unique_lock<tl::mutex> lock(m_event_queue_mtx);
while(m_event_queue.empty() && m_loader_running)
m_event_queue_cv.wait(lock);
......@@ -212,15 +209,16 @@ struct ParallelEventProcessorImpl : public tl::provider<ParallelEventProcessorIm
m_loader_ranks.erase(m_loader_ranks.begin());
}
} else {
size_t num_events_requested = m_options.outputBatchSize;
MPI_Send(&num_events_requested, sizeof(num_events_requested), MPI_BYTE, loader_rank, 1111, m_comm);
MPI_Status status;
descriptors.resize(num_events_requested);
MPI_Recv(descriptors.data(), sizeof(descriptors[0])*descriptors.size(),
MPI_BYTE, loader_rank, 1112, m_comm, &status);
int count;
MPI_Get_count(&status, MPI_BYTE, &count);
size_t num_actual_events = count/sizeof(descriptors[0]);
size_t max = m_options.outputBatchSize;
descriptors.resize(max);
std::vector<std::pair<void*, size_t>> segment =
{{ descriptors.data(), sizeof(descriptors[0])*descriptors.size() }};
auto local_mem = m_datastore->m_engine.expose(segment, tl::bulk_mode::write_only);
size_t num_actual_events = m_req_events_rpc
.on(m_provider_handles[loader_rank])(max, local_mem);
if(num_actual_events != 0) {
descriptors.resize(num_actual_events);
t2 = tl::timer::wtime();
......
......@@ -18,7 +18,7 @@ int main(int argc, char* argv[])
sleep(1);
// Create the datastore
hepnos::DataStore ds = hepnos::DataStore::connect(argv[1]);
hepnos::DataStore ds = hepnos::DataStore::connect(argv[1], true);
datastore = &ds;
// Get the top level suite from the registry
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment