Commit fe8457c9 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

enable setting a maximum batch size in WriteBatch

parent de312ac9
......@@ -40,8 +40,8 @@ class WriteBatch {
public:
WriteBatch(DataStore& ds);
WriteBatch(DataStore& ds, AsyncEngine& async);
WriteBatch(DataStore& ds, unsigned max_batch_size=128);
WriteBatch(DataStore& ds, AsyncEngine& async, unsigned max_batch_size=128);
~WriteBatch();
WriteBatch(const WriteBatch&) = delete;
WriteBatch& operator=(const WriteBatch&) = delete;
......
......@@ -9,11 +9,11 @@
namespace hepnos {
WriteBatch::WriteBatch(DataStore& datastore)
: m_impl(std::make_unique<WriteBatchImpl>(datastore.m_impl)) {}
WriteBatch::WriteBatch(DataStore& datastore, unsigned max_batch_size)
: m_impl(std::make_unique<WriteBatchImpl>(datastore.m_impl, max_batch_size)) {}
WriteBatch::WriteBatch(DataStore& datastore, AsyncEngine& async)
: m_impl(std::make_unique<WriteBatchImpl>(datastore.m_impl, async.m_impl)) {}
WriteBatch::WriteBatch(DataStore& datastore, AsyncEngine& async, unsigned max_batch_size)
: m_impl(std::make_unique<WriteBatchImpl>(datastore.m_impl, max_batch_size, async.m_impl)) {}
WriteBatch::~WriteBatch() {}
......
......@@ -32,28 +32,50 @@ class WriteBatchImpl {
std::shared_ptr<DataStoreImpl> m_datastore;
std::shared_ptr<AsyncEngineImpl> m_async_engine;
entries_type m_entries;
unsigned m_max_batch_size;
tl::condition_variable m_cond;
tl::mutex m_mutex;
std::vector<tl::managed<tl::thread>> m_async_thread;
bool m_async_thread_should_stop = false;
static void writer_thread(const sdskv::database* db,
static void writer_thread(unsigned max_batch_size,
const sdskv::database* db,
const std::vector<std::string>& keys,
const std::vector<std::string>& vals,
Exception* exception,
char* ok) {
*ok = 1;
try {
db->put_multi(keys, vals);
} catch(sdskv::exception& ex) {
if(ex.error() != SDSKV_ERR_KEYEXISTS) {
*ok = 0;
*exception = Exception(std::string("SDSKV error: ")+ex.what());
std::vector<const void*> batch_keys(max_batch_size);
std::vector<size_t> batch_keys_sizes(max_batch_size);
std::vector<const void*> batch_vals(max_batch_size);
std::vector<size_t> batch_vals_sizes(max_batch_size);
size_t remaining = keys.size();
while(remaining != 0) {
unsigned j = keys.size()-remaining;
unsigned this_batch_size = std::min<unsigned>(max_batch_size, remaining);
for(unsigned i=0; i < this_batch_size; i++, j++) {
batch_keys[i] = keys[j].data();
batch_keys_sizes[i] = keys[j].size();
batch_vals[i] = vals[j].data();
batch_vals_sizes[i] = vals[j].size();
}
batch_keys.resize(this_batch_size);
batch_keys_sizes.resize(this_batch_size);
batch_vals.resize(this_batch_size);
batch_vals_sizes.resize(this_batch_size);
try {
db->put_multi(batch_keys, batch_keys_sizes, batch_vals, batch_vals_sizes);
} catch(sdskv::exception& ex) {
if(ex.error() != SDSKV_ERR_KEYEXISTS) {
*ok = 0;
*exception = Exception(std::string("SDSKV error: ")+ex.what());
}
}
remaining -= this_batch_size;
}
}
static void spawn_writer_threads(entries_type& entries, tl::pool& pool) {
static void spawn_writer_threads(unsigned max_batch_size, entries_type& entries, tl::pool& pool) {
auto num_threads = entries.size();
std::vector<tl::managed<tl::thread>> threads;
std::vector<Exception> exceptions(num_threads);
......@@ -62,8 +84,8 @@ class WriteBatchImpl {
for(auto& e : entries) {
char* ok = &oks[i];
Exception* ex = &exceptions[i];
threads.push_back(pool.make_thread([&e, ok, ex]() {
writer_thread(e.first, e.second.m_keys, e.second.m_values, ex, ok);
threads.push_back(pool.make_thread([max_batch_size, &e, ok, ex]() {
writer_thread(max_batch_size, e.first, e.second.m_keys, e.second.m_values, ex, ok);
}));
i += 1;
}
......@@ -87,16 +109,18 @@ class WriteBatchImpl {
auto entries = std::move(batch.m_entries);
batch.m_entries.clear();
lock.unlock();
spawn_writer_threads(entries, batch.m_async_engine->m_pool);
spawn_writer_threads(batch.m_max_batch_size, entries, batch.m_async_engine->m_pool);
}
}
public:
WriteBatchImpl(const std::shared_ptr<DataStoreImpl>& ds,
unsigned max_batch_size,
const std::shared_ptr<AsyncEngineImpl>& async = nullptr)
: m_datastore(ds)
, m_async_engine(async) {
, m_async_engine(async)
, m_max_batch_size(max_batch_size) {
if(m_async_engine) {
m_async_thread.push_back(
m_async_engine->m_pool.make_thread([batch=this](){
......@@ -160,7 +184,7 @@ class WriteBatchImpl {
void flush() {
if(!m_async_engine) { // flush everything here
tl::xstream es = tl::xstream::self();
spawn_writer_threads(m_entries, es.get_main_pools(1)[0]);
spawn_writer_threads(m_max_batch_size, m_entries, es.get_main_pools(1)[0]);
} else { // wait for AsyncEngine to have flushed everything
{
std::lock_guard<tl::mutex> lock(m_mutex);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment