Commit 9d29aaa0 authored by Matthieu Dorier's avatar Matthieu Dorier

implemented a way to prefetch objects during iterations

parent 3cc9962a
...@@ -164,6 +164,8 @@ class DataSet : public KeyValueContainer { ...@@ -164,6 +164,8 @@ class DataSet : public KeyValueContainer {
*/ */
bool loadRawData(const std::string& key, std::string& value) const override; bool loadRawData(const std::string& key, std::string& value) const override;
bool loadRawData(const std::string& key, char* value, size_t* vsize) const override; bool loadRawData(const std::string& key, char* value, size_t* vsize) const override;
bool loadRawData(const Prefetcher& prefetcher, const std::string& key, std::string& value) const override;
bool loadRawData(const Prefetcher& prefetcher, const std::string& key, char* value, size_t* vsize) const override;
/** /**
* @brief Comparison operator. * @brief Comparison operator.
......
...@@ -128,6 +128,8 @@ class Event : public KeyValueContainer { ...@@ -128,6 +128,8 @@ class Event : public KeyValueContainer {
*/ */
bool loadRawData(const std::string& key, std::string& value) const override; bool loadRawData(const std::string& key, std::string& value) const override;
bool loadRawData(const std::string& key, char* value, size_t* vsize) const override; bool loadRawData(const std::string& key, char* value, size_t* vsize) const override;
bool loadRawData(const Prefetcher& prefetcher, const std::string& key, std::string& buffer) const override;
bool loadRawData(const Prefetcher& prefetcher, const std::string& key, char* value, size_t* vsize) const override;
/** /**
* @brief Compares this Event with another Event. The Events must point to * @brief Compares this Event with another Event. The Events must point to
......
...@@ -22,6 +22,7 @@ namespace hepnos { ...@@ -22,6 +22,7 @@ namespace hepnos {
class WriteBatch; class WriteBatch;
class AsyncEngine; class AsyncEngine;
class Prefetcher;
class KeyValueContainer { class KeyValueContainer {
...@@ -104,6 +105,10 @@ class KeyValueContainer { ...@@ -104,6 +105,10 @@ class KeyValueContainer {
virtual bool loadRawData(const std::string& key, char* value, size_t* vsize) const = 0; virtual bool loadRawData(const std::string& key, char* value, size_t* vsize) const = 0;
virtual bool loadRawData(const Prefetcher& prefetcher, const std::string& key, std::string& buffer) const = 0;
virtual bool loadRawData(const Prefetcher& prefetcher, const std::string& key, char* value, size_t* vsize) const = 0;
/** /**
* @brief Stores a key/value pair into the KeyValueContainer. * @brief Stores a key/value pair into the KeyValueContainer.
* The type of the key should have operator<< available * The type of the key should have operator<< available
...@@ -207,6 +212,16 @@ class KeyValueContainer { ...@@ -207,6 +212,16 @@ class KeyValueContainer {
return loadVectorImpl(key, value, std::is_pod<V>()); return loadVectorImpl(key, value, std::is_pod<V>());
} }
template<typename K, typename V>
bool load(const Prefetcher& prefetcher, const K& key, V& value) const {
return loadImpl(prefetcher, key, value, std::is_pod<V>());
}
template<typename K, typename V>
bool load(const Prefetcher& prefetcher, const K& key, std::vector<V>& value) const {
return loadVectorImpl(prefetcher, key, value, std::is_pod<V>());
}
private: private:
/** /**
...@@ -297,6 +312,19 @@ class KeyValueContainer { ...@@ -297,6 +312,19 @@ class KeyValueContainer {
return vsize == sizeof(value); return vsize == sizeof(value);
} }
template<typename K, typename V>
bool loadImpl(const Prefetcher& prefetcher, const K& key, V& value,
const std::integral_constant<bool, true>&) const {
std::string buffer;
std::stringstream ss_key;
ss_key << key << "#" << demangle<V>();
size_t vsize = sizeof(value);
if(!loadRawData(prefetcher, ss_key.str(), reinterpret_cast<char*>(&value), &vsize)) {
return false;
}
return vsize == sizeof(value);
}
/** /**
* @brief Implementation of the load function when the value type is not a POD. * @brief Implementation of the load function when the value type is not a POD.
*/ */
...@@ -319,6 +347,25 @@ class KeyValueContainer { ...@@ -319,6 +347,25 @@ class KeyValueContainer {
return true; return true;
} }
template<typename K, typename V>
bool loadImpl(const Prefetcher& prefetcher, const K& key, V& value,
const std::integral_constant<bool, false>&) const {
std::string buffer;
std::stringstream ss_key;
ss_key << key << "#" << demangle<V>();
if(!loadRawData(prefetcher, ss_key.str(), buffer)) {
return false;
}
try {
std::stringstream ss(buffer);
InputArchive ia(datastore(), ss);
ia >> value;
} catch(...) {
throw Exception("Exception occured during serialization");
}
return true;
}
/** /**
* @brief Implementation of the load function when the value type is a vector of POD. * @brief Implementation of the load function when the value type is a vector of POD.
*/ */
...@@ -344,6 +391,28 @@ class KeyValueContainer { ...@@ -344,6 +391,28 @@ class KeyValueContainer {
return true; return true;
} }
template<typename K, typename V>
bool loadVectorImpl(const Prefetcher& prefetcher, const K& key, std::vector<V>& value,
const std::integral_constant<bool, true>&) const {
std::string buffer;
std::stringstream ss_key;
ss_key << key << "#" << demangle<std::vector<V>>();
if(!loadRawData(prefetcher, ss_key.str(), buffer)) {
return false;
}
size_t count = 0;
if(buffer.size() < sizeof(count)) {
return false;
}
std::memcpy(&count, buffer.data(), sizeof(count));
if(buffer.size() != sizeof(count) + count*sizeof(V)) {
return false;
}
value.resize(count);
std::memcpy(value.data(), buffer.data()+sizeof(count), count*sizeof(V));
return true;
}
/** /**
* @brief Implementation of the load function when the value type is a vector of non-POD. * @brief Implementation of the load function when the value type is a vector of non-POD.
*/ */
...@@ -370,6 +439,31 @@ class KeyValueContainer { ...@@ -370,6 +439,31 @@ class KeyValueContainer {
} }
return true; return true;
} }
template<typename K, typename V>
bool loadVectorImpl(const Prefetcher& prefetcher, const K& key, std::vector<V>& value,
const std::integral_constant<bool, false>&) const {
std::string buffer;
std::stringstream ss_key;
ss_key << key << "#" << demangle<std::vector<V>>();
if(!loadRawData(prefetcher, ss_key.str(), buffer)) {
return false;
}
try {
std::stringstream ss(buffer);
InputArchive ia(datastore(), ss);
size_t count = 0;
ia >> count;
value.resize(count);
for(unsigned i=0; i<count; i++) {
ia >> value[i];
}
} catch(...) {
throw Exception("Exception occured during serialization");
}
return true;
}
/** /**
* @brief Creates the string key based on the provided key * @brief Creates the string key based on the provided key
* and the type of the value. Serializes the value into a string. * and the type of the value. Serializes the value into a string.
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
#define __HEPNOS_PREFETCHER_HPP #define __HEPNOS_PREFETCHER_HPP
#include <memory> #include <memory>
#include <hepnos/Demangle.hpp>
#include <hepnos/Prefetchable.hpp> #include <hepnos/Prefetchable.hpp>
namespace hepnos { namespace hepnos {
...@@ -13,6 +14,7 @@ class RunSet; ...@@ -13,6 +14,7 @@ class RunSet;
class EventSet; class EventSet;
class Run; class Run;
class SubRun; class SubRun;
class Event;
class Prefetcher { class Prefetcher {
...@@ -20,10 +22,7 @@ class Prefetcher { ...@@ -20,10 +22,7 @@ class Prefetcher {
friend class EventSet; friend class EventSet;
friend class Run; friend class Run;
friend class SubRun; friend class SubRun;
friend class Event;
private:
std::shared_ptr<PrefetcherImpl> m_impl;
public: public:
...@@ -48,6 +47,18 @@ class Prefetcher { ...@@ -48,6 +47,18 @@ class Prefetcher {
Prefetchable<Container> operator()(const Container& c) const { Prefetchable<Container> operator()(const Container& c) const {
return Prefetchable<Container>(c, *this); return Prefetchable<Container>(c, *this);
} }
template<typename V>
void fetchProduct(const std::string& label, bool fetch=true) const {
fetchProductImpl(label + "#" + demangle<V>(), fetch);
}
private:
std::shared_ptr<PrefetcherImpl> m_impl;
void fetchProductImpl(const std::string& labelAndType, bool fetch) const;
}; };
} }
......
...@@ -20,6 +20,7 @@ class ProductID { ...@@ -20,6 +20,7 @@ class ProductID {
friend class DataStoreImpl; friend class DataStoreImpl;
friend class AsyncEngineImpl; friend class AsyncEngineImpl;
friend class WriteBatchImpl; friend class WriteBatchImpl;
friend class PrefetcherImpl;
friend class boost::serialization::access; friend class boost::serialization::access;
public: public:
......
...@@ -132,6 +132,8 @@ class Run : public KeyValueContainer { ...@@ -132,6 +132,8 @@ class Run : public KeyValueContainer {
*/ */
bool loadRawData(const std::string& key, std::string& buffer) const override; bool loadRawData(const std::string& key, std::string& buffer) const override;
bool loadRawData(const std::string& key, char* value, size_t* vsize) const override; bool loadRawData(const std::string& key, char* value, size_t* vsize) const override;
bool loadRawData(const Prefetcher& prefetcher, const std::string& key, std::string& buffer) const override;
bool loadRawData(const Prefetcher& prefetcher, const std::string& key, char* value, size_t* vsize) const override;
/** /**
* @brief Compares this Run with another Run. The Runs must point to * @brief Compares this Run with another Run. The Runs must point to
......
...@@ -130,6 +130,8 @@ class SubRun : public KeyValueContainer { ...@@ -130,6 +130,8 @@ class SubRun : public KeyValueContainer {
*/ */
bool loadRawData(const std::string& key, std::string& buffer) const override; bool loadRawData(const std::string& key, std::string& buffer) const override;
bool loadRawData(const std::string& key, char* value, size_t* vsize) const override; bool loadRawData(const std::string& key, char* value, size_t* vsize) const override;
bool loadRawData(const Prefetcher& prefetcher, const std::string& key, std::string& buffer) const override;
bool loadRawData(const Prefetcher& prefetcher, const std::string& key, char* value, size_t* vsize) const override;
/** /**
* @brief Compares this SubRun with another SubRun. The SubRuns must point to * @brief Compares this SubRun with another SubRun. The SubRuns must point to
......
...@@ -7,12 +7,14 @@ ...@@ -7,12 +7,14 @@
#include "hepnos/Run.hpp" #include "hepnos/Run.hpp"
#include "hepnos/RunSet.hpp" #include "hepnos/RunSet.hpp"
#include "hepnos/AsyncEngine.hpp" #include "hepnos/AsyncEngine.hpp"
#include "hepnos/Prefetcher.hpp"
#include "ItemImpl.hpp" #include "ItemImpl.hpp"
#include "DataSetImpl.hpp" #include "DataSetImpl.hpp"
#include "EventSetImpl.hpp" #include "EventSetImpl.hpp"
#include "DataStoreImpl.hpp" #include "DataStoreImpl.hpp"
#include "AsyncEngineImpl.hpp" #include "AsyncEngineImpl.hpp"
#include "WriteBatchImpl.hpp" #include "WriteBatchImpl.hpp"
#include "PrefetcherImpl.hpp"
namespace hepnos { namespace hepnos {
...@@ -97,6 +99,24 @@ bool DataSet::loadRawData(const std::string& key, char* value, size_t* vsize) co ...@@ -97,6 +99,24 @@ bool DataSet::loadRawData(const std::string& key, char* value, size_t* vsize) co
return m_impl->m_datastore->loadRawProduct(id, key, value, vsize); return m_impl->m_datastore->loadRawProduct(id, key, value, vsize);
} }
bool DataSet::loadRawData(const Prefetcher& prefetcher, const std::string& key, std::string& buffer) const {
if(!valid()) {
throw Exception("Calling DataSet member function on an invalid DataSet");
}
(void)prefetcher; // prefetcher isn't usable with a DataSet
ItemDescriptor id(m_impl->m_uuid);
return m_impl->m_datastore->loadRawProduct(id, key, buffer);
}
bool DataSet::loadRawData(const Prefetcher& prefetcher, const std::string& key, char* value, size_t* vsize) const {
if(!valid()) {
throw Exception("Calling DataSet member function on an invalid DataSet");
}
(void)prefetcher; // prefetcher isn't usable with a DataSet
ItemDescriptor id(m_impl->m_uuid);
return m_impl->m_datastore->loadRawProduct(id, key, value, vsize);
}
bool DataSet::operator==(const DataSet& other) const { bool DataSet::operator==(const DataSet& other) const {
bool v1 = valid(); bool v1 = valid();
bool v2 = other.valid(); bool v2 = other.valid();
......
...@@ -5,10 +5,12 @@ ...@@ -5,10 +5,12 @@
*/ */
#include "hepnos/Event.hpp" #include "hepnos/Event.hpp"
#include "hepnos/AsyncEngine.hpp" #include "hepnos/AsyncEngine.hpp"
#include "hepnos/Prefetcher.hpp"
#include "ItemImpl.hpp" #include "ItemImpl.hpp"
#include "DataStoreImpl.hpp" #include "DataStoreImpl.hpp"
#include "WriteBatchImpl.hpp" #include "WriteBatchImpl.hpp"
#include "AsyncEngineImpl.hpp" #include "AsyncEngineImpl.hpp"
#include "PrefetcherImpl.hpp"
namespace hepnos { namespace hepnos {
...@@ -104,6 +106,22 @@ bool Event::loadRawData(const std::string& key, char* value, size_t* vsize) cons ...@@ -104,6 +106,22 @@ bool Event::loadRawData(const std::string& key, char* value, size_t* vsize) cons
return m_impl->m_datastore->loadRawProduct(id, key, value, vsize); return m_impl->m_datastore->loadRawProduct(id, key, value, vsize);
} }
bool Event::loadRawData(const Prefetcher& prefetcher, const std::string& key, std::string& buffer) const {
if(!valid()) {
throw Exception("Calling Event member function on an invalid Event object");
}
auto& id = m_impl->m_descriptor;
return prefetcher.m_impl->loadRawProduct(id, key, buffer);
}
bool Event::loadRawData(const Prefetcher& prefetcher, const std::string& key, char* value, size_t* vsize) const {
if(!valid()) {
throw Exception("Calling DataSet member function on an invalid DataSet");
}
auto& id = m_impl->m_descriptor;
return prefetcher.m_impl->loadRawProduct(id, key, value, vsize);
}
bool Event::operator==(const Event& other) const { bool Event::operator==(const Event& other) const {
bool v1 = valid(); bool v1 = valid();
bool v2 = other.valid(); bool v2 = other.valid();
......
...@@ -34,4 +34,12 @@ void Prefetcher::setBatchSize(unsigned int size) { ...@@ -34,4 +34,12 @@ void Prefetcher::setBatchSize(unsigned int size) {
m_impl->m_batch_size = size; m_impl->m_batch_size = size;
} }
void Prefetcher::fetchProductImpl(const std::string& label, bool fetch=true) const {
if(fetch) {
m_impl->m_active_product_keys.insert(label);
} else {
m_impl->m_active_product_keys.erase(label);
}
}
} }
...@@ -2,6 +2,8 @@ ...@@ -2,6 +2,8 @@
#define __HEPNOS_PREFETCHER_IMPL_HPP #define __HEPNOS_PREFETCHER_IMPL_HPP
#include <set> #include <set>
#include <unordered_set>
#include <unordered_map>
#include "DataStoreImpl.hpp" #include "DataStoreImpl.hpp"
#include "AsyncEngineImpl.hpp" #include "AsyncEngineImpl.hpp"
...@@ -22,8 +24,10 @@ class PrefetcherImpl { ...@@ -22,8 +24,10 @@ class PrefetcherImpl {
std::shared_ptr<AsyncEngineImpl> m_async; std::shared_ptr<AsyncEngineImpl> m_async;
unsigned int m_cache_size = 16; unsigned int m_cache_size = 16;
unsigned int m_batch_size = 1; unsigned int m_batch_size = 1;
mutable std::set<std::shared_ptr<ItemImpl>, ItemPtrComparator> m_cache;
bool m_associated = false; bool m_associated = false;
std::unordered_set<std::string> m_active_product_keys;
mutable std::set<std::shared_ptr<ItemImpl>, ItemPtrComparator> m_item_cache;
mutable std::unordered_map<std::string, std::string> m_product_cache;
PrefetcherImpl(const std::shared_ptr<DataStoreImpl>& ds) PrefetcherImpl(const std::shared_ptr<DataStoreImpl>& ds)
: m_datastore(ds) {} : m_datastore(ds) {}
...@@ -33,19 +37,35 @@ class PrefetcherImpl { ...@@ -33,19 +37,35 @@ class PrefetcherImpl {
: m_datastore(ds) : m_datastore(ds)
, m_async(async) {} , m_async(async) {}
void fetchRequestedProducts(const std::shared_ptr<ItemImpl>& itemImpl) const {
auto& descriptor = itemImpl->m_descriptor;
for(auto& key : m_active_product_keys) {
auto product_id = DataStoreImpl::buildProductID(descriptor, key);
auto it = m_product_cache.find(product_id.m_key);
if(it != m_product_cache.end())
continue;
std::string data;
bool ok = m_datastore->loadRawProduct(product_id, data);
if(ok) {
m_product_cache[product_id.m_key] = std::move(data);
}
}
}
void prefetchFrom(const ItemType& item_type, void prefetchFrom(const ItemType& item_type,
const ItemType& prefix_type, const ItemType& prefix_type,
const std::shared_ptr<ItemImpl>& current, const std::shared_ptr<ItemImpl>& current,
int target=-1) const int target=-1) const
{ {
auto last = current; auto last = current;
while(m_cache.size() != m_cache_size) { while(m_item_cache.size() != m_cache_size) {
std::vector<std::shared_ptr<ItemImpl>> items; std::vector<std::shared_ptr<ItemImpl>> items;
size_t s = m_datastore->nextItems(item_type, prefix_type, last, items, m_batch_size, target); size_t s = m_datastore->nextItems(item_type, prefix_type, last, items, m_batch_size, target);
if(s != 0) if(s != 0)
last = items[items.size()-1]; last = items[items.size()-1];
for(auto& item : items) { for(auto& item : items) {
m_cache.insert(std::move(item)); fetchRequestedProducts(item);
m_item_cache.insert(std::move(item));
} }
if(s < m_batch_size) break; if(s < m_batch_size) break;
} }
...@@ -59,25 +79,53 @@ class PrefetcherImpl { ...@@ -59,25 +79,53 @@ class PrefetcherImpl {
size_t maxItems, size_t maxItems,
int target=-1) const int target=-1) const
{ {
auto ub = m_cache.upper_bound(current); auto ub = m_item_cache.upper_bound(current);
if(ub == m_cache.end()) { if(ub == m_item_cache.end()) {
m_cache.clear(); m_item_cache.clear();
prefetchFrom(item_type, prefix_type, current, target); prefetchFrom(item_type, prefix_type, current, target);
} }
ub = m_cache.upper_bound(current); ub = m_item_cache.upper_bound(current);
result.clear(); result.clear();
if(ub == m_cache.end()) { if(ub == m_item_cache.end()) {
return 0; return 0;
} else { } else {
auto it = ub; auto it = ub;
result.clear(); result.clear();
for(size_t i=0; i < maxItems && it != m_cache.end(); i++, it++) { for(size_t i=0; i < maxItems && it != m_item_cache.end(); i++, it++) {
result.push_back(*it); result.push_back(*it);
} }
m_cache.erase(ub, it); m_item_cache.erase(ub, it);
} }
return result.size(); return result.size();
} }
bool loadRawProduct(const ItemDescriptor& id,
const std::string& productName,
std::string& data) const {
auto product_id = DataStoreImpl::buildProductID(id, productName);
auto it = m_product_cache.find(product_id.m_key);
if(it == m_product_cache.end()) {
return m_datastore->loadRawProduct(product_id, data);
} else {
data = std::move(it->second);
m_product_cache.erase(it);
return true;
}
}
bool loadRawProduct(const ItemDescriptor& id,
const std::string& productName,
char* value, size_t* vsize) const {
auto product_id = DataStoreImpl::buildProductID(id, productName);
auto it = m_product_cache.find(product_id.m_key);
if(it == m_product_cache.end()) {
return m_datastore->loadRawProduct(id, productName, value, vsize);
} else {
*vsize = it->second.size();
std::memcpy(value, it->second.data(), *vsize);
return true;
}
}
}; };
} }
......
...@@ -146,6 +146,22 @@ bool Run::loadRawData(const std::string& key, char* value, size_t* vsize) const ...@@ -146,6 +146,22 @@ bool Run::loadRawData(const std::string& key, char* value, size_t* vsize) const
return m_impl->m_datastore->loadRawProduct(id, key, value, vsize); return m_impl->m_datastore->loadRawProduct(id, key, value, vsize);
} }
bool Run::loadRawData(const Prefetcher& prefetcher, const std::string& key, std::string& buffer) const {
if(!valid()) {
throw Exception("Calling Run member function on an invalid Run object");
}
const ItemDescriptor& id = m_impl->m_descriptor;
return prefetcher.m_impl->loadRawProduct(id, key, buffer);
}
bool Run::loadRawData(const Prefetcher& prefetcher, const std::string& key, char* value, size_t* vsize) const {
if(!valid()) {
throw Exception("Calling DataSet member function on an invalid DataSet");
}
const ItemDescriptor& id = m_impl->m_descriptor;
return prefetcher.m_impl->loadRawProduct(id, key, value, vsize);
}
bool Run::operator==(const Run& other) const { bool Run::operator==(const Run& other) const {
bool v1 = valid(); bool v1 = valid();
bool v2 = other.valid(); bool v2 = other.valid();
...@@ -228,6 +244,7 @@ Run::iterator Run::find(const SubRunNumber& subRunNumber, const Prefetcher& pref ...@@ -228,6 +244,7 @@ Run::iterator Run::find(const SubRunNumber& subRunNumber, const Prefetcher& pref
auto it = find(subRunNumber); auto it = find(subRunNumber);
if(it != end()) { if(it != end()) {
it.m_impl->setPrefetcher(prefetcher.m_impl); it.m_impl->setPrefetcher(prefetcher.m_impl);
prefetcher.m_impl->fetchRequestedProducts(it.m_impl->m_current_subrun.m_impl);
prefetcher.m_impl->prefetchFrom(ItemType::SUBRUN, ItemType::RUN, it.m_impl->m_current_subrun.m_impl); prefetcher.m_impl->prefetchFrom(ItemType::SUBRUN, ItemType::RUN, it.m_impl->m_current_subrun.m_impl);
} }
return it; return it;
...@@ -256,6 +273,7 @@ Run::iterator Run::begin(const Prefetcher& prefetcher) { ...@@ -256,6 +273,7 @@ Run::iterator Run::begin(const Prefetcher& prefetcher) {
auto it = begin(); auto it = begin();
if(it != end()) { if(it != end()) {
it.m_impl->setPrefetcher(prefetcher.m_impl); it.m_impl->setPrefetcher(prefetcher.m_impl);
prefetcher.m_impl->fetchRequestedProducts(it.m_impl->m_current_subrun.m_impl);
prefetcher.m_impl->prefetchFrom(ItemType::SUBRUN, ItemType::RUN, it.m_impl->m_current_subrun.m_impl); prefetcher.m_impl->prefetchFrom(ItemType::SUBRUN, ItemType::RUN, it.m_impl->m_current_subrun.m_impl);
} }
return it; return it;
...@@ -330,6 +348,7 @@ Run::iterator Run::lower_bound(const SubRunNumber& lb, const Prefetcher& prefetc ...@@ -330,6 +348,7 @@ Run::iterator Run::lower_bound(const SubRunNumber& lb, const Prefetcher& prefetc
auto it = lower_bound(lb); auto it = lower_bound(lb);
if(it != end()) { if(it != end()) {
it.m_impl->setPrefetcher(prefetcher.m_impl); it.m_impl->setPrefetcher(prefetcher.m_impl);
prefetcher.m_impl->fetchRequestedProducts(it.m_impl->m_current_subrun.m_impl);
prefetcher.m_impl->prefetchFrom(ItemType::SUBRUN, ItemType::RUN, it.m_impl->m_current_subrun.m_impl); prefetcher.m_impl->prefetchFrom(ItemType::SUBRUN, ItemType::RUN, it.m_impl->m_current_subrun.m_impl);
} }
return it; return it;
...@@ -359,6 +378,7 @@ Run::iterator Run::upper_bound(const SubRunNumber& ub, const Prefetcher& prefetc ...@@ -359,6 +378,7 @@ Run::iterator Run::upper_bound(const SubRunNumber& ub, const Prefetcher& prefetc
auto it = upper_bound(ub); auto it = upper_bound(ub);
if(it != end()) { if(it != end()) {
it.m_impl->setPrefetcher(prefetcher.m_impl); it.m_impl->setPrefetcher(prefetcher.m_impl);
prefetcher.m_impl->fetchRequestedProducts(it.m_impl->m_current_subrun.m_impl);
prefetcher.m_impl->prefetchFrom(ItemType::SUBRUN, ItemType::RUN, it.m_impl->m_current_subrun.m_impl); prefetcher.m_impl->prefetchFrom(ItemType::SUBRUN, ItemType::RUN, it.m_impl->m_current_subrun.m_impl);
} }
return it; return it;
......
...@@ -136,6 +136,7 @@ RunSet::iterator RunSet::begin(const Prefetcher& prefetcher) { ...@@ -136,6 +136,7 @@ RunSet::iterator RunSet::begin(const Prefetcher& prefetcher) {
auto it = begin(); auto it = begin();
if(it != end()) { if(it != end()) {
it.m_impl->setPrefetcher(prefetcher.m_impl); it.m_impl->setPrefetcher(prefetcher.m_impl);
prefetcher.m_impl->fetchRequestedProducts(it.m_impl->m_current_run.m_impl);
prefetcher.m_impl->prefetchFrom(ItemType::RUN, ItemType::DATASET, it.m_impl->m_current_run.m_impl); prefetcher.m_impl->prefetchFrom(ItemType::RUN, ItemType::DATASET, it.m_impl->m_current_run.m_impl);
} }
return it; return it;
...@@ -153,6 +154,7 @@ RunSet::const_iterator RunSet::cbegin(const Prefetcher& prefetcher) const { ...@@ -153,6 +154,7 @@ RunSet::const_iterator RunSet::cbegin(const Prefetcher& prefetcher) const {
auto it = cbegin(); auto it = cbegin();
if(it != cend()) { if(it != cend()) {
it.m_impl->setPrefetcher(prefetcher.m_impl); it.m_impl->setPrefetcher(prefetcher.m_impl);
prefetcher.m_impl->fetchRequestedProducts(it.m_impl->m_current_run.m_impl);
prefetcher.m_impl->prefetchFrom(ItemType::RUN, ItemType::DATASET, it.m_impl->m_current_run.m_impl); prefetcher.m_impl->prefetchFrom(ItemType::RUN, ItemType::DATASET, it.m_impl->m_current_run.m_impl);
} }
return it; return it;
...@@ -170,6 +172,7 @@ RunSet::const_iterator RunSet::begin(const Prefetcher& prefetcher) const { ...@@ -170,6 +172,7 @@ RunSet::const_iterator RunSet::begin(const Prefetcher& prefetcher) const {
auto it = const_iterator(const_cast<RunSet*>(this)->begin()); auto it = const_iterator(const_cast<RunSet*>(this)->begin());
if(it != end()) { if(it != end()) {
it.m_impl->setPrefetcher(prefetcher.m_impl); it.m_impl->setPrefetcher(prefetcher.m_impl);
prefetcher.m_impl->fetchRequestedProducts(it.m_impl->m_current_run.m_impl);
prefetcher.m_impl->prefetchFrom(ItemType::RUN, ItemType::DATASET, it.m_impl->m_current_run.m_impl); prefetcher.m_impl->prefetchFrom(ItemType::RUN, ItemType::DATASET, it.m_impl->m_current_run.m_impl);
} }
return it; return it;
...@@ -211,6 +214,7 @@ RunSet::iterator RunSet::lower_bound(const RunNumber& lb, const Prefetcher& pref ...@@ -211,6 +214,7 @@ RunSet::iterator RunSet::lower_bound(const RunNumber& lb, const Prefetcher& pref