Commit 59135965 authored by Matthieu Dorier's avatar Matthieu Dorier

Prefetcher enabled for EventSet iterators

parent a99a7121
......@@ -15,6 +15,7 @@
namespace hepnos {
class Prefetcher;
class EventSetImpl;
/**
......@@ -95,6 +96,7 @@ class EventSet {
* @return an iterator referring to the first Event in this EventSet.
*/
iterator begin();
iterator begin(const Prefetcher& prefetcher);
/**
* @brief Returns an iterator referring to the end of the EventSet.
......@@ -112,6 +114,7 @@ class EventSet {
* @return an iterator referring to the first Event in this EventSet.
*/
const_iterator begin() const;
const_iterator begin(const Prefetcher& prefetcher) const;
/**
* @brief Returns a const_iterator referring to the end of the EventSet.
......@@ -129,6 +132,7 @@ class EventSet {
* @return a const_iterator referring to the first Event in this EventSet.
*/
const_iterator cbegin() const;
const_iterator cbegin(const Prefetcher& prefetcher) const;
/**
* @brief Returns a const_iterator referring to the end of the EventSet.
......
......@@ -8,6 +8,8 @@
#include <string>
#include "hepnos/DataSet.hpp"
#include "hepnos/EventSet.hpp"
#include "hepnos/Prefetcher.hpp"
#include "PrefetcherImpl.hpp"
#include "EventSetImpl.hpp"
#include "DataStoreImpl.hpp"
#include "ItemImpl.hpp"
......@@ -24,6 +26,7 @@ class EventSet::const_iterator::Impl {
public:
Event m_current_event;
std::shared_ptr<PrefetcherImpl> m_prefetcher;
int m_target = 0;
int m_num_targets = 0;
......@@ -49,6 +52,11 @@ class EventSet::const_iterator::Impl {
, m_num_targets(other.m_num_targets)
{}
~Impl() {
if(m_prefetcher)
m_prefetcher->m_associated = false;
}
bool operator==(const Impl& other) const {
auto v1 = m_current_event.valid();
auto v2 = other.m_current_event.valid();
......@@ -59,6 +67,15 @@ class EventSet::const_iterator::Impl {
&& m_target == other.m_target
&& m_num_targets == other.m_num_targets;
}
void setPrefetcher(const std::shared_ptr<PrefetcherImpl>& p) {
if(p->m_associated)
throw Exception("Prefetcher object already in use");
if(m_prefetcher)
m_prefetcher->m_associated = false;
m_prefetcher = p;
m_prefetcher->m_associated = true;
}
};
////////////////////////////////////////////////////////////////////////////////////////////
......@@ -102,9 +119,19 @@ EventSet::const_iterator::self_type EventSet::const_iterator::operator++() {
auto& ds = m_impl->m_current_event.m_impl->m_datastore;
if(m_impl->m_num_targets == 0) { // single target access
size_t s = ds->nextItems(ItemType::EVENT, ItemType::DATASET,
m_impl->m_current_event.m_impl,
next_events, 1, m_impl->m_target);
size_t s = 0;
if(!m_impl->m_prefetcher) {
s = ds->nextItems(ItemType::EVENT,
ItemType::DATASET,
m_impl->m_current_event.m_impl,
next_events, 1, m_impl->m_target);
} else {
s = m_impl->m_prefetcher->nextItems(
ItemType::EVENT,
ItemType::DATASET,
m_impl->m_current_event.m_impl,
next_events, 1, m_impl->m_target);
}
if(s != 0) {
m_impl->m_current_event.m_impl = std::move(next_events[0]);
} else {
......@@ -112,9 +139,17 @@ EventSet::const_iterator::self_type EventSet::const_iterator::operator++() {
}
} else { // multi-target access
// try to get next event from current target
size_t s = ds->nextItems(ItemType::EVENT, ItemType::DATASET,
m_impl->m_current_event.m_impl,
next_events, 1, m_impl->m_target);
size_t s = 0;
if(!m_impl->m_prefetcher) {
s = ds->nextItems(ItemType::EVENT, ItemType::DATASET,
m_impl->m_current_event.m_impl,
next_events, 1, m_impl->m_target);
} else {
s = m_impl->m_prefetcher->nextItems(
ItemType::EVENT, ItemType::DATASET,
m_impl->m_current_event.m_impl,
next_events, 1, m_impl->m_target);
}
if(s == 1) {
// event found
m_impl->m_current_event.m_impl = std::move(next_events[0]);
......@@ -133,8 +168,15 @@ EventSet::const_iterator::self_type EventSet::const_iterator::operator++() {
return *this;
}
// if event (0,0,0) does not exist, then try to get the next one
size_t s = ds->nextItems(ItemType::EVENT, ItemType::DATASET,
event_impl000, next_events, 1, m_impl->m_target);
size_t s = 0;
if(!m_impl->m_prefetcher) {
s = ds->nextItems(ItemType::EVENT, ItemType::DATASET,
event_impl000, next_events, 1, m_impl->m_target);
} else {
s = m_impl->m_prefetcher->nextItems(
ItemType::EVENT, ItemType::DATASET,
event_impl000, next_events, 1, m_impl->m_target);
}
if(s == 1) {
// item found, make the iterator point to it
m_impl->m_current_event.m_impl = std::move(next_events[0]);
......@@ -266,6 +308,16 @@ EventSet::iterator EventSet::begin() {
return end();
}
EventSet::iterator EventSet::begin(const Prefetcher& prefetcher) {
auto it = begin();
if(it != end()) {
it.m_impl->setPrefetcher(prefetcher.m_impl);
prefetcher.m_impl->prefetchFrom(ItemType::EVENT, ItemType::DATASET,
it.m_impl->m_current_event.m_impl, it.m_impl->m_target);
}
return it;
}
EventSet::iterator EventSet::end() {
return EventSet_end;
}
......@@ -274,6 +326,10 @@ EventSet::const_iterator EventSet::cbegin() const {
return const_iterator(const_cast<EventSet*>(this)->begin());
}
EventSet::const_iterator EventSet::cbegin(const Prefetcher& prefetcher) const {
return const_iterator(const_cast<EventSet*>(this)->begin(prefetcher));
}
EventSet::const_iterator EventSet::cend() const {
return EventSet_end;
}
......@@ -282,6 +338,10 @@ EventSet::const_iterator EventSet::begin() const {
return const_iterator(const_cast<EventSet*>(this)->begin());
}
EventSet::const_iterator EventSet::begin(const Prefetcher& prefetcher) const {
return const_iterator(const_cast<EventSet*>(this)->begin(prefetcher));
}
EventSet::const_iterator EventSet::end() const {
return EventSet_end;
}
......
......@@ -429,7 +429,18 @@ Run::const_iterator::self_type Run::const_iterator::operator++() {
if(!m_impl) {
throw Exception("Trying to increment an invalid iterator");
}
m_impl->m_current_subrun = m_impl->m_current_subrun.next();
if(!m_impl->m_prefetcher) {
m_impl->m_current_subrun = m_impl->m_current_subrun.next();
} else {
std::vector<std::shared_ptr<ItemImpl>> next_subruns;
size_t s = m_impl->m_prefetcher->nextItems(ItemType::SUBRUN,
ItemType::SUBRUN, m_impl->m_current_subrun.m_impl, next_subruns, 1);
if(s == 1) {
m_impl->m_current_subrun.m_impl = std::move(next_subruns[0]);
} else {
m_impl->m_current_subrun = SubRun();
}
}
return *this;
}
......
......@@ -431,7 +431,18 @@ SubRun::const_iterator::self_type SubRun::const_iterator::operator++() {
if(!m_impl) {
throw Exception("Trying to increment an invalid iterator");
}
m_impl->m_current_event = m_impl->m_current_event.next();
if(!m_impl->m_prefetcher) {
m_impl->m_current_event = m_impl->m_current_event.next();
} else {
std::vector<std::shared_ptr<ItemImpl>> next_events;
size_t s = m_impl->m_prefetcher->nextItems(ItemType::EVENT,
ItemType::SUBRUN, m_impl->m_current_event.m_impl, next_events, 1);
if(s == 1) {
m_impl->m_current_event.m_impl = std::move(next_events[0]);
} else {
m_impl->m_current_event = Event();
}
}
return *this;
}
......
......@@ -165,3 +165,66 @@ void EventSetTest::testBeginEnd() {
CPPUNIT_ASSERT_EQUAL(1,(int)i);
}
void EventSetTest::testPrefetcher() {
auto root = datastore->root();
DataSet mds = root["matthieu"];
CPPUNIT_ASSERT(mds.valid());
{
Prefetcher prefetcher(*datastore);
std::vector<std::tuple<RunNumber, SubRunNumber, EventNumber>> events;
// iteration target by target
unsigned i = 0;
for(int target = 0; target < datastore->numTargets(hepnos::ItemType::EVENT); target++) {
auto eventset = mds.events(target);
for(auto it = eventset.begin(prefetcher); it != eventset.end(); it++) {
auto& ev = *it;
CPPUNIT_ASSERT(ev.valid());
i += 1;
auto ev_number = ev.number();
auto sr = ev.subrun();
auto sr_number = sr.number();
auto r_number = sr.run().number();
events.emplace_back(r_number, sr_number, ev_number);
}
}
CPPUNIT_ASSERT_EQUAL(2*3*4, (int)i);
std::sort(events.begin(), events.end(), [](const auto& t1, const auto& t2) {
auto run1 = std::get<0>(t1);
auto subrun1 = std::get<1>(t1);
auto event1 = std::get<2>(t1);
auto run2 = std::get<0>(t2);
auto subrun2 = std::get<1>(t2);
auto event2 = std::get<2>(t2);
if(run1 < run2) return true;
if(run1 > run2) return false;
if(subrun1 < subrun2) return true;
if(subrun1 > subrun2) return false;
if(event1 < event2) return true;
return false;
});
unsigned e = 0;
for(unsigned i=3; i < 5; i++) {
for(unsigned j=6; j < 9; j++) {
for(unsigned k=1; k < 5; k++, e++) {
CPPUNIT_ASSERT_EQUAL(i, (unsigned)std::get<0>(events[e]));
CPPUNIT_ASSERT_EQUAL(j, (unsigned)std::get<1>(events[e]));
CPPUNIT_ASSERT_EQUAL(k, (unsigned)std::get<2>(events[e]));
}
}
}
}
{
Prefetcher prefetcher(*datastore);
// iteration all targets at once
unsigned i = 0;
auto eventset = mds.events();
for(auto it = eventset.begin(prefetcher); it != eventset.end(); it++) {
auto& ev = *it;
CPPUNIT_ASSERT(ev.valid());
i += 1;
}
CPPUNIT_ASSERT_EQUAL(2*3*4, (int)i);
}
}
......@@ -12,6 +12,7 @@ class EventSetTest : public CppUnit::TestFixture
CPPUNIT_TEST( testFillDataStore );
CPPUNIT_TEST( testInvalid );
CPPUNIT_TEST( testBeginEnd );
CPPUNIT_TEST( testPrefetcher );
CPPUNIT_TEST_SUITE_END();
public:
......@@ -22,6 +23,7 @@ class EventSetTest : public CppUnit::TestFixture
void testFillDataStore();
void testInvalid();
void testBeginEnd();
void testPrefetcher();
};
#endif
......@@ -210,7 +210,7 @@ void RunSetTest::testPrefetcher() {
{
Prefetcher prefetcher(*datastore);
unsigned i=5;
auto it = mds.runs().lower_bound(5);
auto it = mds.runs().lower_bound(5, prefetcher);
for(; it != mds.runs().end(); it++) {
CPPUNIT_ASSERT(it->valid());
CPPUNIT_ASSERT(it->number() == i);
......@@ -221,7 +221,7 @@ void RunSetTest::testPrefetcher() {
{
Prefetcher prefetcher(*datastore);
unsigned i=6;
auto it = mds.runs().upper_bound(5);
auto it = mds.runs().upper_bound(5, prefetcher);
for(; it != mds.runs().end(); it++) {
CPPUNIT_ASSERT(it->valid());
CPPUNIT_ASSERT(it->number() == i);
......
......@@ -224,7 +224,7 @@ void RunTest::testPrefetcher() {
{
Prefetcher prefetcher(*datastore);
unsigned i=5;
auto it = r.lower_bound(5);
auto it = r.lower_bound(5, prefetcher);
for(; it != r.end(); it++) {
CPPUNIT_ASSERT(it->valid());
CPPUNIT_ASSERT(it->number() == i);
......@@ -235,7 +235,7 @@ void RunTest::testPrefetcher() {
{
Prefetcher prefetcher(*datastore);
unsigned i=6;
auto it = r.upper_bound(5);
auto it = r.upper_bound(5, prefetcher);
for(; it != r.end(); it++) {
CPPUNIT_ASSERT(it->valid());
CPPUNIT_ASSERT(it->number() == i);
......
......@@ -225,7 +225,7 @@ void SubRunTest::testPrefetcher() {
{
Prefetcher prefetcher(*datastore);
unsigned i=5;
auto it = sr.lower_bound(5);
auto it = sr.lower_bound(5, prefetcher);
for(; it != sr.end(); it++) {
CPPUNIT_ASSERT(it->valid());
CPPUNIT_ASSERT(it->number() == i);
......@@ -236,7 +236,7 @@ void SubRunTest::testPrefetcher() {
{
Prefetcher prefetcher(*datastore);
unsigned i=6;
auto it = sr.upper_bound(5);
auto it = sr.upper_bound(5, prefetcher);
for(; it != sr.end(); it++) {
CPPUNIT_ASSERT(it->valid());
CPPUNIT_ASSERT(it->number() == i);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment