Commit 9223062e authored by Matthieu Dorier's avatar Matthieu Dorier

added async engine

parent f6dde05d
#ifndef __HEPNOS_H
#define __HEPNOS_H
#include <hepnos/AsyncEngine.hpp>
#include <hepnos/DataStore.hpp>
#include <hepnos/DataSet.hpp>
#include <hepnos/Demangle.hpp>
......
/*
* (C) 2019 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __HEPNOS_ASYNC_ENGINE_H
#define __HEPNOS_ASYNC_ENGINE_H
#include <memory>
namespace hepnos {
class DataStore;
class AsyncEngineImpl;
class WriteBatch;
class AsyncEngine {
friend class WriteBatch;
private:
std::shared_ptr<AsyncEngineImpl> m_impl;
public:
AsyncEngine(DataStore& ds, size_t num_threads=0);
~AsyncEngine() = default;
AsyncEngine(const AsyncEngine&) = default;
AsyncEngine& operator=(const AsyncEngine&) = default;
AsyncEngine(AsyncEngine&&) = default;
AsyncEngine& operator=(AsyncEngine&&) = default;
};
}
#endif
......@@ -24,6 +24,7 @@ class RunSetImpl;
class ItemImpl;
template<typename T, typename C = std::vector<T>> class Ptr;
class WriteBatch;
class AsyncEngine;
/**
* The DataStore class is the main handle referencing an HEPnOS service.
......@@ -39,6 +40,7 @@ class DataStore {
friend class Event;
friend class WriteBatch;
friend class DataStoreImpl;
friend class AsyncEngine;
public:
......
......@@ -23,6 +23,7 @@ class Run;
class SubRun;
class Event;
class WriteBatchImpl;
class AsyncEngine;
class WriteBatch {
......@@ -40,6 +41,7 @@ class WriteBatch {
public:
WriteBatch(DataStore& ds);
WriteBatch(DataStore& ds, AsyncEngine& async);
~WriteBatch();
WriteBatch(const WriteBatch&) = delete;
WriteBatch& operator=(const WriteBatch&) = delete;
......
#include "hepnos/AsyncEngine.hpp"
#include "hepnos/DataStore.hpp"
#include "AsyncEngineImpl.hpp"
namespace hepnos {
AsyncEngine::AsyncEngine(DataStore& ds, size_t num_threads)
: m_impl(std::make_shared<AsyncEngineImpl>(ds.m_impl, num_threads)) {}
}
#ifndef __HEPNOS_ASYNC_ENGINE_IMPL_HPP
#define __HEPNOS_ASYNC_ENGINE_IMPL_HPP
#include <thallium.hpp>
#include "DataStoreImpl.hpp"
#include "hepnos/Exception.hpp"
namespace tl = thallium;
namespace hepnos {
class WriteBatchImpl;
class AsyncEngineImpl {
friend class WriteBatchImpl;
std::shared_ptr<DataStoreImpl> m_datastore;
tl::pool m_pool;
std::vector<tl::managed<tl::xstream>> m_xstreams;
public:
AsyncEngineImpl(const std::shared_ptr<DataStoreImpl>& ds, size_t num_threads)
: m_datastore(ds) {
if(num_threads > 0) {
ABT_pool p = ABT_POOL_NULL;
int ret = ABT_pool_create_basic(ABT_POOL_FIFO, ABT_POOL_ACCESS_MPMC, ABT_TRUE, &p);
if(ret != ABT_SUCCESS) {
throw Exception("Could not create Argobots thread pool");
}
m_pool = tl::pool(p);
for(size_t i=0; i < num_threads; i++) {
m_xstreams.push_back(tl::xstream::create(tl::scheduler::predef::deflt, m_pool));
}
for(auto& es : m_xstreams) {
es->start();
}
} else {
auto current_es = tl::xstream::self();
auto pools = current_es.get_main_pools(1);
if(pools.size() != 1) {
throw Exception("Could not get current execution stream's main Argobots pool");
}
}
}
~AsyncEngineImpl() {
for(auto& es : m_xstreams) {
es->join();
}
}
};
}
#endif
......@@ -6,7 +6,8 @@ set(hepnos-src DataStore.cpp
SubRun.cpp
Event.cpp
UUID.cpp
WriteBatch.cpp)
WriteBatch.cpp
AsyncEngine.cpp)
set(hepnos-service-src service/HEPnOSService.cpp
service/ServiceConfig.cpp
......
......@@ -5,12 +5,16 @@
*/
#include "hepnos.hpp"
#include "WriteBatchImpl.hpp"
#include "hepnos/AsyncEngine.hpp"
namespace hepnos {
WriteBatch::WriteBatch(DataStore& datastore)
: m_impl(std::make_unique<WriteBatchImpl>(datastore.m_impl)) {}
WriteBatch::WriteBatch(DataStore& datastore, AsyncEngine& async)
: m_impl(std::make_unique<WriteBatchImpl>(datastore.m_impl, async.m_impl)) {}
WriteBatch::~WriteBatch() {}
}
......@@ -6,12 +6,17 @@
#ifndef __HEPNOS_PRIVATE_WRITEBATCH_IMPL_H
#define __HEPNOS_PRIVATE_WRITEBATCH_IMPL_H
#include <mutex> // for std::lock_guard and std::unique_lock
#include <iostream>
#include <unordered_map>
#include <string>
#include <vector>
#include <thallium.hpp>
#include "hepnos/WriteBatch.hpp"
#include "DataStoreImpl.hpp"
#include "AsyncEngineImpl.hpp"
namespace tl = thallium;
namespace hepnos {
......@@ -22,33 +27,82 @@ class WriteBatchImpl {
std::vector<std::string> m_values;
};
struct writer_thread_args {
const sdskv::database* db = 0;
keyvals* keyval_list = nullptr;
Exception ex;
bool ok = true;
};
typedef std::unordered_map<const sdskv::database*, keyvals> entries_type;
std::shared_ptr<DataStoreImpl> m_datastore;
std::shared_ptr<AsyncEngineImpl> m_async_engine;
entries_type m_entries;
tl::condition_variable m_cond;
tl::mutex m_mutex;
std::vector<tl::managed<tl::thread>> m_async_thread;
bool m_async_thread_should_stop = false;
static void writer_thread(void* x) {
writer_thread_args* args = static_cast<writer_thread_args*>(x);
auto db = args->db;
const auto& keys = args->keyval_list->m_keys;
const auto& vals = args->keyval_list->m_values;
static void writer_thread(const sdskv::database* db,
const std::vector<std::string>& keys,
const std::vector<std::string>& vals,
Exception* exception,
char* ok) {
*ok = 1;
try {
db->put_multi(keys, vals);
} catch(Exception& ex) {
args->ok = false;
args->ex = ex;
*ok = 0;
*exception = ex;
}
}
public:
static void spawn_writer_threads(entries_type& entries, tl::pool& pool) {
auto num_threads = entries.size();
std::vector<tl::managed<tl::thread>> threads;
std::vector<Exception> exceptions(num_threads);
std::vector<char> oks(num_threads);
unsigned i=0;
for(auto& e : entries) {
char* ok = &oks[i];
Exception* ex = &exceptions[i];
threads.push_back(pool.make_thread([&e, ok, ex]() {
writer_thread(e.first, e.second.m_keys, e.second.m_values, ex, ok);
}));
i += 1;
}
for(auto& t : threads) {
t->join();
}
for(unsigned i=0; i < num_threads; i++) {
if(not oks[i]) throw exceptions[i];
}
entries.clear();
}
std::shared_ptr<DataStoreImpl> m_datastore;
std::unordered_map<const sdskv::database*, keyvals> m_entries;
static void async_writer_thread(WriteBatchImpl& batch) {
while(!(batch.m_async_thread_should_stop && batch.m_entries.empty())) {
std::unique_lock<tl::mutex> lock(batch.m_mutex);
while(batch.m_entries.empty()) {
batch.m_cond.wait(lock);
}
if(batch.m_entries.empty())
continue;
auto entries = std::move(batch.m_entries);
batch.m_entries.clear();
lock.unlock();
spawn_writer_threads(entries, batch.m_async_engine->m_pool);
}
}
WriteBatchImpl(const std::shared_ptr<DataStoreImpl>& ds)
: m_datastore(ds) {}
public:
WriteBatchImpl(const std::shared_ptr<DataStoreImpl>& ds,
const std::shared_ptr<AsyncEngineImpl>& async = nullptr)
: m_datastore(ds)
, m_async_engine(async) {
if(m_async_engine) {
m_async_thread.push_back(
m_async_engine->m_pool.make_thread([batch=this](){
async_writer_thread(*batch);
})
);
}
}
ProductID storeRawProduct(const ItemDescriptor& id,
const std::string& productName,
......@@ -59,9 +113,17 @@ class WriteBatchImpl {
// locate db
auto& db = m_datastore->locateProductDb(product_id);
// insert in the map of entries
bool was_empty;
{
std::lock_guard<tl::mutex> g(m_mutex);
was_empty = m_entries.empty();
keyvals& entry = m_entries[&db];
entry.m_keys.push_back(product_id.m_key);
entry.m_values.emplace_back(value, vsize);
}
if(was_empty) {
m_cond.notify_one();
}
return product_id;
}
......@@ -79,31 +141,32 @@ class WriteBatchImpl {
// locate db
auto& db = m_datastore->locateItemDb(id);
// insert in the map of entries
bool was_empty;
{
std::lock_guard<tl::mutex> lock(m_mutex);
was_empty = m_entries.empty();
auto& entry = m_entries[&db];
entry.m_keys.push_back(std::string(reinterpret_cast<char*>(&id), sizeof(id)));
entry.m_values.emplace_back();
}
if(was_empty) {
m_cond.notify_one();
}
return true;
}
void flush() {
ABT_xstream es = ABT_XSTREAM_NULL;
ABT_xstream_self(&es);
auto num_threads = m_entries.size();
std::vector<ABT_thread> threads(num_threads);
std::vector<writer_thread_args> args(num_threads);
unsigned i=0;
for(auto& e : m_entries) {
args[i].db = e.first;
args[i].keyval_list = &e.second;
ABT_thread_create_on_xstream(es, &writer_thread, &args[i], ABT_THREAD_ATTR_NULL, &threads[i]);
i += 1;
if(!m_async_engine) { // flush everything here
tl::xstream es = tl::xstream::self();
spawn_writer_threads(m_entries, es.get_main_pools(1)[0]);
} else { // wait for AsyncEngine to have flushed everything
{
std::lock_guard<tl::mutex> lock(m_mutex);
m_async_thread_should_stop = true;
}
ABT_thread_join_many(num_threads, threads.data());
ABT_thread_free_many(num_threads, threads.data());
for(auto& a : args) {
if(not a.ok) throw a.ex;
m_cond.notify_one();
m_async_thread[0]->join();
}
m_entries.clear();
}
~WriteBatchImpl() {
......
#include "AsyncWriteBatchTest.hpp"
#include "CppUnitAdditionalMacros.hpp"
#include "TestObjects.hpp"
CPPUNIT_TEST_SUITE_REGISTRATION( AsyncWriteBatchTest );
using namespace hepnos;
void AsyncWriteBatchTest::setUp() {}
void AsyncWriteBatchTest::tearDown() {}
void AsyncWriteBatchTest::testAsyncWriteBatchRun() {
TestObjectA out_obj_a;
out_obj_a.x() = 44;
out_obj_a.y() = 1.2;
TestObjectB out_obj_b;
out_obj_b.a() = 33;
out_obj_b.b() = "you";
std::string key1 = "mykey";
auto dataset = datastore->root().createDataSet("testAsyncWriteBatchRun");
hepnos::AsyncEngine async_engine(*datastore, 2);
{
hepnos::WriteBatch batch(*datastore, async_engine);
for(auto i = 0; i < 10; i++) {
auto run = dataset.createRun(batch, i);
CPPUNIT_ASSERT(run.store(batch, key1, out_obj_a));
CPPUNIT_ASSERT(run.store(batch, key1, out_obj_b));
}
}
TestObjectA in_obj_a;
TestObjectB in_obj_b;
for(auto i = 0; i < 10; i++) {
auto run = dataset.runs()[i];
CPPUNIT_ASSERT(run.valid());
CPPUNIT_ASSERT(run.load(key1, in_obj_a));
CPPUNIT_ASSERT(run.load(key1, in_obj_b));
CPPUNIT_ASSERT(out_obj_a == in_obj_a);
CPPUNIT_ASSERT(out_obj_b == in_obj_b);
}
}
void AsyncWriteBatchTest::testAsyncWriteBatchSubRun() {
TestObjectA out_obj_a;
out_obj_a.x() = 44;
out_obj_a.y() = 1.2;
TestObjectB out_obj_b;
out_obj_b.a() = 33;
out_obj_b.b() = "you";
std::string key1 = "mykey";
auto dataset = datastore->root().createDataSet("testAsyncWriteBatchSubRun");
hepnos::AsyncEngine async_engine(*datastore, 2);
auto run = dataset.createRun(42);
{
hepnos::WriteBatch batch(*datastore, async_engine);
for(auto i = 0; i < 10; i++) {
auto sr = run.createSubRun(batch, i);
CPPUNIT_ASSERT(sr.store(batch, key1, out_obj_a));
CPPUNIT_ASSERT(sr.store(batch, key1, out_obj_b));
}
}
TestObjectA in_obj_a;
TestObjectB in_obj_b;
for(auto i = 0; i < 10; i++) {
auto sr = run[i];
CPPUNIT_ASSERT(sr.valid());
CPPUNIT_ASSERT(sr.load(key1, in_obj_a));
CPPUNIT_ASSERT(sr.load(key1, in_obj_b));
CPPUNIT_ASSERT(out_obj_a == in_obj_a);
CPPUNIT_ASSERT(out_obj_b == in_obj_b);
}
}
void AsyncWriteBatchTest::testAsyncWriteBatchEvent() {
TestObjectA out_obj_a;
out_obj_a.x() = 44;
out_obj_a.y() = 1.2;
TestObjectB out_obj_b;
out_obj_b.a() = 33;
out_obj_b.b() = "you";
std::string key1 = "mykey";
auto dataset = datastore->root().createDataSet("testAsyncWriteBatchEvent");
hepnos::AsyncEngine async_engine(*datastore, 2);
auto run = dataset.createRun(42);
auto subrun = run.createSubRun(2);
{
hepnos::WriteBatch batch(*datastore, async_engine);
for(auto i = 0; i < 10; i++) {
auto e = subrun.createEvent(batch, i);
CPPUNIT_ASSERT(e.store(batch, key1, out_obj_a));
CPPUNIT_ASSERT(e.store(batch, key1, out_obj_b));
}
}
TestObjectA in_obj_a;
TestObjectB in_obj_b;
for(auto i = 0; i < 10; i++) {
auto e = subrun[i];
CPPUNIT_ASSERT(e.valid());
CPPUNIT_ASSERT(e.load(key1, in_obj_a));
CPPUNIT_ASSERT(e.load(key1, in_obj_b));
CPPUNIT_ASSERT(out_obj_a == in_obj_a);
CPPUNIT_ASSERT(out_obj_b == in_obj_b);
}
}
#ifndef __HEPNOS_TEST_ASYNCWRITEBATCH_H
#define __HEPNOS_TEST_ASYNCWRITEBATCH_H
#include <cppunit/extensions/HelperMacros.h>
#include <hepnos.hpp>
extern hepnos::DataStore* datastore;
class AsyncWriteBatchTest : public CppUnit::TestFixture
{
CPPUNIT_TEST_SUITE( AsyncWriteBatchTest );
CPPUNIT_TEST( testAsyncWriteBatchRun );
CPPUNIT_TEST( testAsyncWriteBatchSubRun );
CPPUNIT_TEST( testAsyncWriteBatchEvent );
CPPUNIT_TEST_SUITE_END();
public:
void setUp();
void tearDown();
void testAsyncWriteBatchRun();
void testAsyncWriteBatchSubRun();
void testAsyncWriteBatchEvent();
};
#endif
......@@ -48,6 +48,9 @@ target_link_libraries(RestartAndReadTest ${CPPUNIT_LIBRARIES} hepnos ${BOOST_DEP
add_executable(WriteBatchTest WriteBatchTest.cpp HEPnOSTestMain.cpp)
target_link_libraries(WriteBatchTest ${CPPUNIT_LIBRARIES} hepnos ${BOOST_DEPS})
add_executable(AsyncWriteBatchTest AsyncWriteBatchTest.cpp HEPnOSTestMain.cpp)
target_link_libraries(AsyncWriteBatchTest ${CPPUNIT_LIBRARIES} hepnos ${BOOST_DEPS})
#add_executable(ParallelMPITest ParallelMPITest.cpp HEPnOSTestMain.cpp)
#target_link_libraries(ParallelMPITest ${CPPUNIT_LIBRARIES} hepnos ${BOOST_DEPS})
......@@ -60,6 +63,7 @@ add_test(NAME EventTest COMMAND run-test.sh ./EventTest)
add_test(NAME LoadStoreTest COMMAND run-test.sh ./LoadStoreTest)
add_test(NAME LoadStoreVectorsTest COMMAND run-test.sh ./LoadStoreVectorsTest)
add_test(NAME WriteBatchTest COMMAND run-test.sh ./WriteBatchTest)
add_test(NAME AsyncWriteBatchTest COMMAND run-test.sh ./AsyncWriteBatchTest)
add_test(NAME PtrTest COMMAND run-test.sh ./PtrTest)
add_test(NAME RestartTest COMMAND run-two-tests.sh ./WriteAndRestartTest ./RestartAndReadTest)
#add_test(NAME ParallelMPITest COMMAND run-test.sh "mpirun -np 4 ./ParallelMPITest" 30)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment