Commit 9882df6e authored by Matthieu Dorier's avatar Matthieu Dorier

reimplemented WriteBatch with sdskv's put_packed function

parent ef4435f2
......@@ -9,6 +9,7 @@
#include <mutex> // for std::lock_guard and std::unique_lock
#include <iostream>
#include <unordered_map>
#include <queue>
#include <string>
#include <vector>
#include <thallium.hpp>
......@@ -25,11 +26,19 @@ class WriteBatchImpl {
public:
struct keyvals {
std::vector<std::string> m_keys;
std::vector<std::string> m_values;
std::string m_packed_keys;
std::vector<hg_size_t> m_packed_key_sizes;
std::string m_packed_vals;
std::vector<hg_size_t> m_packed_val_sizes;
keyvals() = default;
keyvals(keyvals&&) = default;
keyvals(const keyvals&) = delete;
keyvals& operator=(keyvals&&) = default;
keyvals& operator=(const keyvals&) = delete;
};
typedef std::unordered_map<const sdskv::database*, keyvals> entries_type;
typedef std::unordered_map<const sdskv::database*, std::queue<keyvals>> entries_type;
std::unique_ptr<WriteBatchStatistics> m_stats;
mutable tl::mutex m_stats_mtx;
......@@ -55,46 +64,27 @@ class WriteBatchImpl {
m_stats->batch_sizes.updateWith(batch_size);
}
static void writer_thread(WriteBatchImpl& wb,
unsigned max_batch_size,
const sdskv::database* db,
const std::vector<std::string>& keys,
const std::vector<std::string>& vals,
static void writer_thread(const sdskv::database* db,
std::queue<keyvals>& kvs_queue,
Exception* exception,
char* ok) {
*ok = 1;
std::vector<const void*> batch_keys(max_batch_size);
std::vector<size_t> batch_keys_sizes(max_batch_size);
std::vector<const void*> batch_vals(max_batch_size);
std::vector<size_t> batch_vals_sizes(max_batch_size);
size_t remaining = keys.size();
while(remaining != 0) {
unsigned j = keys.size()-remaining;
unsigned this_batch_size = std::min<unsigned>(max_batch_size, remaining);
for(unsigned i=0; i < this_batch_size; i++, j++) {
batch_keys[i] = keys[j].data();
batch_keys_sizes[i] = keys[j].size();
batch_vals[i] = vals[j].data();
batch_vals_sizes[i] = vals[j].size();
}
batch_keys.resize(this_batch_size);
batch_keys_sizes.resize(this_batch_size);
batch_vals.resize(this_batch_size);
batch_vals_sizes.resize(this_batch_size);
while(!kvs_queue.empty()) {
auto& batch = kvs_queue.front();
try {
db->put_multi(batch_keys, batch_keys_sizes, batch_vals, batch_vals_sizes);
wb.update_operation_statistics(this_batch_size);
db->put_packed(batch.m_packed_keys, batch.m_packed_key_sizes,
batch.m_packed_vals, batch.m_packed_val_sizes);
} catch(sdskv::exception& ex) {
if(ex.error() != SDSKV_ERR_KEYEXISTS) {
*ok = 0;
*exception = Exception(std::string("SDSKV error: ")+ex.what());
}
}
remaining -= this_batch_size;
kvs_queue.pop();
}
}
static void spawn_writer_threads(WriteBatchImpl& wb, unsigned max_batch_size, entries_type& entries, tl::pool& pool) {
static void spawn_writer_threads(entries_type& entries, tl::pool& pool) {
auto num_threads = entries.size();
std::vector<tl::managed<tl::thread>> threads;
std::vector<Exception> exceptions(num_threads);
......@@ -103,8 +93,8 @@ class WriteBatchImpl {
for(auto& e : entries) {
char* ok = &oks[i];
Exception* ex = &exceptions[i];
threads.push_back(pool.make_thread([&wb, max_batch_size, &e, ok, ex]() {
writer_thread(wb, max_batch_size, e.first, e.second.m_keys, e.second.m_values, ex, ok);
threads.push_back(pool.make_thread([&e, ok, ex]() {
writer_thread(e.first, e.second, ex, ok);
}));
i += 1;
}
......@@ -128,7 +118,7 @@ class WriteBatchImpl {
auto entries = std::move(batch.m_entries);
batch.m_entries.clear();
lock.unlock();
spawn_writer_threads(batch, batch.m_max_batch_size, entries, batch.m_async_engine->m_pool);
spawn_writer_threads(entries, batch.m_async_engine->m_pool);
}
}
......@@ -162,9 +152,22 @@ class WriteBatchImpl {
{
std::lock_guard<tl::mutex> g(m_mutex);
was_empty = m_entries.empty();
keyvals& entry = m_entries[&db];
entry.m_keys.push_back(product_id.m_key);
entry.m_values.emplace_back(value, vsize);
// find the queue of batches
std::queue<keyvals>& entry_queue = m_entries[&db];
if(entry_queue.empty()
|| entry_queue.back().m_packed_key_sizes.size() >= m_max_batch_size) {
entry_queue.emplace();
}
auto& kv_batch = entry_queue.back();
kv_batch.m_packed_keys += product_id.m_key;
kv_batch.m_packed_key_sizes.push_back(product_id.m_key.size());
if(vsize != 0) {
size_t offset = kv_batch.m_packed_vals.size();
kv_batch.m_packed_vals.resize(offset + vsize);
std::memcpy(const_cast<char*>(kv_batch.m_packed_vals.data()) + offset, value, vsize);
}
kv_batch.m_packed_val_sizes.push_back(vsize);
update_keyval_statistics(product_id.m_key.size(), vsize);
}
if(was_empty) {
......@@ -197,9 +200,20 @@ class WriteBatchImpl {
{
std::lock_guard<tl::mutex> lock(m_mutex);
was_empty = m_entries.empty();
auto& entry = m_entries[&db];
entry.m_keys.push_back(std::string(reinterpret_cast<char*>(&id), sizeof(id)));
entry.m_values.emplace_back();
auto& kvs_queue = m_entries[&db];
if(kvs_queue.empty()
|| kvs_queue.back().m_packed_key_sizes.size() >= m_max_batch_size) {
kvs_queue.emplace();
}
auto& kv_batch = kvs_queue.back();
size_t offset = kv_batch.m_packed_keys.size();
kv_batch.m_packed_keys.resize(offset + sizeof(id));
std::memcpy(const_cast<char*>(kv_batch.m_packed_keys.data())+offset,
reinterpret_cast<char*>(&id), sizeof(id));
kv_batch.m_packed_key_sizes.push_back(sizeof(id));
kv_batch.m_packed_val_sizes.push_back(0);
update_keyval_statistics(sizeof(id), 0);
}
if(was_empty) {
......@@ -211,7 +225,7 @@ class WriteBatchImpl {
void flush() {
if(!m_async_engine) { // flush everything here
tl::xstream es = tl::xstream::self();
spawn_writer_threads(*this, m_max_batch_size, m_entries, es.get_main_pools(1)[0]);
spawn_writer_threads(m_entries, es.get_main_pools(1)[0]);
} else { // wait for AsyncEngine to have flushed everything
{
std::lock_guard<tl::mutex> lock(m_mutex);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment