Commit f7c4c123 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

SDSKV integration working

parent c7b518c0
......@@ -31,6 +31,12 @@ static void read_op_exec_omap_get_vals(void*, const char*, const char*, uint64_t
static void read_op_exec_omap_get_vals_by_keys(void*, char const* const*, size_t, mobject_store_omap_iter_t*, int*);
static void read_op_exec_end(void*);
/* defined in core-write-op.cpp */
extern uint64_t mobject_compute_object_size(
sdskv_provider_handle_t ph,
sdskv_database_id_t seg_db_id,
oid_t oid, double ts);
static oid_t get_oid_from_name(
sdskv_provider_handle_t ph,
sdskv_database_id_t name_db_id,
......@@ -65,17 +71,45 @@ void read_op_exec_begin(void* u)
{
ENTERING;
auto vargs = static_cast<server_visitor_args_t>(u);
LEAVING;
// find oid
const char* object_name = vargs->object_name;
oid_t oid = vargs->oid;
if(oid == 0) {
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
oid = get_oid_from_name(sdskv_ph, name_db_id, object_name);
vargs->oid = oid;
}
if(oid == 0) {
ERROR fprintf(stderr,"oid == 0\n");
LEAVING;
return;
}
LEAVING
}
void read_op_exec_stat(void* u, uint64_t* psize, time_t* pmtime, int* prval)
{
ENTERING;
auto vargs = static_cast<server_visitor_args_t>(u);
// TODO
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t seg_db_id = vargs->srv_ctx->segment_db_id;
// find oid
oid_t oid = vargs->oid;
if(oid == 0) {
*prval = -1;
ERROR fprintf(stderr,"oid == 0\n");
LEAVING;
return;
}
double ts = ABT_get_wtime();
*psize = mobject_compute_object_size(sdskv_ph, seg_db_id, oid, ts);
LEAVING;
}
#if 0
void read_op_exec_read(void* u, uint64_t offset, size_t len, buffer_u buf, size_t* bytes_read, int* prval)
{
ENTERING;
......@@ -84,7 +118,6 @@ void read_op_exec_read(void* u, uint64_t offset, size_t len, buffer_u buf, size_
bake_target_id_t bti = vargs->srv_ctx->bake_tid;
bake_region_id_t rid;
hg_bulk_t remote_bulk = vargs->bulk_handle;
const char* object_name = vargs->object_name;
const char* remote_addr_str = vargs->client_addr_str;
hg_addr_t remote_addr = vargs->client_addr;
int ret;
......@@ -96,12 +129,6 @@ void read_op_exec_read(void* u, uint64_t offset, size_t len, buffer_u buf, size_
// find oid
oid_t oid = vargs->oid;
if(oid == 0) {
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
oid = get_oid_from_name(sdskv_ph, name_db_id, object_name);
vargs->oid = oid;
}
if(oid == 0) {
*prval = -1;
ERROR fprintf(stderr,"oid == 0\n");
......@@ -182,6 +209,163 @@ void read_op_exec_read(void* u, uint64_t offset, size_t len, buffer_u buf, size_
}
LEAVING;
}
#endif
void read_op_exec_read(void* u, uint64_t offset, size_t len, buffer_u buf, size_t* bytes_read, int* prval)
{
ENTERING;
auto vargs = static_cast<server_visitor_args_t>(u);
bake_provider_handle_t bph = vargs->srv_ctx->bake_ph;
bake_target_id_t bti = vargs->srv_ctx->bake_tid;
bake_region_id_t rid;
hg_bulk_t remote_bulk = vargs->bulk_handle;
const char* remote_addr_str = vargs->client_addr_str;
hg_addr_t remote_addr = vargs->client_addr;
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t seg_db_id = vargs->srv_ctx->segment_db_id;
int ret;
uint64_t client_start_index = offset;
uint64_t client_end_index = offset+len;
*prval = 0;
// find oid
oid_t oid = vargs->oid;
if(oid == 0) {
*prval = -1;
ERROR fprintf(stderr,"oid == 0\n");
LEAVING;
return;
}
segment_key_t lb;
lb.oid = oid;
lb.timestamp = ABT_get_wtime();
covermap<uint64_t> coverage(offset, offset+len);
size_t max_segments = 10; // XXX this is a pretty arbitrary number
segment_key_t segment_keys[max_segments];
void* segment_keys_addrs[max_segments];
hg_size_t segment_keys_size[max_segments];
bake_region_id_t segment_data[max_segments];
void* segment_data_addrs[max_segments];
hg_size_t segment_data_size[max_segments];
for(auto i = 0 ; i < max_segments; i++) {
segment_keys_addrs[i] = (void*)(&segment_keys[i]);
segment_keys_size[i] = sizeof(segment_key_t);
segment_data_addrs[i] = (void*)(&segment_data[i]);
segment_data_size[i] = sizeof(bake_region_id_t);
}
bool done = false;
while(!coverage.full() && !done) {
// get the next max_segments segments
size_t num_segments = max_segments;
ret = sdskv_list_keyvals(sdskv_ph, seg_db_id,
(const void*)&lb, sizeof(lb),
segment_keys_addrs, segment_keys_size,
segment_data_addrs, segment_data_size,
&num_segments);
if(ret != SDSKV_SUCCESS) {
ERROR fprintf(stderr, "sdskv_list_keyvals returned %d\n", ret);
*prval = -1;
LEAVING;
return;
}
size_t i;
for(i=0; i < num_segments; i++) {
const segment_key_t& seg = segment_keys[i];
const bake_region_id_t& region = segment_data[i];
if(seg.oid != oid || coverage.full()) {
done = true;
break;
}
switch(seg.type) {
case seg_type_t::ZERO:
coverage.set(seg.start_index, seg.end_index);
if(*bytes_read < seg.end_index) *bytes_read = seg.end_index;
break;
case seg_type_t::TOMBSTONE:
coverage.set(seg.start_index, seg.end_index);
if(*bytes_read < seg.start_index) *bytes_read = seg.start_index;
break;
case seg_type_t::BAKE_REGION: {
auto ranges = coverage.set(seg.start_index, seg.end_index);
for(auto r : ranges) {
uint64_t segment_size = r.end - r.start;
uint64_t region_offset = r.start - seg.start_index;
uint64_t remote_offset = r.start - offset;
ret = bake_proxy_read(bph, region, region_offset, remote_bulk,
remote_offset, remote_addr_str, segment_size);
if(ret != 0) {
*prval = -1;
ERROR fprintf(stderr,"bake_proxy_read returned %d\n", ret);
LEAVING;
return;
}
if(*bytes_read < r.end) *bytes_read = r.end;
}
break;
} // end case seg_type_t::BAKE_REGION
case seg_type_t::SMALL_REGION: {
auto ranges = coverage.set(seg.start_index, seg.end_index);
const char* base = static_cast<const char*>((void*)(&region));
margo_instance_id mid = vargs->srv_ctx->mid;
for(auto r : ranges) {
uint64_t segment_size = r.end - r.start;
uint64_t region_offset = r.start - seg.start_index;
uint64_t remote_offset = r.start - offset;
void* buf_ptrs[1] = { const_cast<char*>(base + region_offset) };
hg_size_t buf_sizes[1] = { segment_size };
hg_bulk_t handle;
ret = margo_bulk_create(mid,1, buf_ptrs, buf_sizes, HG_BULK_READ_ONLY, &handle);
if(ret != HG_SUCCESS) {
ERROR fprintf(stderr,"margo_bulk_create returned %d\n", ret);
LEAVING;
*prval = -1;
return;
} // end if
ret = margo_bulk_transfer(mid, HG_BULK_PUSH,
remote_addr, remote_bulk,
buf.as_offset+remote_offset,
handle, 0, segment_size);
if(ret != HG_SUCCESS) {
ERROR fprintf(stderr,"margo_bulk_transfer returned %d\n", ret);
*prval = -1;
LEAVING;
return;
} // end if
ret = margo_bulk_free(handle);
if(ret != HG_SUCCESS) {
ERROR fprintf(stderr,"margo_bulk_free returned %d\n", ret);
*prval = -1;
LEAVING;
return;
} // end if
if(*bytes_read < r.end) *bytes_read = r.end;
} // end for
} // end case seg_type_t::SMALL_REGION
} // end switch
} // end for
if(num_segments != max_segments) done = true;
}
LEAVING;
}
void read_op_exec_omap_get_keys(void* u, const char* start_after, uint64_t max_return,
mobject_store_omap_iter_t* iter, int* prval)
......@@ -195,12 +379,6 @@ void read_op_exec_omap_get_keys(void* u, const char* start_after, uint64_t max_r
*prval = 0;
oid_t oid = vargs->oid;
if(oid == 0) {
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
oid = get_oid_from_name(sdskv_ph, name_db_id, object_name);
vargs->oid = oid;
}
if(oid == 0) {
*prval = -1;
ERROR fprintf(stderr, "oid == 0\n");
......@@ -263,12 +441,6 @@ void read_op_exec_omap_get_vals(void* u, const char* start_after, const char* fi
*prval = 0;
oid_t oid = vargs->oid;
if(oid == 0) {
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
oid = get_oid_from_name(sdskv_ph, name_db_id, object_name);
vargs->oid = oid;
}
if(oid == 0) {
*prval = -1;
ERROR fprintf(stderr, "oid == 0\n");
......@@ -355,12 +527,6 @@ void read_op_exec_omap_get_vals_by_keys(void* u, char const* const* keys, size_t
*prval = 0;
oid_t oid = vargs->oid;
if(oid == 0) {
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
oid = get_oid_from_name(sdskv_ph, name_db_id, object_name);
vargs->oid = oid;
}
if(oid == 0) {
*prval = -1;
ERROR fprintf(stderr, "oid == 0\n");
......
......@@ -38,11 +38,33 @@ static oid_t get_or_create_oid(
sdskv_database_id_t oid_db_id,
const char* object_name);
static void insert_region_log_entry(oid_t oid, uint64_t offset, uint64_t len, bake_region_id_t* region, double ts = -1.0);
static void insert_small_region_log_entry(oid_t oid, uint64_t offset, uint64_t len, const char* data, double ts = -1.0);
static void insert_zero_log_entry(oid_t oid, uint64_t offset, uint64_t len, double ts=-1.0);
static void insert_punch_log_entry(oid_t oid, uint64_t offset, double ts=-1.0);
static uint64_t compute_size(oid_t oid, double ts);
static void insert_region_log_entry(
sdskv_provider_handle_t ph,
sdskv_database_id_t seg_db_id,
oid_t oid, uint64_t offset, uint64_t len,
bake_region_id_t* region, double ts = -1.0);
static void insert_small_region_log_entry(
sdskv_provider_handle_t ph,
sdskv_database_id_t seg_db_id,
oid_t oid, uint64_t offset, uint64_t len,
const char* data, double ts = -1.0);
static void insert_zero_log_entry(
sdskv_provider_handle_t ph,
sdskv_database_id_t seg_db_id,
oid_t oid, uint64_t offset,
uint64_t len, double ts=-1.0);
static void insert_punch_log_entry(
sdskv_provider_handle_t ph,
sdskv_database_id_t seg_db_id,
oid_t oid, uint64_t offset, double ts=-1.0);
uint64_t mobject_compute_object_size(
sdskv_provider_handle_t ph,
sdskv_database_id_t seg_db_id,
oid_t oid, double ts);
static struct write_op_visitor write_op_exec = {
.visit_begin = write_op_exec_begin,
......@@ -68,6 +90,11 @@ extern "C" void core_write_op(mobject_store_write_op_t write_op, server_visitor_
void write_op_exec_begin(void* u)
{
auto vargs = static_cast<server_visitor_args_t>(u);
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
sdskv_database_id_t oid_db_id = vargs->srv_ctx->oid_db_id;
oid_t oid = get_or_create_oid(sdskv_ph, name_db_id, oid_db_id, vargs->object_name);
vargs->oid = oid;
}
void write_op_exec_end(void* u)
......@@ -79,13 +106,12 @@ void write_op_exec_create(void* u, int exclusive)
{
ENTERING;
auto vargs = static_cast<server_visitor_args_t>(u);
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
sdskv_database_id_t oid_db_id = vargs->srv_ctx->oid_db_id;
oid_t oid = get_or_create_oid(sdskv_ph, name_db_id, oid_db_id, vargs->object_name);
oid_t oid = vargs->oid;
if(oid == 0) {
ERROR fprintf(stderr,"oid == 0\n");
}
/* nothing to do, the object is actually created in write_op_exec_begin
if it did not exist before */
LEAVING;
}
......@@ -94,16 +120,12 @@ void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset)
ENTERING;
auto vargs = static_cast<server_visitor_args_t>(u);
oid_t oid = vargs->oid;
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t seg_db_id = vargs->srv_ctx->segment_db_id;
if(oid == 0) {
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
sdskv_database_id_t oid_db_id = vargs->srv_ctx->oid_db_id;
oid = get_or_create_oid(sdskv_ph, name_db_id, oid_db_id, vargs->object_name);
if(oid == 0) {
ERROR fprintf(stderr,"oid == 0\n");
return;
}
vargs->oid = oid;
ERROR fprintf(stderr,"oid == 0\n");
LEAVING;
return;
}
bake_provider_handle_t bake_ph = vargs->srv_ctx->bake_ph;
......@@ -130,7 +152,7 @@ void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset)
ERROR fprintf(stderr, "bake_persist returned %d\n", ret);
}
insert_region_log_entry(oid, offset, len, &rid);
insert_region_log_entry(sdskv_ph, seg_db_id, oid, offset, len, &rid);
} else {
margo_instance_id mid = vargs->srv_ctx->mid;
char data[SMALL_REGION_THRESHOLD];
......@@ -150,7 +172,7 @@ void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset)
ERROR fprintf(stderr, "margo_bulk_free returned %d\n", ret);
}
insert_small_region_log_entry(oid, offset, len, data);
insert_small_region_log_entry(sdskv_ph, seg_db_id, oid, offset, len, data);
}
LEAVING;
}
......@@ -164,12 +186,11 @@ void write_op_exec_write_full(void* u, buffer_u buf, size_t len)
auto vargs = static_cast<server_visitor_args_t>(u);
oid_t oid = vargs->oid;
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t seg_db_id = vargs->srv_ctx->segment_db_id;
if(oid == 0) {
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
sdskv_database_id_t oid_db_id = vargs->srv_ctx->oid_db_id;
oid = get_or_create_oid(sdskv_ph, name_db_id, oid_db_id, vargs->object_name);
vargs->oid = oid;
ERROR fprintf(stderr,"oid == 0\n");
LEAVING;
}
bake_provider_handle_t bph = vargs->srv_ctx->bake_ph;
......@@ -200,7 +221,7 @@ void write_op_exec_write_full(void* u, buffer_u buf, size_t len)
LEAVING;
return;
}
insert_region_log_entry(oid, 0, len, &rid);
insert_region_log_entry(sdskv_ph, seg_db_id, oid, 0, len, &rid);
LEAVING;
}
......@@ -209,12 +230,12 @@ void write_op_exec_writesame(void* u, buffer_u buf, size_t data_len, size_t writ
ENTERING;
auto vargs = static_cast<server_visitor_args_t>(u);
oid_t oid = vargs->oid;
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t seg_db_id = vargs->srv_ctx->segment_db_id;
if(oid == 0) {
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
sdskv_database_id_t oid_db_id = vargs->srv_ctx->oid_db_id;
oid = get_or_create_oid(sdskv_ph, name_db_id, oid_db_id, vargs->object_name);
vargs->oid = oid;
ERROR fprintf(stderr,"oid == 0\n");
LEAVING;
return;
}
bake_provider_handle_t bph = vargs->srv_ctx->bake_ph;
......@@ -234,9 +255,9 @@ void write_op_exec_writesame(void* u, buffer_u buf, size_t data_len, size_t writ
//double ts = ABT_get_wtime();
for(i=0; i < write_len; i += data_len) {
segment_key_t seg;
// TODO normally we should have the same timestamps but right now it bugs...
insert_region_log_entry(oid, offset+i, std::min(data_len, write_len - i), &rid);//, ts);
insert_region_log_entry(sdskv_ph, seg_db_id,
oid, offset+i, std::min(data_len, write_len - i), &rid);//, ts);
}
LEAVING;
}
......@@ -246,12 +267,12 @@ void write_op_exec_append(void* u, buffer_u buf, size_t len)
ENTERING;
auto vargs = static_cast<server_visitor_args_t>(u);
oid_t oid = vargs->oid;
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t seg_db_id = vargs->srv_ctx->segment_db_id;
if(oid == 0) {
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
sdskv_database_id_t oid_db_id = vargs->srv_ctx->oid_db_id;
oid = get_or_create_oid(sdskv_ph, name_db_id, oid_db_id, vargs->object_name);
vargs->oid = oid;
ERROR fprintf(stderr,"oid == 0\n");
LEAVING;
return;
}
bake_provider_handle_t bph = vargs->srv_ctx->bake_ph;
......@@ -268,9 +289,9 @@ void write_op_exec_append(void* u, buffer_u buf, size_t len)
// find out the current length of the object
double ts = ABT_get_wtime();
uint64_t offset = compute_size(oid,ts);
uint64_t offset = mobject_compute_object_size(sdskv_ph, seg_db_id, oid,ts);
insert_region_log_entry(oid, offset, len, &rid, ts);
insert_region_log_entry(sdskv_ph, seg_db_id, oid, offset, len, &rid, ts);
LEAVING;
}
......@@ -288,15 +309,14 @@ void write_op_exec_truncate(void* u, uint64_t offset)
ENTERING;
auto vargs = static_cast<server_visitor_args_t>(u);
oid_t oid = vargs->oid;
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t seg_db_id = vargs->srv_ctx->segment_db_id;
if(oid == 0) {
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
sdskv_database_id_t oid_db_id = vargs->srv_ctx->oid_db_id;
oid = get_or_create_oid(sdskv_ph, name_db_id, oid_db_id, vargs->object_name);
vargs->oid = oid;
ERROR fprintf(stderr,"oid == 0\n");
LEAVING;
}
insert_punch_log_entry(oid, offset);
insert_punch_log_entry(sdskv_ph, seg_db_id, oid, offset);
LEAVING;
}
......@@ -305,15 +325,15 @@ void write_op_exec_zero(void* u, uint64_t offset, uint64_t len)
ENTERING;
auto vargs = static_cast<server_visitor_args_t>(u);
oid_t oid = vargs->oid;
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t seg_db_id = vargs->srv_ctx->segment_db_id;
if(oid == 0) {
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
sdskv_database_id_t oid_db_id = vargs->srv_ctx->oid_db_id;
oid = get_or_create_oid(sdskv_ph, name_db_id, oid_db_id, vargs->object_name);
vargs->oid = oid;
ERROR fprintf(stderr,"oid == 0\n");
LEAVING;
return;
}
insert_zero_log_entry(oid, offset, len);
insert_zero_log_entry(sdskv_ph, seg_db_id, oid, offset, len);
LEAVING;
}
......@@ -331,8 +351,9 @@ void write_op_exec_omap_set(void* u, char const* const* keys,
sdskv_database_id_t omap_db_id = vargs->srv_ctx->omap_db_id;
oid_t oid = vargs->oid;
if(oid == 0) {
oid = get_or_create_oid(sdskv_ph, name_db_id, oid_db_id, vargs->object_name);
vargs->oid = oid;
ERROR fprintf(stderr,"oid == 0\n");
LEAVING;
return;
}
/* find out the max key length */
......@@ -373,8 +394,9 @@ void write_op_exec_omap_rm_keys(void* u, char const* const* keys, size_t num_key
sdskv_database_id_t omap_db_id = vargs->srv_ctx->omap_db_id;
oid_t oid = vargs->oid;
if(oid == 0) {
oid = get_or_create_oid(sdskv_ph, name_db_id, oid_db_id, vargs->object_name);
vargs->oid = oid;
ERROR fprintf(stderr,"oid == 0\n");
LEAVING;
return;
}
for(auto i=0; i<num_keys; i++) {
......@@ -439,7 +461,11 @@ oid_t get_or_create_oid(
return oid;
}
static void insert_region_log_entry(oid_t oid, uint64_t offset, uint64_t len, bake_region_id_t* region, double ts)
static void insert_region_log_entry(
sdskv_provider_handle_t ph,
sdskv_database_id_t seg_db_id,
oid_t oid, uint64_t offset, uint64_t len,
bake_region_id_t* region, double ts)
{
ENTERING;
segment_key_t seg;
......@@ -448,11 +474,23 @@ static void insert_region_log_entry(oid_t oid, uint64_t offset, uint64_t len, ba
seg.start_index = offset;
seg.end_index = offset+len;
seg.type = seg_type_t::BAKE_REGION;
#if 0
segment_map[seg] = *region;
#endif
int ret = sdskv_put(ph, seg_db_id,
(const void*)&seg, sizeof(seg),
(const void*)region, sizeof(*region));
if(ret != SDSKV_SUCCESS) {
ERROR fprintf(stderr, "sdskv_put returned %d\n", ret);
}
LEAVING;
}
static void insert_small_region_log_entry(oid_t oid, uint64_t offset, uint64_t len, const char* data, double ts)
static void insert_small_region_log_entry(
sdskv_provider_handle_t ph,
sdskv_database_id_t seg_db_id,
oid_t oid, uint64_t offset, uint64_t len,
const char* data, double ts)
{
ENTERING;
segment_key_t seg;
......@@ -461,12 +499,23 @@ static void insert_small_region_log_entry(oid_t oid, uint64_t offset, uint64_t l
seg.start_index = offset;
seg.end_index = offset+len;
seg.type = seg_type_t::SMALL_REGION;
#if 0
void* b = static_cast<void*>(&segment_map[seg]);
std::memcpy(b, data, len);
#endif
int ret = sdskv_put(ph, seg_db_id,
(const void*)&seg, sizeof(seg),
(const void*)data, len);
if(ret != SDSKV_SUCCESS) {
ERROR fprintf(stderr, "sdskv_put returned %d\n", ret);
}
LEAVING;
}
static void insert_zero_log_entry(oid_t oid, uint64_t offset, uint64_t len, double ts)
static void insert_zero_log_entry(
sdskv_provider_handle_t ph,
sdskv_database_id_t seg_db_id,
oid_t oid, uint64_t offset, uint64_t len, double ts)
{
ENTERING;
segment_key_t seg;
......@@ -475,11 +524,22 @@ static void insert_zero_log_entry(oid_t oid, uint64_t offset, uint64_t len, doub
seg.start_index = offset;
seg.end_index = offset+len;
seg.type = seg_type_t::ZERO;
int ret = sdskv_put(ph, seg_db_id,
(const void*)&seg, sizeof(seg),
(const void*)nullptr, 0);
#if 0