Commit 4df770f2 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

corrected writebatch to include statistics

parent 577af76a
......@@ -26,6 +26,7 @@ class WriteBatchImpl {
public:
struct keyvals {
size_t m_size = 0;
std::string m_packed_keys;
std::vector<hg_size_t> m_packed_key_sizes;
std::string m_packed_vals;
......@@ -64,7 +65,8 @@ class WriteBatchImpl {
m_stats->batch_sizes.updateWith(batch_size);
}
static void writer_thread(const sdskv::database* db,
static void writer_thread(WriteBatchImpl& wb,
const sdskv::database* db,
std::queue<keyvals>& kvs_queue,
Exception* exception,
char* ok) {
......@@ -80,11 +82,12 @@ class WriteBatchImpl {
*exception = Exception(std::string("SDSKV error: ")+ex.what());
}
}
wb.update_operation_statistics(batch.m_size);
kvs_queue.pop();
}
}
static void spawn_writer_threads(entries_type& entries, tl::pool& pool) {
static void spawn_writer_threads(WriteBatchImpl& wb, entries_type& entries, tl::pool& pool) {
auto num_threads = entries.size();
std::vector<tl::managed<tl::thread>> threads;
std::vector<Exception> exceptions(num_threads);
......@@ -93,8 +96,8 @@ class WriteBatchImpl {
for(auto& e : entries) {
char* ok = &oks[i];
Exception* ex = &exceptions[i];
threads.push_back(pool.make_thread([&e, ok, ex]() {
writer_thread(e.first, e.second, ex, ok);
threads.push_back(pool.make_thread([&wb, &e, ok, ex]() {
writer_thread(wb, e.first, e.second, ex, ok);
}));
i += 1;
}
......@@ -118,7 +121,7 @@ class WriteBatchImpl {
auto entries = std::move(batch.m_entries);
batch.m_entries.clear();
lock.unlock();
spawn_writer_threads(entries, batch.m_async_engine->m_pool);
spawn_writer_threads(batch, entries, batch.m_async_engine->m_pool);
}
}
......@@ -161,6 +164,7 @@ class WriteBatchImpl {
auto& kv_batch = entry_queue.back();
kv_batch.m_packed_keys += product_id.m_key;
kv_batch.m_packed_key_sizes.push_back(product_id.m_key.size());
kv_batch.m_size += 1;
if(vsize != 0) {
size_t offset = kv_batch.m_packed_vals.size();
kv_batch.m_packed_vals.resize(offset + vsize);
......@@ -213,7 +217,7 @@ class WriteBatchImpl {
reinterpret_cast<char*>(&id), sizeof(id));
kv_batch.m_packed_key_sizes.push_back(sizeof(id));
kv_batch.m_packed_val_sizes.push_back(0);
kv_batch.m_size += 1;
update_keyval_statistics(sizeof(id), 0);
}
if(was_empty) {
......@@ -225,7 +229,7 @@ class WriteBatchImpl {
void flush() {
if(!m_async_engine) { // flush everything here
tl::xstream es = tl::xstream::self();
spawn_writer_threads(m_entries, es.get_main_pools(1)[0]);
spawn_writer_threads(*this, m_entries, es.get_main_pools(1)[0]);
} else { // wait for AsyncEngine to have flushed everything
{
std::lock_guard<tl::mutex> lock(m_mutex);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment