Commit 597f5400 authored by Matthieu Dorier's avatar Matthieu Dorier

added possibility to load products from cache

parent 9a4a0030
......@@ -170,6 +170,16 @@ class DataSet : public KeyValueContainer {
*/
bool loadRawData(const Prefetcher& prefetcher, const std::string& key, char* value, size_t* vsize) const override;
/**
* @see KeyValueContainer::loadRawData()
*/
bool loadRawData(const ProductCache& cache, const std::string& key, std::string& buffer) const override;
/**
* @see KeyValueContainer::loadRawData()
*/
bool loadRawData(const ProductCache& cache, const std::string& key, char* value, size_t* vsize) const override;
/**
* @brief Comparison operator.
*
......
......@@ -152,6 +152,16 @@ class Event : public KeyValueContainer {
*/
bool loadRawData(const Prefetcher& prefetcher, const std::string& key, char* value, size_t* vsize) const override;
/**
* @see KeyValueContainer::loadRawData()
*/
bool loadRawData(const ProductCache& cache, const std::string& key, std::string& buffer) const override;
/**
* @see KeyValueContainer::loadRawData()
*/
bool loadRawData(const ProductCache& cache, const std::string& key, char* value, size_t* vsize) const override;
/**
* @brief Compares this Event with another Event. The Events must point to
* the same event number within the same container.
......
......@@ -23,6 +23,7 @@ namespace hepnos {
class WriteBatch;
class AsyncEngine;
class Prefetcher;
class ProductCache;
class KeyValueContainer {
......@@ -155,6 +156,7 @@ class KeyValueContainer {
* (or scheduled to be prefetched) and fall back to looking up in the underlying
* DataStore if it hasn't.
*
* @param prefetcher Prefetcher to look into first.
* @param key Key.
* @param value Buffer in which to put the binary data.
* @param vsize in: size of the buffer, out: size of the actual data.
......@@ -163,6 +165,35 @@ class KeyValueContainer {
*/
virtual bool loadRawData(const Prefetcher& prefetcher, const std::string& key, char* value, size_t* vsize) const = 0;
/**
* @brief Loads binary data associated with a particular key from the container.
* This function will look in the product cache for the requested object.
* Note that contrary to the Prefetcher, this function will NOT fall back to looking
* up into the DataStore.
*
* @param cache ProductCache to look into first.
* @param key Key.
* @param buffer Buffer in which to put the binary data.
*
* @return true if the key exists and the read succeeded, false otherwise.
*/
virtual bool loadRawData(const ProductCache& cache, const std::string& key, std::string& buffer) const = 0;
/**
* @brief Loads binary data associated with a particular key from the container.
* This function will look in the product cache for the requested object.
* Note that contrary to the Prefetcher, this function will NOT fall back to looking
* up into the DataStore.
*
* @param cache ProductCache to look into first.
* @param key Key.
* @param value Buffer in which to put the binary data.
* @param vsize in: size of the buffer, out: size of the actual data.
*
* @return true if the key exists and the read succeeded, false otherwise.
*/
virtual bool loadRawData(const ProductCache& cache, const std::string& key, char* value, size_t* vsize) const = 0;
/**
* @brief Stores a key/value pair into the KeyValueContainer.
* The type of the key should have operator<< available
......@@ -307,6 +338,24 @@ class KeyValueContainer {
return loadVectorImpl(prefetcher, key, value, std::is_pod<V>());
}
/**
* @brief Version of load that will first look into the Prefetcher
* argument for the requested key.
*/
template<typename K, typename V>
bool load(const ProductCache& cache, const K& key, V& value) const {
return loadImpl(cache, key, value, std::is_pod<V>());
}
/**
* @brief Version of load for vectors, looking first into the
* Prefetcher argument for the requested key.
*/
template<typename K, typename V>
bool load(const ProductCache& cache, const K& key, std::vector<V>& value) const {
return loadVectorImpl(cache, key, value, std::is_pod<V>());
}
private:
/**
......@@ -413,6 +462,22 @@ class KeyValueContainer {
return vsize == sizeof(value);
}
/**
* @brief Implementation of the load function with a cache.
*/
template<typename K, typename V>
bool loadImpl(const ProductCache& cache, const K& key, V& value,
const std::integral_constant<bool, true>&) const {
std::string buffer;
std::stringstream ss_key;
ss_key << key << "#" << demangle<V>();
size_t vsize = sizeof(value);
if(!loadRawData(cache, ss_key.str(), reinterpret_cast<char*>(&value), &vsize)) {
return false;
}
return vsize == sizeof(value);
}
/**
* @brief Implementation of the load function when the value type is not a POD.
*/
......@@ -457,6 +522,28 @@ class KeyValueContainer {
return true;
}
/**
* @brief Implementation of the load function with a prefetcher.
*/
template<typename K, typename V>
bool loadImpl(const ProductCache& cache, const K& key, V& value,
const std::integral_constant<bool, false>&) const {
std::string buffer;
std::stringstream ss_key;
ss_key << key << "#" << demangle<V>();
if(!loadRawData(cache, ss_key.str(), buffer)) {
return false;
}
try {
std::stringstream ss(buffer);
InputArchive ia(datastore(), ss);
ia >> value;
} catch(...) {
throw Exception("Exception occured during serialization");
}
return true;
}
/**
* @brief Implementation of the load function when the value type is a vector of POD.
*/
......@@ -507,6 +594,31 @@ class KeyValueContainer {
return true;
}
/**
* @brief Implementation of the load function with a cache.
*/
template<typename K, typename V>
bool loadVectorImpl(const ProductCache& cache, const K& key, std::vector<V>& value,
const std::integral_constant<bool, true>&) const {
std::string buffer;
std::stringstream ss_key;
ss_key << key << "#" << demangle<std::vector<V>>();
if(!loadRawData(cache, ss_key.str(), buffer)) {
return false;
}
size_t count = 0;
if(buffer.size() < sizeof(count)) {
return false;
}
std::memcpy(&count, buffer.data(), sizeof(count));
if(buffer.size() != sizeof(count) + count*sizeof(V)) {
return false;
}
value.resize(count);
std::memcpy(value.data(), buffer.data()+sizeof(count), count*sizeof(V));
return true;
}
/**
* @brief Implementation of the load function when the value type is a vector of non-POD.
*/
......@@ -561,6 +673,33 @@ class KeyValueContainer {
return true;
}
/**
* @brief Implementation of the load function with a prefetcher.
*/
template<typename K, typename V>
bool loadVectorImpl(const ProductCache& cache, const K& key, std::vector<V>& value,
const std::integral_constant<bool, false>&) const {
std::string buffer;
std::stringstream ss_key;
ss_key << key << "#" << demangle<std::vector<V>>();
if(!loadRawData(cache, ss_key.str(), buffer)) {
return false;
}
try {
std::stringstream ss(buffer);
InputArchive ia(datastore(), ss);
size_t count = 0;
ia >> count;
value.resize(count);
for(unsigned i=0; i<count; i++) {
ia >> value[i];
}
} catch(...) {
throw Exception("Exception occured during serialization");
}
return true;
}
/**
* @brief Creates the string key based on the provided key
* and the type of the value. Serializes the value into a string.
......
......@@ -10,13 +10,25 @@
namespace hepnos {
class Event;
class SubRun;
class Run;
class DataSet;
struct ProductCacheImpl;
struct ParallelEventProcessorImpl;
struct SyncPrefetcherImpl;
struct AsyncPrefetcherImpl;
/**
* @brief The ProductCache is used in ParallelEventProcessor to
* cache products associated with events.
*/
class ProductCache {
friend class DataSet;
friend class Run;
friend class SubRun;
friend class Event;
friend struct ParallelEventProcessorImpl;
friend struct SyncPrefetcherImpl;
friend struct AsyncPrefetcherImpl;
......
......@@ -165,6 +165,16 @@ class Run : public KeyValueContainer {
*/
bool loadRawData(const Prefetcher& prefetcher, const std::string& key, char* value, size_t* vsize) const override;
/**
* @see KeyValueContainer::loadRawData()
*/
bool loadRawData(const ProductCache& cache, const std::string& key, std::string& buffer) const override;
/**
* @see KeyValueContainer::loadRawData()
*/
bool loadRawData(const ProductCache& cache, const std::string& key, char* value, size_t* vsize) const override;
/**
* @brief Compares this Run with another Run. The Runs must point to
* the same run number within the same container.
......
......@@ -163,6 +163,16 @@ class SubRun : public KeyValueContainer {
*/
bool loadRawData(const Prefetcher& prefetcher, const std::string& key, char* value, size_t* vsize) const override;
/**
* @see KeyValueContainer::loadRawData()
*/
bool loadRawData(const ProductCache& cache, const std::string& key, std::string& buffer) const override;
/**
* @see KeyValueContainer::loadRawData()
*/
bool loadRawData(const ProductCache& cache, const std::string& key, char* value, size_t* vsize) const override;
/**
* @brief Compares this SubRun with another SubRun. The SubRuns must point to
* the same subrun number within the same container.
......
......@@ -15,6 +15,7 @@
#include "AsyncEngineImpl.hpp"
#include "WriteBatchImpl.hpp"
#include "PrefetcherImpl.hpp"
#include "ProductCacheImpl.hpp"
namespace hepnos {
......@@ -117,6 +118,22 @@ bool DataSet::loadRawData(const Prefetcher& prefetcher, const std::string& key,
return m_impl->m_datastore->loadRawProduct(id, key, value, vsize);
}
bool DataSet::loadRawData(const ProductCache& cache, const std::string& key, std::string& buffer) const {
if(!valid()) {
throw Exception("Calling DataSet member function on an invalid DataSet");
}
ItemDescriptor id(m_impl->m_uuid);
return cache.m_impl->loadRawProduct(id, key, buffer);
}
bool DataSet::loadRawData(const ProductCache& cache, const std::string& key, char* value, size_t* vsize) const {
if(!valid()) {
throw Exception("Calling DataSet member function on an invalid DataSet");
}
ItemDescriptor id(m_impl->m_uuid);
return cache.m_impl->loadRawProduct(id, key, value, vsize);
}
bool DataSet::operator==(const DataSet& other) const {
bool v1 = valid();
bool v2 = other.valid();
......
......@@ -11,6 +11,7 @@
#include "WriteBatchImpl.hpp"
#include "AsyncEngineImpl.hpp"
#include "PrefetcherImpl.hpp"
#include "ProductCacheImpl.hpp"
namespace hepnos {
......@@ -116,12 +117,28 @@ bool Event::loadRawData(const Prefetcher& prefetcher, const std::string& key, st
bool Event::loadRawData(const Prefetcher& prefetcher, const std::string& key, char* value, size_t* vsize) const {
if(!valid()) {
throw Exception("Calling DataSet member function on an invalid DataSet");
throw Exception("Calling Event member function on an invalid Event");
}
auto& id = m_impl->m_descriptor;
return prefetcher.m_impl->loadRawProduct(id, key, value, vsize);
}
bool Event::loadRawData(const ProductCache& cache, const std::string& key, std::string& buffer) const {
if(!valid()) {
throw Exception("Calling Event member function on an invalid Event object");
}
auto& id = m_impl->m_descriptor;
return cache.m_impl->loadRawProduct(id, key, buffer);
}
bool Event::loadRawData(const ProductCache& cache, const std::string& key, char* value, size_t* vsize) const {
if(!valid()) {
throw Exception("Calling Event member function on an invalid Event");
}
auto& id = m_impl->m_descriptor;
return cache.m_impl->loadRawProduct(id, key, value, vsize);
}
bool Event::operator==(const Event& other) const {
bool v1 = valid();
bool v2 = other.valid();
......
......@@ -29,6 +29,19 @@ struct ProductCacheImpl {
return found;
}
bool loadRawProduct(const ProductID& product_id, char* value, size_t* vsize) const {
m_lock.rdlock();
auto it = m_map.find(product_id.m_key);
auto found = it != m_map.end();
if(found) {
auto& data = it->second;
*vsize = data.size();
if(*vsize) std::memcpy(value, data.data(), *vsize);
}
m_lock.unlock();
return found;
}
bool loadRawProduct(const ItemDescriptor& id,
const std::string& productName,
std::string& data) const {
......@@ -36,6 +49,13 @@ struct ProductCacheImpl {
return loadRawProduct(product_id, data);
}
bool loadRawProduct(const ItemDescriptor& id,
const std::string& productName,
char* value, size_t* vsize) const {
auto product_id = DataStoreImpl::buildProductID(id, productName);
return loadRawProduct(product_id, value, vsize);
}
bool hasProduct(const ProductID& product_id) const {
m_lock.rdlock();
auto it = m_map.find(product_id.m_key);
......
......@@ -12,6 +12,7 @@
#include "DataStoreImpl.hpp"
#include "WriteBatchImpl.hpp"
#include "AsyncEngineImpl.hpp"
#include "ProductCacheImpl.hpp"
namespace hepnos {
......@@ -161,6 +162,22 @@ bool Run::loadRawData(const Prefetcher& prefetcher, const std::string& key, char
return prefetcher.m_impl->loadRawProduct(id, key, value, vsize);
}
bool Run::loadRawData(const ProductCache& cache, const std::string& key, std::string& buffer) const {
if(!valid()) {
throw Exception("Calling Run member function on an invalid Run object");
}
auto& id = m_impl->m_descriptor;
return cache.m_impl->loadRawProduct(id, key, buffer);
}
bool Run::loadRawData(const ProductCache& cache, const std::string& key, char* value, size_t* vsize) const {
if(!valid()) {
throw Exception("Calling Run member function on an invalid Run");
}
auto& id = m_impl->m_descriptor;
return cache.m_impl->loadRawProduct(id, key, value, vsize);
}
bool Run::operator==(const Run& other) const {
bool v1 = valid();
bool v2 = other.valid();
......
......@@ -12,6 +12,7 @@
#include "DataStoreImpl.hpp"
#include "WriteBatchImpl.hpp"
#include "AsyncEngineImpl.hpp"
#include "ProductCacheImpl.hpp"
namespace hepnos {
......@@ -173,6 +174,22 @@ bool SubRun::loadRawData(const Prefetcher& prefetcher, const std::string& key, c
return prefetcher.m_impl->loadRawProduct(id, key, value, vsize);
}
bool SubRun::loadRawData(const ProductCache& cache, const std::string& key, std::string& buffer) const {
if(!valid()) {
throw Exception("Calling SubRun member function on invalid SubRun object");
}
auto& id = m_impl->m_descriptor;
return cache.m_impl->loadRawProduct(id, key, buffer);
}
bool SubRun::loadRawData(const ProductCache& cache, const std::string& key, char* value, size_t* vsize) const {
if(!valid()) {
throw Exception("Calling DataSet member function on an invalid DataSet");
}
auto& id = m_impl->m_descriptor;
return cache.m_impl->loadRawProduct(id, key, value, vsize);
}
bool SubRun::operator==(const SubRun& other) const {
bool v1 = valid();
bool v2 = other.valid();
......
......@@ -73,4 +73,4 @@ add_test(NAME WriteBatchTest COMMAND run-test.sh ./WriteBatchTest)
add_test(NAME AsyncWriteBatchTest COMMAND run-test.sh ./AsyncWriteBatchTest)
add_test(NAME PtrTest COMMAND run-test.sh ./PtrTest)
add_test(NAME RestartTest COMMAND run-two-tests.sh ./WriteAndRestartTest ./RestartAndReadTest)
add_test(NAME ParallelMPITest COMMAND run-test.sh "mpirun -np 4 ./ParallelMPITest" 30)
add_test(NAME ParallelMPITest COMMAND run-test.sh "mpirun -np 4 ./ParallelMPITest" 120)
#include <algorithm>
#include <thallium.hpp>
#include "ParallelMPITest.hpp"
#include "TestObjects.hpp"
#include "CppUnitAdditionalMacros.hpp"
CPPUNIT_TEST_SUITE_REGISTRATION( ParallelMPITest );
......@@ -16,7 +17,14 @@ void ParallelMPITest::setUp() {
for(unsigned j = 0; j < 8; j++) {
auto subrun = run.createSubRun(j);
for(unsigned k = 0; k < 8; k++) {
subrun.createEvent(k);
TestObjectA a;
a.x() = k;
a.y() = k*2.0;
TestObjectB b;
b.a() = k;
auto ev = subrun.createEvent(k);
ev.store("abc", a);
ev.store("abc", b);
}
}
MPI_Barrier(MPI_COMM_WORLD);
......@@ -189,3 +197,75 @@ void ParallelMPITest::testParallelEventProcessorAsync() {
}
}
}
void ParallelMPITest::testParallelEventProcessorWithProducts() {
auto mds = datastore->root()["matthieu"];
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
int size;
MPI_Comm_size(MPI_COMM_WORLD, &size);
ParallelEventProcessorStatistics stats;
ParallelEventProcessor parallel_processor(*datastore, MPI_COMM_WORLD);
parallel_processor.preload<TestObjectA>("abc");
std::vector<item> items;
parallel_processor.process(mds,
[&items, rank](const Event& ev, const ProductCache& cache) {
SubRun sr = ev.subrun();
Run r = sr.run();
std::cout << "Rank " << rank << " invoking lambda for item " <<
r.number() << " " << sr.number() << " " << ev.number() << std::endl;
items.emplace_back(r.number(), sr.number(), ev.number());
TestObjectA a;
TestObjectB b;
CPPUNIT_ASSERT(ev.load(cache, "abc", a));
CPPUNIT_ASSERT(!ev.load(cache, "abc", b));
CPPUNIT_ASSERT(a.x() == ev.number());
double t = tl::timer::wtime();
while(tl::timer::wtime() - t < 0.1) {}
},
&stats
);
std::cout << "Rank " << rank << " statistics:\n"
<< " total_events_processed = " << stats.total_events_processed << "\n"
<< " local_events_processed = " << stats.local_events_processed << "\n"
<< " total_time = " << stats.total_time << "\n"
<< " acc_event_processing_time = " << stats.acc_event_processing_time << "\n"
<< " acc_product_loading_time = " << stats.acc_product_loading_time << "\n"
<< " processing_time_stats = " << stats.processing_time_stats << "\n"
<< " product_loading_time_stats = " << stats.product_loading_time_stats << "\n"
<< " waiting_time_stats = " << stats.waiting_time_stats << std::endl;
if(rank != 0) {
int num_local_items = items.size();
MPI_Send(&num_local_items, 1, MPI_INT, 0, 0, MPI_COMM_WORLD);
if(num_local_items) {
MPI_Send(items.data(), items.size()*sizeof(item), MPI_BYTE, 0, 0, MPI_COMM_WORLD);
}
} else {
for(unsigned j=1; j < size; j++) {
int num_items = 0;
MPI_Recv(&num_items, 1, MPI_INT, j, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
items.resize(items.size() + num_items);
if(num_items) {
MPI_Recv(&items[items.size() - num_items], sizeof(item)*num_items,
MPI_BYTE, j, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
}
}
std::sort(items.begin(), items.end());
CPPUNIT_ASSERT(items.size() == size*8*8);
unsigned x = 0;
for(unsigned i = 0; i < (unsigned)size; i++) {
for(unsigned j = 0; j < 8; j++) {
for(unsigned k = 0; k < 8; k++) {
auto& e = items[x];
CPPUNIT_ASSERT(e.run == i && e.subrun == j && e.event == k);
x += 1;
}
}
}
}
}
......@@ -10,8 +10,9 @@ extern hepnos::DataStore* datastore;
class ParallelMPITest : public CppUnit::TestFixture
{
CPPUNIT_TEST_SUITE( ParallelMPITest );
// CPPUNIT_TEST( testParallelEventProcessor );
CPPUNIT_TEST( testParallelEventProcessor );
CPPUNIT_TEST( testParallelEventProcessorAsync );
CPPUNIT_TEST( testParallelEventProcessorWithProducts );
CPPUNIT_TEST_SUITE_END();
public:
......@@ -21,6 +22,7 @@ class ParallelMPITest : public CppUnit::TestFixture
void testParallelEventProcessor();
void testParallelEventProcessorAsync();
void testParallelEventProcessorWithProducts();
};
#endif
......@@ -15,7 +15,7 @@ cp config.yaml $TEST_DIR/config.yaml
CFG_FILE=$TEST_DIR/config.yaml
sed -i -e "s|XXX|${TEST_DIR}/database|g" $CFG_FILE
hepnos_test_start_servers 2 2 60 $CFG_FILE $CON_FILE
hepnos_test_start_servers 2 2 ${timeout_sec} $CFG_FILE $CON_FILE
export HEPNOS_CONFIG_FILE=$CON_FILE
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment