Commit 24dc1b19 authored by Matthieu Dorier's avatar Matthieu Dorier

added error handling in AsyncEngine and added async tests in LoadStoreTest

parent 85627dc4
......@@ -7,6 +7,7 @@
#define __HEPNOS_ASYNC_ENGINE_H
#include <memory>
#include <vector>
namespace hepnos {
......@@ -43,6 +44,7 @@ class AsyncEngine {
AsyncEngine& operator=(AsyncEngine&&) = default;
void wait();
const std::vector<std::string>& errors() const;
};
}
......
......@@ -11,4 +11,8 @@ void AsyncEngine::wait() {
m_impl->wait();
}
const std::vector<std::string>& AsyncEngine::errors() const {
return m_impl->m_errors;
}
}
......@@ -14,10 +14,13 @@ class WriteBatchImpl;
class AsyncEngineImpl {
friend class WriteBatchImpl;
friend class AsyncEngine;
std::shared_ptr<DataStoreImpl> m_datastore;
tl::pool m_pool;
std::vector<tl::managed<tl::xstream>> m_xstreams;
std::vector<std::string> m_errors;
tl::mutex m_errors_mtx;
public:
......@@ -55,17 +58,28 @@ class AsyncEngineImpl {
const std::string& productName,
const char* value, size_t vsize)
{
// build the key
auto product_id = m_datastore->buildProductID(id, productName);
// make a thread that will store the data
m_pool.make_thread([product_id, // passed by copy
auto product_id = m_datastore->buildProductID(id, productName);
m_pool.make_thread([this,
product_id, id, productName,// passed by copy
ds=m_datastore, // shared pointer
data=std::string(value,vsize)]() { // create new string
auto& db = ds->locateProductDb(product_id);
try {
db.put(product_id.m_key, data);
} catch(sdskv::exception& ex) {
// TODO handle exception
std::lock_guard<tl::mutex> lock(m_errors_mtx);
if(ex.error() == SDSKV_ERR_KEYEXISTS) {
m_errors.push_back(
std::string("Product ")
+productName
+" already exists for item "
+id.to_string());
} else {
m_errors.push_back(
std::string("SDSKV error: ")
+ex.what());
}
}
});
return product_id;
......@@ -83,14 +97,17 @@ class AsyncEngineImpl {
id.subrun = subrun_number;
id.event = event_number;
// make a thread that will store the data
m_pool.make_thread([id, ds=m_datastore]() {
m_pool.make_thread([this, id, ds=m_datastore]() {
// locate db
auto& db = ds->locateItemDb(id);
try {
db.put(&id, sizeof(id), nullptr, 0);
} catch(sdskv::exception& ex) {
if(!ex.error() == SDSKV_ERR_KEYEXISTS) {
// TODO handle exception
std::lock_guard<tl::mutex> lock(m_errors_mtx);
m_errors.push_back(
std::string("SDSKV error: ")
+ex.what());
}
}
});
......
......@@ -73,6 +73,14 @@ struct ItemDescriptor {
template<typename S>
friend S& operator<<(S& s, const ItemDescriptor& d);
std::string to_string() const {
return std::string("[")
+ dataset.to_string() + ", "
+ std::to_string(run) + ", "
+ std::to_string(subrun) + ", "
+ std::to_string(event) + "]";
}
};
template<typename S>
......
......@@ -13,14 +13,22 @@ void LoadStoreTest::tearDown() {}
void LoadStoreTest::testFillDataStore() {
auto root = datastore->root();
auto mds = root.createDataSet("matthieu");
CPPUNIT_ASSERT(mds.valid());
Run r1 = mds.createRun(42);
auto ds1 = root.createDataSet("matthieu");
CPPUNIT_ASSERT(ds1.valid());
Run r1 = ds1.createRun(42);
CPPUNIT_ASSERT(r1.valid());
SubRun sr1 = r1.createSubRun(3);
CPPUNIT_ASSERT(sr1.valid());
Event ev1 = sr1.createEvent(22);
CPPUNIT_ASSERT(ev1.valid());
DataSet ds2 = root.createDataSet("matthieu_async");
CPPUNIT_ASSERT(ds2.valid());
Run r2 = ds2.createRun(42);
CPPUNIT_ASSERT(r2.valid());
SubRun sr2 = r2.createSubRun(3);
CPPUNIT_ASSERT(sr2.valid());
Event ev2 = sr2.createEvent(22);
CPPUNIT_ASSERT(ev2.valid());
}
void LoadStoreTest::testLoadStoreDataSet() {
......@@ -199,3 +207,205 @@ void LoadStoreTest::testLoadStoreEvent() {
CPPUNIT_ASSERT(in_obj_b == out_obj_b);
}
// Async Tests
void LoadStoreTest::testAsyncLoadStoreDataSet() {
auto root = datastore->root();
auto mds = root["matthieu_async"];
auto run = mds[42];
auto subrun = run[3];
auto event = subrun[22];
CPPUNIT_ASSERT(mds.valid());
CPPUNIT_ASSERT(run.valid());
CPPUNIT_ASSERT(subrun.valid());
CPPUNIT_ASSERT(event.valid());
TestObjectA out_obj_a;
out_obj_a.x() = 44;
out_obj_a.y() = 1.2;
TestObjectB out_obj_b;
out_obj_b.a() = 33;
out_obj_b.b() = "you";
std::string key1 = "mykey";
hepnos::AsyncEngine async(*datastore, 1);
// we can store obj_a
CPPUNIT_ASSERT(mds.store(async, key1, out_obj_a));
// we cannot store at that key again something of the same type,
// but we will know that only when checking the async engine
// for errors
TestObjectA tmpa;
CPPUNIT_ASSERT(mds.store(async, key1, tmpa));
// we can store obj_b at the same key because it's not the same type
CPPUNIT_ASSERT(mds.store(async, key1, out_obj_b));
async.wait();
// there should be one error logged
CPPUNIT_ASSERT(async.errors().size() == 1);
TestObjectA in_obj_a;
TestObjectB in_obj_b;
std::string key2 = "otherkey";
// we can't load something at a key that does not exist
CPPUNIT_ASSERT(!mds.load(key2, in_obj_a));
// we can reload obj_a from key1
CPPUNIT_ASSERT(mds.load(key1, in_obj_a));
// and they are the same
CPPUNIT_ASSERT(in_obj_a == out_obj_a);
// we can reload obj_b from key1
CPPUNIT_ASSERT(mds.load(key1, in_obj_b));
// and they are the same
CPPUNIT_ASSERT(in_obj_b == out_obj_b);
}
void LoadStoreTest::testAsyncLoadStoreRun() {
auto root = datastore->root();
auto mds = root["matthieu_async"];
auto run = mds[42];
auto subrun = run[3];
auto event = subrun[22];
CPPUNIT_ASSERT(mds.valid());
CPPUNIT_ASSERT(run.valid());
CPPUNIT_ASSERT(subrun.valid());
CPPUNIT_ASSERT(event.valid());
TestObjectA out_obj_a;
out_obj_a.x() = 44;
out_obj_a.y() = 1.2;
TestObjectB out_obj_b;
out_obj_b.a() = 33;
out_obj_b.b() = "you";
std::string key1 = "mykey";
hepnos::AsyncEngine async(*datastore, 1);
// we can store obj_a
CPPUNIT_ASSERT(run.store(async, key1, out_obj_a));
// we cannot store at that key again something of the same type
// but we will know about it only later when checking for errors
TestObjectA tmpa;
CPPUNIT_ASSERT(run.store(async, key1, tmpa));
// we can store obj_b at the same key because it's not the same type
CPPUNIT_ASSERT(run.store(async, key1, out_obj_b));
async.wait();
// there should be one error logged
CPPUNIT_ASSERT(async.errors().size() == 1);
TestObjectA in_obj_a;
TestObjectB in_obj_b;
std::string key2 = "otherkey";
// we can't load something at a key that does not exist
CPPUNIT_ASSERT(!run.load(key2, in_obj_a));
// we can reload obj_a from key1
CPPUNIT_ASSERT(run.load(key1, in_obj_a));
// and they are the same
CPPUNIT_ASSERT(in_obj_a == out_obj_a);
// we can reload obj_b from key1
CPPUNIT_ASSERT(run.load(key1, in_obj_b));
// and they are the same
CPPUNIT_ASSERT(in_obj_b == out_obj_b);
}
void LoadStoreTest::testAsyncLoadStoreSubRun() {
auto root = datastore->root();
auto mds = root["matthieu_async"];
auto run = mds[42];
auto subrun = run[3];
auto event = subrun[22];
CPPUNIT_ASSERT(mds.valid());
CPPUNIT_ASSERT(run.valid());
CPPUNIT_ASSERT(subrun.valid());
CPPUNIT_ASSERT(event.valid());
TestObjectA out_obj_a;
out_obj_a.x() = 44;
out_obj_a.y() = 1.2;
TestObjectB out_obj_b;
out_obj_b.a() = 33;
out_obj_b.b() = "you";
std::string key1 = "mykey";
hepnos::AsyncEngine async(*datastore, 1);
// we can store obj_a
CPPUNIT_ASSERT(subrun.store(async, key1, out_obj_a));
// we cannot store at that key again something of the same type
// but we will know about it only when checking for errors
TestObjectA tmpa;
CPPUNIT_ASSERT(subrun.store(async, key1, tmpa));
// we can store obj_b at the same key because it's not the same type
CPPUNIT_ASSERT(subrun.store(async, key1, out_obj_b));
async.wait();
// there should be one error logged
CPPUNIT_ASSERT(async.errors().size() == 1);
TestObjectA in_obj_a;
TestObjectB in_obj_b;
std::string key2 = "otherkey";
// we can't load something at a key that does not exist
CPPUNIT_ASSERT(!subrun.load(key2, in_obj_a));
// we can reload obj_a from key1
CPPUNIT_ASSERT(subrun.load(key1, in_obj_a));
// and they are the same
CPPUNIT_ASSERT(in_obj_a == out_obj_a);
// we can reload obj_b from key1
CPPUNIT_ASSERT(subrun.load(key1, in_obj_b));
// and they are the same
CPPUNIT_ASSERT(in_obj_b == out_obj_b);
}
void LoadStoreTest::testAsyncLoadStoreEvent() {
auto root = datastore->root();
auto mds = root["matthieu_async"];
auto run = mds[42];
auto subrun = run[3];
auto event = subrun[22];
CPPUNIT_ASSERT(mds.valid());
CPPUNIT_ASSERT(run.valid());
CPPUNIT_ASSERT(subrun.valid());
CPPUNIT_ASSERT(event.valid());
TestObjectA out_obj_a;
out_obj_a.x() = 44;
out_obj_a.y() = 1.2;
TestObjectB out_obj_b;
out_obj_b.a() = 33;
out_obj_b.b() = "you";
std::string key1 = "mykey";
hepnos::AsyncEngine async(*datastore, 1);
// we can store obj_a
CPPUNIT_ASSERT(event.store(async, key1, out_obj_a));
// we cannot store at that key again something of the same type
// but we will know about it later when checking for errors
TestObjectA tmpa;
CPPUNIT_ASSERT(event.store(async, key1, tmpa));
// we can store obj_b at the same key because it's not the same type
CPPUNIT_ASSERT(event.store(async, key1, out_obj_b));
async.wait();
CPPUNIT_ASSERT(async.errors().size() == 1);
TestObjectA in_obj_a;
TestObjectB in_obj_b;
std::string key2 = "otherkey";
// we can't load something at a key that does not exist
CPPUNIT_ASSERT(!event.load(key2, in_obj_a));
// we can reload obj_a from key1
CPPUNIT_ASSERT(event.load(key1, in_obj_a));
// and they are the same
CPPUNIT_ASSERT(in_obj_a == out_obj_a);
// we can reload obj_b from key1
CPPUNIT_ASSERT(event.load(key1, in_obj_b));
// and they are the same
CPPUNIT_ASSERT(in_obj_b == out_obj_b);
}
......@@ -14,6 +14,10 @@ class LoadStoreTest : public CppUnit::TestFixture
CPPUNIT_TEST( testLoadStoreRun );
CPPUNIT_TEST( testLoadStoreSubRun );
CPPUNIT_TEST( testLoadStoreEvent );
CPPUNIT_TEST( testAsyncLoadStoreDataSet );
CPPUNIT_TEST( testAsyncLoadStoreRun );
CPPUNIT_TEST( testAsyncLoadStoreSubRun );
CPPUNIT_TEST( testAsyncLoadStoreEvent );
CPPUNIT_TEST_SUITE_END();
public:
......@@ -26,6 +30,10 @@ class LoadStoreTest : public CppUnit::TestFixture
void testLoadStoreRun();
void testLoadStoreSubRun();
void testLoadStoreEvent();
void testAsyncLoadStoreDataSet();
void testAsyncLoadStoreRun();
void testAsyncLoadStoreSubRun();
void testAsyncLoadStoreEvent();
};
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment