From 1a3f5f7fb9fee8ce9ffcffbf047e2a0963663d40 Mon Sep 17 00:00:00 2001 From: Matthieu Dorier Date: Tue, 3 Sep 2019 08:13:05 -0500 Subject: [PATCH] implemented parallel flush --- include/hepnos/Exception.hpp | 3 +++ src/private/WriteBatchImpl.hpp | 45 +++++++++++++++++++++++++++++++--- 2 files changed, 44 insertions(+), 4 deletions(-) diff --git a/include/hepnos/Exception.hpp b/include/hepnos/Exception.hpp index cfa4a8f..d0e8b35 100644 --- a/include/hepnos/Exception.hpp +++ b/include/hepnos/Exception.hpp @@ -17,7 +17,10 @@ class Exception : public std::exception public: + Exception() = default; Exception(const std::string& msg) : m_msg(msg){} + Exception(const Exception&) = default; + Exception& operator=(const Exception&) = default; virtual const char* what() const noexcept override { diff --git a/src/private/WriteBatchImpl.hpp b/src/private/WriteBatchImpl.hpp index 464bfc4..3d8fcf8 100644 --- a/src/private/WriteBatchImpl.hpp +++ b/src/private/WriteBatchImpl.hpp @@ -17,6 +17,29 @@ namespace hepnos { class WriteBatch::Impl { + struct writer_thread_args { + unsigned long db_idx = 0; + std::pair, std::vector>* keyvals = nullptr; + Impl* batch = nullptr; + Exception ex; + bool ok = true; + }; + + static void writer_thread(void* x) { + writer_thread_args* args = static_cast(x); + auto batch = args->batch; + auto db_idx = args->db_idx; + auto keyvals = args->keyvals; + const auto& keys = keyvals->first; + const auto& vals = keyvals->second; + try { + batch->m_datastore->m_impl->storeMultiple(db_idx, keys, vals); + } catch(Exception& ex) { + args->ok = false; + args->ex = ex; + } + } + public: DataStore* m_datastore; @@ -46,12 +69,26 @@ class WriteBatch::Impl { } void flush() { - for(const auto& e : m_entries) { + ABT_xstream es = ABT_XSTREAM_NULL; + ABT_xstream_self(&es); + auto num_threads = m_entries.size(); + std::vector threads(num_threads); + std::vector args(num_threads); + unsigned i=0; + for(auto& e : m_entries) { auto db_idx = e.first; - const auto& keys = e.second.first; - const auto& vals = e.second.second; - m_datastore->m_impl->storeMultiple(db_idx, keys, vals); + args[i].db_idx = e.first; + args[i].batch = this; + args[i].keyvals = &e.second; + ABT_thread_create_on_xstream(es, &writer_thread, &args[i], ABT_THREAD_ATTR_NULL, &threads[i]); + i += 1; + } + ABT_thread_join_many(num_threads, threads.data()); + ABT_thread_free_many(num_threads, threads.data()); + for(auto& a : args) { + if(not a.ok) throw a.ex; } + m_entries.clear(); } ~Impl() { -- 2.26.2