Commit 85627dc4 authored by Matthieu Dorier's avatar Matthieu Dorier

implemented async engine capabilities for Datasets, runs, subruns, and events

parent 9223062e
......@@ -11,11 +11,22 @@
namespace hepnos {
class DataStore;
class AsyncEngineImpl;
class DataSet;
class Run;
class SubRun;
class Event;
class WriteBatch;
class WriteBatchImpl;
class AsyncEngineImpl;
class AsyncEngine {
friend class DataStore;
friend class DataSet;
friend class Run;
friend class SubRun;
friend class Event;
friend class KeyValueContainer;
friend class WriteBatch;
private:
......@@ -31,6 +42,7 @@ class AsyncEngine {
AsyncEngine(AsyncEngine&&) = default;
AsyncEngine& operator=(AsyncEngine&&) = default;
void wait();
};
}
......
......@@ -149,6 +149,7 @@ class DataSet : public KeyValueContainer {
*/
ProductID storeRawData(const std::string& key, const char* value, size_t vsize) override;
ProductID storeRawData(WriteBatch& batch, const std::string& key, const char* value, size_t vsize) override;
ProductID storeRawData(AsyncEngine& engine, const std::string& key, const char* value, size_t vsize) override;
/**
* @brief Loads binary data associated with a particular key from the DataSet.
......@@ -210,6 +211,7 @@ class DataSet : public KeyValueContainer {
* @return A Run instance pointing to the created run.
*/
Run createRun(const RunNumber& runNumber);
Run createRun(AsyncEngine& async, const RunNumber& runNumber);
Run createRun(WriteBatch& batch, const RunNumber& runNumber);
/**
......
......@@ -106,6 +106,7 @@ class Event : public KeyValueContainer {
*/
ProductID storeRawData(const std::string& key, const char* value, size_t vsize) override;
ProductID storeRawData(WriteBatch& batch, const std::string& key, const char* value, size_t vsize) override;
ProductID storeRawData(AsyncEngine& async, const std::string& key, const char* value, size_t vsize) override;
/**
* @brief Loads raw key/value data from this Event.
......
......@@ -21,6 +21,7 @@
namespace hepnos {
class WriteBatch;
class AsyncEngine;
class KeyValueContainer {
......@@ -88,6 +89,8 @@ class KeyValueContainer {
*/
virtual ProductID storeRawData(WriteBatch& batch, const std::string& key, const char* value, size_t vsize) = 0;
virtual ProductID storeRawData(AsyncEngine& async, const std::string& key, const char* value, size_t vsize) = 0;
/**
* @brief Loads raw key/value data from this KeyValueContainer.
* This function is virtual and must be overloaded in the child class.
......@@ -141,6 +144,11 @@ class KeyValueContainer {
return storeImpl(batch, key, value, std::is_pod<V>());
}
template<typename K, typename V>
ProductID store(AsyncEngine& async, const K& key, const V& value) {
return storeImpl(async, key, value, std::is_pod<V>());
}
/**
* @brief Version of store when the value is an std::vector.
*/
......@@ -161,6 +169,16 @@ class KeyValueContainer {
return storeRawData(batch, key_str, val_str.data(), val_str.size());
}
/**
* @brief Version of store when the value is an std::vector.
*/
template<typename K, typename V>
ProductID store(AsyncEngine& async, const K& key, const std::vector<V>& value, int start=0, int end=-1) {
std::string key_str, val_str;
serializeKeyValueVector(std::is_pod<V>(), key, value, key_str, val_str, start, end);
return storeRawData(async, key_str, val_str.data(), val_str.size());
}
/**
* @brief Loads a value associated with a key from the
* KeyValueContainer. The type of the key should have
......@@ -215,6 +233,18 @@ class KeyValueContainer {
return storeRawData(batch, key_str, val_str.data(), val_str.size());
}
/**
* @brief Implementation of the store function with AsyncEngine
* and the value type is not am std::vector and not a POD.
*/
template<typename K, typename V>
ProductID storeImpl(AsyncEngine& async, const K& key, const V& value,
const std::integral_constant<bool, false>&) {
std::string key_str, val_str;
serializeKeyValue(key, value, key_str, val_str);
return storeRawData(async, key_str, val_str.data(), val_str.size());
}
/**
* @brief Implementation of the store function when the value
* type is a POD.
......@@ -239,6 +269,18 @@ class KeyValueContainer {
return storeRawData(batch, key_str, reinterpret_cast<const char*>(&value), sizeof(value));
}
/**
* @brief Implementation of the store function with AsyncEngine
* when the value type is a POD.
*/
template<typename K, typename V>
ProductID storeImpl(AsyncEngine& async, const K& key, const V& value,
const std::integral_constant<bool, true>&) {
std::string key_str;
serializeKeyValue(key, value, key_str);
return storeRawData(async, key_str, reinterpret_cast<const char*>(&value), sizeof(value));
}
/**
* @brief Implementation of the load function when the value type is a POD.
*/
......
......@@ -18,6 +18,7 @@ class ProductID {
friend class DataStore;
friend class DataStoreImpl;
friend class AsyncEngineImpl;
friend class WriteBatchImpl;
friend class boost::serialization::access;
......
......@@ -111,6 +111,7 @@ class Run : public KeyValueContainer {
* @return a valid ProductID if the key did not already exist, an invalid one otherwise.
*/
ProductID storeRawData(const std::string& key, const char* value, size_t vsize) override;
ProductID storeRawData(AsyncEngine& async, const std::string& key, const char* value, size_t vsize) override;
ProductID storeRawData(WriteBatch& batch, const std::string& key, const char* value, size_t vsize) override;
/**
......@@ -300,6 +301,7 @@ class Run : public KeyValueContainer {
* @return a handle to the created or existing SubRun.
*/
SubRun createSubRun(const SubRunNumber& subRunNumber);
SubRun createSubRun(AsyncEngine& async, const SubRunNumber& subRunNumber);
SubRun createSubRun(WriteBatch& batch, const SubRunNumber& subRunNumber);
};
......
......@@ -109,6 +109,7 @@ class SubRun : public KeyValueContainer {
*/
ProductID storeRawData(const std::string& key, const char* value, size_t vsize) override;
ProductID storeRawData(WriteBatch& batch, const std::string& key, const char* value, size_t vsize) override;
ProductID storeRawData(AsyncEngine& async, const std::string& key, const char* value, size_t vsize) override;
/**
* @brief Loads raw key/value data from this SubRun.
......@@ -298,6 +299,7 @@ class SubRun : public KeyValueContainer {
*/
Event createEvent(const EventNumber& eventNumber);
Event createEvent(WriteBatch& batch, const EventNumber& eventNumber);
Event createEvent(AsyncEngine& batch, const EventNumber& eventNumber);
};
class SubRun::const_iterator {
......
......@@ -7,4 +7,8 @@ namespace hepnos {
AsyncEngine::AsyncEngine(DataStore& ds, size_t num_threads)
: m_impl(std::make_shared<AsyncEngineImpl>(ds.m_impl, num_threads)) {}
void AsyncEngine::wait() {
m_impl->wait();
}
}
......@@ -51,6 +51,68 @@ class AsyncEngineImpl {
}
}
ProductID storeRawProduct(const ItemDescriptor& id,
const std::string& productName,
const char* value, size_t vsize)
{
// build the key
auto product_id = m_datastore->buildProductID(id, productName);
// make a thread that will store the data
m_pool.make_thread([product_id, // passed by copy
ds=m_datastore, // shared pointer
data=std::string(value,vsize)]() { // create new string
auto& db = ds->locateProductDb(product_id);
try {
db.put(product_id.m_key, data);
} catch(sdskv::exception& ex) {
// TODO handle exception
}
});
return product_id;
}
bool createItem(const UUID& containerUUID,
const RunNumber& run_number,
const SubRunNumber& subrun_number = InvalidSubRunNumber,
const EventNumber& event_number = InvalidEventNumber)
{
// build the key
ItemDescriptor id;
id.dataset = containerUUID;
id.run = run_number;
id.subrun = subrun_number;
id.event = event_number;
// make a thread that will store the data
m_pool.make_thread([id, ds=m_datastore]() {
// locate db
auto& db = ds->locateItemDb(id);
try {
db.put(&id, sizeof(id), nullptr, 0);
} catch(sdskv::exception& ex) {
if(!ex.error() == SDSKV_ERR_KEYEXISTS) {
// TODO handle exception
}
}
});
return true;
}
void wait() {
// join the current set of ES
for(auto& es : m_xstreams) {
es->join();
}
// create a new set of ES
std::vector<tl::managed<tl::xstream>> new_es;
for(unsigned i=0; i < m_xstreams.size(); i++) {
new_es.push_back(tl::xstream::create(tl::scheduler::predef::deflt, m_pool));
}
// replace old es
m_xstreams = std::move(new_es);
// starting new ES
for(auto& es : m_xstreams) es->start();
}
};
}
......
......@@ -6,9 +6,11 @@
#include "hepnos/DataSet.hpp"
#include "hepnos/Run.hpp"
#include "hepnos/RunSet.hpp"
#include "hepnos/AsyncEngine.hpp"
#include "ItemImpl.hpp"
#include "DataSetImpl.hpp"
#include "DataStoreImpl.hpp"
#include "AsyncEngineImpl.hpp"
#include "WriteBatchImpl.hpp"
namespace hepnos {
......@@ -61,6 +63,15 @@ ProductID DataSet::storeRawData(WriteBatch& batch, const std::string& key, const
return batch.m_impl->storeRawProduct(id, key, value, vsize);
}
ProductID DataSet::storeRawData(AsyncEngine& async, const std::string& key, const char* value, size_t vsize) {
if(!valid()) {
throw Exception("Calling DataSet member function on an invalid DataSet");
}
// forward the call to the async engine's store function
ItemDescriptor id(m_impl->m_uuid);
return async.m_impl->storeRawProduct(id, key, value, vsize);
}
bool DataSet::loadRawData(const std::string& key, std::string& buffer) const {
if(!valid()) {
throw Exception("Calling DataSet member function on an invalid DataSet");
......@@ -152,6 +163,17 @@ Run DataSet::createRun(WriteBatch& batch, const RunNumber& runNumber) {
runNumber));
}
Run DataSet::createRun(AsyncEngine& async, const RunNumber& runNumber) {
if(InvalidRunNumber == runNumber) {
throw Exception("Trying to create a Run with InvalidRunNumber");
}
async.m_impl->createItem(m_impl->m_uuid, runNumber);
return Run(std::make_shared<ItemImpl>(
m_impl->m_datastore,
m_impl->m_uuid,
runNumber));
}
DataSet DataSet::operator[](const std::string& datasetName) const {
auto it = find(datasetName);
if(!it->valid())
......
......@@ -4,9 +4,11 @@
* See COPYRIGHT in top-level directory.
*/
#include "hepnos/Event.hpp"
#include "hepnos/AsyncEngine.hpp"
#include "ItemImpl.hpp"
#include "DataStoreImpl.hpp"
#include "WriteBatchImpl.hpp"
#include "AsyncEngineImpl.hpp"
namespace hepnos {
......@@ -53,11 +55,20 @@ ProductID Event::storeRawData(WriteBatch& batch, const std::string& key, const c
if(!valid()) {
throw Exception("Calling Event member function on an invalid Event object");
}
// forward the call to the datastore's store function
// forward the call to the batch's store function
auto& id = m_impl->m_descriptor;
return batch.m_impl->storeRawProduct(id, key, value, vsize);
}
ProductID Event::storeRawData(AsyncEngine& async, const std::string& key, const char* value, size_t vsize) {
if(!valid()) {
throw Exception("Calling Event member function on an invalid Event object");
}
// forward the call to the async engine's store function
auto& id = m_impl->m_descriptor;
return async.m_impl->storeRawProduct(id, key, value, vsize);
}
bool Event::loadRawData(const std::string& key, std::string& buffer) const {
if(!valid()) {
throw Exception("Calling Event member function on an invalid Event object");
......
......@@ -4,10 +4,12 @@
* See COPYRIGHT in top-level directory.
*/
#include "hepnos/Run.hpp"
#include "hepnos/AsyncEngine.hpp"
#include "ItemImpl.hpp"
#include "ItemImpl.hpp"
#include "DataStoreImpl.hpp"
#include "WriteBatchImpl.hpp"
#include "AsyncEngineImpl.hpp"
namespace hepnos {
......@@ -56,11 +58,20 @@ ProductID Run::storeRawData(WriteBatch& batch, const std::string& key, const cha
if(!valid()) {
throw Exception("Calling Run member function on an invalid Run object");
}
// forward the call to the datastore's store function
// forward the call to the batch's store function
const ItemDescriptor& id = m_impl->m_descriptor;
return batch.m_impl->storeRawProduct(id, key, value, vsize);
}
ProductID Run::storeRawData(AsyncEngine& async, const std::string& key, const char* value, size_t vsize) {
if(!valid()) {
throw Exception("Calling Run member function on an invalid Run object");
}
// forward the call to the async engine's store function
const ItemDescriptor& id = m_impl->m_descriptor;
return async.m_impl->storeRawProduct(id, key, value, vsize);
}
bool Run::loadRawData(const std::string& key, std::string& buffer) const {
if(!valid()) {
throw Exception("Calling Run member function on an invalid Run object");
......@@ -119,6 +130,16 @@ SubRun Run::createSubRun(WriteBatch& batch, const SubRunNumber& subRunNumber) {
return SubRun(std::move(new_subrun_impl));
}
SubRun Run::createSubRun(AsyncEngine& async, const SubRunNumber& subRunNumber) {
if(!valid()) {
throw Exception("Calling Run member function on an invalid Run object");
}
ItemDescriptor& id = m_impl->m_descriptor;
async.m_impl->createItem(id.dataset, id.run, subRunNumber);
auto new_subrun_impl = std::make_shared<ItemImpl>(m_impl->m_datastore, id.dataset, id.run, subRunNumber);
return SubRun(std::move(new_subrun_impl));
}
SubRun Run::operator[](const SubRunNumber& subRunNumber) const {
auto it = find(subRunNumber);
if(!it->valid())
......
......@@ -6,9 +6,11 @@
#include <memory>
#include "hepnos/SubRun.hpp"
#include "hepnos/AsyncEngine.hpp"
#include "ItemImpl.hpp"
#include "DataStoreImpl.hpp"
#include "WriteBatchImpl.hpp"
#include "AsyncEngineImpl.hpp"
namespace hepnos {
......@@ -56,11 +58,20 @@ ProductID SubRun::storeRawData(WriteBatch& batch, const std::string& key, const
if(!valid()) {
throw Exception("Calling SubRun member function on invalid SubRun object");
}
// forward the call to the datastore's store function
// forward the call to the batch's store function
auto& id = m_impl->m_descriptor;
return batch.m_impl->storeRawProduct(id, key, value, vsize);
}
ProductID SubRun::storeRawData(AsyncEngine& async, const std::string& key, const char* value, size_t vsize) {
if(!valid()) {
throw Exception("Calling SubRun member function on invalid SubRun object");
}
// forward the call to async engine's store function
auto& id = m_impl->m_descriptor;
return async.m_impl->storeRawProduct(id, key, value, vsize);
}
bool SubRun::loadRawData(const std::string& key, std::string& buffer) const {
if(!valid()) {
throw Exception("Calling SubRun member function on invalid SubRun object");
......@@ -114,6 +125,15 @@ Event SubRun::createEvent(WriteBatch& batch, const EventNumber& eventNumber) {
return Event(std::make_shared<ItemImpl>(m_impl->m_datastore, id.dataset, id.run, id.subrun, eventNumber));
}
Event SubRun::createEvent(AsyncEngine& async, const EventNumber& eventNumber) {
if(!valid()) {
throw Exception("Calling SubRun member function on invalid SubRun object");
}
auto& id = m_impl->m_descriptor;
async.m_impl->createItem(id.dataset, id.run, id.subrun, eventNumber);
return Event(std::make_shared<ItemImpl>(m_impl->m_datastore, id.dataset, id.run, id.subrun, eventNumber));
}
Event SubRun::operator[](const EventNumber& eventNumber) const {
auto it = find(eventNumber);
if(!it->valid())
......
......@@ -25,7 +25,17 @@ void AsyncWriteBatchTest::testAsyncWriteBatchRun() {
{
hepnos::WriteBatch batch(*datastore, async_engine);
for(auto i = 0; i < 10; i++) {
for(auto i = 0; i < 5; i++) {
auto run = dataset.createRun(batch, i);
CPPUNIT_ASSERT(run.store(batch, key1, out_obj_a));
CPPUNIT_ASSERT(run.store(batch, key1, out_obj_b));
}
}
async_engine.wait(); // useless, but just to test
{
hepnos::WriteBatch batch(*datastore, async_engine);
for(auto i = 5; i < 10; i++) {
auto run = dataset.createRun(batch, i);
CPPUNIT_ASSERT(run.store(batch, key1, out_obj_a));
CPPUNIT_ASSERT(run.store(batch, key1, out_obj_b));
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment