Commit 1a3f5f7f authored by Matthieu Dorier's avatar Matthieu Dorier

implemented parallel flush

parent d7048475
......@@ -17,7 +17,10 @@ class Exception : public std::exception
public:
Exception() = default;
Exception(const std::string& msg) : m_msg(msg){}
Exception(const Exception&) = default;
Exception& operator=(const Exception&) = default;
virtual const char* what() const noexcept override
{
......
......@@ -17,6 +17,29 @@ namespace hepnos {
class WriteBatch::Impl {
struct writer_thread_args {
unsigned long db_idx = 0;
std::pair<std::vector<std::string>, std::vector<std::string>>* keyvals = nullptr;
Impl* batch = nullptr;
Exception ex;
bool ok = true;
};
static void writer_thread(void* x) {
writer_thread_args* args = static_cast<writer_thread_args*>(x);
auto batch = args->batch;
auto db_idx = args->db_idx;
auto keyvals = args->keyvals;
const auto& keys = keyvals->first;
const auto& vals = keyvals->second;
try {
batch->m_datastore->m_impl->storeMultiple(db_idx, keys, vals);
} catch(Exception& ex) {
args->ok = false;
args->ex = ex;
}
}
public:
DataStore* m_datastore;
......@@ -46,12 +69,26 @@ class WriteBatch::Impl {
}
void flush() {
for(const auto& e : m_entries) {
ABT_xstream es = ABT_XSTREAM_NULL;
ABT_xstream_self(&es);
auto num_threads = m_entries.size();
std::vector<ABT_thread> threads(num_threads);
std::vector<writer_thread_args> args(num_threads);
unsigned i=0;
for(auto& e : m_entries) {
auto db_idx = e.first;
const auto& keys = e.second.first;
const auto& vals = e.second.second;
m_datastore->m_impl->storeMultiple(db_idx, keys, vals);
args[i].db_idx = e.first;
args[i].batch = this;
args[i].keyvals = &e.second;
ABT_thread_create_on_xstream(es, &writer_thread, &args[i], ABT_THREAD_ATTR_NULL, &threads[i]);
i += 1;
}
ABT_thread_join_many(num_threads, threads.data());
ABT_thread_free_many(num_threads, threads.data());
for(auto& a : args) {
if(not a.ok) throw a.ex;
}
m_entries.clear();
}
~Impl() {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment