Commit 72057aaa authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

added reads and small regions management

parent 0e7599dc
......@@ -63,7 +63,8 @@ src_server_libmobject_server_la_SOURCES = \
src/server/core/core-write-op.cpp \
src/server/core/core-read-op.cpp \
src/server/printer/print-write-op.c \
src/server/printer/print-read-op.c
src/server/printer/print-read-op.c \
src/server/core/fake-kv.cpp
src_server_libmobject_server_la_CPPFLAGS = ${AM_CPPFLAGS} ${SERVER_CPPFLAGS}
src_server_libmobject_server_la_CFLAGS = ${AM_CFLAGS} ${SERVER_CFLAGS}
src_server_libmobject_server_la_LIBADD = src/omap-iter/libomap-iter.la \
......
......@@ -64,6 +64,8 @@ void mobject_store_read_op_read(mobject_store_read_op_t read_op,
DL_APPEND(read_op->actions, base);
read_op->num_actions += 1;
memset(buffer, 0, len);
}
void mobject_store_read_op_omap_get_keys(mobject_store_read_op_t read_op,
......
#include <map>
#include <string>
#include <iostream>
#include <algorithm>
#include <list>
#include <bake-bulk-client.h>
#include "src/server/core/core-read-op.h"
#include "src/server/visitor-args.h"
#include "src/io-chain/read-op-visitor.h"
#include "src/io-chain/read-resp-impl.h"
#include "src/omap-iter/omap-iter-impl.h"
#include "src/server/core/fake-kv.hpp"
#include "src/server/core/covermap.hpp"
static void read_op_exec_begin(void*);
static void read_op_exec_stat(void*, uint64_t*, time_t*, int*);
......@@ -15,6 +20,16 @@ static void read_op_exec_omap_get_vals(void*, const char*, const char*, uint64_t
static void read_op_exec_omap_get_vals_by_keys(void*, char const* const*, size_t, mobject_store_omap_iter_t*, int*);
static void read_op_exec_end(void*);
struct read_request_t {
double timestamp; // timestamp at which the segment was created
uint64_t absolute_start_index; // start index within the object
uint64_t absolute_end_index; // end index within the object
uint64_t region_start_index; // where to start within the region
uint64_t region_end_index; // where to end within the region
uint64_t client_offset; // offset within the client's buffer
bake_bulk_region_id_t region; // region id
};
static struct read_op_visitor read_op_exec = {
.visit_begin = read_op_exec_begin,
.visit_stat = read_op_exec_stat,
......@@ -43,25 +58,167 @@ void read_op_exec_stat(void* u, uint64_t* psize, time_t* pmtime, int* prval)
void read_op_exec_read(void* u, uint64_t offset, size_t len, buffer_u buf, size_t* bytes_read, int* prval)
{
auto vargs = static_cast<server_visitor_args_t>(u);
bake_target_id_t bti = vargs->srv_ctx->bake_id;
bake_bulk_region_id_t rid;
hg_bulk_t remote_bulk = vargs->bulk_handle;
const char* object_name = vargs->object_name;
const char* remote_addr_str = vargs->client_addr_str;
hg_addr_t remote_addr = vargs->client_addr;
int ret;
uint64_t client_start_index = offset;
uint64_t client_end_index = offset+len;
*prval = 0;
// find oid
if(name_map.count(object_name) == 0) {
*prval = -1;
return;
}
oid_t oid = name_map[object_name];
segment_key_t lb;
lb.oid = oid;
lb.timestamp = std::numeric_limits<double>::max();
auto it = segment_map.lower_bound(lb);
covermap<uint64_t> coverage(offset, offset+len);
while(!coverage.full() && it != segment_map.end() && it->first.oid == oid) {
const segment_key_t& seg = it->first;
const bake_bulk_region_id_t& region = it->second;
switch(seg.type) {
case seg_type_t::ZERO:
coverage.set(seg.start_index, seg.end_index);
if(*bytes_read < seg.end_index) *bytes_read = seg.end_index;
break;
case seg_type_t::TOMBSTONE:
coverage.set(seg.start_index, seg.end_index);
if(*bytes_read < seg.start_index) *bytes_read = seg.start_index;
break;
case seg_type_t::BAKE_REGION: {
auto ranges = coverage.set(seg.start_index, seg.end_index);
for(auto r : ranges) {
uint64_t segment_size = r.end - r.start;
uint64_t region_offset = r.start - seg.start_index;
uint64_t remote_offset = r.start - offset;
bake_bulk_proxy_read(bti, region, region_offset, remote_bulk,
remote_offset, remote_addr_str, segment_size);
if(*bytes_read < r.end) *bytes_read = r.end;
}
break;
}
case seg_type_t::SMALL_REGION: {
auto ranges = coverage.set(seg.start_index, seg.end_index);
const char* base = static_cast<const char*>((void*)(&region));
margo_instance_id mid = vargs->srv_ctx->mid;
for(auto r : ranges) {
uint64_t segment_size = r.end - r.start;
uint64_t region_offset = r.start - seg.start_index;
uint64_t remote_offset = r.start - offset;
void* buf_ptrs[1] = { const_cast<char*>(base + region_offset) };
hg_size_t buf_sizes[1] = { segment_size };
hg_bulk_t handle;
std::cout << "Reading from a small region" << std::endl;
ret = margo_bulk_create(mid,1, buf_ptrs, buf_sizes, HG_BULK_READ_ONLY, &handle);
ret = margo_bulk_transfer(mid, HG_BULK_PUSH, remote_addr, remote_bulk, buf.as_offset+remote_offset, handle, 0, segment_size);
ret = margo_bulk_free(handle);
if(*bytes_read < r.end) *bytes_read = r.end;
}
}
}
it++;
}
}
void read_op_exec_omap_get_keys(void* u, const char* start_after, uint64_t max_return,
mobject_store_omap_iter_t* iter, int* prval)
{
auto vargs = static_cast<server_visitor_args_t>(u);
// omap_iter_create(iter);
const char* object_name = vargs->object_name;
*prval = 0;
// find oid
if(name_map.count(object_name) == 0) {
*prval = -1;
return;
}
oid_t oid = name_map[object_name];
omap_iter_create(iter);
omap_key_t lb;
lb.oid = oid;
lb.key = std::string(start_after);
auto it = omap_map.lower_bound(lb);
if(it == omap_map.end()) return;
if(it->first.key == lb.key) it++;
for(uint64_t i=0; it != omap_map.end() && i < max_return; it++, i++) {
omap_iter_append(*iter, it->first.key.c_str(), nullptr, 0);
}
}
void read_op_exec_omap_get_vals(void* u, const char* start_after, const char* filter_prefix, uint64_t max_return, mobject_store_omap_iter_t* iter, int* prval)
{
auto vargs = static_cast<server_visitor_args_t>(u);
// omap_iter_create(iter);
const char* object_name = vargs->object_name;
*prval = 0;
// find oid
if(name_map.count(object_name) == 0) {
*prval = -1;
return;
}
oid_t oid = name_map[object_name];
omap_iter_create(iter);
omap_key_t lb;
lb.oid = oid;
lb.key = std::string(start_after);
auto it = omap_map.lower_bound(lb);
if(it == omap_map.end()) return;
if(it->first.key == lb.key) it++;
std::string prefix(filter_prefix);
for(uint64_t i=0; it != omap_map.end() && i < max_return; it++, i++) {
const std::string& k = it->first.key;
if(k.compare(0, prefix.size(), prefix) == 0) {
omap_iter_append(*iter, k.c_str(), &(it->second[0]), it->second.size());
}
}
}
void read_op_exec_omap_get_vals_by_keys(void* u, char const* const* keys, size_t num_keys, mobject_store_omap_iter_t* iter, int* prval)
{
auto vargs = static_cast<server_visitor_args_t>(u);
// omap_iter_create(iter);
const char* object_name = vargs->object_name;
*prval = 0;
// find oid
if(name_map.count(object_name) == 0) {
*prval = -1;
return;
}
oid_t oid = name_map[object_name];
omap_iter_create(iter);
omap_key_t key;
key.oid = oid;
for(size_t i=0; i<num_keys; i++) {
key.key = keys[i];
auto it = omap_map.find(key);
if(it != omap_map.end()) {
omap_iter_append(*iter, keys[i], &(it->second[0]), it->second.size());
}
}
}
void read_op_exec_end(void* u)
......
#include <map>
#include <cstring>
#include <string>
#include <iostream>
#include <limits>
#include <bake-bulk-client.h>
#include "src/server/visitor-args.h"
#include "src/io-chain/write-op-visitor.h"
#include "src/server/core/fake-kv.hpp"
static void write_op_exec_begin(void*);
static void write_op_exec_end(void*);
......@@ -18,6 +21,13 @@ static void write_op_exec_zero(void*, uint64_t, uint64_t);
static void write_op_exec_omap_set(void*, char const* const*, char const* const*, const size_t*, size_t);
static void write_op_exec_omap_rm_keys(void*, char const* const*, size_t);
static oid_t get_or_create_oid(const char* name);
static void insert_region_log_entry(oid_t oid, uint64_t offset, uint64_t len, bake_bulk_region_id_t* region, double ts = -1.0);
static void insert_small_region_log_entry(oid_t oid, uint64_t offset, uint64_t len, const char* data, double ts = -1.0);
static void insert_zero_log_entry(oid_t oid, uint64_t offset, uint64_t len, double ts=-1.0);
static void insert_punch_log_entry(oid_t oid, uint64_t offset, double ts=-1.0);
static uint64_t compute_size(oid_t oid, double ts);
static struct write_op_visitor write_op_exec = {
.visit_begin = write_op_exec_begin,
.visit_create = write_op_exec_create,
......@@ -52,89 +62,136 @@ void write_op_exec_end(void* u)
void write_op_exec_create(void* u, int exclusive)
{
auto vargs = static_cast<server_visitor_args_t>(u);
get_or_create_oid(vargs->object_name);
}
void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset)
{
auto vargs = static_cast<server_visitor_args_t>(u);
oid_t oid = get_or_create_oid(vargs->object_name);
bake_target_id_t bti = vargs->srv_ctx->bake_id;
bake_bulk_region_id_t rid;
hg_bulk_t remote_bulk = vargs->bulk_handle;
const char* remote_addr = vargs->client_addr.as_string;
const char* remote_addr_str = vargs->client_addr_str;
hg_addr_t remote_addr = vargs->client_addr;
int ret;
// TODO: check return values of those calls
ret = bake_bulk_create(bti, len, &rid);
ret = bake_bulk_proxy_write(bti, rid, 0, remote_bulk, buf.as_offset, remote_addr, len);
ret = bake_bulk_persist(bti, rid);
if(len > SMALL_REGION_THRESHOLD) {
// TODO: check return values of those calls
ret = bake_bulk_create(bti, len, &rid);
ret = bake_bulk_proxy_write(bti, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, len);
ret = bake_bulk_persist(bti, rid);
// TODO: write [offset,len,rid] in sds-keyval for the specified object
insert_region_log_entry(oid, offset, len, &rid);
} else {
margo_instance_id mid = vargs->srv_ctx->mid;
char data[SMALL_REGION_THRESHOLD];
void* buf_ptrs[1] = {(void*)(&data[0])};
hg_size_t buf_sizes[1] = {len};
hg_bulk_t handle;
ret = margo_bulk_create(mid,1, buf_ptrs, buf_sizes, HG_BULK_WRITE_ONLY, &handle);
ret = margo_bulk_transfer(mid, HG_BULK_PULL, remote_addr, remote_bulk, buf.as_offset, handle, 0, len);
ret = margo_bulk_free(handle);
insert_small_region_log_entry(oid, offset, len, data);
}
}
void write_op_exec_write_full(void* u, buffer_u buf, size_t len)
{
// TODO: this function will not be valid if the new object is
// smaller than its previous version. Instead we should remove the object
// and re-create it.
auto vargs = static_cast<server_visitor_args_t>(u);
oid_t oid = get_or_create_oid(vargs->object_name);
bake_target_id_t bti = vargs->srv_ctx->bake_id;
bake_bulk_region_id_t rid;
hg_bulk_t remote_bulk = vargs->bulk_handle;
const char* remote_addr = vargs->client_addr.as_string;
const char* remote_addr_str = vargs->client_addr_str;
hg_addr_t remote_addr = vargs->client_addr;
int ret;
// TODO: check return values of those calls
ret = bake_bulk_create(bti, len, &rid);
ret = bake_bulk_proxy_write(bti, rid, 0, remote_bulk, buf.as_offset, remote_addr, len);
ret = bake_bulk_proxy_write(bti, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, len);
ret = bake_bulk_persist(bti, rid);
// TODO: write [offset,len,rid] in sds-keyval for the specified object
insert_region_log_entry(oid, 0, len, &rid);
}
void write_op_exec_writesame(void* u, buffer_u buf, size_t data_len, size_t write_len, uint64_t offset)
{
auto vargs = static_cast<server_visitor_args_t>(u);
auto vargs = static_cast<server_visitor_args_t>(u);
oid_t oid = get_or_create_oid(vargs->object_name);
bake_target_id_t bti = vargs->srv_ctx->bake_id;
bake_bulk_region_id_t rid;
hg_bulk_t remote_bulk = vargs->bulk_handle;
const char* remote_addr = vargs->client_addr.as_string;
const char* remote_addr_str = vargs->client_addr_str;
hg_addr_t remote_addr = vargs->client_addr;
int ret;
// TODO: check return values of those calls
ret = bake_bulk_create(bti, data_len, &rid);
ret = bake_bulk_proxy_write(bti, rid, 0, remote_bulk, buf.as_offset, remote_addr, data_len);
ret = bake_bulk_proxy_write(bti, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, data_len);
ret = bake_bulk_persist(bti, rid);
// TODO: write [offset,len,rid] in sds-keyval for the specified object
size_t i;
//double ts = ABT_get_wtime();
for(i=0; i < write_len; i += data_len) {
segment_key_t seg;
// TODO normally we should have the same timestamps but right now it bugs...
insert_region_log_entry(oid, offset+i, std::min(data_len, write_len - i), &rid);//, ts);
}
}
void write_op_exec_append(void* u, buffer_u buf, size_t len)
{
auto vargs = static_cast<server_visitor_args_t>(u);
oid_t oid = get_or_create_oid(vargs->object_name);
bake_target_id_t bti = vargs->srv_ctx->bake_id;
bake_bulk_region_id_t rid;
hg_bulk_t remote_bulk = vargs->bulk_handle;
const char* remote_addr = vargs->client_addr.as_string;
const char* remote_addr_str = vargs->client_addr_str;
hg_addr_t remote_addr = vargs->client_addr;
int ret;
// TODO: check return values of those calls
ret = bake_bulk_create(bti, len, &rid);
ret = bake_bulk_proxy_write(bti, rid, 0, remote_bulk, buf.as_offset, remote_addr, len);
ret = bake_bulk_proxy_write(bti, rid, 0, remote_bulk, buf.as_offset, remote_addr_str, len);
ret = bake_bulk_persist(bti, rid);
// TODO: write [offset,len,rid] in sds-keyval for the specified object
// find out the current length of the object
double ts = ABT_get_wtime();
uint64_t offset = compute_size(oid,ts);
insert_region_log_entry(oid, offset, len, &rid, ts);
}
void write_op_exec_remove(void* u)
{
auto vargs = static_cast<server_visitor_args_t>(u);
write_op_exec_truncate(u,0);
// TODO: technically should mark the object as removed
}
void write_op_exec_truncate(void* u, uint64_t offset)
{
auto vargs = static_cast<server_visitor_args_t>(u);
oid_t oid = get_or_create_oid(vargs->object_name);
insert_punch_log_entry(oid, offset);
}
void write_op_exec_zero(void* u, uint64_t offset, uint64_t len)
{
auto vargs = static_cast<server_visitor_args_t>(u);
oid_t oid = get_or_create_oid(vargs->object_name);
insert_zero_log_entry(oid, offset, len);
}
void write_op_exec_omap_set(void* u, char const* const* keys,
......@@ -143,10 +200,110 @@ void write_op_exec_omap_set(void* u, char const* const* keys,
size_t num)
{
auto vargs = static_cast<server_visitor_args_t>(u);
oid_t oid = get_or_create_oid(vargs->object_name);
for(auto i=0; i<num; i++) {
std::vector<char> val(vals[i], vals[i]+lens[i]);
omap_key_t omk;
omk.oid = oid;
omk.key = std::string(keys[i]);
omap_map[std::move(omk)] = std::move(val);
}
}
void write_op_exec_omap_rm_keys(void* u, char const* const* keys, size_t num_keys)
{
auto vargs = static_cast<server_visitor_args_t>(u);
oid_t oid = get_or_create_oid(vargs->object_name);
for(auto i=0; i < num_keys; i++) {
omap_key_t omk;
omk.oid = oid;
omk.key = std::string(keys[i]);
omap_map.erase(omk);
}
}
oid_t get_or_create_oid(const char* object_name)
{
oid_t oid = 0;
std::string name(object_name);
// check that the object exists, if not, create the object
if(name_map.count(name) == 0) {
std::hash<std::string> hash_fn;
oid = hash_fn(name);
while(oid_map.count(oid) != 0 || oid == 0) {
oid += 1;
}
name_map[name] = oid;
oid_map[oid] = name;
} else {
oid = name_map[name];
}
return oid;
}
static void insert_region_log_entry(oid_t oid, uint64_t offset, uint64_t len, bake_bulk_region_id_t* region, double ts)
{
segment_key_t seg;
seg.oid = oid;
seg.timestamp = ts < 0 ? ABT_get_wtime() : ts;
seg.start_index = offset;
seg.end_index = offset+len;
seg.type = seg_type_t::BAKE_REGION;
segment_map[seg] = *region;
}
static void insert_small_region_log_entry(oid_t oid, uint64_t offset, uint64_t len, const char* data, double ts)
{
segment_key_t seg;
seg.oid = oid;
seg.timestamp = ts < 0 ? ABT_get_wtime() : ts;
seg.start_index = offset;
seg.end_index = offset+len;
seg.type = seg_type_t::SMALL_REGION;
void* b = static_cast<void*>(&segment_map[seg]);
std::memcpy(b, data, len);
}
static void insert_zero_log_entry(oid_t oid, uint64_t offset, uint64_t len, double ts)
{
segment_key_t seg;
seg.oid = oid;
seg.timestamp = ts < 0 ? ABT_get_wtime() : ts;
seg.start_index = offset;
seg.end_index = offset+len;
seg.type = seg_type_t::ZERO;
segment_map[seg] = bake_bulk_region_id_t();
}
static void insert_punch_log_entry(oid_t oid, uint64_t offset, double ts)
{
segment_key_t seg;
seg.oid = oid;
seg.timestamp = ts < 0 ? ABT_get_wtime() : ts;
seg.start_index = offset;
seg.end_index = std::numeric_limits<uint64_t>::max();
seg.type = seg_type_t::TOMBSTONE;
segment_map[seg] = bake_bulk_region_id_t();
}
uint64_t compute_size(oid_t oid, double ts)
{
segment_key_t lb;
lb.oid = oid;
lb.timestamp = ts;
uint64_t size = 0;
auto it = segment_map.lower_bound(lb);
for(; it != segment_map.end(); it++) {
if(it->first.oid != oid) break;
auto& seg = it->first;
if(seg.type < seg_type_t::TOMBSTONE) {
if(size < seg.end_index) size = seg.end_index;
} else if(seg.type == seg_type_t::TOMBSTONE) {
if(size < seg.end_index) size = seg.start_index;
break;
}
}
return size;
}
#ifndef COVERAGE_MAP
#define COVERAGE_MAP
#include <iostream>
#include <list>
#include <map>
template<typename T>
class covermap {
const T m_start;
const T m_end;
T m_level;
std::map<T,T> m_segments;
static bool intersects(const T& start1, const T& end1, const T& start2, const T& end2) {
if(start1 < start2) {
return start2 <= end1;
} else {
return start1 <= end2;
}
}
public:
struct segment {
T start;
T end;
segment() {}
segment(T s, T e)
: start(s), end(e) {}
};
covermap(T s, T e)
: m_start(s), m_end(e) {}
std::list<segment> set(T start, T end) {
// make start and end match the bounds
if(start < m_start) start = m_start;
if(end > m_end) end = m_end;
std::list<segment> result;
// easy case: the coverage map is empty
if(m_segments.empty()) {
m_segments[start] = end;
result.emplace_back(start,end);
m_level += (end-start);
return result;
}
// not easy case
// find the first segment intersecting
auto first_seg = m_segments.lower_bound(start);
// we need to go back left
if(first_seg != m_segments.begin()) first_seg--;
// check if the first segment we found intersects
if(!intersects(first_seg->first, first_seg->second, start, end)) {
// it does not intersect
first_seg++;
}
// find the first segment on the right that does NOT intersect
auto last_seg = m_segments.lower_bound(end);
// if the first_seg and last_seg are the same, then we can
// insert the new segment directly
if(first_seg == last_seg) {
result.emplace_back(start, end);
m_level += (end-start);
m_segments[start] = end;
return result;
}
// otherwise, we iterate to build the list
auto it = first_seg;
if(first_seg->first > start) result.emplace_back(start,first_seg->first);
for(; it != last_seg; it++) {
auto jt = it;
jt++;