diff --git a/include/hepnos/Exception.hpp b/include/hepnos/Exception.hpp index cfa4a8fdf621f5236a3bc21dbb8270fa74f925a1..d0e8b35db57fd635820c6ecbd7464294497380b0 100644 --- a/include/hepnos/Exception.hpp +++ b/include/hepnos/Exception.hpp @@ -17,7 +17,10 @@ class Exception : public std::exception public: + Exception() = default; Exception(const std::string& msg) : m_msg(msg){} + Exception(const Exception&) = default; + Exception& operator=(const Exception&) = default; virtual const char* what() const noexcept override { diff --git a/src/private/WriteBatchImpl.hpp b/src/private/WriteBatchImpl.hpp index 464bfc4e34304c224e39d2ee7f53a6042d0856c2..3d8fcf8f39cacb854fddf0f6c228abd08c5dccdc 100644 --- a/src/private/WriteBatchImpl.hpp +++ b/src/private/WriteBatchImpl.hpp @@ -17,6 +17,29 @@ namespace hepnos { class WriteBatch::Impl { + struct writer_thread_args { + unsigned long db_idx = 0; + std::pair, std::vector>* keyvals = nullptr; + Impl* batch = nullptr; + Exception ex; + bool ok = true; + }; + + static void writer_thread(void* x) { + writer_thread_args* args = static_cast(x); + auto batch = args->batch; + auto db_idx = args->db_idx; + auto keyvals = args->keyvals; + const auto& keys = keyvals->first; + const auto& vals = keyvals->second; + try { + batch->m_datastore->m_impl->storeMultiple(db_idx, keys, vals); + } catch(Exception& ex) { + args->ok = false; + args->ex = ex; + } + } + public: DataStore* m_datastore; @@ -46,12 +69,26 @@ class WriteBatch::Impl { } void flush() { - for(const auto& e : m_entries) { + ABT_xstream es = ABT_XSTREAM_NULL; + ABT_xstream_self(&es); + auto num_threads = m_entries.size(); + std::vector threads(num_threads); + std::vector args(num_threads); + unsigned i=0; + for(auto& e : m_entries) { auto db_idx = e.first; - const auto& keys = e.second.first; - const auto& vals = e.second.second; - m_datastore->m_impl->storeMultiple(db_idx, keys, vals); + args[i].db_idx = e.first; + args[i].batch = this; + args[i].keyvals = &e.second; + ABT_thread_create_on_xstream(es, &writer_thread, &args[i], ABT_THREAD_ATTR_NULL, &threads[i]); + i += 1; + } + ABT_thread_join_many(num_threads, threads.data()); + ABT_thread_free_many(num_threads, threads.data()); + for(auto& a : args) { + if(not a.ok) throw a.ex; } + m_entries.clear(); } ~Impl() {