Commit 8669ee23 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

added statistics in WriteBatch

parent d4c1d4a5
/*
* (C) 2019 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __HEPNOS_STATISTICS_H
#define __HEPNOS_STATISTICS_H
namespace hepnos {
/**
* @brief Statistics is a simple helper class to compute running statistics.
*
* @tparam Number
* @tparam Double
*/
template<typename Number, typename Double=double>
struct Statistics {
size_t num = 0;
Number max = 0;
Number min = 0;
Double avg = 0;
Double var = 0;
void updateWith(Number value) {
if(num == 0) min = value;
auto n = num;
if(max < value) max = value;
if(min > value) min = value;
Double wn = ((Double)n)/((Double)(n+1));
Double w1 = 1.0/((Double)(n+1));
Double avg_n = avg;
Double var_n = var;
avg = wn*avg_n + w1*value;
var = wn*(var_n + avg_n*avg_n)
+ w1*value*value
- avg*avg;
}
};
}
#endif
......@@ -14,6 +14,7 @@
#include <hepnos/ProductID.hpp>
#include <hepnos/DataStore.hpp>
#include <hepnos/Exception.hpp>
#include <hepnos/Statistics.hpp>
namespace hepnos {
......@@ -25,6 +26,12 @@ class Event;
class WriteBatchImpl;
class AsyncEngine;
struct WriteBatchStatistics {
Statistics<size_t> batch_sizes;
Statistics<size_t> key_sizes;
Statistics<size_t> value_sizes; // only non-empty values are accounted for
};
/**
* @brief The WriteBatch oject can be used to batch
* operations such as creating Runs, SubRuns, and Events,
......@@ -100,6 +107,20 @@ class WriteBatch {
* everything is flushed.
*/
void flush();
/**
* @brief Activate statistics collection.
*
* @param activate Whether to activate statistics.
*/
void activateStatistics(bool activate=true);
/**
* @brief Collects the usage statistics.
*
* @param stats WriteBatchStatistics object to fill.
*/
void collectStatistics(WriteBatchStatistics& stats) const;
};
}
......
......@@ -21,4 +21,12 @@ void WriteBatch::flush() {
m_impl->flush();
}
void WriteBatch::activateStatistics(bool activate) {
m_impl->m_stats_enabled = activate;
}
void WriteBatch::collectStatistics(WriteBatchStatistics& stats) const {
m_impl->collectStatistics(stats);
}
}
......@@ -22,6 +22,8 @@ namespace hepnos {
class WriteBatchImpl {
public:
struct keyvals {
std::vector<std::string> m_keys;
std::vector<std::string> m_values;
......@@ -29,16 +31,33 @@ class WriteBatchImpl {
typedef std::unordered_map<const sdskv::database*, keyvals> entries_type;
std::shared_ptr<DataStoreImpl> m_datastore;
std::shared_ptr<AsyncEngineImpl> m_async_engine;
entries_type m_entries;
unsigned m_max_batch_size;
tl::condition_variable m_cond;
tl::mutex m_mutex;
std::vector<tl::managed<tl::thread>> m_async_thread;
bool m_async_thread_should_stop = false;
WriteBatchStatistics m_stats;
bool m_stats_enabled;
mutable tl::mutex m_stats_mtx;
std::shared_ptr<DataStoreImpl> m_datastore;
std::shared_ptr<AsyncEngineImpl> m_async_engine;
entries_type m_entries;
unsigned m_max_batch_size;
tl::condition_variable m_cond;
tl::mutex m_mutex;
std::vector<tl::managed<tl::thread>> m_async_thread;
bool m_async_thread_should_stop = false;
void update_keyval_statistics(size_t ksize, size_t vsize) {
if(!m_stats_enabled) return;
m_stats.key_sizes.updateWith(ksize);
if(vsize)
m_stats.value_sizes.updateWith(vsize);
}
static void writer_thread(unsigned max_batch_size,
void update_operation_statistics(size_t batch_size) {
if(!m_stats_enabled) return;
m_stats.batch_sizes.updateWith(batch_size);
}
static void writer_thread(WriteBatchImpl& wb,
unsigned max_batch_size,
const sdskv::database* db,
const std::vector<std::string>& keys,
const std::vector<std::string>& vals,
......@@ -65,6 +84,7 @@ class WriteBatchImpl {
batch_vals_sizes.resize(this_batch_size);
try {
db->put_multi(batch_keys, batch_keys_sizes, batch_vals, batch_vals_sizes);
wb.update_operation_statistics(this_batch_size);
} catch(sdskv::exception& ex) {
if(ex.error() != SDSKV_ERR_KEYEXISTS) {
*ok = 0;
......@@ -75,7 +95,7 @@ class WriteBatchImpl {
}
}
static void spawn_writer_threads(unsigned max_batch_size, entries_type& entries, tl::pool& pool) {
static void spawn_writer_threads(WriteBatchImpl& wb, unsigned max_batch_size, entries_type& entries, tl::pool& pool) {
auto num_threads = entries.size();
std::vector<tl::managed<tl::thread>> threads;
std::vector<Exception> exceptions(num_threads);
......@@ -84,8 +104,8 @@ class WriteBatchImpl {
for(auto& e : entries) {
char* ok = &oks[i];
Exception* ex = &exceptions[i];
threads.push_back(pool.make_thread([max_batch_size, &e, ok, ex]() {
writer_thread(max_batch_size, e.first, e.second.m_keys, e.second.m_values, ex, ok);
threads.push_back(pool.make_thread([&wb, max_batch_size, &e, ok, ex]() {
writer_thread(wb, max_batch_size, e.first, e.second.m_keys, e.second.m_values, ex, ok);
}));
i += 1;
}
......@@ -109,7 +129,7 @@ class WriteBatchImpl {
auto entries = std::move(batch.m_entries);
batch.m_entries.clear();
lock.unlock();
spawn_writer_threads(batch.m_max_batch_size, entries, batch.m_async_engine->m_pool);
spawn_writer_threads(batch, batch.m_max_batch_size, entries, batch.m_async_engine->m_pool);
}
}
......@@ -146,6 +166,7 @@ class WriteBatchImpl {
keyvals& entry = m_entries[&db];
entry.m_keys.push_back(product_id.m_key);
entry.m_values.emplace_back(value, vsize);
update_keyval_statistics(product_id.m_key.size(), vsize);
}
if(was_empty) {
m_cond.notify_one();
......@@ -180,6 +201,7 @@ class WriteBatchImpl {
auto& entry = m_entries[&db];
entry.m_keys.push_back(std::string(reinterpret_cast<char*>(&id), sizeof(id)));
entry.m_values.emplace_back();
update_keyval_statistics(sizeof(id), 0);
}
if(was_empty) {
m_cond.notify_one();
......@@ -190,7 +212,7 @@ class WriteBatchImpl {
void flush() {
if(!m_async_engine) { // flush everything here
tl::xstream es = tl::xstream::self();
spawn_writer_threads(m_max_batch_size, m_entries, es.get_main_pools(1)[0]);
spawn_writer_threads(*this, m_max_batch_size, m_entries, es.get_main_pools(1)[0]);
} else { // wait for AsyncEngine to have flushed everything
{
std::lock_guard<tl::mutex> lock(m_mutex);
......@@ -204,6 +226,11 @@ class WriteBatchImpl {
~WriteBatchImpl() {
flush();
}
void collectStatistics(WriteBatchStatistics& stats) const {
std::unique_lock<tl::mutex> lock(m_stats_mtx);
stats = m_stats;
}
};
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment