Commit a2d524e2 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

product loading now working in PEP

parent 83f99920
...@@ -26,6 +26,7 @@ class ProductID { ...@@ -26,6 +26,7 @@ class ProductID {
friend class AsyncPrefetcherImpl; friend class AsyncPrefetcherImpl;
friend class SyncPrefetcherImpl; friend class SyncPrefetcherImpl;
friend struct ProductCacheImpl; friend struct ProductCacheImpl;
friend class ParallelEventProcessorImpl;
friend class boost::serialization::access; friend class boost::serialization::access;
public: public:
......
...@@ -256,13 +256,22 @@ class DataStoreImpl { ...@@ -256,13 +256,22 @@ class DataStoreImpl {
return result; return result;
} }
const sdskv::database& locateProductDb(const ProductID& productID) const { long unsigned computeProductDbIndex(const ProductID& productID) const {
// hash the name to get the provider id // hash the name to get the provider id
long unsigned db_idx = 0; long unsigned db_idx = 0;
uint64_t hash; uint64_t hash;
hash = hashString(productID.m_key.c_str(), sizeof(ItemDescriptor)); // we are taking only the dataset+run+subrun part of the productID
hash = hashString(productID.m_key.c_str(), SubRunDescriptorLength);
ch_placement_find_closest(m_product_dbs.chi, hash, 1, &db_idx); ch_placement_find_closest(m_product_dbs.chi, hash, 1, &db_idx);
return m_product_dbs.dbs[db_idx]; return db_idx;
}
const sdskv::database& locateProductDb(const ProductID& productID) const {
return m_product_dbs.dbs[computeProductDbIndex(productID)];
}
const sdskv::database& getProductDatabase(unsigned long index) const {
return m_product_dbs.dbs[index];
} }
bool loadRawProduct(const ProductID& key, bool loadRawProduct(const ProductID& key,
...@@ -343,7 +352,7 @@ class DataStoreImpl { ...@@ -343,7 +352,7 @@ class DataStoreImpl {
auto key = buildProductID(id, productName); auto key = buildProductID(id, productName);
// find out which DB to access // find out which DB to access
auto& db = locateProductDb(key); auto& db = locateProductDb(key);
// read the value // store the value
try { try {
db.put(key.m_key, data); db.put(key.m_key, data);
} catch(sdskv::exception& ex) { } catch(sdskv::exception& ex) {
......
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
#ifndef __HEPNOS_PARALLEL_EVENT_PROCESSOR_IMPL_HPP #ifndef __HEPNOS_PARALLEL_EVENT_PROCESSOR_IMPL_HPP
#define __HEPNOS_PARALLEL_EVENT_PROCESSOR_IMPL_HPP #define __HEPNOS_PARALLEL_EVENT_PROCESSOR_IMPL_HPP
#include <numeric>
#include <queue> #include <queue>
#include <thallium.hpp> #include <thallium.hpp>
#include "PrefetcherImpl.hpp" #include "PrefetcherImpl.hpp"
...@@ -240,32 +241,132 @@ struct ParallelEventProcessorImpl : public tl::provider<ParallelEventProcessorIm ...@@ -240,32 +241,132 @@ struct ParallelEventProcessorImpl : public tl::provider<ParallelEventProcessorIm
return false; return false;
} }
void preloadProductsFor(const ItemDescriptor& descriptor, ProductCache& cache) { void preloadProductsForDescriptors(const std::vector<EventDescriptor>& descriptors,
for(auto& product_key : m_product_keys) { ProductCache& cache) {
auto product_id = DataStoreImpl::buildProductID(descriptor, product_key); if(m_product_keys.size() == 0) return;
std::string data;
bool ok = m_datastore->loadRawProduct(product_id, data); size_t pks = 0;
if(ok) { for(const auto& product_key : m_product_keys)
cache.m_impl->addRawProduct(descriptor, product_key, std::move(data)); pks += product_key.size() + sizeof(EventDescriptor);
// buffer for packed product ids
std::string packed_product_ids;
std::vector<ProductID> product_ids;
packed_product_ids.reserve(descriptors.size() * pks);
product_ids.reserve(descriptors.size());
// size of each product id
std::vector<hg_size_t> packed_product_id_sizes;
packed_product_id_sizes.reserve(descriptors.size());
// size of each value
std::vector<hg_size_t> packed_value_sizes(descriptors.size(), 0);
// build the list of packed product ids, product id sizes
size_t offset = 0;
hg_size_t count = 0;
long current_db_idx = -1;
for(const auto& descriptor : descriptors) {
// build a fake product id to get the db_index
auto fake_product_id = DataStoreImpl::buildProductID(descriptor, "");
auto db_idx = m_datastore->computeProductDbIndex(fake_product_id);
// if the db_idx changed, we need to flush the current batch
if(current_db_idx != -1 && current_db_idx != (long)db_idx) {
// get size of batch of products
packed_value_sizes.resize(count);
auto& db = m_datastore->getProductDatabase(current_db_idx);
bool b = db.length_packed(count, packed_product_ids.data(),
packed_product_id_sizes.data(),
packed_value_sizes.data());
// allocate a buffer of appropriate size for packed values
size_t buffer_size = std::accumulate(packed_value_sizes.begin(),
packed_value_sizes.end(), 0);
std::vector<char> value_buffer(buffer_size);
// get the actual values
hg_size_t actual_count = count;
db.get_packed(&actual_count, packed_product_ids.data(),
packed_product_id_sizes.data(),
buffer_size, value_buffer.data(),
packed_value_sizes.data());
if(actual_count != count)
throw Exception("get_packed failed to get correct count of product values");
// place data into cache
offset = 0;
for(unsigned i = 0; i < count; i++) {
std::string data(value_buffer.data() + offset, packed_value_sizes[i]);
cache.m_impl->addRawProduct(product_ids[i], std::move(data));
offset += packed_value_sizes[i];
}
// reset buffers and variables
offset = 0;
count = 0;
product_ids.resize(0);
packed_product_ids.resize(0);
packed_product_id_sizes.resize(0);
product_ids.reserve(descriptors.size());
packed_product_ids.reserve(descriptors.size() & pks);
packed_product_id_sizes.reserve(descriptors.size());
}
current_db_idx = db_idx;
// go through all actual product keys
for(const auto& product_key : m_product_keys) {
auto product_id = DataStoreImpl::buildProductID(descriptor, product_key);
product_ids.push_back(product_id);
auto key_size = product_id.m_key.size();
packed_product_ids.resize(offset + key_size);
std::memcpy(const_cast<char*>(packed_product_ids.data() + offset),
product_id.m_key.data(),
key_size);
offset += key_size;
packed_product_id_sizes.push_back(key_size);
count += 1;
}
}
if(current_db_idx != -1) {
// get size of batch of products
packed_value_sizes.resize(count);
auto& db = m_datastore->getProductDatabase(current_db_idx);
bool b = db.length_packed(count, packed_product_ids.data(),
packed_product_id_sizes.data(),
packed_value_sizes.data());
// allocate a buffer of appropriate size for packed values
size_t buffer_size = std::accumulate(packed_value_sizes.begin(),
packed_value_sizes.end(), 0);
std::vector<char> value_buffer(buffer_size);
// get the actual values
hg_size_t actual_count = count;
db.get_packed(&actual_count, packed_product_ids.data(),
packed_product_id_sizes.data(),
buffer_size, value_buffer.data(),
packed_value_sizes.data());
if(actual_count != count)
throw Exception("get_packed failed to get correct count of product values");
// place data into cache
offset = 0;
for(unsigned i = 0; i < count; i++) {
std::string data(value_buffer.data() + offset, packed_value_sizes[i]);
cache.m_impl->addRawProduct(product_ids[i], std::move(data));
offset += packed_value_sizes[i];
} }
} }
} }
void processSingleEvent(const EventDescriptor& d, const ParallelEventProcessor::EventProcessingWithCacheFn& user_function) { void processSingleEvent(const EventDescriptor& d,
ProductCache cache; const ParallelEventProcessor::EventProcessingWithCacheFn& user_function,
ProductCache& cache) {
double t1, t2, t3; double t1, t2, t3;
t1 = tl::timer::wtime(); t1 = tl::timer::wtime();
Event event = Event::fromDescriptor(DataStore(m_datastore), d, false); Event event = Event::fromDescriptor(DataStore(m_datastore), d, false);
preloadProductsFor(d, cache);
t2 = tl::timer::wtime();
user_function(event, cache); user_function(event, cache);
t3 = tl::timer::wtime(); t2 = tl::timer::wtime();
if(m_stats) { if(m_stats) {
std::lock_guard<tl::mutex> lock(m_stats_mtx); std::lock_guard<tl::mutex> lock(m_stats_mtx);
m_stats->product_loading_time_stats.updateWith(t2-t1); m_stats->processing_time_stats.updateWith(t2-t1);
m_stats->acc_product_loading_time += t2-t1; m_stats->acc_event_processing_time += t2-t1;
m_stats->processing_time_stats.updateWith(t3-t2);
m_stats->acc_event_processing_time += t3-t2;
} }
if(m_async) { if(m_async) {
{ {
...@@ -284,7 +385,9 @@ struct ParallelEventProcessorImpl : public tl::provider<ParallelEventProcessorIm ...@@ -284,7 +385,9 @@ struct ParallelEventProcessorImpl : public tl::provider<ParallelEventProcessorIm
double t_start = tl::timer::wtime(); double t_start = tl::timer::wtime();
std::vector<EventDescriptor> descriptors; std::vector<EventDescriptor> descriptors;
auto max_ults = m_async ? m_async->m_xstreams.size()*2 : 0; auto max_ults = m_async ? m_async->m_xstreams.size()*2 : 0;
ProductCache cache;
while(requestEvents(descriptors)) { while(requestEvents(descriptors)) {
preloadProductsForDescriptors(descriptors, cache);
for(auto& d : descriptors) { for(auto& d : descriptors) {
if(m_async) { if(m_async) {
{ // don't submit more ULTs than twice the number of ES { // don't submit more ULTs than twice the number of ES
...@@ -294,11 +397,11 @@ struct ParallelEventProcessorImpl : public tl::provider<ParallelEventProcessorIm ...@@ -294,11 +397,11 @@ struct ParallelEventProcessorImpl : public tl::provider<ParallelEventProcessorIm
} }
m_num_processing_ults += 1; m_num_processing_ults += 1;
} }
m_async->m_pool.make_thread([this, d, &user_function]() { m_async->m_pool.make_thread([this, d, &cache, &user_function]() {
processSingleEvent(d, user_function); processSingleEvent(d, user_function, cache);
}, tl::anonymous()); }, tl::anonymous());
} else { } else {
processSingleEvent(d, user_function); processSingleEvent(d, user_function, cache);
} }
} }
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment