Commit 4667bc25 authored by Matthieu Dorier's avatar Matthieu Dorier

added async support in parallel event processor

parent 9592387a
......@@ -21,6 +21,8 @@ class WriteBatchImpl;
class Prefetcher;
class PrefetcherImpl;
class AsyncEngineImpl;
class ParallelEventProcessor;
class ParallelEventProcessorImpl;
/**
* @brief The AsyncEngine class uses Argobots to provide a set
......@@ -42,6 +44,8 @@ class AsyncEngine {
friend class KeyValueContainer;
friend class WriteBatch;
friend class Prefetcher;
friend class ParallelEventProcessor;
friend class ParallelEventProcessorImpl;
private:
......
......@@ -17,6 +17,7 @@ namespace hepnos {
class RunSet;
class Run;
class ParallelEventProcessor;
/**
* @brief The DataSet class represents a handle to a named dataset
......@@ -29,6 +30,7 @@ class DataSet : public KeyValueContainer {
friend class DataStore;
friend class RunSet;
friend class DataSetImpl;
friend class ParallelEventProcessor;
private:
......
......@@ -25,12 +25,14 @@ struct ParallelEventProcessorOptions {
};
struct ParallelEventProcessorStatistics {
size_t total_events_processed = 0;
size_t local_events_processed = 0;
size_t total_events_processed = 0; // total number of events processed by this process
size_t local_events_processed = 0; // total number of events that were loaded locally (not using MPI)
double total_time = 0.0; // total time in the process function, in seconds
double total_processing_time = 0.0; // total processing time, in seconds
double acc_event_processing_time = 0.0; // accumulated time, in the user callback function, in seconds
double acc_product_loading_time = 0.0; // accumulated product loading time, in seconds
Statistics<double,double> processing_time_stats; // statistics on single-event processing times
Statistics<double,double> waiting_time_stats; // statictics on time between calls to user-provided function
Statistics<double,double> product_loading_time_stats; // statistics on single-event product loading times
Statistics<double,double> waiting_time_stats; // statictics on time spent waiting for new events to be in the queue
};
/**
......
......@@ -11,12 +11,14 @@ namespace hepnos {
class WriteBatchImpl;
class AsyncPrefetcherImpl;
class ParallelEventProcessor;
class AsyncEngineImpl {
friend class WriteBatchImpl;
friend class AsyncEngine;
friend class AsyncPrefetcherImpl;
friend class ParallelEventProcessor;
public:
......
......@@ -4,7 +4,9 @@
* See COPYRIGHT in top-level directory.
*/
#include "hepnos/ParallelEventProcessor.hpp"
#include "hepnos/AsyncEngine.hpp"
#include "ParallelEventProcessorImpl.hpp"
#include "AsyncEngineImpl.hpp"
namespace hepnos {
......@@ -52,6 +54,14 @@ ParallelEventProcessor::ParallelEventProcessor(
m_impl->m_targets = std::move(my_targets);
}
ParallelEventProcessor::ParallelEventProcessor(
const AsyncEngine& async,
MPI_Comm comm,
const ParallelEventProcessorOptions& options)
: ParallelEventProcessor(DataStore(async.m_impl->m_datastore), comm, options) {
m_impl->m_async = async.m_impl;
}
ParallelEventProcessor::~ParallelEventProcessor() {
if(m_impl) {
MPI_Barrier(m_impl->m_comm);
......@@ -62,6 +72,17 @@ void ParallelEventProcessor::process(
const DataSet& dataset,
const EventProcessingWithCacheFn& function,
ParallelEventProcessorStatistics* stats) {
// make sure this function is called on the same dataset by all the processes
UUID dsetid = dataset.m_impl->m_uuid;
UUID uuid;
MPI_Allreduce(&dsetid, &uuid, sizeof(uuid), MPI_BYTE, MPI_BAND, m_impl->m_comm);
char c = dsetid == uuid ? 1 : 0;
char c2;
MPI_Allreduce(&c, &c2, 1, MPI_BYTE, MPI_BAND, m_impl->m_comm);
if(!c2) {
throw Exception("ParallelEventProcessor::process called on different DataSets by distinct processes");
}
// get the event sets
std::vector<EventSet> ev_sets;
for(auto t : m_impl->m_targets) {
ev_sets.push_back(dataset.events(t));
......
......@@ -19,21 +19,27 @@ namespace tl = thallium;
struct ParallelEventProcessorImpl {
std::shared_ptr<DataStoreImpl> m_datastore;
MPI_Comm m_comm;
ParallelEventProcessorOptions m_options;
std::vector<int> m_loader_ranks;
std::vector<int> m_targets;
std::unordered_set<std::string> m_product_keys;
std::shared_ptr<DataStoreImpl> m_datastore;
std::shared_ptr<AsyncEngineImpl> m_async;
MPI_Comm m_comm;
ParallelEventProcessorOptions m_options;
std::vector<int> m_loader_ranks;
std::vector<int> m_targets;
std::unordered_set<std::string> m_product_keys;
bool m_loader_running = false;
std::queue<EventDescriptor> m_event_queue;
tl::mutex m_event_queue_mtx;
tl::condition_variable m_event_queue_cv;
bool m_loader_running = false;
std::queue<EventDescriptor> m_event_queue;
tl::mutex m_event_queue_mtx;
tl::condition_variable m_event_queue_cv;
int m_num_active_consumers;
tl::managed<tl::xstream> m_mpi_xstream;
int m_num_active_consumers;
tl::managed<tl::xstream> m_mpi_xstream;
size_t m_num_processing_ults = 0;
tl::mutex m_processing_ults_mtx;
tl::condition_variable m_processing_ults_cv;
tl::mutex m_stats_mtx;
ParallelEventProcessorStatistics* m_stats = nullptr;
ParallelEventProcessorImpl(
......@@ -163,6 +169,8 @@ struct ParallelEventProcessorImpl {
* false otherwise.
*/
bool requestEvents(std::vector<EventDescriptor>& descriptors) {
double t1 = tl::timer::wtime();
double t2;
int my_rank;
MPI_Comm_rank(m_comm, &my_rank);
while(m_loader_ranks.size() != 0) {
......@@ -178,10 +186,17 @@ struct ParallelEventProcessorImpl {
descriptors[i] = m_event_queue.front();
m_event_queue.pop();
num_actual_events += 1;
if(m_stats) m_stats->local_events_processed += 1;
// no need to lock m_stats_mtx, this is the only ULT modifying local_events_processed
if(m_stats) {
m_stats->total_events_processed += 1;
m_stats->local_events_processed += 1;
}
}
if(num_actual_events != 0) {
descriptors.resize(num_actual_events);
t2 = tl::timer::wtime();
// no need to lock m_stats_mtx, this is the only ULT modifying waiting_time_stats
if(m_stats) m_stats->waiting_time_stats.updateWith(t2-t1);
return true;
} else {
m_loader_ranks.erase(m_loader_ranks.begin());
......@@ -198,12 +213,21 @@ struct ParallelEventProcessorImpl {
size_t num_actual_events = count/sizeof(descriptors[0]);
if(num_actual_events != 0) {
descriptors.resize(num_actual_events);
t2 = tl::timer::wtime();
// no need to lock m_stats_mtx, this is the only ULT modifying waiting_time_stats
if(m_stats) {
m_stats->total_events_processed += num_actual_events;
m_stats->waiting_time_stats.updateWith(t2-t1);
}
return true;
} else {
m_loader_ranks.erase(m_loader_ranks.begin());
}
}
}
t2 = tl::timer::wtime();
// no need to lock m_stats_mtx, this is the only ULT modifying waiting_time_stats
if(m_stats) m_stats->waiting_time_stats.updateWith(t2-t1);
return false;
}
......@@ -218,6 +242,31 @@ struct ParallelEventProcessorImpl {
}
}
void processSingleEvent(const EventDescriptor& d, const ParallelEventProcessor::EventProcessingWithCacheFn& user_function) {
ProductCache cache;
double t1, t2, t3;
t1 = tl::timer::wtime();
Event event = Event::fromDescriptor(DataStore(m_datastore), d, false);
preloadProductsFor(d, cache);
t2 = tl::timer::wtime();
user_function(event, cache);
t3 = tl::timer::wtime();
if(m_stats) {
std::lock_guard<tl::mutex> lock(m_stats_mtx);
m_stats->product_loading_time_stats.updateWith(t2-t1);
m_stats->acc_product_loading_time += t2-t1;
m_stats->processing_time_stats.updateWith(t3-t2);
m_stats->acc_event_processing_time += t3-t2;
}
if(m_async) {
{
std::unique_lock<tl::mutex> lock(m_processing_ults_mtx);
m_num_processing_ults -= 1;
}
m_processing_ults_cv.notify_all();
}
}
/**
* This function keeps requesting new events and call the user-provided callback.
*/
......@@ -225,25 +274,31 @@ struct ParallelEventProcessorImpl {
if(m_stats) *m_stats = ParallelEventProcessorStatistics();
double t_start = tl::timer::wtime();
std::vector<EventDescriptor> descriptors;
double t1;
double t2 = tl::timer::wtime();
ProductCache cache;
auto max_ults = m_async ? m_async->m_xstreams.size()*2 : 0;
while(requestEvents(descriptors)) {
for(auto& d : descriptors) {
cache.clear();
Event event = Event::fromDescriptor(DataStore(m_datastore), d, false);
preloadProductsFor(d, cache);
t1 = tl::timer::wtime();
if(m_stats) m_stats->waiting_time_stats.updateWith(t1-t2);
user_function(event, cache);
t2 = tl::timer::wtime();
if(m_stats) {
m_stats->processing_time_stats.updateWith(t2 - t1);
m_stats->total_processing_time += t2 - t1;
m_stats->total_events_processed += 1;
if(m_async) {
{ // don't submit more ULTs than twice the number of ES
std::unique_lock<tl::mutex> lock(m_processing_ults_mtx);
while(m_num_processing_ults >= max_ults) {
m_processing_ults_cv.wait(lock);
}
m_num_processing_ults += 1;
}
m_async->m_pool.make_thread([this, d, &user_function]() {
processSingleEvent(d, user_function);
}, tl::anonymous());
} else {
processSingleEvent(d, user_function);
}
}
}
{ // wait until all ULTs completed
std::unique_lock<tl::mutex> lock(m_processing_ults_mtx);
while(m_num_processing_ults != 0) {
m_processing_ults_cv.wait(lock);
}
}
double t_end = tl::timer::wtime();
if(m_stats) m_stats->total_time = t_end - t_start;
}
......
......@@ -78,8 +78,10 @@ void ParallelMPITest::testParallelEventProcessor() {
<< " total_events_processed = " << stats.total_events_processed << "\n"
<< " local_events_processed = " << stats.local_events_processed << "\n"
<< " total_time = " << stats.total_time << "\n"
<< " total_processing_time = " << stats.total_processing_time << "\n"
<< " acc_event_processing_time = " << stats.acc_event_processing_time << "\n"
<< " acc_product_loading_time = " << stats.acc_product_loading_time << "\n"
<< " processing_time_stats = " << stats.processing_time_stats << "\n"
<< " product_loading_time_stats = " << stats.product_loading_time_stats << "\n"
<< " waiting_time_stats = " << stats.waiting_time_stats << std::endl;
if(rank != 0) {
......@@ -113,3 +115,77 @@ void ParallelMPITest::testParallelEventProcessor() {
}
}
void ParallelMPITest::testParallelEventProcessorAsync() {
auto mds = datastore->root()["matthieu"];
AsyncEngine async(*datastore, 2);
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
int size;
MPI_Comm_size(MPI_COMM_WORLD, &size);
ParallelEventProcessorStatistics stats;
ParallelEventProcessor parallel_processor(async, MPI_COMM_WORLD);
std::vector<item> items;
tl::mutex item_mtx;
parallel_processor.process(mds,
[&items, &item_mtx, rank](const Event& ev) {
SubRun sr = ev.subrun();
Run r = sr.run();
std::cout << "Rank " << rank << " ES " << tl::xstream::self().get_rank() << " invoking lambda for item " <<
r.number() << " " << sr.number() << " " << ev.number() << std::endl;
{
std::lock_guard<tl::mutex> lock(item_mtx);
items.emplace_back(r.number(), sr.number(), ev.number());
}
double t = tl::timer::wtime();
while(tl::timer::wtime() - t < 0.1) {
tl::thread::yield();
}
},
&stats
);
std::cout << "Rank " << rank << " statistics:\n"
<< " total_events_processed = " << stats.total_events_processed << "\n"
<< " local_events_processed = " << stats.local_events_processed << "\n"
<< " total_time = " << stats.total_time << "\n"
<< " acc_event_processing_time = " << stats.acc_event_processing_time << "\n"
<< " acc_product_loading_time = " << stats.acc_product_loading_time << "\n"
<< " processing_time_stats = " << stats.processing_time_stats << "\n"
<< " product_loading_time_stats = " << stats.product_loading_time_stats << "\n"
<< " waiting_time_stats = " << stats.waiting_time_stats << std::endl;
if(rank != 0) {
int num_local_items = items.size();
MPI_Send(&num_local_items, 1, MPI_INT, 0, 0, MPI_COMM_WORLD);
if(num_local_items) {
MPI_Send(items.data(), items.size()*sizeof(item), MPI_BYTE, 0, 0, MPI_COMM_WORLD);
}
} else {
for(unsigned j=1; j < size; j++) {
int num_items = 0;
MPI_Recv(&num_items, 1, MPI_INT, j, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
items.resize(items.size() + num_items);
if(num_items) {
MPI_Recv(&items[items.size() - num_items], sizeof(item)*num_items,
MPI_BYTE, j, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
}
}
std::sort(items.begin(), items.end());
CPPUNIT_ASSERT(items.size() == size*8*8);
unsigned x = 0;
for(unsigned i = 0; i < (unsigned)size; i++) {
for(unsigned j = 0; j < 8; j++) {
for(unsigned k = 0; k < 8; k++) {
auto& e = items[x];
CPPUNIT_ASSERT(e.run == i && e.subrun == j && e.event == k);
x += 1;
}
}
}
}
}
......@@ -10,7 +10,8 @@ extern hepnos::DataStore* datastore;
class ParallelMPITest : public CppUnit::TestFixture
{
CPPUNIT_TEST_SUITE( ParallelMPITest );
CPPUNIT_TEST( testParallelEventProcessor );
// CPPUNIT_TEST( testParallelEventProcessor );
CPPUNIT_TEST( testParallelEventProcessorAsync );
CPPUNIT_TEST_SUITE_END();
public:
......@@ -19,6 +20,7 @@ class ParallelMPITest : public CppUnit::TestFixture
void tearDown();
void testParallelEventProcessor();
void testParallelEventProcessorAsync();
};
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment