Commit 9fb2d579 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

Merge branch 'dev-sdskv-integration' into 'master'

Dev sdskv integration

See merge request !4
parents 6f285fdd 935e2a52
......@@ -194,7 +194,7 @@ extern "C" {
void mobject_write_op_zero(
mobject_store_write_op_t write_op,
uint64_t offset,
uint64_t len);
size_t len);
/**
* Set key/value pairs on an object
......
......@@ -9,6 +9,7 @@
#include <margo.h>
#include <bake-client.h>
#include <sdskv-client.h>
/* server-side utilities and routines. Clients are looking for either
* libmobject-store.h or librados-mobject-store.h */
......@@ -24,6 +25,7 @@ typedef struct mobject_server_context* mobject_provider_t;
* @param[in] mplex_id multiplex id of the provider
* @param[in] pool Argobots pool for the provider
* @param[in] bake_ph Bake provider handle to use to write/read data
* @param[in] sdskv_ph SDSKV provider handle to use to access metadata
* @param[in] cluster_file file name to write cluster connect info to
* @param[out] provider resulting provider
*
......@@ -34,6 +36,7 @@ int mobject_provider_register(
uint8_t mplex_id,
ABT_pool pool,
bake_provider_handle_t bake_ph,
sdskv_provider_handle_t sdskv_ph,
const char *cluster_file,
mobject_provider_t* provider);
......
......@@ -66,8 +66,7 @@ src_server_libmobject_server_la_SOURCES = \
src/server/core/core-write-op.cpp \
src/server/core/core-read-op.cpp \
src/server/printer/print-write-op.c \
src/server/printer/print-read-op.c \
src/server/core/fake-kv.cpp
src/server/printer/print-read-op.c
src_server_libmobject_server_la_CPPFLAGS = ${AM_CPPFLAGS} ${SERVER_CPPFLAGS}
src_server_libmobject_server_la_CFLAGS = ${AM_CFLAGS} ${SERVER_CFLAGS}
src_server_libmobject_server_la_LIBADD = src/omap-iter/libomap-iter.la \
......
......@@ -386,7 +386,8 @@ int mobject_store_read_op_operate(mobject_store_read_op_t read_op,
if(r != 0) return r;
r = mobject_read_op_operate(mph,read_op, ioctx->pool_name, oid, flags);
return 0;
mobject_provider_handle_release(mph);
return r;
}
// send a shutdown signal to a server cluster
......
......@@ -42,8 +42,8 @@ void mobject_write_op_create(mobject_store_write_op_t write_op,
void mobject_write_op_write(mobject_store_write_op_t write_op,
const char *buffer,
size_t len,
uint64_t offset)
uint64_t offset,
size_t len)
{
MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "invalid mobject_store_write_op_t obect");
MOBJECT_ASSERT(!(write_op->ready), "can't modify a write_op that is ready to be processed");
......@@ -53,7 +53,7 @@ void mobject_write_op_write(mobject_store_write_op_t write_op,
action->buffer.as_pointer = buffer;
action->len = len;
action->offset = offset;
WRITE_ACTION_UPCAST(base, action);
DL_APPEND(write_op->actions, base);
......@@ -98,9 +98,9 @@ void mobject_write_op_write_full(mobject_store_write_op_t write_op,
void mobject_write_op_write_same(mobject_store_write_op_t write_op,
const char *buffer,
uint64_t offset,
size_t data_len,
size_t write_len,
uint64_t offset)
size_t write_len)
{
MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "invalid mobject_store_write_op_t obect");
MOBJECT_ASSERT(!(write_op->ready), "can't modify a write_op that is ready to be processed");
......@@ -186,7 +186,7 @@ void mobject_write_op_truncate(mobject_store_write_op_t write_op,
void mobject_write_op_zero(mobject_store_write_op_t write_op,
uint64_t offset,
uint64_t len)
size_t len)
{
MOBJECT_ASSERT(write_op != MOBJECT_WRITE_OP_NULL, "invalid mobject_store_write_op_t obect");
MOBJECT_ASSERT(!(write_op->ready), "can't modify a write_op that is ready to be processed");
......
......@@ -2,6 +2,7 @@
#include <string>
#include <iostream>
#include <algorithm>
#include <vector>
#include <list>
#include <bake-client.h>
#include "src/server/core/core-read-op.h"
......@@ -9,9 +10,19 @@
#include "src/io-chain/read-op-visitor.h"
#include "src/io-chain/read-resp-impl.h"
#include "src/omap-iter/omap-iter-impl.h"
#include "src/server/core/fake-kv.hpp"
#include "src/server/core/key-types.h"
#include "src/server/core/covermap.hpp"
static int tabs = 0;
/*
#define ENTERING {for(int i=0; i<tabs; i++) fprintf(stderr," "); fprintf(stderr,"[ENTERING]>> %s\n",__FUNCTION__); tabs += 1;}
#define LEAVING {tabs -= 1; for(int i=0; i<tabs; i++) fprintf(stderr," "); fprintf(stderr,"[LEAVING]<<< %s\n",__FUNCTION__); }
#define ERROR {for(int i=0; i<(tabs+1); i++) fprintf(stderr, " "); fprintf(stderr,"[ERROR] "); }
*/
#define ENTERING
#define LEAVING
#define ERROR
static void read_op_exec_begin(void*);
static void read_op_exec_stat(void*, uint64_t*, time_t*, int*);
static void read_op_exec_read(void*, uint64_t, size_t, buffer_u, size_t*, int*);
......@@ -20,6 +31,17 @@ static void read_op_exec_omap_get_vals(void*, const char*, const char*, uint64_t
static void read_op_exec_omap_get_vals_by_keys(void*, char const* const*, size_t, mobject_store_omap_iter_t*, int*);
static void read_op_exec_end(void*);
/* defined in core-write-op.cpp */
extern uint64_t mobject_compute_object_size(
sdskv_provider_handle_t ph,
sdskv_database_id_t seg_db_id,
oid_t oid, double ts);
static oid_t get_oid_from_name(
sdskv_provider_handle_t ph,
sdskv_database_id_t name_db_id,
const char* name);
struct read_request_t {
double timestamp; // timestamp at which the segment was created
uint64_t absolute_start_index; // start index within the object
......@@ -47,22 +69,55 @@ extern "C" void core_read_op(mobject_store_read_op_t read_op, server_visitor_arg
void read_op_exec_begin(void* u)
{
ENTERING;
auto vargs = static_cast<server_visitor_args_t>(u);
// find oid
const char* object_name = vargs->object_name;
oid_t oid = vargs->oid;
if(oid == 0) {
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
oid = get_oid_from_name(sdskv_ph, name_db_id, object_name);
vargs->oid = oid;
}
if(oid == 0) {
ERROR fprintf(stderr,"oid == 0\n");
LEAVING;
return;
}
LEAVING
}
void read_op_exec_stat(void* u, uint64_t* psize, time_t* pmtime, int* prval)
{
ENTERING;
auto vargs = static_cast<server_visitor_args_t>(u);
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t seg_db_id = vargs->srv_ctx->segment_db_id;
// find oid
oid_t oid = vargs->oid;
if(oid == 0) {
*prval = -1;
ERROR fprintf(stderr,"oid == 0\n");
LEAVING;
return;
}
double ts = ABT_get_wtime();
*psize = mobject_compute_object_size(sdskv_ph, seg_db_id, oid, ts);
LEAVING;
}
#if 0
void read_op_exec_read(void* u, uint64_t offset, size_t len, buffer_u buf, size_t* bytes_read, int* prval)
{
ENTERING;
auto vargs = static_cast<server_visitor_args_t>(u);
bake_provider_handle_t bph = vargs->srv_ctx->bake_ph;
bake_target_id_t bti = vargs->srv_ctx->bake_tid;
bake_region_id_t rid;
hg_bulk_t remote_bulk = vargs->bulk_handle;
const char* object_name = vargs->object_name;
const char* remote_addr_str = vargs->client_addr_str;
hg_addr_t remote_addr = vargs->client_addr;
int ret;
......@@ -73,11 +128,13 @@ void read_op_exec_read(void* u, uint64_t offset, size_t len, buffer_u buf, size_
*prval = 0;
// find oid
if(name_map.count(object_name) == 0) {
oid_t oid = vargs->oid;
if(oid == 0) {
*prval = -1;
ERROR fprintf(stderr,"oid == 0\n");
LEAVING;
return;
}
oid_t oid = name_map[object_name];
segment_key_t lb;
lb.oid = oid;
......@@ -88,7 +145,7 @@ void read_op_exec_read(void* u, uint64_t offset, size_t len, buffer_u buf, size_
while(!coverage.full() && it != segment_map.end() && it->first.oid == oid) {
const segment_key_t& seg = it->first;
const bake_region_id_t& region = it->second;
switch(seg.type) {
case seg_type_t::ZERO:
coverage.set(seg.start_index, seg.end_index);
......@@ -104,8 +161,13 @@ void read_op_exec_read(void* u, uint64_t offset, size_t len, buffer_u buf, size_
uint64_t segment_size = r.end - r.start;
uint64_t region_offset = r.start - seg.start_index;
uint64_t remote_offset = r.start - offset;
bake_proxy_read(bph, region, region_offset, remote_bulk,
ret = bake_proxy_read(bph, region, region_offset, remote_bulk,
remote_offset, remote_addr_str, segment_size);
if(ret != 0) {
ERROR fprintf(stderr,"bake_proxy_read returned %d\n", ret);
LEAVING;
return;
}
if(*bytes_read < r.end) *bytes_read = r.end;
}
break;
......@@ -122,106 +184,414 @@ void read_op_exec_read(void* u, uint64_t offset, size_t len, buffer_u buf, size_
hg_size_t buf_sizes[1] = { segment_size };
hg_bulk_t handle;
ret = margo_bulk_create(mid,1, buf_ptrs, buf_sizes, HG_BULK_READ_ONLY, &handle);
if(ret != HG_SUCCESS) {
ERROR fprintf(stderr,"margo_bulk_create returned %d\n", ret);
LEAVING;
return;
}
ret = margo_bulk_transfer(mid, HG_BULK_PUSH, remote_addr, remote_bulk, buf.as_offset+remote_offset, handle, 0, segment_size);
if(ret != HG_SUCCESS) {
ERROR fprintf(stderr,"margo_bulk_transfer returned %d\n", ret);
LEAVING;
return;
}
ret = margo_bulk_free(handle);
if(ret != HG_SUCCESS) {
ERROR fprintf(stderr,"margo_bulk_free returned %d\n", ret);
LEAVING;
return;
}
if(*bytes_read < r.end) *bytes_read = r.end;
}
}
}
it++;
}
LEAVING;
}
#endif
void read_op_exec_omap_get_keys(void* u, const char* start_after, uint64_t max_return,
mobject_store_omap_iter_t* iter, int* prval)
void read_op_exec_read(void* u, uint64_t offset, size_t len, buffer_u buf, size_t* bytes_read, int* prval)
{
ENTERING;
auto vargs = static_cast<server_visitor_args_t>(u);
const char* object_name = vargs->object_name;
bake_provider_handle_t bph = vargs->srv_ctx->bake_ph;
bake_target_id_t bti = vargs->srv_ctx->bake_tid;
bake_region_id_t rid;
hg_bulk_t remote_bulk = vargs->bulk_handle;
const char* remote_addr_str = vargs->client_addr_str;
hg_addr_t remote_addr = vargs->client_addr;
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t seg_db_id = vargs->srv_ctx->segment_db_id;
int ret;
uint64_t client_start_index = offset;
uint64_t client_end_index = offset+len;
*prval = 0;
// find oid
if(name_map.count(object_name) == 0) {
oid_t oid = vargs->oid;
if(oid == 0) {
*prval = -1;
ERROR fprintf(stderr,"oid == 0\n");
LEAVING;
return;
}
oid_t oid = name_map[object_name];
omap_iter_create(iter);
omap_key_t lb;
segment_key_t lb;
lb.oid = oid;
lb.key = std::string(start_after);
lb.timestamp = ABT_get_wtime();
covermap<uint64_t> coverage(offset, offset+len);
size_t max_segments = 10; // XXX this is a pretty arbitrary number
segment_key_t segment_keys[max_segments];
void* segment_keys_addrs[max_segments];
hg_size_t segment_keys_size[max_segments];
bake_region_id_t segment_data[max_segments];
void* segment_data_addrs[max_segments];
hg_size_t segment_data_size[max_segments];
for(auto i = 0 ; i < max_segments; i++) {
segment_keys_addrs[i] = (void*)(&segment_keys[i]);
segment_keys_size[i] = sizeof(segment_key_t);
segment_data_addrs[i] = (void*)(&segment_data[i]);
segment_data_size[i] = sizeof(bake_region_id_t);
}
bool done = false;
while(!coverage.full() && !done) {
auto it = omap_map.lower_bound(lb);
if(it == omap_map.end()) return;
if(it->first.key == lb.key) it++;
// get the next max_segments segments
size_t num_segments = max_segments;
ret = sdskv_list_keyvals(sdskv_ph, seg_db_id,
(const void*)&lb, sizeof(lb),
segment_keys_addrs, segment_keys_size,
segment_data_addrs, segment_data_size,
&num_segments);
for(uint64_t i=0; it != omap_map.end() && i < max_return; it++, i++) {
omap_iter_append(*iter, it->first.key.c_str(), nullptr, 0);
if(ret != SDSKV_SUCCESS) {
ERROR fprintf(stderr, "sdskv_list_keyvals returned %d\n", ret);
*prval = -1;
LEAVING;
return;
}
size_t i;
for(i=0; i < num_segments; i++) {
const segment_key_t& seg = segment_keys[i];
const bake_region_id_t& region = segment_data[i];
if(seg.oid != oid || coverage.full()) {
done = true;
break;
}
switch(seg.type) {
case seg_type_t::ZERO:
coverage.set(seg.start_index, seg.end_index);
if(*bytes_read < seg.end_index) *bytes_read = seg.end_index;
break;
case seg_type_t::TOMBSTONE:
coverage.set(seg.start_index, seg.end_index);
if(*bytes_read < seg.start_index) *bytes_read = seg.start_index;
break;
case seg_type_t::BAKE_REGION: {
auto ranges = coverage.set(seg.start_index, seg.end_index);
for(auto r : ranges) {
uint64_t segment_size = r.end - r.start;
uint64_t region_offset = r.start - seg.start_index;
uint64_t remote_offset = r.start - offset;
ret = bake_proxy_read(bph, region, region_offset, remote_bulk,
remote_offset, remote_addr_str, segment_size);
if(ret != 0) {
*prval = -1;
ERROR fprintf(stderr,"bake_proxy_read returned %d\n", ret);
LEAVING;
return;
}
if(*bytes_read < r.end) *bytes_read = r.end;
}
break;
} // end case seg_type_t::BAKE_REGION
case seg_type_t::SMALL_REGION: {
auto ranges = coverage.set(seg.start_index, seg.end_index);
const char* base = static_cast<const char*>((void*)(&region));
margo_instance_id mid = vargs->srv_ctx->mid;
for(auto r : ranges) {
uint64_t segment_size = r.end - r.start;
uint64_t region_offset = r.start - seg.start_index;
uint64_t remote_offset = r.start - offset;
void* buf_ptrs[1] = { const_cast<char*>(base + region_offset) };
hg_size_t buf_sizes[1] = { segment_size };
hg_bulk_t handle;
ret = margo_bulk_create(mid,1, buf_ptrs, buf_sizes, HG_BULK_READ_ONLY, &handle);
if(ret != HG_SUCCESS) {
ERROR fprintf(stderr,"margo_bulk_create returned %d\n", ret);
LEAVING;
*prval = -1;
return;
} // end if
ret = margo_bulk_transfer(mid, HG_BULK_PUSH,
remote_addr, remote_bulk,
buf.as_offset+remote_offset,
handle, 0, segment_size);
if(ret != HG_SUCCESS) {
ERROR fprintf(stderr,"margo_bulk_transfer returned %d\n", ret);
*prval = -1;
LEAVING;
return;
} // end if
ret = margo_bulk_free(handle);
if(ret != HG_SUCCESS) {
ERROR fprintf(stderr,"margo_bulk_free returned %d\n", ret);
*prval = -1;
LEAVING;
return;
} // end if
if(*bytes_read < r.end) *bytes_read = r.end;
} // end for
} // end case seg_type_t::SMALL_REGION
} // end switch
} // end for
if(num_segments != max_segments) done = true;
}
LEAVING;
}
void read_op_exec_omap_get_vals(void* u, const char* start_after, const char* filter_prefix, uint64_t max_return, mobject_store_omap_iter_t* iter, int* prval)
void read_op_exec_omap_get_keys(void* u, const char* start_after, uint64_t max_return,
mobject_store_omap_iter_t* iter, int* prval)
{
ENTERING;
auto vargs = static_cast<server_visitor_args_t>(u);
const char* object_name = vargs->object_name;
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t omap_db_id = vargs->srv_ctx->omap_db_id;
int ret;
*prval = 0;
oid_t oid = vargs->oid;
if(oid == 0) {
*prval = -1;
ERROR fprintf(stderr, "oid == 0\n");
LEAVING;
return;
}
omap_iter_create(iter);
size_t lb_size = sizeof(omap_key_t)+strlen(start_after);
omap_key_t* lb = (omap_key_t*)calloc(1, lb_size);
lb->oid = oid;
strcpy(lb->key, start_after);
hg_size_t max_keys = 10;
hg_size_t key_len = MAX_OMAP_KEY_SIZE+sizeof(omap_key_t);
std::vector<void*> keys(max_keys);
std::vector<hg_size_t> ksizes(max_keys, key_len);
std::vector<std::vector<char>> buffers(max_keys, std::vector<char>(key_len));
for(auto i=0; i < max_keys; i++) keys[i] = (void*)buffers[i].data();
hg_size_t keys_retrieved = max_keys;
hg_size_t count = 0;
do {
ret = sdskv_list_keys(sdskv_ph, omap_db_id,
(const void*)lb, lb_size,
keys.data(), ksizes.data(),
&keys_retrieved);
if(ret != SDSKV_SUCCESS) {
*prval = -1;
ERROR fprintf(stderr, "sdskv_list_keys returned %d\n", ret);
break;
}
const char* k;
for(auto i = 0; i < keys_retrieved && count < max_return; i++, count++) {
// extract the actual key part, without the oid
k = ((omap_key_t*)keys[i])->key;
/* this key is not part of the same object, we should leave the loop */
if(((omap_key_t*)keys[i])->oid != oid) goto out; /* ugly way of leaving the loop, I know ... */
omap_iter_append(*iter, k, nullptr, 0);
}
strcpy(lb->key, k);
lb_size = strlen(k) + sizeof(omap_key_t);
} while(keys_retrieved == max_keys && count < max_return);
out:
free(lb);
LEAVING;
}
void read_op_exec_omap_get_vals(void* u, const char* start_after, const char* filter_prefix, uint64_t max_return, mobject_store_omap_iter_t* iter, int* prval)
{
ENTERING;
auto vargs = static_cast<server_visitor_args_t>(u);
const char* object_name = vargs->object_name;
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t omap_db_id = vargs->srv_ctx->omap_db_id;
int ret;
*prval = 0;
// find oid
if(name_map.count(object_name) == 0) {
oid_t oid = vargs->oid;
if(oid == 0) {
*prval = -1;
ERROR fprintf(stderr, "oid == 0\n");
LEAVING;
return;
}
oid_t oid = name_map[object_name];
hg_size_t max_items = std::min(max_return, (decltype(max_return))10);
hg_size_t key_len = MAX_OMAP_KEY_SIZE + sizeof(omap_key_t);
hg_size_t val_len = MAX_OMAP_VAL_SIZE;
omap_iter_create(iter);
omap_key_t lb;
lb.oid = oid;
lb.key = std::string(start_after);
auto it = omap_map.lower_bound(lb);
if(it == omap_map.end()) return;
if(it->first.key == lb.key) it++;
/* omap_key_t equivalent of start_key */
hg_size_t lb_size = key_len;
omap_key_t* lb = (omap_key_t*)calloc(1, lb_size);
lb->oid = oid;
strcpy(lb->key, start_after);
std::string prefix(filter_prefix);
for(uint64_t i=0; it != omap_map.end() && i < max_return; it++, i++) {
const std::string& k = it->first.key;
if(k.compare(0, prefix.size(), prefix) == 0) {
omap_iter_append(*iter, k.c_str(), &(it->second[0]), it->second.size());
}
/* omap_key_t equivalent of the filter_prefix */
hg_size_t prefix_size = sizeof(omap_key_t)+strlen(filter_prefix);
omap_key_t* prefix = (omap_key_t*)calloc(1, prefix_size);
prefix->oid = oid;
strcpy(prefix->key, filter_prefix);
hg_size_t prefix_actual_size = offsetof(omap_key_t, key)+strlen(filter_prefix);
/* we need the above because the prefix in sdskv is not considered a string */
/* initialize structures to pass to SDSKV functions */
std::vector<void*> keys(max_items);
std::vector<void*> vals(max_items);
std::vector<hg_size_t> ksizes(max_items, key_len);
std::vector<hg_size_t> vsizes(max_items, val_len);
std::vector<std::vector<char>> key_buffers(max_items, std::vector<char>(key_len));
std::vector<std::vector<char>> val_buffers(max_items, std::vector<char>(val_len));
for(auto i=0; i < max_items; i++) {
keys[i] = (void*)key_buffers[i].data();
vals[i] = (void*)val_buffers[i].data();
}
hg_size_t items_retrieved = max_items;
hg_size_t count = 0;
do {
ret = sdskv_list_keyvals_with_prefix(
sdskv_ph, omap_db_id,
(const void*)lb, lb_size,
(const void*)prefix, prefix_actual_size,
keys.data(), ksizes.data(),
vals.data(), vsizes.data(),
&items_retrieved);
if(ret != SDSKV_SUCCESS) {
*prval = -1;
ERROR fprintf(stderr, "sdskv_list_keyvals_with_prefix returned %d\n", ret);
break;
}
const char* k;
for(auto i = 0; i < items_retrieved && count < max_return; i++, count++) {
// extract the actual key part, without the oid
k = ((omap_key_t*)keys[i])->key;
/* this key is not part of the same object, we should leave the loop */
if(((omap_key_t*)keys[i])->oid != oid) goto out; /* ugly way of leaving the loop, I know ... */
omap_iter_append(*iter, k, (const char*)vals[i], vsizes[i]);
}
memset(lb, 0, lb_size);
lb->oid = oid;
strcpy(lb->key, k);
//lb_size = strlen(k) + sizeof(omap_key_t);
} while(items_retrieved == max_items && count < max_return);
out:
free(lb);
LEAVING;
}
void read_op_exec_omap_get_vals_by_keys(void*