Commit c70bfcba authored by Matthieu Dorier's avatar Matthieu Dorier

enabled statistics in prefetcher

parent 8669ee23
...@@ -9,6 +9,7 @@ ...@@ -9,6 +9,7 @@
#include <memory> #include <memory>
#include <hepnos/Demangle.hpp> #include <hepnos/Demangle.hpp>
#include <hepnos/Prefetchable.hpp> #include <hepnos/Prefetchable.hpp>
#include <hepnos/Statistics.hpp>
namespace hepnos { namespace hepnos {
...@@ -21,6 +22,13 @@ class Run; ...@@ -21,6 +22,13 @@ class Run;
class SubRun; class SubRun;
class Event; class Event;
struct PrefetcherStatistics {
Statistics<size_t,double> batch_sizes;
Statistics<size_t,double> product_sizes;
size_t product_cache_hit = 0;
size_t product_cache_miss = 0;
};
/** /**
* @brief The Prefetcher object will actively try to prefetch * @brief The Prefetcher object will actively try to prefetch
* items from the underlying DataStore when using iterators. * items from the underlying DataStore when using iterators.
...@@ -134,6 +142,20 @@ class Prefetcher { ...@@ -134,6 +142,20 @@ class Prefetcher {
fetchProductImpl(label + "#" + demangle<V>(), fetch); fetchProductImpl(label + "#" + demangle<V>(), fetch);
} }
/**
* @brief Activate statistics collection.
*
* @param activate Whether to activate statistics.
*/
void activateStatistics(bool activate=true);
/**
* @brief Collects the usage statistics.
*
* @param stats PrefetcherStatistics object to fill.
*/
void collectStatistics(PrefetcherStatistics& stats) const;
private: private:
std::shared_ptr<PrefetcherImpl> m_impl; std::shared_ptr<PrefetcherImpl> m_impl;
......
...@@ -47,4 +47,17 @@ void Prefetcher::fetchProductImpl(const std::string& label, bool fetch=true) con ...@@ -47,4 +47,17 @@ void Prefetcher::fetchProductImpl(const std::string& label, bool fetch=true) con
} }
} }
void Prefetcher::activateStatistics(bool activate) {
if(activate) {
if(m_impl->m_stats) return;
m_impl->m_stats = std::make_unique<PrefetcherStatistics>();
} else {
m_impl->m_stats.reset();
}
}
void Prefetcher::collectStatistics(PrefetcherStatistics& stats) const {
m_impl->collectStatistics(stats);
}
} }
...@@ -20,6 +20,9 @@ class PrefetcherImpl { ...@@ -20,6 +20,9 @@ class PrefetcherImpl {
} }
}; };
mutable std::unique_ptr<PrefetcherStatistics> m_stats;
mutable tl::mutex m_stats_mtx;
std::shared_ptr<DataStoreImpl> m_datastore; std::shared_ptr<DataStoreImpl> m_datastore;
unsigned int m_cache_size = 16; unsigned int m_cache_size = 16;
unsigned int m_batch_size = 1; unsigned int m_batch_size = 1;
...@@ -33,6 +36,16 @@ class PrefetcherImpl { ...@@ -33,6 +36,16 @@ class PrefetcherImpl {
virtual ~PrefetcherImpl() = default; virtual ~PrefetcherImpl() = default;
void update_batch_statistics(size_t batch_size) const {
if(!m_stats) return;
m_stats->batch_sizes.updateWith(batch_size);
}
void update_product_statistics(size_t psize) const {
if(!m_stats) return;
m_stats->product_sizes.updateWith(psize);
}
virtual void fetchRequestedProducts(const std::shared_ptr<ItemImpl>& itemImpl) const = 0; virtual void fetchRequestedProducts(const std::shared_ptr<ItemImpl>& itemImpl) const = 0;
virtual void prefetchFrom(const ItemType& item_type, virtual void prefetchFrom(const ItemType& item_type,
...@@ -55,6 +68,12 @@ class PrefetcherImpl { ...@@ -55,6 +68,12 @@ class PrefetcherImpl {
virtual bool loadRawProduct(const ItemDescriptor& id, virtual bool loadRawProduct(const ItemDescriptor& id,
const std::string& productName, const std::string& productName,
char* value, size_t* vsize) const = 0; char* value, size_t* vsize) const = 0;
void collectStatistics(PrefetcherStatistics& stats) const {
std::unique_lock<tl::mutex> lock(m_stats_mtx);
if(m_stats)
stats = *m_stats;
}
}; };
} }
......
...@@ -27,6 +27,7 @@ class SyncPrefetcherImpl : public PrefetcherImpl { ...@@ -27,6 +27,7 @@ class SyncPrefetcherImpl : public PrefetcherImpl {
std::string data; std::string data;
bool ok = m_datastore->loadRawProduct(product_id, data); bool ok = m_datastore->loadRawProduct(product_id, data);
if(ok) { if(ok) {
update_product_statistics(data.size());
m_product_cache[product_id.m_key] = std::move(data); m_product_cache[product_id.m_key] = std::move(data);
} }
} }
...@@ -41,8 +42,10 @@ class SyncPrefetcherImpl : public PrefetcherImpl { ...@@ -41,8 +42,10 @@ class SyncPrefetcherImpl : public PrefetcherImpl {
while(m_item_cache.size() != m_cache_size) { while(m_item_cache.size() != m_cache_size) {
std::vector<std::shared_ptr<ItemImpl>> items; std::vector<std::shared_ptr<ItemImpl>> items;
size_t s = m_datastore->nextItems(item_type, prefix_type, last, items, m_batch_size, target); size_t s = m_datastore->nextItems(item_type, prefix_type, last, items, m_batch_size, target);
if(s != 0) if(s != 0) {
update_batch_statistics(s);
last = items[items.size()-1]; last = items[items.size()-1];
}
for(auto& item : items) { for(auto& item : items) {
fetchRequestedProducts(item); fetchRequestedProducts(item);
m_item_cache.insert(std::move(item)); m_item_cache.insert(std::move(item));
...@@ -85,8 +88,10 @@ class SyncPrefetcherImpl : public PrefetcherImpl { ...@@ -85,8 +88,10 @@ class SyncPrefetcherImpl : public PrefetcherImpl {
auto product_id = DataStoreImpl::buildProductID(id, productName); auto product_id = DataStoreImpl::buildProductID(id, productName);
auto it = m_product_cache.find(product_id.m_key); auto it = m_product_cache.find(product_id.m_key);
if(it == m_product_cache.end()) { if(it == m_product_cache.end()) {
m_stats->product_cache_miss += 1;
return m_datastore->loadRawProduct(product_id, data); return m_datastore->loadRawProduct(product_id, data);
} else { } else {
m_stats->product_cache_hit += 1;
data = std::move(it->second); data = std::move(it->second);
m_product_cache.erase(it); m_product_cache.erase(it);
return true; return true;
...@@ -99,9 +104,11 @@ class SyncPrefetcherImpl : public PrefetcherImpl { ...@@ -99,9 +104,11 @@ class SyncPrefetcherImpl : public PrefetcherImpl {
auto product_id = DataStoreImpl::buildProductID(id, productName); auto product_id = DataStoreImpl::buildProductID(id, productName);
auto it = m_product_cache.find(product_id.m_key); auto it = m_product_cache.find(product_id.m_key);
if(it == m_product_cache.end()) { if(it == m_product_cache.end()) {
if(m_stats) m_stats->product_cache_miss += 1;
return m_datastore->loadRawProduct(id, productName, value, vsize); return m_datastore->loadRawProduct(id, productName, value, vsize);
} else { } else {
*vsize = it->second.size(); *vsize = it->second.size();
if(m_stats) m_stats->product_cache_hit += 1;
std::memcpy(value, it->second.data(), *vsize); std::memcpy(value, it->second.data(), *vsize);
return true; return true;
} }
......
...@@ -22,7 +22,12 @@ void WriteBatch::flush() { ...@@ -22,7 +22,12 @@ void WriteBatch::flush() {
} }
void WriteBatch::activateStatistics(bool activate) { void WriteBatch::activateStatistics(bool activate) {
m_impl->m_stats_enabled = activate; if(activate) {
if(m_impl->m_stats) return;
m_impl->m_stats = std::make_unique<WriteBatchStatistics>();
} else {
m_impl->m_stats.reset();
}
} }
void WriteBatch::collectStatistics(WriteBatchStatistics& stats) const { void WriteBatch::collectStatistics(WriteBatchStatistics& stats) const {
......
...@@ -31,8 +31,7 @@ class WriteBatchImpl { ...@@ -31,8 +31,7 @@ class WriteBatchImpl {
typedef std::unordered_map<const sdskv::database*, keyvals> entries_type; typedef std::unordered_map<const sdskv::database*, keyvals> entries_type;
WriteBatchStatistics m_stats; std::unique_ptr<WriteBatchStatistics> m_stats;
bool m_stats_enabled;
mutable tl::mutex m_stats_mtx; mutable tl::mutex m_stats_mtx;
std::shared_ptr<DataStoreImpl> m_datastore; std::shared_ptr<DataStoreImpl> m_datastore;
...@@ -45,15 +44,15 @@ class WriteBatchImpl { ...@@ -45,15 +44,15 @@ class WriteBatchImpl {
bool m_async_thread_should_stop = false; bool m_async_thread_should_stop = false;
void update_keyval_statistics(size_t ksize, size_t vsize) { void update_keyval_statistics(size_t ksize, size_t vsize) {
if(!m_stats_enabled) return; if(!m_stats) return;
m_stats.key_sizes.updateWith(ksize); m_stats->key_sizes.updateWith(ksize);
if(vsize) if(vsize)
m_stats.value_sizes.updateWith(vsize); m_stats->value_sizes.updateWith(vsize);
} }
void update_operation_statistics(size_t batch_size) { void update_operation_statistics(size_t batch_size) {
if(!m_stats_enabled) return; if(!m_stats) return;
m_stats.batch_sizes.updateWith(batch_size); m_stats->batch_sizes.updateWith(batch_size);
} }
static void writer_thread(WriteBatchImpl& wb, static void writer_thread(WriteBatchImpl& wb,
...@@ -229,7 +228,8 @@ class WriteBatchImpl { ...@@ -229,7 +228,8 @@ class WriteBatchImpl {
void collectStatistics(WriteBatchStatistics& stats) const { void collectStatistics(WriteBatchStatistics& stats) const {
std::unique_lock<tl::mutex> lock(m_stats_mtx); std::unique_lock<tl::mutex> lock(m_stats_mtx);
stats = m_stats; if(m_stats)
stats = *m_stats;
} }
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment