Commit 9592387a authored by Matthieu Dorier's avatar Matthieu Dorier

added product cache

parent eb69bc6f
......@@ -17,6 +17,7 @@
#include <hepnos/KeyValueContainer.hpp>
#include <hepnos/ParallelEventProcessor.hpp>
#include <hepnos/Prefetcher.hpp>
#include <hepnos/ProductCache.hpp>
#include <hepnos/Run.hpp>
#include <hepnos/RunNumber.hpp>
#include <hepnos/RunSet.hpp>
......
......@@ -12,6 +12,7 @@
#include <hepnos/AsyncEngine.hpp>
#include <hepnos/Statistics.hpp>
#include <hepnos/DataStore.hpp>
#include <hepnos/ProductCache.hpp>
namespace hepnos {
......@@ -45,6 +46,7 @@ class ParallelEventProcessor {
public:
typedef std::function<void(const Event&)> EventProcessingFn;
typedef std::function<void(const Event&, const ProductCache& cache)> EventProcessingWithCacheFn;
/**
* @brief Constructor. Builds a ParallelEventProcessor to navigate a dataset.
......@@ -100,9 +102,19 @@ class ParallelEventProcessor {
* @param stats Pointer to a statistics object to fill
*/
void process(const DataSet& dataset,
const EventProcessingFn& function,
const EventProcessingWithCacheFn& function,
ParallelEventProcessorStatistics* stats = nullptr);
void process(const DataSet& dataset,
const EventProcessingFn& function,
ParallelEventProcessorStatistics* stats = nullptr) {
process(dataset,
[&function](const Event& ev, const ProductCache&) {
function(ev);
},
stats);
}
private:
void preloadImpl(const std::string& productKey);
......
/*
* (C) 2020 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __HEPNOS_PRODUCT_CACHE_HPP
#define __HEPNOS_PRODUCT_CACHE_HPP
#include <memory>
namespace hepnos {
struct ProductCacheImpl;
struct ParallelEventProcessorImpl;
struct SyncPrefetcherImpl;
struct AsyncPrefetcherImpl;
class ProductCache {
friend struct ParallelEventProcessorImpl;
friend struct SyncPrefetcherImpl;
friend struct AsyncPrefetcherImpl;
std::shared_ptr<ProductCacheImpl> m_impl;
public:
ProductCache();
~ProductCache();
ProductCache(const ProductCache&);
ProductCache(ProductCache&&);
ProductCache& operator=(const ProductCache&);
ProductCache& operator=(ProductCache&&);
void clear();
size_t size() const;
};
}
#endif
......@@ -25,6 +25,7 @@ class ProductID {
friend class WriteBatchImpl;
friend class AsyncPrefetcherImpl;
friend class SyncPrefetcherImpl;
friend struct ProductCacheImpl;
friend class boost::serialization::access;
public:
......
......@@ -42,7 +42,7 @@ class AsyncPrefetcherImpl : public PrefetcherImpl {
{
std::unique_lock<tl::mutex> lock(m_product_cache_mtx);
if(ok) {
m_product_cache[product_id.m_key] = std::move(data);
m_product_cache.m_impl->addRawProduct(product_id, std::move(data));
}
m_products_loading.erase(product_id.m_key);
}
......@@ -193,12 +193,9 @@ class AsyncPrefetcherImpl : public PrefetcherImpl {
std::string& data) const override {
auto product_id = DataStoreImpl::buildProductID(id, productName);
std::unique_lock<tl::mutex> lock(m_product_cache_mtx);
auto it = m_product_cache.find(product_id.m_key);
if(it != m_product_cache.end()) {
if(m_product_cache.m_impl->loadRawProduct(product_id, data)) {
// product found right away
auto& product = it->second;
data = std::move(product);
m_product_cache.erase(it);
m_product_cache.m_impl->removeRawProduct(product_id);
} else {
// product not found, check if prefetching is pending
if(m_products_loading.count(product_id.m_key) == 0) {
......@@ -211,15 +208,14 @@ class AsyncPrefetcherImpl : public PrefetcherImpl {
m_product_cache_cv.wait(lock, [this, &product_id]() {
return m_products_loading.count(product_id.m_key) == 0; });
// check again if the product is available
it = m_product_cache.find(product_id.m_key);
if(it == m_product_cache.end()) {
// product is not available
if(m_product_cache.m_impl->loadRawProduct(product_id, data)) {
// product found
m_product_cache.m_impl->removeRawProduct(product_id);
return true;
} else{
// product not found
return false;
}
// product is available
auto& product = it->second;
data = std::move(product);
m_product_cache.erase(it);
}
}
return true;
......@@ -230,13 +226,12 @@ class AsyncPrefetcherImpl : public PrefetcherImpl {
char* value, size_t* vsize) const override {
auto product_id = DataStoreImpl::buildProductID(id, productName);
std::unique_lock<tl::mutex> lock(m_product_cache_mtx);
auto it = m_product_cache.find(product_id.m_key);
if(it != m_product_cache.end()) {
std::string data;
if(m_product_cache.m_impl->loadRawProduct(product_id, data)) {
// product found right away
auto& product = it->second;
*vsize = product.size();
std::memcpy(value, product.data(), *vsize);
m_product_cache.erase(it);
m_product_cache.m_impl->removeRawProduct(product_id);
*vsize = data.size();
std::memcpy(value, data.data(), *vsize);
} else {
// product not found, check if prefetching is pending
if(m_products_loading.count(product_id.m_key) == 0) {
......@@ -249,16 +244,16 @@ class AsyncPrefetcherImpl : public PrefetcherImpl {
m_product_cache_cv.wait(lock, [this, &product_id]() {
return m_products_loading.count(product_id.m_key) == 0; });
// check again if the product is available
it = m_product_cache.find(product_id.m_key);
if(it == m_product_cache.end()) {
// product is not available
if(m_product_cache.m_impl->loadRawProduct(product_id, data)) {
// product found
m_product_cache.m_impl->removeRawProduct(product_id);
*vsize = data.size();
std::memcpy(value, data.data(), *vsize);
return true;
} else{
// product not found
return false;
}
// product is available
auto& product = it->second;
*vsize = product.size();
std::memcpy(value, product.data(), *vsize);
m_product_cache.erase(it);
}
}
return true;
......
......@@ -9,6 +9,7 @@ set(hepnos-src DataStore.cpp
WriteBatch.cpp
ParallelEventProcessor.cpp
Prefetcher.cpp
ProductCache.cpp
AsyncEngine.cpp
EventSet.cpp)
......
#ifndef __HEPNOS_ITEM_DESCRIPTOR_HPP
#define __HEPNOS_ITEM_DESCRIPTOR_HPP
#include "hepnos/Event.hpp"
#include "hepnos/SubRun.hpp"
#include "hepnos/Run.hpp"
#include "hepnos/RunNumber.hpp"
#include "hepnos/SubRunNumber.hpp"
#include "hepnos/EventNumber.hpp"
......@@ -21,6 +24,21 @@ struct ItemDescriptor {
std::memset(dataset.data, '\0', sizeof(dataset.data));
}
ItemDescriptor(const EventDescriptor& ed) {
std::memcpy(this, &ed, EventDescriptorLength);
}
ItemDescriptor(const SubRunDescriptor& srd) {
std::memcpy(this, &srd, SubRunDescriptorLength);
event = InvalidEventNumber;
}
ItemDescriptor(const RunDescriptor& rd) {
std::memcpy(this, &rd, RunDescriptorLength);
event = InvalidEventNumber;
subrun = InvalidSubRunNumber;
}
ItemDescriptor(const UUID& ds,
const RunNumber& rn = InvalidRunNumber,
const SubRunNumber& srn = InvalidSubRunNumber,
......
......@@ -60,7 +60,7 @@ ParallelEventProcessor::~ParallelEventProcessor() {
void ParallelEventProcessor::process(
const DataSet& dataset,
const EventProcessingFn& function,
const EventProcessingWithCacheFn& function,
ParallelEventProcessorStatistics* stats) {
std::vector<EventSet> ev_sets;
for(auto t : m_impl->m_targets) {
......@@ -70,7 +70,7 @@ void ParallelEventProcessor::process(
}
void ParallelEventProcessor::preloadImpl(const std::string& productKey) {
m_impl->m_product_keys.insert(productKey);
}
}
......@@ -9,6 +9,7 @@
#include <queue>
#include <thallium.hpp>
#include "PrefetcherImpl.hpp"
#include "ProductCacheImpl.hpp"
#include "hepnos/EventSet.hpp"
#include "hepnos/ParallelEventProcessor.hpp"
......@@ -18,19 +19,20 @@ namespace tl = thallium;
struct ParallelEventProcessorImpl {
std::shared_ptr<DataStoreImpl> m_datastore;
MPI_Comm m_comm;
ParallelEventProcessorOptions m_options;
std::vector<int> m_loader_ranks;
std::vector<int> m_targets;
std::shared_ptr<DataStoreImpl> m_datastore;
MPI_Comm m_comm;
ParallelEventProcessorOptions m_options;
std::vector<int> m_loader_ranks;
std::vector<int> m_targets;
std::unordered_set<std::string> m_product_keys;
bool m_loader_running = false;
std::queue<EventDescriptor> m_event_queue;
tl::mutex m_event_queue_mtx;
tl::condition_variable m_event_queue_cv;
bool m_loader_running = false;
std::queue<EventDescriptor> m_event_queue;
tl::mutex m_event_queue_mtx;
tl::condition_variable m_event_queue_cv;
int m_num_active_consumers;
tl::managed<tl::xstream> m_mpi_xstream;
int m_num_active_consumers;
tl::managed<tl::xstream> m_mpi_xstream;
ParallelEventProcessorStatistics* m_stats = nullptr;
......@@ -51,7 +53,7 @@ struct ParallelEventProcessorImpl {
* Main function to start processing events in parallel.
*/
void process(const std::vector<EventSet>& evsets,
const ParallelEventProcessor::EventProcessingFn& function,
const ParallelEventProcessor::EventProcessingWithCacheFn& function,
ParallelEventProcessorStatistics* stats) {
m_stats = stats;
startLoadingEventsFromTargets(evsets);
......@@ -154,6 +156,12 @@ struct ParallelEventProcessorImpl {
}
}
/**
* This function tries to fill out the provided vector with a batch of
* Event descriptors taken locally or from loader processes.
* It returns true if new EventDescriptors were put in the vector,
* false otherwise.
*/
bool requestEvents(std::vector<EventDescriptor>& descriptors) {
int my_rank;
MPI_Comm_rank(m_comm, &my_rank);
......@@ -199,18 +207,35 @@ struct ParallelEventProcessorImpl {
return false;
}
void processEvents(const ParallelEventProcessor::EventProcessingFn& user_function) {
void preloadProductsFor(const ItemDescriptor& descriptor, ProductCache& cache) {
for(auto& product_key : m_product_keys) {
auto product_id = DataStoreImpl::buildProductID(descriptor, product_key);
std::string data;
bool ok = m_datastore->loadRawProduct(product_id, data);
if(ok) {
cache.m_impl->addRawProduct(descriptor, product_key, std::move(data));
}
}
}
/**
* This function keeps requesting new events and call the user-provided callback.
*/
void processEvents(const ParallelEventProcessor::EventProcessingWithCacheFn& user_function) {
if(m_stats) *m_stats = ParallelEventProcessorStatistics();
double t_start = tl::timer::wtime();
std::vector<EventDescriptor> descriptors;
double t1;
double t2 = tl::timer::wtime();
ProductCache cache;
while(requestEvents(descriptors)) {
for(auto& d : descriptors) {
cache.clear();
Event event = Event::fromDescriptor(DataStore(m_datastore), d, false);
preloadProductsFor(d, cache);
t1 = tl::timer::wtime();
if(m_stats) m_stats->waiting_time_stats.updateWith(t1-t2);
user_function(event);
user_function(event, cache);
t2 = tl::timer::wtime();
if(m_stats) {
m_stats->processing_time_stats.updateWith(t2 - t1);
......
......@@ -5,6 +5,7 @@
#include <unordered_set>
#include <unordered_map>
#include "hepnos/Prefetcher.hpp"
#include "hepnos/ProductCache.hpp"
#include "DataStoreImpl.hpp"
#include "AsyncEngineImpl.hpp"
......@@ -30,7 +31,7 @@ class PrefetcherImpl {
bool m_associated = false;
std::vector<std::string> m_active_product_keys;
mutable std::set<std::shared_ptr<ItemImpl>, ItemPtrComparator> m_item_cache;
mutable std::unordered_map<std::string, std::string> m_product_cache;
mutable ProductCache m_product_cache;
PrefetcherImpl(const std::shared_ptr<DataStoreImpl>& ds)
: m_datastore(ds) {}
......
/*
* (C) 2020 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include "hepnos/ProductCache.hpp"
#include "DataStoreImpl.hpp"
#include "ProductCacheImpl.hpp"
#include <thallium.hpp>
namespace hepnos {
namespace tl = thallium;
ProductCache::ProductCache()
: m_impl(std::make_shared<ProductCacheImpl>()) {}
ProductCache::~ProductCache() = default;
ProductCache::ProductCache(const ProductCache&) = default;
ProductCache::ProductCache(ProductCache&&) = default;
ProductCache& ProductCache::operator=(const ProductCache&) = default;
ProductCache& ProductCache::operator=(ProductCache&&) = default;
void ProductCache::clear() {
auto& impl = *m_impl;
impl.m_lock.wrlock();
impl.m_map.clear();
impl.m_lock.unlock();
}
size_t ProductCache::size() const {
auto& impl = *m_impl;
impl.m_lock.rdlock();
size_t s = impl.m_map.size();
impl.m_lock.unlock();
return s;
}
}
/*
* (C) 2020 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include "hepnos/ProductCache.hpp"
#include "ItemDescriptor.hpp"
#include "DataStoreImpl.hpp"
#include <unordered_map>
#include <thallium.hpp>
namespace hepnos {
namespace tl = thallium;
struct ProductCacheImpl {
mutable tl::rwlock m_lock;
std::unordered_map<std::string, std::string> m_map;
bool loadRawProduct(const ProductID& product_id, std::string& data) const {
m_lock.rdlock();
auto it = m_map.find(product_id.m_key);
auto found = it != m_map.end();
if(found) {
data = it->second;
}
m_lock.unlock();
return found;
}
bool loadRawProduct(const ItemDescriptor& id,
const std::string& productName,
std::string& data) const {
auto product_id = DataStoreImpl::buildProductID(id, productName);
return loadRawProduct(product_id, data);
}
bool hasProduct(const ProductID& product_id) const {
m_lock.rdlock();
auto it = m_map.find(product_id.m_key);
auto found = it != m_map.end();
m_lock.unlock();
return found;
}
bool hasProduct(const ItemDescriptor& id,
const std::string& productName) const {
auto product_id = DataStoreImpl::buildProductID(id, productName);
return hasProduct(product_id);
}
void addRawProduct(const ProductID& product_id,
const std::string& data) {
m_lock.wrlock();
m_map[product_id.m_key] = data;
m_lock.unlock();
}
void addRawProduct(const ItemDescriptor& id,
const std::string& productName,
const std::string& data) {
auto product_id = DataStoreImpl::buildProductID(id, productName);
addRawProduct(product_id, data);
}
void addRawProduct(const ProductID& product_id,
std::string&& data) {
m_lock.wrlock();
m_map[product_id.m_key] = std::move(data);
m_lock.unlock();
}
void addRawProduct(const ItemDescriptor& id,
const std::string& productName,
std::string&& data) {
auto product_id = DataStoreImpl::buildProductID(id, productName);
addRawProduct(product_id, std::move(data));
}
void removeRawProduct(const ProductID& product_id) {
m_lock.wrlock();
m_map.erase(product_id.m_key);
m_lock.unlock();
}
void removeRawProduct(const ItemDescriptor& id,
const std::string& productName) {
auto product_id = DataStoreImpl::buildProductID(id, productName);
removeRawProduct(product_id);
}
};
}
......@@ -9,6 +9,7 @@
#include <set>
#include <unordered_set>
#include <unordered_map>
#include "ProductCacheImpl.hpp"
#include "DataStoreImpl.hpp"
#include "AsyncEngineImpl.hpp"
#include "PrefetcherImpl.hpp"
......@@ -25,15 +26,14 @@ class SyncPrefetcherImpl : public PrefetcherImpl {
void fetchRequestedProducts(const std::shared_ptr<ItemImpl>& itemImpl) const override {
auto& descriptor = itemImpl->m_descriptor;
for(auto& key : m_active_product_keys) {
auto product_id = DataStoreImpl::buildProductID(descriptor, key);
auto it = m_product_cache.find(product_id.m_key);
if(it != m_product_cache.end())
if(m_product_cache.m_impl->hasProduct(descriptor, key))
continue;
auto product_id = DataStoreImpl::buildProductID(descriptor, key);
std::string data;
bool ok = m_datastore->loadRawProduct(product_id, data);
if(ok) {
update_product_statistics(data.size());
m_product_cache[product_id.m_key] = std::move(data);
m_product_cache.m_impl->addRawProduct(descriptor, key, std::move(data));
}
}
}
......@@ -91,15 +91,13 @@ class SyncPrefetcherImpl : public PrefetcherImpl {
const std::string& productName,
std::string& data) const override {
auto product_id = DataStoreImpl::buildProductID(id, productName);
auto it = m_product_cache.find(product_id.m_key);
if(it == m_product_cache.end()) {
if(m_stats) m_stats->product_cache_miss += 1;
return m_datastore->loadRawProduct(product_id, data);
} else {
if(m_product_cache.m_impl->loadRawProduct(product_id, data)) {
if(m_stats) m_stats->product_cache_hit += 1;
data = std::move(it->second);
m_product_cache.erase(it);
m_product_cache.m_impl->removeRawProduct(product_id);
return true;
} else {
if(m_stats) m_stats->product_cache_miss += 1;
return m_datastore->loadRawProduct(product_id, data);
}
}
......@@ -107,15 +105,16 @@ class SyncPrefetcherImpl : public PrefetcherImpl {
const std::string& productName,
char* value, size_t* vsize) const override {
auto product_id = DataStoreImpl::buildProductID(id, productName);
auto it = m_product_cache.find(product_id.m_key);
if(it == m_product_cache.end()) {
if(m_stats) m_stats->product_cache_miss += 1;
return m_datastore->loadRawProduct(id, productName, value, vsize);
} else {
*vsize = it->second.size();
std::string data;
if(m_product_cache.m_impl->loadRawProduct(product_id, data)) {
if(m_stats) m_stats->product_cache_hit += 1;
std::memcpy(value, it->second.data(), *vsize);
*vsize = data.size();
if(*vsize) std::memcpy(value, data.data(), *vsize);
m_product_cache.m_impl->removeRawProduct(product_id);
return true;
} else {
if(m_stats) m_stats->product_cache_miss += 1;
return m_datastore->loadRawProduct(id, productName, value, vsize);
}
}
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment