Commit d4ccfc77 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

fixed issue with PEP

parent 0145a03b
......@@ -23,6 +23,7 @@ struct ParallelEventProcessorImpl : public tl::provider<ParallelEventProcessorIm
std::shared_ptr<DataStoreImpl> m_datastore;
std::shared_ptr<AsyncEngineImpl> m_async;
MPI_Comm m_comm;
int m_my_rank;
ParallelEventProcessorOptions m_options;
std::vector<int> m_loader_ranks;
std::vector<int> m_targets;
......@@ -57,8 +58,7 @@ struct ParallelEventProcessorImpl : public tl::provider<ParallelEventProcessorIm
, m_options(options)
, m_req_events_rpc(define("hepnos_pep_req_events", &ParallelEventProcessorImpl::requestEventsRPC)) {
int size;
int rank;
MPI_Comm_rank(comm, &rank);
MPI_Comm_rank(comm, &m_my_rank);
MPI_Comm_size(comm, &size);
m_num_active_consumers = size-1;
// exchange addresses
......@@ -68,15 +68,13 @@ struct ParallelEventProcessorImpl : public tl::provider<ParallelEventProcessorIm
MPI_Allgather(my_addr_str.data(), 1024, MPI_BYTE, all_addresses_packed.data(), 1024, MPI_BYTE, comm);
m_provider_handles.resize(size);
for(unsigned i=0; i < size; i++) {
unsigned j = (rank + i) % size;
unsigned j = (m_my_rank + i) % size;
auto ep = m_datastore->m_engine.lookup(all_addresses_packed.data()+j*1024);
m_provider_handles[j] = tl::provider_handle(std::move(ep), options.providerID);
}
}
~ParallelEventProcessorImpl() {
if(m_is_loader)
m_no_more_consumers.wait();
m_req_events_rpc.deregister();
}
......@@ -86,10 +84,17 @@ struct ParallelEventProcessorImpl : public tl::provider<ParallelEventProcessorIm
void process(const std::vector<EventSet>& evsets,
const ParallelEventProcessor::EventProcessingWithCacheFn& function,
ParallelEventProcessorStatistics* stats) {
int size;
MPI_Comm_size(m_comm, &size);
m_is_loader = (m_loader_ranks[0] == m_my_rank);
if(size == 1)
m_no_more_consumers.set_value();
m_stats = stats;
startLoadingEventsFromTargets(evsets);
processEvents(function);
m_stats = nullptr;
if(m_is_loader)
m_no_more_consumers.wait();
}
/**
......@@ -138,7 +143,6 @@ struct ParallelEventProcessorImpl : public tl::provider<ParallelEventProcessorIm
}
void requestEventsRPC(const tl::request& req, size_t max, tl::bulk& remote_mem) {
std::vector<EventDescriptor> descriptorsToSend;
descriptorsToSend.reserve(max);
{
......@@ -177,12 +181,9 @@ struct ParallelEventProcessorImpl : public tl::provider<ParallelEventProcessorIm
bool requestEvents(std::vector<EventDescriptor>& descriptors) {
double t1 = tl::timer::wtime();
double t2;
int my_rank;
MPI_Comm_rank(m_comm, &my_rank);
while(m_loader_ranks.size() != 0) {
int loader_rank = m_loader_ranks[0];
if(loader_rank == my_rank) {
m_is_loader = true;
if(loader_rank == m_my_rank) {
std::unique_lock<tl::mutex> lock(m_event_queue_mtx);
while(m_event_queue.empty() && m_loader_running)
m_event_queue_cv.wait(lock);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment