Commit edfc30fd authored by Matthieu Dorier's avatar Matthieu Dorier

started changing the ParallelEventProcessImpl into a provider

parent c9b9a2f0
......@@ -22,6 +22,7 @@ struct ParallelEventProcessorOptions {
unsigned cacheSize = std::numeric_limits<unsigned>::max(); // cache size of internal prefetcher
unsigned inputBatchSize = 16; // size of batches loaded from HEPnOS
unsigned outputBatchSize = 16; // size of batches sent over MPI to workers
uint16_t providerID = 0; // provider id to use (if multiple ParallelEventProcessor instances are used)
};
struct ParallelEventProcessorStatistics {
......
......@@ -17,7 +17,7 @@ namespace hepnos {
namespace tl = thallium;
struct ParallelEventProcessorImpl {
struct ParallelEventProcessorImpl : public tl::provider<ParallelEventProcessorImpl> {
std::shared_ptr<DataStoreImpl> m_datastore;
std::shared_ptr<AsyncEngineImpl> m_async;
......@@ -27,6 +27,8 @@ struct ParallelEventProcessorImpl {
std::vector<int> m_targets;
std::unordered_set<std::string> m_product_keys;
tl::remote_procedure m_req_events_rpc;
bool m_loader_running = false;
std::queue<EventDescriptor> m_event_queue;
tl::mutex m_event_queue_mtx;
......@@ -46,14 +48,18 @@ struct ParallelEventProcessorImpl {
std::shared_ptr<DataStoreImpl> ds,
MPI_Comm comm,
const ParallelEventProcessorOptions& options)
: m_datastore(std::move(ds))
: tl::provider<ParallelEventProcessorImpl>(ds->m_engine, options.providerID)
, m_datastore(std::move(ds))
, m_comm(comm)
, m_options(options) {
, m_options(options)
, m_req_events_rpc(define("hepnos_pep_req_events", &ParallelEventProcessorImpl::requestEventsRPC)) {
MPI_Comm_size(comm, &m_num_active_consumers);
m_num_active_consumers -= 1;
}
~ParallelEventProcessorImpl() {}
~ParallelEventProcessorImpl() {
m_req_events_rpc.deregister();
}
/**
* Main function to start processing events in parallel.
......@@ -162,6 +168,10 @@ struct ParallelEventProcessorImpl {
}
}
void requestEventsRPC(const tl::request& req, size_t max, tl::bulk& remote_mem) {
// TODO
}
/**
* This function tries to fill out the provided vector with a batch of
* Event descriptors taken locally or from loader processes.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment