Commit 5f577a11 authored by Matthieu Dorier's avatar Matthieu Dorier

Prefetcher working for RunSet and Run

parent 0e3be1bc
...@@ -9,10 +9,16 @@ class DataStore; ...@@ -9,10 +9,16 @@ class DataStore;
class PrefetcherImpl; class PrefetcherImpl;
class AsyncEngine; class AsyncEngine;
class RunSet; class RunSet;
class EventSet;
class Run;
class SubRun;
class Prefetcher { class Prefetcher {
friend class RunSet; friend class RunSet;
friend class EventSet;
friend class Run;
friend class SubRun;
private: private:
......
...@@ -346,6 +346,8 @@ class Run : public KeyValueContainer { ...@@ -346,6 +346,8 @@ class Run : public KeyValueContainer {
class Run::const_iterator { class Run::const_iterator {
friend class Run;
protected: protected:
/** /**
......
...@@ -6,14 +6,63 @@ ...@@ -6,14 +6,63 @@
#include "hepnos/Run.hpp" #include "hepnos/Run.hpp"
#include "hepnos/DataSet.hpp" #include "hepnos/DataSet.hpp"
#include "hepnos/AsyncEngine.hpp" #include "hepnos/AsyncEngine.hpp"
#include "hepnos/Prefetcher.hpp"
#include "ItemImpl.hpp" #include "ItemImpl.hpp"
#include "ItemImpl.hpp" #include "PrefetcherImpl.hpp"
#include "DataStoreImpl.hpp" #include "DataStoreImpl.hpp"
#include "WriteBatchImpl.hpp" #include "WriteBatchImpl.hpp"
#include "AsyncEngineImpl.hpp" #include "AsyncEngineImpl.hpp"
namespace hepnos { namespace hepnos {
////////////////////////////////////////////////////////////////////////////////////////////
// Run::const_iterator::Impl declaration
////////////////////////////////////////////////////////////////////////////////////////////
class Run::const_iterator::Impl {
friend class Run;
public:
SubRun m_current_subrun;
std::shared_ptr<PrefetcherImpl> m_prefetcher;
Impl()
: m_current_subrun()
{}
Impl(const SubRun& subrun)
: m_current_subrun(subrun)
{}
Impl(SubRun&& subrun)
: m_current_subrun(std::move(subrun))
{}
Impl(const Impl& other)
: m_current_subrun(other.m_current_subrun)
{}
~Impl() {
if(m_prefetcher)
m_prefetcher->m_associated = false;
}
bool operator==(const Impl& other) const {
return m_current_subrun == other.m_current_subrun;
}
void setPrefetcher(const std::shared_ptr<PrefetcherImpl>& p) {
if(p->m_associated)
throw Exception("Prefetcher object already in use");
if(m_prefetcher)
m_prefetcher->m_associated = false;
m_prefetcher = p;
m_prefetcher->m_associated = true;
}
};
static Run::iterator Run_end; static Run::iterator Run_end;
Run::Run() Run::Run()
...@@ -175,6 +224,15 @@ Run::iterator Run::find(const SubRunNumber& subRunNumber) { ...@@ -175,6 +224,15 @@ Run::iterator Run::find(const SubRunNumber& subRunNumber) {
return iterator(SubRun(std::move(new_subrun_impl))); return iterator(SubRun(std::move(new_subrun_impl)));
} }
Run::iterator Run::find(const SubRunNumber& subRunNumber, const Prefetcher& prefetcher) {
auto it = find(subRunNumber);
if(it != end()) {
it.m_impl->setPrefetcher(prefetcher.m_impl);
prefetcher.m_impl->prefetchFrom(ItemType::SUBRUN, ItemType::RUN, it.m_impl->m_current_subrun.m_impl);
}
return it;
}
Run::const_iterator Run::find(const SubRunNumber& subRunNumber) const { Run::const_iterator Run::find(const SubRunNumber& subRunNumber) const {
iterator it = const_cast<Run*>(this)->find(subRunNumber); iterator it = const_cast<Run*>(this)->find(subRunNumber);
return it; return it;
...@@ -194,6 +252,15 @@ Run::iterator Run::begin() { ...@@ -194,6 +252,15 @@ Run::iterator Run::begin() {
else return end(); else return end();
} }
Run::iterator Run::begin(const Prefetcher& prefetcher) {
auto it = begin();
if(it != end()) {
it.m_impl->setPrefetcher(prefetcher.m_impl);
prefetcher.m_impl->prefetchFrom(ItemType::SUBRUN, ItemType::RUN, it.m_impl->m_current_subrun.m_impl);
}
return it;
}
Run::iterator Run::end() { Run::iterator Run::end() {
if(!valid()) { if(!valid()) {
throw Exception("Calling Run member function on an invalid Run object"); throw Exception("Calling Run member function on an invalid Run object");
...@@ -205,6 +272,10 @@ Run::const_iterator Run::begin() const { ...@@ -205,6 +272,10 @@ Run::const_iterator Run::begin() const {
return const_iterator(const_cast<Run*>(this)->begin()); return const_iterator(const_cast<Run*>(this)->begin());
} }
Run::const_iterator Run::begin(const Prefetcher& prefetcher) const {
return const_iterator(const_cast<Run*>(this)->begin(prefetcher));
}
Run::const_iterator Run::end() const { Run::const_iterator Run::end() const {
if(!valid()) { if(!valid()) {
throw Exception("Calling Run member function on an invalid Run object"); throw Exception("Calling Run member function on an invalid Run object");
...@@ -216,6 +287,10 @@ Run::const_iterator Run::cbegin() const { ...@@ -216,6 +287,10 @@ Run::const_iterator Run::cbegin() const {
return const_iterator(const_cast<Run*>(this)->begin()); return const_iterator(const_cast<Run*>(this)->begin());
} }
Run::const_iterator Run::cbegin(const Prefetcher& prefetcher) const {
return const_iterator(const_cast<Run*>(this)->begin(prefetcher));
}
Run::const_iterator Run::cend() const { Run::const_iterator Run::cend() const {
if(!valid()) { if(!valid()) {
throw Exception("Calling Run member function on an invalid Run object"); throw Exception("Calling Run member function on an invalid Run object");
...@@ -251,11 +326,23 @@ Run::iterator Run::lower_bound(const SubRunNumber& lb) { ...@@ -251,11 +326,23 @@ Run::iterator Run::lower_bound(const SubRunNumber& lb) {
} }
} }
Run::const_iterator Run::lower_bound(const SubRunNumber& lb) const { Run::iterator Run::lower_bound(const SubRunNumber& lb, const Prefetcher& prefetcher) {
iterator it = const_cast<Run*>(this)->lower_bound(lb); auto it = lower_bound(lb);
if(it != end()) {
it.m_impl->setPrefetcher(prefetcher.m_impl);
prefetcher.m_impl->prefetchFrom(ItemType::SUBRUN, ItemType::RUN, it.m_impl->m_current_subrun.m_impl);
}
return it; return it;
} }
Run::const_iterator Run::lower_bound(const SubRunNumber& lb) const {
return const_cast<Run*>(this)->lower_bound(lb);
}
Run::const_iterator Run::lower_bound(const SubRunNumber& lb, const Prefetcher& prefetcher) const {
return const_cast<Run*>(this)->lower_bound(lb, prefetcher);
}
Run::iterator Run::upper_bound(const SubRunNumber& ub) { Run::iterator Run::upper_bound(const SubRunNumber& ub) {
if(!valid()) { if(!valid()) {
throw Exception("Calling Run member function on an invalid Run object"); throw Exception("Calling Run member function on an invalid Run object");
...@@ -268,11 +355,23 @@ Run::iterator Run::upper_bound(const SubRunNumber& ub) { ...@@ -268,11 +355,23 @@ Run::iterator Run::upper_bound(const SubRunNumber& ub) {
else return iterator(subrun); else return iterator(subrun);
} }
Run::const_iterator Run::upper_bound(const SubRunNumber& ub) const { Run::iterator Run::upper_bound(const SubRunNumber& ub, const Prefetcher& prefetcher) {
iterator it = const_cast<Run*>(this)->upper_bound(ub); auto it = upper_bound(ub);
if(it != end()) {
it.m_impl->setPrefetcher(prefetcher.m_impl);
prefetcher.m_impl->prefetchFrom(ItemType::SUBRUN, ItemType::RUN, it.m_impl->m_current_subrun.m_impl);
}
return it; return it;
} }
Run::const_iterator Run::upper_bound(const SubRunNumber& ub) const {
return const_cast<Run*>(this)->upper_bound(ub);
}
Run::const_iterator Run::upper_bound(const SubRunNumber& ub, const Prefetcher& prefetcher) const {
return const_cast<Run*>(this)->upper_bound(ub, prefetcher);
}
void Run::toDescriptor(RunDescriptor& descriptor) { void Run::toDescriptor(RunDescriptor& descriptor) {
std::memset(descriptor.data, 0, sizeof(descriptor.data)); std::memset(descriptor.data, 0, sizeof(descriptor.data));
if(!valid()) return; if(!valid()) return;
...@@ -288,35 +387,6 @@ Run Run::fromDescriptor(const DataStore& datastore, const RunDescriptor& descrip ...@@ -288,35 +387,6 @@ Run Run::fromDescriptor(const DataStore& datastore, const RunDescriptor& descrip
else return Run(); else return Run();
} }
////////////////////////////////////////////////////////////////////////////////////////////
// Run::const_iterator::Impl implementation
////////////////////////////////////////////////////////////////////////////////////////////
class Run::const_iterator::Impl {
public:
SubRun m_current_subrun;
Impl()
: m_current_subrun()
{}
Impl(const SubRun& subrun)
: m_current_subrun(subrun)
{}
Impl(SubRun&& subrun)
: m_current_subrun(std::move(subrun))
{}
Impl(const Impl& other)
: m_current_subrun(other.m_current_subrun)
{}
bool operator==(const Impl& other) const {
return m_current_subrun == other.m_current_subrun;
}
};
//////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////
// Run::const_iterator implementation // Run::const_iterator implementation
//////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////
......
...@@ -97,8 +97,10 @@ RunSet::iterator RunSet::find(const RunNumber& runNumber) { ...@@ -97,8 +97,10 @@ RunSet::iterator RunSet::find(const RunNumber& runNumber) {
RunSet::iterator RunSet::find(const RunNumber& runNumber, const Prefetcher& prefetcher) { RunSet::iterator RunSet::find(const RunNumber& runNumber, const Prefetcher& prefetcher) {
auto it = find(runNumber); auto it = find(runNumber);
if(it != end()) if(it != end()) {
it.m_impl->setPrefetcher(prefetcher.m_impl); it.m_impl->setPrefetcher(prefetcher.m_impl);
prefetcher.m_impl->prefetchFrom(ItemType::RUN, ItemType::DATASET, it.m_impl->m_current_run.m_impl);
}
return it; return it;
} }
...@@ -109,8 +111,10 @@ RunSet::const_iterator RunSet::find(const RunNumber& runNumber) const { ...@@ -109,8 +111,10 @@ RunSet::const_iterator RunSet::find(const RunNumber& runNumber) const {
RunSet::const_iterator RunSet::find(const RunNumber& runNumber, const Prefetcher& prefetcher) const { RunSet::const_iterator RunSet::find(const RunNumber& runNumber, const Prefetcher& prefetcher) const {
iterator it = const_cast<RunSet*>(this)->find(runNumber); iterator it = const_cast<RunSet*>(this)->find(runNumber);
if(it != end()) if(it != end()) {
it.m_impl->setPrefetcher(prefetcher.m_impl); it.m_impl->setPrefetcher(prefetcher.m_impl);
prefetcher.m_impl->prefetchFrom(ItemType::RUN, ItemType::DATASET, it.m_impl->m_current_run.m_impl);
}
return it; return it;
} }
...@@ -130,8 +134,10 @@ RunSet::iterator RunSet::begin() { ...@@ -130,8 +134,10 @@ RunSet::iterator RunSet::begin() {
RunSet::iterator RunSet::begin(const Prefetcher& prefetcher) { RunSet::iterator RunSet::begin(const Prefetcher& prefetcher) {
auto it = begin(); auto it = begin();
if(it != end()) if(it != end()) {
it.m_impl->setPrefetcher(prefetcher.m_impl); it.m_impl->setPrefetcher(prefetcher.m_impl);
prefetcher.m_impl->prefetchFrom(ItemType::RUN, ItemType::DATASET, it.m_impl->m_current_run.m_impl);
}
return it; return it;
} }
...@@ -145,8 +151,10 @@ RunSet::const_iterator RunSet::cbegin() const { ...@@ -145,8 +151,10 @@ RunSet::const_iterator RunSet::cbegin() const {
RunSet::const_iterator RunSet::cbegin(const Prefetcher& prefetcher) const { RunSet::const_iterator RunSet::cbegin(const Prefetcher& prefetcher) const {
auto it = cbegin(); auto it = cbegin();
if(it != cend()) if(it != cend()) {
it.m_impl->setPrefetcher(prefetcher.m_impl); it.m_impl->setPrefetcher(prefetcher.m_impl);
prefetcher.m_impl->prefetchFrom(ItemType::RUN, ItemType::DATASET, it.m_impl->m_current_run.m_impl);
}
return it; return it;
} }
...@@ -160,8 +168,10 @@ RunSet::const_iterator RunSet::begin() const { ...@@ -160,8 +168,10 @@ RunSet::const_iterator RunSet::begin() const {
RunSet::const_iterator RunSet::begin(const Prefetcher& prefetcher) const { RunSet::const_iterator RunSet::begin(const Prefetcher& prefetcher) const {
auto it = const_iterator(const_cast<RunSet*>(this)->begin()); auto it = const_iterator(const_cast<RunSet*>(this)->begin());
if(it != end()) if(it != end()) {
it.m_impl->setPrefetcher(prefetcher.m_impl); it.m_impl->setPrefetcher(prefetcher.m_impl);
prefetcher.m_impl->prefetchFrom(ItemType::RUN, ItemType::DATASET, it.m_impl->m_current_run.m_impl);
}
return it; return it;
} }
...@@ -199,8 +209,10 @@ RunSet::iterator RunSet::lower_bound(const RunNumber& lb) { ...@@ -199,8 +209,10 @@ RunSet::iterator RunSet::lower_bound(const RunNumber& lb) {
RunSet::iterator RunSet::lower_bound(const RunNumber& lb, const Prefetcher& prefetcher) { RunSet::iterator RunSet::lower_bound(const RunNumber& lb, const Prefetcher& prefetcher) {
auto it = lower_bound(lb); auto it = lower_bound(lb);
if(it != end()) if(it != end()) {
it.m_impl->setPrefetcher(prefetcher.m_impl); it.m_impl->setPrefetcher(prefetcher.m_impl);
prefetcher.m_impl->prefetchFrom(ItemType::RUN, ItemType::DATASET, it.m_impl->m_current_run.m_impl);
}
return it; return it;
} }
...@@ -211,8 +223,10 @@ RunSet::const_iterator RunSet::lower_bound(const RunNumber& lb) const { ...@@ -211,8 +223,10 @@ RunSet::const_iterator RunSet::lower_bound(const RunNumber& lb) const {
RunSet::const_iterator RunSet::lower_bound(const RunNumber& lb, const Prefetcher& prefetcher) const { RunSet::const_iterator RunSet::lower_bound(const RunNumber& lb, const Prefetcher& prefetcher) const {
iterator it = const_cast<RunSet*>(this)->lower_bound(lb); iterator it = const_cast<RunSet*>(this)->lower_bound(lb);
if(it != end()) if(it != end()) {
it.m_impl->setPrefetcher(prefetcher.m_impl); it.m_impl->setPrefetcher(prefetcher.m_impl);
prefetcher.m_impl->prefetchFrom(ItemType::RUN, ItemType::DATASET, it.m_impl->m_current_run.m_impl);
}
return it; return it;
} }
...@@ -226,8 +240,10 @@ RunSet::iterator RunSet::upper_bound(const RunNumber& ub) { ...@@ -226,8 +240,10 @@ RunSet::iterator RunSet::upper_bound(const RunNumber& ub) {
RunSet::iterator RunSet::upper_bound(const RunNumber& ub, const Prefetcher& prefetcher) { RunSet::iterator RunSet::upper_bound(const RunNumber& ub, const Prefetcher& prefetcher) {
auto it = upper_bound(ub); auto it = upper_bound(ub);
if(it != end()) if(it != end()) {
it.m_impl->setPrefetcher(prefetcher.m_impl); it.m_impl->setPrefetcher(prefetcher.m_impl);
prefetcher.m_impl->prefetchFrom(ItemType::RUN, ItemType::DATASET, it.m_impl->m_current_run.m_impl);
}
return it; return it;
} }
...@@ -239,8 +255,10 @@ RunSet::const_iterator RunSet::upper_bound(const RunNumber& ub) const { ...@@ -239,8 +255,10 @@ RunSet::const_iterator RunSet::upper_bound(const RunNumber& ub) const {
RunSet::const_iterator RunSet::upper_bound(const RunNumber& ub, const Prefetcher& prefetcher) const { RunSet::const_iterator RunSet::upper_bound(const RunNumber& ub, const Prefetcher& prefetcher) const {
iterator it = const_cast<RunSet*>(this)->upper_bound(ub); iterator it = const_cast<RunSet*>(this)->upper_bound(ub);
if(it != end()) if(it != end()) {
it.m_impl->setPrefetcher(prefetcher.m_impl); it.m_impl->setPrefetcher(prefetcher.m_impl);
prefetcher.m_impl->prefetchFrom(ItemType::RUN, ItemType::DATASET, it.m_impl->m_current_run.m_impl);
}
return it; return it;
} }
......
...@@ -206,7 +206,6 @@ void RunSetTest::testPrefetcher() { ...@@ -206,7 +206,6 @@ void RunSetTest::testPrefetcher() {
i += 1; i += 1;
} }
} }
// test lower_bound // test lower_bound
{ {
Prefetcher prefetcher(*datastore); Prefetcher prefetcher(*datastore);
...@@ -218,7 +217,7 @@ void RunSetTest::testPrefetcher() { ...@@ -218,7 +217,7 @@ void RunSetTest::testPrefetcher() {
i += 1; i += 1;
} }
} }
// test lower_bound // test upper_bound
{ {
Prefetcher prefetcher(*datastore); Prefetcher prefetcher(*datastore);
unsigned i=6; unsigned i=6;
......
...@@ -199,5 +199,47 @@ void RunTest::testAsync() { ...@@ -199,5 +199,47 @@ void RunTest::testAsync() {
} }
void RunTest::testPrefetcher() { void RunTest::testPrefetcher() {
// TODO auto root = datastore->root();
DataSet mds = root.createDataSet("matthieu_prefetch");
CPPUNIT_ASSERT(mds.valid());
Run r = mds.createRun(42);
CPPUNIT_ASSERT(r.valid());
for(unsigned i=0; i < 20; i++) {
SubRun sr = r.createSubRun(i);
CPPUNIT_ASSERT(sr.valid());
}
// test begin/end
{
Prefetcher prefetcher(*datastore);
unsigned i=0;
for(auto it = r.begin(prefetcher); it != r.end(); it++) {
CPPUNIT_ASSERT(it->valid());
CPPUNIT_ASSERT(it->number() == i);
i += 1;
}
}
// test lower_bound
{
Prefetcher prefetcher(*datastore);
unsigned i=5;
auto it = r.lower_bound(5);
for(; it != r.end(); it++) {
CPPUNIT_ASSERT(it->valid());
CPPUNIT_ASSERT(it->number() == i);
i += 1;
}
}
// test upper_bound
{
Prefetcher prefetcher(*datastore);
unsigned i=6;
auto it = r.upper_bound(5);
for(; it != r.end(); it++) {
CPPUNIT_ASSERT(it->valid());
CPPUNIT_ASSERT(it->number() == i);
i += 1;
}
}
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment