Commit eb69bc6f authored by Matthieu Dorier's avatar Matthieu Dorier

added output batching in parallel processor

parent 44c69388
......@@ -6,16 +6,21 @@
#ifndef __HEPNOS_PARALLEL_EVENT_PROCESSOR_HPP
#define __HEPNOS_PARALLEL_EVENT_PROCESSOR_HPP
#include <limits>
#include <mpi.h>
#include <hepnos/Prefetcher.hpp>
#include <hepnos/Demangle.hpp>
#include <hepnos/AsyncEngine.hpp>
#include <hepnos/Statistics.hpp>
#include <hepnos/DataStore.hpp>
namespace hepnos {
struct ParallelEventProcessorImpl;
struct DispatchPolicy {
size_t eventsPerBlock = 16;
struct ParallelEventProcessorOptions {
unsigned cacheSize = std::numeric_limits<unsigned>::max(); // cache size of internal prefetcher
unsigned inputBatchSize = 16; // size of batches loaded from HEPnOS
unsigned outputBatchSize = 16; // size of batches sent over MPI to workers
};
struct ParallelEventProcessorStatistics {
......@@ -48,13 +53,24 @@ class ParallelEventProcessor {
*
* @param datastore Datastore
* @param comm Communicator gathering participating processes
* @param prefetcher Prefetcher to use when reading events from storage
* @param policy Dispatch policy to use when sending events to workers
* @param options Options on how to carry out batching and dispatch
*/
ParallelEventProcessor(const DataStore& datastore,
MPI_Comm comm,
const Prefetcher& prefetcher,
const DispatchPolicy& policy = DispatchPolicy());
const ParallelEventProcessorOptions& options = ParallelEventProcessorOptions());
/**
* @brief Constructor. Builds a ParallelEventProcessor to navigate a dataset.
* This constructor involves collective communications across members of the
* provided communicator.
*
* @param async AsyncEngine to use to access the storage in the background
* @param comm Communicator gathering participating processes
* @param options Options on how to carry out batching and dispatch
*/
ParallelEventProcessor(const AsyncEngine& async,
MPI_Comm comm,
const ParallelEventProcessorOptions& options = ParallelEventProcessorOptions());
/**
* @brief Destructor. This destructor involves collective comunications
......
......@@ -11,9 +11,8 @@ namespace hepnos {
ParallelEventProcessor::ParallelEventProcessor(
const DataStore& datastore,
MPI_Comm comm,
const Prefetcher& prefetcher,
const DispatchPolicy& policy)
: m_impl(std::make_shared<ParallelEventProcessorImpl>(datastore.m_impl, comm, prefetcher, policy))
const ParallelEventProcessorOptions& options)
: m_impl(std::make_shared<ParallelEventProcessorImpl>(datastore.m_impl, comm, options))
{
int num_procs;
int my_rank;
......
......@@ -16,14 +16,11 @@ namespace hepnos {
namespace tl = thallium;
typedef std::vector<EventDescriptor> DescriptorBatch;
struct ParallelEventProcessorImpl {
std::shared_ptr<DataStoreImpl> m_datastore;
MPI_Comm m_comm;
Prefetcher m_prefetcher;
DispatchPolicy m_policy;
ParallelEventProcessorOptions m_options;
std::vector<int> m_loader_ranks;
std::vector<int> m_targets;
......@@ -40,18 +37,19 @@ struct ParallelEventProcessorImpl {
ParallelEventProcessorImpl(
std::shared_ptr<DataStoreImpl> ds,
MPI_Comm comm,
const Prefetcher& prefetcher,
const DispatchPolicy& policy)
const ParallelEventProcessorOptions& options)
: m_datastore(std::move(ds))
, m_comm(comm)
, m_prefetcher(prefetcher)
, m_policy(policy) {
, m_options(options) {
MPI_Comm_size(comm, &m_num_active_consumers);
m_num_active_consumers -= 1;
}
~ParallelEventProcessorImpl() {}
/**
* Main function to start processing events in parallel.
*/
void process(const std::vector<EventSet>& evsets,
const ParallelEventProcessor::EventProcessingFn& function,
ParallelEventProcessorStatistics* stats) {
......@@ -65,6 +63,10 @@ struct ParallelEventProcessorImpl {
m_stats = nullptr;
}
/**
* Starts the ULT that loads events from HEPnOS. This ULT is posted
* on the first pool of the current ES.
*/
void startLoadingEventsFromTargets(const std::vector<EventSet>& evsets) {
if(evsets.size() == 0) {
return;
......@@ -75,9 +77,20 @@ struct ParallelEventProcessorImpl {
}, tl::anonymous());
}
/**
* Content of the ULT that loads events from HEPnOS. This ULT
* will loop over the EventSets, and inside an EventSet over the
* Events, and push descriptors inside the event queue.
*/
void loadEventsFromTargets(const std::vector<EventSet>& evsets) {
for(auto& evset : evsets) {
for(auto it = evset.begin(m_prefetcher); it != evset.end(); it++) {
Prefetcher prefetcher(
DataStore(m_datastore),
m_options.cacheSize,
m_options.inputBatchSize);
for(auto it = evset.begin(prefetcher); it != evset.end(); it++) {
EventDescriptor descriptor;
it->toDescriptor(descriptor);
{
......@@ -94,6 +107,10 @@ struct ParallelEventProcessorImpl {
}
}
/**
* Starts the ES that responds to MPI requests from other clients.
* This has to be done in a separate ES because MPI isn't Argobots-aware.
*/
void startRespondingToMPIrequests() {
if(!m_loader_running)
return;
......@@ -103,60 +120,76 @@ struct ParallelEventProcessorImpl {
}, tl::anonymous());
}
/**
* Function called in the above ES. This ULT will wait for requests from
* clients that don't have local events to process anymore.
*/
void respondToMPIrequests() {
int my_rank;
MPI_Comm_rank(m_comm, &my_rank);
while(m_num_active_consumers != 0) {
MPI_Status status;
MPI_Recv(NULL, 0, MPI_BYTE, MPI_ANY_SOURCE, 1111, m_comm, &status);
size_t num_events_requested;
MPI_Recv(&num_events_requested, sizeof(num_events_requested),
MPI_BYTE, MPI_ANY_SOURCE, 1111, m_comm, &status);
if(num_events_requested == 0) num_events_requested = 1;
int consumer_rank = status.MPI_SOURCE;
bool shouldSendEvent;
EventDescriptor descriptorToSend;
std::vector<EventDescriptor> descriptorsToSend;
descriptorsToSend.reserve(num_events_requested);
{
std::unique_lock<tl::mutex> lock(m_event_queue_mtx);
while(m_loader_running && m_event_queue.size() == 0)
while(m_loader_running && m_event_queue.empty())
m_event_queue_cv.wait(lock);
if(m_event_queue.size() > 0) {
descriptorToSend = m_event_queue.front();
for(unsigned i = 0; i < num_events_requested && !m_event_queue.empty(); i++) {
descriptorsToSend.push_back(m_event_queue.front());
m_event_queue.pop();
shouldSendEvent = true;
} else {
shouldSendEvent = false;
}
}
if(shouldSendEvent) {
MPI_Send(&descriptorToSend, sizeof(descriptorToSend), MPI_BYTE, consumer_rank, 1112, m_comm);
} else {
MPI_Send(NULL, 0, MPI_BYTE, consumer_rank, 1112, m_comm);
MPI_Send(descriptorsToSend.data(),
descriptorsToSend.size()*sizeof(descriptorsToSend[0]),
MPI_BYTE, consumer_rank, 1112, m_comm);
if(descriptorsToSend.empty()) {
m_num_active_consumers -= 1;
}
}
}
bool requestEvents(EventDescriptor& descriptor) {
bool requestEvents(std::vector<EventDescriptor>& descriptors) {
int my_rank;
MPI_Comm_rank(m_comm, &my_rank);
while(m_loader_ranks.size() != 0) {
int loader_rank = m_loader_ranks[0];
if(loader_rank == my_rank) {
std::unique_lock<tl::mutex> lock(m_event_queue_mtx);
while(m_event_queue.size() == 0 && m_loader_running)
while(m_event_queue.empty() && m_loader_running)
m_event_queue_cv.wait(lock);
if(m_event_queue.size() > 0) {
descriptor = m_event_queue.front();
size_t num_events_requested = m_options.outputBatchSize;
descriptors.resize(num_events_requested);
size_t num_actual_events = 0;
for(unsigned i = 0; i < num_events_requested && !m_event_queue.empty(); i++) {
descriptors[i] = m_event_queue.front();
m_event_queue.pop();
num_actual_events += 1;
if(m_stats) m_stats->local_events_processed += 1;
}
if(num_actual_events != 0) {
descriptors.resize(num_actual_events);
return true;
} else {
m_loader_ranks.erase(m_loader_ranks.begin());
}
} else {
MPI_Send(NULL, 0, MPI_BYTE, loader_rank, 1111, m_comm);
size_t num_events_requested = m_options.outputBatchSize;
MPI_Send(&num_events_requested, sizeof(num_events_requested), MPI_BYTE, loader_rank, 1111, m_comm);
MPI_Status status;
MPI_Recv(&descriptor, sizeof(descriptor), MPI_BYTE, loader_rank, 1112, m_comm, &status);
descriptors.resize(num_events_requested);
MPI_Recv(descriptors.data(), sizeof(descriptors[0])*descriptors.size(),
MPI_BYTE, loader_rank, 1112, m_comm, &status);
int count;
MPI_Get_count(&status, MPI_BYTE, &count);
if(count == sizeof(descriptor)) {
size_t num_actual_events = count/sizeof(descriptors[0]);
if(num_actual_events != 0) {
descriptors.resize(num_actual_events);
return true;
} else {
m_loader_ranks.erase(m_loader_ranks.begin());
......@@ -169,19 +202,21 @@ struct ParallelEventProcessorImpl {
void processEvents(const ParallelEventProcessor::EventProcessingFn& user_function) {
if(m_stats) *m_stats = ParallelEventProcessorStatistics();
double t_start = tl::timer::wtime();
EventDescriptor descriptor;
std::vector<EventDescriptor> descriptors;
double t1;
double t2 = tl::timer::wtime();
while(requestEvents(descriptor)) {
Event event = Event::fromDescriptor(DataStore(m_datastore), descriptor, false);
t1 = tl::timer::wtime();
if(m_stats) m_stats->waiting_time_stats.updateWith(t1-t2);
user_function(event);
t2 = tl::timer::wtime();
if(m_stats) {
m_stats->processing_time_stats.updateWith(t2 - t1);
m_stats->total_processing_time += t2 - t1;
m_stats->total_events_processed += 1;
while(requestEvents(descriptors)) {
for(auto& d : descriptors) {
Event event = Event::fromDescriptor(DataStore(m_datastore), d, false);
t1 = tl::timer::wtime();
if(m_stats) m_stats->waiting_time_stats.updateWith(t1-t2);
user_function(event);
t2 = tl::timer::wtime();
if(m_stats) {
m_stats->processing_time_stats.updateWith(t2 - t1);
m_stats->total_processing_time += t2 - t1;
m_stats->total_events_processed += 1;
}
}
}
double t_end = tl::timer::wtime();
......
......@@ -4,6 +4,7 @@
#include <set>
#include <unordered_set>
#include <unordered_map>
#include "hepnos/Prefetcher.hpp"
#include "DataStoreImpl.hpp"
#include "AsyncEngineImpl.hpp"
......
......@@ -59,9 +59,7 @@ void ParallelMPITest::testParallelEventProcessor() {
MPI_Comm_size(MPI_COMM_WORLD, &size);
ParallelEventProcessorStatistics stats;
Prefetcher prefetcher(*datastore);
ParallelEventProcessor parallel_processor(*datastore, MPI_COMM_WORLD, prefetcher);
ParallelEventProcessor parallel_processor(*datastore, MPI_COMM_WORLD);
std::vector<item> items;
parallel_processor.process(mds,
[&items, rank](const Event& ev) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment