Commit 0e3be1bc authored by Matthieu Dorier's avatar Matthieu Dorier

started Prefetcher implementation

parent acc11cc3
......@@ -10,6 +10,7 @@
#include <hepnos/EventSet.hpp>
#include <hepnos/Exception.hpp>
#include <hepnos/KeyValueContainer.hpp>
#include <hepnos/Prefetcher.hpp>
#include <hepnos/Run.hpp>
#include <hepnos/RunNumber.hpp>
#include <hepnos/RunSet.hpp>
......
......@@ -18,6 +18,8 @@ class SubRun;
class Event;
class WriteBatch;
class WriteBatchImpl;
class Prefetcher;
class PrefetcherImpl;
class AsyncEngineImpl;
class AsyncEngine {
......@@ -29,6 +31,7 @@ class AsyncEngine {
friend class Event;
friend class KeyValueContainer;
friend class WriteBatch;
friend class Prefetcher;
private:
......
......@@ -27,6 +27,7 @@ class ItemImpl;
template<typename T, typename C = std::vector<T>> class Ptr;
class WriteBatch;
class AsyncEngine;
class Prefetcher;
/**
* The DataStore class is the main handle referencing an HEPnOS service.
......@@ -44,6 +45,7 @@ class DataStore {
friend class DataStoreImpl;
friend class AsyncEngine;
friend class EventSet;
friend class Prefetcher;
public:
......
#ifndef __HEPNOS_PREFETCHER_HPP
#define __HEPNOS_PREFETCHER_HPP
#include <memory>
namespace hepnos {
class DataStore;
class PrefetcherImpl;
class AsyncEngine;
class RunSet;
class Prefetcher {
friend class RunSet;
private:
std::shared_ptr<PrefetcherImpl> m_impl;
public:
Prefetcher(const DataStore& ds,
unsigned int cache_size=16,
unsigned int batch_size=16);
Prefetcher(const DataStore& ds, const AsyncEngine& async,
unsigned int cache_size=16,
unsigned int batch_size=16);
~Prefetcher();
Prefetcher(const Prefetcher&) = delete;
Prefetcher(Prefetcher&&) = delete;
Prefetcher& operator=(const Prefetcher&) = delete;
Prefetcher& operator=(Prefetcher&&) = delete;
unsigned int getCacheSize() const;
void setCacheSize(unsigned int size);
unsigned int getBatchSize() const;
void setBatchSize(unsigned int size);
};
}
#endif
......@@ -18,6 +18,7 @@ namespace hepnos {
constexpr const int RunDescriptorLength = 24;
class Prefetcher;
class RunSet;
struct RunDescriptor {
......@@ -173,6 +174,7 @@ class Run : public KeyValueContainer {
* Run::end() otherwise.
*/
iterator find(const SubRunNumber& srn);
iterator find(const SubRunNumber& srn, const Prefetcher& prefetcher);
/**
* @brief Searches this Run for a SubRun with
......@@ -185,6 +187,7 @@ class Run : public KeyValueContainer {
* Run::cend() otherwise.
*/
const_iterator find(const SubRunNumber&) const;
const_iterator find(const SubRunNumber&, const Prefetcher&) const;
/**
* @brief Returns an iterator referring to the first SubRun
......@@ -193,6 +196,7 @@ class Run : public KeyValueContainer {
* @return an iterator referring to the first SubRun in this Run.
*/
iterator begin();
iterator begin(const Prefetcher&);
/**
* @brief Returns an iterator referring to the end of the Run.
......@@ -210,6 +214,7 @@ class Run : public KeyValueContainer {
* @return a const_iterator referring to the first SubRun in this Run.
*/
const_iterator begin() const;
const_iterator begin(const Prefetcher&) const;
/**
* @brief Returns a const_iterator referring to the end of the Run.
......@@ -227,6 +232,7 @@ class Run : public KeyValueContainer {
* @return a const_iterator referring to the first SubRun in this Run.
*/
const_iterator cbegin() const;
const_iterator cbegin(const Prefetcher&) const;
/**
* @brief Returns a const_iterator referring to the end of the Run.
......@@ -248,6 +254,7 @@ class Run : public KeyValueContainer {
* if all subrun numbers are lower.
*/
iterator lower_bound(const SubRunNumber&);
iterator lower_bound(const SubRunNumber&, const Prefetcher&);
/**
* @brief Returns a const_iterator pointing to the first SubRun in this
......@@ -260,6 +267,7 @@ class Run : public KeyValueContainer {
* if all subrun numbers are lower.
*/
const_iterator lower_bound(const SubRunNumber&) const;
const_iterator lower_bound(const SubRunNumber&, const Prefetcher&) const;
/**
* @brief Returns an iterator pointing to the first SubRun in the
......@@ -272,6 +280,7 @@ class Run : public KeyValueContainer {
* no such SubRun exists.
*/
iterator upper_bound(const SubRunNumber&);
iterator upper_bound(const SubRunNumber&, const Prefetcher&);
/**
* @brief Returns a const_iterator pointing to the first SubRun in the
......@@ -284,6 +293,7 @@ class Run : public KeyValueContainer {
* no such SubRun exists.
*/
const_iterator upper_bound(const SubRunNumber&) const;
const_iterator upper_bound(const SubRunNumber&, const Prefetcher&) const;
/**
* @brief Accesses an existing ubun using the []
......
......@@ -15,6 +15,8 @@
namespace hepnos {
class Prefetcher;
/**
* @brief The RunSet class is a helper class to access Runs
* stored in a particular DataSet.
......@@ -110,6 +112,7 @@ class RunSet {
* RunSet::end() otherwise.
*/
iterator find(const RunNumber& runNumber);
iterator find(const RunNumber& runNumber, const Prefetcher& prefetcher);
/**
* @brief Searches this RunSet for a Run with
......@@ -123,6 +126,7 @@ class RunSet {
* RunSet::cend() otherwise.
*/
const_iterator find(const RunNumber& runNumber) const;
const_iterator find(const RunNumber& runNumber, const Prefetcher& prefetcher) const;
/**
......@@ -132,6 +136,7 @@ class RunSet {
* @return an iterator referring to the first Run in this RunSet.
*/
iterator begin();
iterator begin(const Prefetcher& prefetcher);
/**
* @brief Returns an iterator referring to the end of the RunSet.
......@@ -149,6 +154,7 @@ class RunSet {
* @return an iterator referring to the first Run in this RunSet.
*/
const_iterator begin() const;
const_iterator begin(const Prefetcher& prefetcher) const;
/**
* @brief Returns a const_iterator referring to the end of the RunSet.
......@@ -166,6 +172,7 @@ class RunSet {
* @return a const_iterator referring to the first Run in this RunSet.
*/
const_iterator cbegin() const;
const_iterator cbegin(const Prefetcher& prefetcher) const;
/**
* @brief Returns a const_iterator referring to the end of the RunSet.
......@@ -187,6 +194,7 @@ class RunSet {
* if such a Run does not exist.
*/
iterator lower_bound(const RunNumber& lb);
iterator lower_bound(const RunNumber& lb, const Prefetcher& prefetcher);
/**
* @brief Returns a const_iterator pointing to the first Run in this
......@@ -199,6 +207,7 @@ class RunSet {
* if such a Run does not exist.
*/
const_iterator lower_bound(const RunNumber& lb) const;
const_iterator lower_bound(const RunNumber& lb, const Prefetcher& prefetcher) const;
/**
* @brief Returns an iterator pointing to the first Run in the
......@@ -211,6 +220,7 @@ class RunSet {
* no such a Run exist.
*/
iterator upper_bound(const RunNumber& ub);
iterator upper_bound(const RunNumber& ub,const Prefetcher& prefetcher);
/**
* @brief Returns a const_iterator pointing to the first Run in the
......@@ -223,10 +233,13 @@ class RunSet {
* no such a Run exist.
*/
const_iterator upper_bound(const RunNumber& ub) const;
const_iterator upper_bound(const RunNumber& ub, const Prefetcher& prefetcher) const;
};
class RunSet::const_iterator {
friend class RunSet;
protected:
/**
......
......@@ -16,6 +16,8 @@
namespace hepnos {
class Prefetcher;
constexpr const int SubRunDescriptorLength = 32;
struct SubRunDescriptor {
......@@ -175,6 +177,7 @@ class SubRun : public KeyValueContainer {
* SubRun::end() otherwise.
*/
iterator find(const EventNumber& en);
iterator find(const EventNumber& en, const Prefetcher& prefetcher);
/**
* @brief Searches this SubRun for an Event with
......@@ -187,6 +190,7 @@ class SubRun : public KeyValueContainer {
* SubRun::cend() otherwise.
*/
const_iterator find(const EventNumber& en) const;
const_iterator find(const EventNumber& en, const Prefetcher& prefetcher) const;
/**
* @brief Returns an iterator referring to the first Event
......@@ -195,6 +199,7 @@ class SubRun : public KeyValueContainer {
* @return an iterator referring to the first Event in this SubRun.
*/
iterator begin();
iterator begin(const Prefetcher& prefetcher);
/**
* @brief Returns an iterator referring to the end of the SubRun.
......@@ -212,6 +217,7 @@ class SubRun : public KeyValueContainer {
* @return a const_iterator referring to the first Event in this SubRun.
*/
const_iterator begin() const;
const_iterator begin(const Prefetcher& prefetcher) const;
/**
* @brief Returns a const_iterator referring to the end of the SubRun.
......@@ -229,6 +235,7 @@ class SubRun : public KeyValueContainer {
* @return a const_iterator referring to the first Event in this SubRun.
*/
const_iterator cbegin() const;
const_iterator cbegin(const Prefetcher& prefetcher) const;
/**
* @brief Returns a const_iterator referring to the end of the SubRun.
......@@ -250,6 +257,7 @@ class SubRun : public KeyValueContainer {
* if all event numbers are lower.
*/
iterator lower_bound(const EventNumber&);
iterator lower_bound(const EventNumber&, const Prefetcher&);
/**
* @brief Returns a const_iterator pointing to the first Event in this
......@@ -262,6 +270,7 @@ class SubRun : public KeyValueContainer {
* if all event numbers are lower.
*/
const_iterator lower_bound(const SubRunNumber&) const;
const_iterator lower_bound(const SubRunNumber&, const Prefetcher&) const;
/**
* @brief Returns an iterator pointing to the first Event in the
......@@ -274,6 +283,7 @@ class SubRun : public KeyValueContainer {
* no such Event exists.
*/
iterator upper_bound(const EventNumber&);
iterator upper_bound(const EventNumber&, const Prefetcher&);
/**
* @brief Returns a const_iterator pointing to the first Event in the
......@@ -286,6 +296,7 @@ class SubRun : public KeyValueContainer {
* no such Event exists.
*/
const_iterator upper_bound(const SubRunNumber&) const;
const_iterator upper_bound(const SubRunNumber&, const Prefetcher&) const;
/**
* @brief Accesses an existing event using the []
......
......@@ -7,6 +7,7 @@ set(hepnos-src DataStore.cpp
Event.cpp
UUID.cpp
WriteBatch.cpp
Prefetcher.cpp
AsyncEngine.cpp
EventSet.cpp)
......
#include "hepnos/Prefetcher.hpp"
#include "hepnos/AsyncEngine.hpp"
#include "PrefetcherImpl.hpp"
namespace hepnos {
Prefetcher::Prefetcher(const DataStore& ds, unsigned int cache_size, unsigned int batch_size)
: m_impl(std::make_shared<PrefetcherImpl>(ds.m_impl)) {
m_impl->m_cache_size = cache_size;
m_impl->m_batch_size = batch_size;
}
Prefetcher::Prefetcher(const DataStore& ds, const AsyncEngine& async, unsigned int cache_size, unsigned int batch_size)
: m_impl(std::make_shared<PrefetcherImpl>(ds.m_impl, async.m_impl)) {
m_impl->m_cache_size = cache_size;
m_impl->m_batch_size = batch_size;
}
Prefetcher::~Prefetcher() {}
unsigned int Prefetcher::getCacheSize() const {
return m_impl->m_cache_size;
}
void Prefetcher::setCacheSize(unsigned int size) {
m_impl->m_cache_size = size;
}
unsigned int Prefetcher::getBatchSize() const {
return m_impl->m_batch_size;
}
void Prefetcher::setBatchSize(unsigned int size) {
m_impl->m_batch_size = size;
}
}
#ifndef __HEPNOS_PREFETCHER_IMPL_HPP
#define __HEPNOS_PREFETCHER_IMPL_HPP
#include <set>
#include "DataStoreImpl.hpp"
#include "AsyncEngineImpl.hpp"
namespace hepnos {
class PrefetcherImpl {
public:
struct ItemPtrComparator {
bool operator()(const std::shared_ptr<ItemImpl>& lhs,
const std::shared_ptr<ItemImpl>& rhs) const {
return lhs->m_descriptor < rhs->m_descriptor;
}
};
std::shared_ptr<DataStoreImpl> m_datastore;
std::shared_ptr<AsyncEngineImpl> m_async;
unsigned int m_cache_size = 16;
unsigned int m_batch_size = 1;
mutable std::set<std::shared_ptr<ItemImpl>, ItemPtrComparator> m_cache;
bool m_associated = false;
PrefetcherImpl(const std::shared_ptr<DataStoreImpl>& ds)
: m_datastore(ds) {}
PrefetcherImpl(const std::shared_ptr<DataStoreImpl>& ds,
const std::shared_ptr<AsyncEngineImpl>& async)
: m_datastore(ds)
, m_async(async) {}
void prefetchFrom(const ItemType& item_type,
const ItemType& prefix_type,
const std::shared_ptr<ItemImpl>& current,
int target=-1) const
{
auto last = current;
while(m_cache.size() != m_cache_size) {
std::vector<std::shared_ptr<ItemImpl>> items;
size_t s = m_datastore->nextItems(item_type, prefix_type, last, items, m_batch_size, target);
if(s != 0)
last = items[items.size()-1];
for(auto& item : items) {
m_cache.insert(std::move(item));
}
if(s < m_batch_size) break;
}
}
size_t nextItems(
const ItemType& item_type,
const ItemType& prefix_type,
const std::shared_ptr<ItemImpl>& current,
std::vector<std::shared_ptr<ItemImpl>>& result,
size_t maxItems,
int target=-1) const
{
auto ub = m_cache.upper_bound(current);
if(ub == m_cache.end()) {
m_cache.clear();
prefetchFrom(item_type, prefix_type, current, target);
}
ub = m_cache.upper_bound(current);
result.clear();
if(ub == m_cache.end()) {
return 0;
} else {
auto it = ub;
result.clear();
for(size_t i=0; i < maxItems && it != m_cache.end(); i++, it++) {
result.push_back(*it);
}
m_cache.erase(ub, it);
}
return result.size();
}
};
}
#endif
......@@ -8,12 +8,58 @@
#include <string>
#include "hepnos/DataSet.hpp"
#include "hepnos/RunSet.hpp"
#include "hepnos/Prefetcher.hpp"
#include "DataSetImpl.hpp"
#include "DataStoreImpl.hpp"
#include "ItemImpl.hpp"
#include "PrefetcherImpl.hpp"
namespace hepnos {
////////////////////////////////////////////////////////////////////////////////////////////
// RunSet::const_iterator::Impl declaration
////////////////////////////////////////////////////////////////////////////////////////////
class RunSet::const_iterator::Impl {
public:
Run m_current_run;
std::shared_ptr<PrefetcherImpl> m_prefetcher;
Impl()
: m_current_run()
{}
Impl(const Run& run)
: m_current_run(run)
{}
Impl(Run&& run)
: m_current_run(std::move(run))
{}
Impl(const Impl& other)
: m_current_run(other.m_current_run)
{}
~Impl() {
if(m_prefetcher)
m_prefetcher->m_associated = false;
}
bool operator==(const Impl& other) const {
return m_current_run == other.m_current_run;
}
void setPrefetcher(const std::shared_ptr<PrefetcherImpl>& p) {
if(p->m_associated)
throw Exception("Prefetcher object already in use");
if(m_prefetcher)
m_prefetcher->m_associated = false;
m_prefetcher = p;
m_prefetcher->m_associated = true;
}
};
////////////////////////////////////////////////////////////////////////////////////////////
// RunSet implementation
////////////////////////////////////////////////////////////////////////////////////////////
......@@ -49,11 +95,25 @@ RunSet::iterator RunSet::find(const RunNumber& runNumber) {
runNumber));
}
RunSet::iterator RunSet::find(const RunNumber& runNumber, const Prefetcher& prefetcher) {
auto it = find(runNumber);
if(it != end())
it.m_impl->setPrefetcher(prefetcher.m_impl);
return it;
}
RunSet::const_iterator RunSet::find(const RunNumber& runNumber) const {
iterator it = const_cast<RunSet*>(this)->find(runNumber);
return it;
}
RunSet::const_iterator RunSet::find(const RunNumber& runNumber, const Prefetcher& prefetcher) const {
iterator it = const_cast<RunSet*>(this)->find(runNumber);
if(it != end())
it.m_impl->setPrefetcher(prefetcher.m_impl);
return it;
}
RunSet::iterator RunSet::begin() {
auto it = find(0);
if(it != end()) return it;
......@@ -68,6 +128,13 @@ RunSet::iterator RunSet::begin() {
else return end();
}
RunSet::iterator RunSet::begin(const Prefetcher& prefetcher) {
auto it = begin();
if(it != end())
it.m_impl->setPrefetcher(prefetcher.m_impl);
return it;
}
RunSet::iterator RunSet::end() {
return RunSet_end;
}
......@@ -76,6 +143,13 @@ RunSet::const_iterator RunSet::cbegin() const {
return const_iterator(const_cast<RunSet*>(this)->begin());
}
RunSet::const_iterator RunSet::cbegin(const Prefetcher& prefetcher) const {
auto it = cbegin();
if(it != cend())
it.m_impl->setPrefetcher(prefetcher.m_impl);
return it;
}
RunSet::const_iterator RunSet::cend() const {
return RunSet_end;
}
......@@ -84,6 +158,13 @@ RunSet::const_iterator RunSet::begin() const {
return const_iterator(const_cast<RunSet*>(this)->begin());
}
RunSet::const_iterator RunSet::begin(const Prefetcher& prefetcher) const {
auto it = const_iterator(const_cast<RunSet*>(this)->begin());
if(it != end())
it.m_impl->setPrefetcher(prefetcher.m_impl);
return it;
}
RunSet::const_iterator RunSet::end() const {
return RunSet_end;
}
......@@ -116,11 +197,25 @@ RunSet::iterator RunSet::lower_bound(const RunNumber& lb) {
}
}
RunSet::iterator RunSet::lower_bound(const RunNumber& lb, const Prefetcher& prefetcher) {
auto it = lower_bound(lb);
if(it != end())
it.m_impl->setPrefetcher(prefetcher.m_impl);
return it;
}
RunSet::const_iterator RunSet::lower_bound(const RunNumber& lb) const {
iterator it = const_cast<RunSet*>(this)->lower_bound(lb);
return it;
}
RunSet::const_iterator RunSet::lower_bound(const RunNumber& lb, const Prefetcher& prefetcher) const {
iterator it = const_cast<RunSet*>(this)->lower_bound(lb);
if(it != end())
it.m_impl->setPrefetcher(prefetcher.m_impl);
return it;
}
RunSet::iterator RunSet::upper_bound(const RunNumber& ub) {
Run run(std::make_shared<ItemImpl>(m_impl->m_datastore,
m_impl->m_uuid, ub));
......@@ -129,39 +224,26 @@ RunSet::iterator RunSet::upper_bound(const RunNumber& ub) {
else return iterator(run);
}
RunSet::iterator RunSet::upper_bound(const RunNumber& ub, const Prefetcher& prefetcher) {
auto it = upper_bound(ub);
if(it != end())
it.m_impl->setPrefetcher(prefetcher.m_impl);
return it;
}
RunSet::const_iterator RunSet::upper_bound(const RunNumber& ub) const {
iterator it = const_cast<RunSet*>(this)->upper_bound(ub);
return it;
}
////////////////////////////////////////////////////////////////////////////////////////////
// RunSet::const_iterator::Impl implementation
////////////////////////////////////////////////////////////////////////////////////////////
class RunSet::const_iterator::Impl {
public:
Run m_current_run;
Impl()
: m_current_run()
{}
Impl(const Run& run)
: m_current_run(run)
{}
Impl(Run&& run)
: m_current_run(std::move(run))
{}
Impl(const Impl& other)
: m_current_run(other.m_current_run)
{}
RunSet::const_iterator RunSet::upper_bound(const RunNumber& ub, const Prefetcher& prefetcher) const {
iterator it = const_cast<RunSet*>(this)->upper_bound(ub);
if(it != end())
it.m_impl->setPrefetcher(prefetcher.m_impl);
return it;
}
bool operator==(const Impl& other) const {
return m_current_run == other.m_current_run;
}
};
////////////////////////////////////////////////////////////////////////////////////////////
// RunSet::const_iterator implementation
......@@ -200,7 +282,18 @@ RunSet::const_iterator::self_type RunSet::const_iterator::operator++() {
if(!m_impl) {
throw Exception("Trying to increment an invalid iterator");
}
m_impl->m_current_run = m_impl->m_current_run.next();
if(!m_impl->m_prefetcher)
m_impl->m_current_run = m_impl->m_current_run.next();
else {
std::vector<std::shared_ptr<ItemImpl>> next_runs;
size_t s = m_impl->m_prefetcher->nextItems(ItemType::RUN,
ItemType::DATASET, m_impl->m_current_run.m_impl, next_runs, 1);
if(s == 1) {
m_impl->m_current_run.m_impl = std::move(next_runs[0]);
} else {
m_impl->m_current_run = Run();
}
}
return *this;
}
......
......@@ -185,3 +185,48 @@ void RunSetTest::testAsync() {
CPPUNIT_ASSERT(mds[i].valid());
}
}
void RunSetTest::testPrefetcher() {
auto root = datastore->root();
DataSet mds = root.createDataSet("matthieu_prefetch");
CPPUNIT_ASSERT(mds.valid());
for(unsigned i=0; i < 20; i++) {
Run r = mds.createRun(i);
CPPUNIT_ASSERT(r.valid());
}
// test begin/end
{
Prefetcher prefetcher(*datastore);
unsigned i=0;
for(auto it = mds.runs().begin(prefetcher); it != mds.runs().end(); it++) {
CPPUNIT_ASSERT(it->valid());
CPPUNIT_ASSERT(it->number() == i);
i += 1;
}
}
// test lower_bound
{
Prefetcher prefetcher(*datastore);
unsigned i=5;
auto it = mds.runs().lower_bound(5);
for(; it != mds.runs().end(); it++) {
CPPUNIT_ASSERT(it->valid());
CPPUNIT_ASSERT(it->number() == i);
i += 1;
}
}
// test lower_bound
{
Prefetcher prefetcher(*datastore);
unsigned i=6;
auto it = mds.runs().upper_bound(5);
for(; it != mds.runs().end(); it++) {
CPPUNIT_ASSERT(it->valid());
CPPUNIT_ASSERT(it->number() == i);
i += 1;
}
}
}
......@@ -16,6 +16,7 @@ class RunSetTest : public CppUnit::TestFixture
CPPUNIT_TEST( testLowerUpperBounds );
CPPUNIT_TEST( testCreateSubRuns );
CPPUNIT_TEST( testAsync );
CPPUNIT_TEST( testPrefetcher );
CPPUNIT_TEST_SUITE_END();
public:
...