Commit 44c69388 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

started implementing distributed work queue

parent bef015bb
/*
* (C) 2018 The University of Chicago
*
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __HEPNOS_H
......@@ -15,6 +15,7 @@
#include <hepnos/EventSet.hpp>
#include <hepnos/Exception.hpp>
#include <hepnos/KeyValueContainer.hpp>
#include <hepnos/ParallelEventProcessor.hpp>
#include <hepnos/Prefetcher.hpp>
#include <hepnos/Run.hpp>
#include <hepnos/RunNumber.hpp>
......
/*
* (C) 2019 The University of Chicago
*
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __HEPNOS_ASYNC_ENGINE_H
......
/*
* (C) 2018 The University of Chicago
*
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __HEPNOS_DATA_SET_H
......@@ -115,7 +115,7 @@ class DataSet : public KeyValueContainer {
std::string fullname() const;
/**
* @brief Gets the next DataSet from this DataSet in
* @brief Gets the next DataSet from this DataSet in
* alphabetical order within the same container.
* If no such dataset exists, this function returns
* a DataSet instance such that valid() == false.
......@@ -192,8 +192,8 @@ class DataSet : public KeyValueContainer {
* @brief Creates a dataset with a given name inside the
* DataSet. This name must not have the '/' and '%' characters.
* A DataSet object pointing to the created dataset is returned.
* If a dataset with this name already exists in the DataStore,
* it is not created, but a DataSet object pointing to the
* If a dataset with this name already exists in the DataStore,
* it is not created, but a DataSet object pointing to the
* existing one is returned instead.
*
* @param name Name of DataSet.
......@@ -268,7 +268,7 @@ class DataSet : public KeyValueContainer {
class const_iterator;
/**
* @brief Searches this DataSet for a DataSet with
* @brief Searches this DataSet for a DataSet with
* the provided path and returns an iterator to it if found,
* otherwise it returns an iterator to DataStore::end().
*
......@@ -280,8 +280,8 @@ class DataSet : public KeyValueContainer {
iterator find(const std::string& datasetPath);
/**
* @brief Searches this DataSet for an DataSet with
* the provided path and returns a const_iterator to it
* @brief Searches this DataSet for an DataSet with
* the provided path and returns a const_iterator to it
* if found, otherwise it returns an iterator to DataSet::end().
*
* @param datasetPath Path of the DataSet to find.
......@@ -345,38 +345,38 @@ class DataSet : public KeyValueContainer {
/**
* @brief Returns an iterator pointing to the first DataSet in this
* DataSet, whose name is not considered to go before lb
* DataSet, whose name is not considered to go before lb
* (i.e., either it is equal or goes after, alphabetically).
*
* @param lb DataSet name to search for.
*
* @return An iterator to the the first DataSet in this DataSet
* whose name is not considered to go before lb, or DataStore::end()
* @return An iterator to the the first DataSet in this DataSet
* whose name is not considered to go before lb, or DataStore::end()
* if all keys are considered to go before it.
*/
iterator lower_bound(const std::string& lb);
/**
* @brief Returns a const_iterator pointing to the first DataSet in this
* DataSet whose name is not considered to go before lb
* DataSet whose name is not considered to go before lb
* (i.e., either it is equal or goes after, alphabetically).
*
* @param lb DataSet name to search for.
*
* @return A const_iterator to the the first DataSet in the DataSet
* whose name is not considered to go before lb, or DataSet::cend()
* @return A const_iterator to the the first DataSet in the DataSet
* whose name is not considered to go before lb, or DataSet::cend()
* if all DataSet names are considered to go before it.
*/
const_iterator lower_bound(const std::string& lb) const;
/**
* @brief Returns an iterator pointing to the first DataSet in the
* @brief Returns an iterator pointing to the first DataSet in the
* DataStore whose key is considered to go after ub.
*
* @param ub DataSet name to search for.
*
* @return An iterator to the the first DataSet in this DataSet,
* whose name is considered to go after ub, or DataSet::end() if
* whose name is considered to go after ub, or DataSet::end() if
* no DataSet names are considered to go after it.
*/
iterator upper_bound(const std::string& ub);
......@@ -387,8 +387,8 @@ class DataSet : public KeyValueContainer {
*
* @param ub DataSet name to search for.
*
* @return A const_iterator to the the first DataSet in this DataSet
* whose name is considered to go after ub, or DataSet::end() if
* @return A const_iterator to the the first DataSet in this DataSet
* whose name is considered to go after ub, or DataSet::end() if
* no DataSet names are considered to go after it.
*/
const_iterator upper_bound(const std::string& ub) const;
......@@ -443,7 +443,7 @@ class DataSet::const_iterator {
/**
* @brief Constructor. Creates a const_iterator pointing
* to a given DataSet. The DataSet may or may not be valid.
* to a given DataSet. The DataSet may or may not be valid.
*
* @param current DataSet to make the const_iterator point to.
*/
......
/*
* (C) 2018 The University of Chicago
*
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __HEPNOS_DATA_STORE_H
......@@ -27,6 +27,8 @@ class ItemImpl;
template<typename T, typename C = std::vector<T>> class Ptr;
class WriteBatch;
class AsyncEngine;
class ParallelEventProcessor;
class ParallelEventProcessorImpl;
class Prefetcher;
/**
......@@ -45,6 +47,8 @@ class DataStore {
friend class DataStoreImpl;
friend class AsyncEngine;
friend class EventSet;
friend class ParallelEventProcessor;
friend class ParallelEventProcessorImpl;
friend class Prefetcher;
public:
......@@ -98,7 +102,7 @@ class DataStore {
* @return This DataStore.
*/
DataStore& operator=(DataStore&& other) = default;
/**
* @brief Destructor.
*/
......@@ -236,7 +240,7 @@ class DataStore {
* @param t Product.
* @param std::integral_constant type trait indicating T is POD.
*
* @return true if the data was loaded successfuly, false otherwise.
* @return true if the data was loaded successfuly, false otherwise.
*/
template<typename T>
bool loadProductImpl(const ProductID& productID, T& t, const std::integral_constant<bool, true>&);
......
/*
* (C) 2018 The University of Chicago
*
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __HEPNOS_DEMANGLE_H
......@@ -20,7 +20,7 @@ namespace hepnos {
*/
template<typename T>
std::string demangle() {
char const * name = typeid(T).name();
char const * name = typeid(T).name();
return boost::core::demangle(name);
}
......
/*
* (C) 2018 The University of Chicago
*
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __HEPNOS_EVENT_H
......
/*
* (C) 2018 The University of Chicago
*
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __HEPNOS_EVENT_NUMBER_H
......
/*
* (C) 2018 The University of Chicago
*
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __HEPNOS_EVENT_SET_H
......@@ -231,7 +231,7 @@ class EventSet::const_iterator {
* @brief Dereference operator. Returns a const reference
* to the DataSet this const_iterator points to.
*
* @return a const reference to the DataSet this
* @return a const reference to the DataSet this
* const_iterator points to.
*/
const reference operator*();
......@@ -240,7 +240,7 @@ class EventSet::const_iterator {
* @brief Returns a const pointer to the DataSet this
* const_iterator points to.
*
* @return a const pointer to the DataSet this
* @return a const pointer to the DataSet this
* const_iterator points to.
*/
const pointer operator->();
......
/*
* (C) 2018 The University of Chicago
*
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __HEPNOS_EXCEPTION_H
......@@ -31,7 +31,7 @@ class Exception : public std::exception
* @param msg Error message.
*/
Exception(const std::string& msg) : m_msg(msg){}
/**
* @brief Copy constructor.
*/
......
/*
* (C) 2018 The University of Chicago
*
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __HEPNOS_INPUT_ARCHIVE_H
......@@ -30,7 +30,7 @@ class InputArchive : public iarchive {
private:
DataStore m_datastore;
DataStore m_datastore;
public:
......
/*
* (C) 2018 The University of Chicago
*
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __HEPNOS_ITEM_TYPE_HPP
......
/*
* (C) 2018 The University of Chicago
*
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __HEPNOS_KEYVAL_CONTAINER_H
......@@ -262,10 +262,10 @@ class KeyValueContainer {
}
/**
* @brief Loads a value associated with a key from the
* KeyValueContainer. The type of the key should have
* operator<< available to stream it into a std::stringstream
* for the purpose of converting it into an std::string.
* @brief Loads a value associated with a key from the
* KeyValueContainer. The type of the key should have
* operator<< available to stream it into a std::stringstream
* for the purpose of converting it into an std::string.
* The resulting string must not have the "/" or "%" characters.
* The type of the value must be serializable using Boost.
*
......
/*
* (C) 2020 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __HEPNOS_PARALLEL_EVENT_PROCESSOR_HPP
#define __HEPNOS_PARALLEL_EVENT_PROCESSOR_HPP
#include <mpi.h>
#include <hepnos/Prefetcher.hpp>
#include <hepnos/DataStore.hpp>
namespace hepnos {
struct ParallelEventProcessorImpl;
struct DispatchPolicy {
size_t eventsPerBlock = 16;
};
struct ParallelEventProcessorStatistics {
size_t total_events_processed = 0;
size_t local_events_processed = 0;
double total_time = 0.0; // total time in the process function, in seconds
double total_processing_time = 0.0; // total processing time, in seconds
Statistics<double,double> processing_time_stats; // statistics on single-event processing times
Statistics<double,double> waiting_time_stats; // statictics on time between calls to user-provided function
};
/**
* @brief The ParallelEventProcessor enables running a function in parallel
* on all the Events in a given container. The ParallelEventProcessor will
* attempt to optimize accesses to the HEPnOS service and dispatch work to
* members of the provided communicator to aim for load balancing.
*/
class ParallelEventProcessor {
std::shared_ptr<ParallelEventProcessorImpl> m_impl;
public:
typedef std::function<void(const Event&)> EventProcessingFn;
/**
* @brief Constructor. Builds a ParallelEventProcessor to navigate a dataset.
* This constructor involves collective communications across members of the
* provided communicator.
*
* @param datastore Datastore
* @param comm Communicator gathering participating processes
* @param prefetcher Prefetcher to use when reading events from storage
* @param policy Dispatch policy to use when sending events to workers
*/
ParallelEventProcessor(const DataStore& datastore,
MPI_Comm comm,
const Prefetcher& prefetcher,
const DispatchPolicy& policy = DispatchPolicy());
/**
* @brief Destructor. This destructor involves collective comunications
* across the members of the underlying communicator.
*/
~ParallelEventProcessor();
/**
* @brief Tells the ParallelEventProcessor to preload objects of
* certain type and with a certain label before executing the user callback.
*
* @tparam T Type of product to load
* @param label Label associated with the product
*/
template<typename T>
void preload(const std::string& label) {
std::string productKey = label + "#" + demangle<T>();
preloadImpl(productKey);
}
/**
* @brief Process all the events in the dataset. This function involves
* collective communications across members of the underlying communicator.
*
* @param dataset Dataset in which to process events
* @param function Function to execute on events
* @param stats Pointer to a statistics object to fill
*/
void process(const DataSet& dataset,
const EventProcessingFn& function,
ParallelEventProcessorStatistics* stats = nullptr);
private:
void preloadImpl(const std::string& productKey);
};
}
#endif
/*
* (C) 2018 The University of Chicago
*
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __HEPNOS_PREFETCHABLE_HPP
......
/*
* (C) 2018 The University of Chicago
*
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __HEPNOS_PREFETCHER_HPP
......@@ -61,7 +61,7 @@ class Prefetcher {
* @param cache_size maximum number of items that can be stored in the cache.
* @param batch_size how many items to prefetch at once.
*/
Prefetcher(const AsyncEngine& async,
Prefetcher(const AsyncEngine& async,
unsigned int cache_size=16,
unsigned int batch_size=16);
......@@ -73,12 +73,12 @@ class Prefetcher {
/**
* @brief Deleted copy constructor.
*/
Prefetcher(const Prefetcher&) = delete;
Prefetcher(const Prefetcher&) = default;
/**
* @brief Deleted move constructor.
*/
Prefetcher(Prefetcher&&) = delete;
Prefetcher(Prefetcher&&) = default;
/**
* @brief Deleted copy-assignment operator.
......@@ -94,7 +94,7 @@ class Prefetcher {
* @return Cache size.
*/
unsigned int getCacheSize() const;
/**
* @brief Set the cache size.
*
......
/*
* (C) 2018 The University of Chicago
*
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __HEPNOS_PRODUCT_ID_H
......
/*
* (C) 2018 The University of Chicago
*
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __HEPNOS_PTR_H
......
/*
* (C) 2018 The University of Chicago
*
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __HEPNOS_RUN_H
......@@ -30,7 +30,7 @@ constexpr const int RunDescriptorLength = 24;
* serialize the representation of a Run for
* the purpose of sending it to another process, where
* the corresponding Run can be reconstructed without
* involving the DataStore.
* involving the DataStore.
*/
struct RunDescriptor {
char data[RunDescriptorLength];
......@@ -146,22 +146,22 @@ class Run : public KeyValueContainer {
ProductID storeRawData(WriteBatch& batch, const std::string& key, const char* value, size_t vsize) override;
/**
* @see KeyValueContainer::loadRawData()
* @see KeyValueContainer::loadRawData()
*/
bool loadRawData(const std::string& key, std::string& buffer) const override;
/**
* @see KeyValueContainer::loadRawData()
* @see KeyValueContainer::loadRawData()
*/
bool loadRawData(const std::string& key, char* value, size_t* vsize) const override;
/**
* @see KeyValueContainer::loadRawData()
* @see KeyValueContainer::loadRawData()
*/
bool loadRawData(const Prefetcher& prefetcher, const std::string& key, std::string& buffer) const override;
/**
* @see KeyValueContainer::loadRawData()
* @see KeyValueContainer::loadRawData()
*/
bool loadRawData(const Prefetcher& prefetcher, const std::string& key, char* value, size_t* vsize) const override;
......@@ -196,7 +196,7 @@ class Run : public KeyValueContainer {
class iterator;
/**
* @brief Searches this Run for an SubRun with
* @brief Searches this Run for an SubRun with
* the provided number and returns an iterator to it if found,
* otherwise it returns an iterator to Run::end().
*
......@@ -221,8 +221,8 @@ class Run : public KeyValueContainer {
iterator find(const SubRunNumber& srn, const Prefetcher& prefetcher);
/**
* @brief Searches this Run for a SubRun with
* the provided number and returns a const_iterator to it
* @brief Searches this Run for a SubRun with
* the provided number and returns a const_iterator to it
* if found, otherwise it returns an iterator to Run::end().
*
* @param srn SubRunNumber of the SubRun to find.
......@@ -255,11 +255,11 @@ class Run : public KeyValueContainer {
/**
* @brief Returns an iterator referring to the first SubRun in
* this Run and using the given
* this Run and using the given
*
* @param Prefetcher
*
* @return
* @return
*/
iterator begin(const Prefetcher&);
......@@ -330,7 +330,7 @@ class Run : public KeyValueContainer {
* @param lb SubRunNumber lower bound to search for.
*
* @return An iterator to the first SubRun in this Run
* whose whose SubRunNumber is not lower than lb, or Run::end()
* whose whose SubRunNumber is not lower than lb, or Run::end()
* if all subrun numbers are lower.
*/
iterator lower_bound(const SubRunNumber&);
......@@ -348,7 +348,7 @@ class Run : public KeyValueContainer {
* @param lb SubRunNumber lower bound to search for.
*
* @return A const_iterator to the first SubRun in this Run
* whose whose SubRunNumber is not lower than lb, or Run::cend()
* whose whose SubRunNumber is not lower than lb, or Run::cend()
* if all subrun numbers are lower.
*/
const_iterator lower_bound(const SubRunNumber&) const;
......@@ -360,13 +360,13 @@ class Run : public KeyValueContainer {
const_iterator lower_bound(const SubRunNumber&, const Prefetcher&) const;
/**
* @brief Returns an iterator pointing to the first SubRun in the
* @brief Returns an iterator pointing to the first SubRun in the
* Run whose SubRunNumber is greater than ub.
*
* @param ub SubRunNumber upper bound to search for.
*
* @return An iterator to the first SubRun in this Run,
* whose SubRunNumber is greater than ub, or Run::end() if
* whose SubRunNumber is greater than ub, or Run::end() if
* no such SubRun exists.
*/
iterator upper_bound(const SubRunNumber&);
......@@ -378,13 +378,13 @@ class Run : public KeyValueContainer {
iterator upper_bound(const SubRunNumber&, const Prefetcher&);
/**
* @brief Returns a const_iterator pointing to the first SubRun in the
* @brief Returns a const_iterator pointing to the first SubRun in the
* Run whose SubRunNumber is greater than ub.
*
* @param ub SubRunNumber upper bound to search for.
*
* @return An const_iterator to the first SubRun in this Run,
* whose SubRunNumber is greater than ub, or Run::cend() if
* whose SubRunNumber is greater than ub, or Run::cend() if
* no such SubRun exists.
*/
const_iterator upper_bound(const SubRunNumber&) const;
......@@ -490,7 +490,7 @@ class Run::const_iterator {
/**
* @brief Constructor. Creates a const_iterator pointing
* to a given SubRun. The SubRun may or may not be valid.
* to a given SubRun. The SubRun may or may not be valid.
*
* @param current SubRun to make the const_iterator point to.
*/
......@@ -498,7 +498,7 @@ class Run::const_iterator {
/**
* @brief Constructor. Creates a const_iterator pointing
* to a given SubRun. The SubRun may or may not be valid.
* to a given SubRun. The SubRun may or may not be valid.
*
* @param current SubRun to make the const_iterator point to.
*/
......@@ -569,7 +569,7 @@ class Run::const_iterator {
* @brief Dereference operator. Returns a const reference
* to the SubRun this const_iterator points to.
*
* @return a const reference to the DataSet this
* @return a const reference to the DataSet this
* const_iterator points to.
*/
const reference operator*();
......@@ -578,7 +578,7 @@ class Run::const_iterator {
* @brief Returns a const pointer to the SubRun this
* const_iterator points to.
*
* @return a const pointer to the SubRun this
* @return a const pointer to the SubRun this
* const_iterator points to.
*/
const pointer operator->();
......
/*
* (C) 2018 The University of Chicago
*
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __HEPNOS_RUN_SET_H
......@@ -101,9 +101,9 @@ class RunSet {
Run operator[](const RunNumber& runNumber);
/**
* @brief Searches this RunSet for a Run with