Commit a29d8576 authored by Matthieu Dorier's avatar Matthieu Dorier

name_map and oid_map now managed by sdskv

parent 4ad2c7d5
......@@ -21,6 +21,11 @@ static void read_op_exec_omap_get_vals(void*, const char*, const char*, uint64_t
static void read_op_exec_omap_get_vals_by_keys(void*, char const* const*, size_t, mobject_store_omap_iter_t*, int*);
static void read_op_exec_end(void*);
static oid_t get_oid_from_name(
sdskv_provider_handle_t ph,
sdskv_database_id_t name_db_id,
const char* name);
struct read_request_t {
double timestamp; // timestamp at which the segment was created
uint64_t absolute_start_index; // start index within the object
......@@ -74,11 +79,17 @@ void read_op_exec_read(void* u, uint64_t offset, size_t len, buffer_u buf, size_
*prval = 0;
// find oid
if(name_map.count(object_name) == 0) {
oid_t oid = vargs->oid;
if(oid == 0) {
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
oid = get_oid_from_name(sdskv_ph, name_db_id, object_name);
vargs->oid = oid;
}
if(oid == 0) {
*prval = -1;
return;
}
oid_t oid = name_map[object_name];
segment_key_t lb;
lb.oid = oid;
......@@ -143,12 +154,17 @@ void read_op_exec_omap_get_keys(void* u, const char* start_after, uint64_t max_r
int ret;
*prval = 0;
// find oid
if(name_map.count(object_name) == 0) {
oid_t oid = vargs->oid;
if(oid == 0) {
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
oid = get_oid_from_name(sdskv_ph, name_db_id, object_name);
vargs->oid = oid;
}
if(oid == 0) {
*prval = -1;
return;
}
oid_t oid = name_map[object_name];
omap_iter_create(iter);
size_t lb_size = sizeof(omap_key_t)+strlen(start_after);
......@@ -201,12 +217,17 @@ void read_op_exec_omap_get_vals(void* u, const char* start_after, const char* fi
int ret;
*prval = 0;
// find oid
if(name_map.count(object_name) == 0) {
oid_t oid = vargs->oid;
if(oid == 0) {
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
oid = get_oid_from_name(sdskv_ph, name_db_id, object_name);
vargs->oid = oid;
}
if(oid == 0) {
*prval = -1;
return;
}
oid_t oid = name_map[object_name];
hg_size_t max_items = 10;
// TODO make this changeable
......@@ -286,13 +307,18 @@ void read_op_exec_omap_get_vals_by_keys(void* u, char const* const* keys, size_t
int ret;
*prval = 0;
// find oid
if(name_map.count(object_name) == 0) {
oid_t oid = vargs->oid;
if(oid == 0) {
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
oid = get_oid_from_name(sdskv_ph, name_db_id, object_name);
vargs->oid = oid;
}
if(oid == 0) {
*prval = -1;
return;
}
oid_t oid = name_map[object_name];
omap_iter_create(iter);
// figure out key sizes
......@@ -332,3 +358,15 @@ void read_op_exec_end(void* u)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
static oid_t get_oid_from_name(
sdskv_provider_handle_t ph,
sdskv_database_id_t name_db_id,
const char* name)
{
oid_t result = 0;
hg_size_t oid_size = sizeof(result);
int ret = sdskv_get(ph, name_db_id, (const void*)name, strlen(name+1), (void*)&result, &oid_size);
if(ret != SDSKV_SUCCESS) return 0;
return result;
}
......@@ -21,7 +21,12 @@ static void write_op_exec_zero(void*, uint64_t, uint64_t);
static void write_op_exec_omap_set(void*, char const* const*, char const* const*, const size_t*, size_t);
static void write_op_exec_omap_rm_keys(void*, char const* const*, size_t);
static oid_t get_or_create_oid(const char* name);
static oid_t get_or_create_oid(
sdskv_provider_handle_t ph,
sdskv_database_id_t name_db_id,
sdskv_database_id_t oid_db_id,
const char* object_name);
static void insert_region_log_entry(oid_t oid, uint64_t offset, uint64_t len, bake_region_id_t* region, double ts = -1.0);
static void insert_small_region_log_entry(oid_t oid, uint64_t offset, uint64_t len, const char* data, double ts = -1.0);
static void insert_zero_log_entry(oid_t oid, uint64_t offset, uint64_t len, double ts=-1.0);
......@@ -62,13 +67,23 @@ void write_op_exec_end(void* u)
void write_op_exec_create(void* u, int exclusive)
{
auto vargs = static_cast<server_visitor_args_t>(u);
get_or_create_oid(vargs->object_name);
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
sdskv_database_id_t oid_db_id = vargs->srv_ctx->oid_db_id;
get_or_create_oid(sdskv_ph, name_db_id, oid_db_id, vargs->object_name);
}
void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset)
{
auto vargs = static_cast<server_visitor_args_t>(u);
oid_t oid = get_or_create_oid(vargs->object_name);
oid_t oid = vargs->oid;
if(oid == 0) {
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
sdskv_database_id_t oid_db_id = vargs->srv_ctx->oid_db_id;
oid = get_or_create_oid(sdskv_ph, name_db_id, oid_db_id, vargs->object_name);
vargs->oid = oid;
}
bake_provider_handle_t bake_ph = vargs->srv_ctx->bake_ph;
bake_target_id_t bti = vargs->srv_ctx->bake_tid;
......@@ -106,7 +121,14 @@ void write_op_exec_write_full(void* u, buffer_u buf, size_t len)
// and re-create it.
auto vargs = static_cast<server_visitor_args_t>(u);
oid_t oid = get_or_create_oid(vargs->object_name);
oid_t oid = vargs->oid;
if(oid == 0) {
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
sdskv_database_id_t oid_db_id = vargs->srv_ctx->oid_db_id;
oid = get_or_create_oid(sdskv_ph, name_db_id, oid_db_id, vargs->object_name);
vargs->oid = oid;
}
bake_provider_handle_t bph = vargs->srv_ctx->bake_ph;
bake_target_id_t bti = vargs->srv_ctx->bake_tid;
......@@ -130,7 +152,14 @@ void write_op_exec_write_full(void* u, buffer_u buf, size_t len)
void write_op_exec_writesame(void* u, buffer_u buf, size_t data_len, size_t write_len, uint64_t offset)
{
auto vargs = static_cast<server_visitor_args_t>(u);
oid_t oid = get_or_create_oid(vargs->object_name);
oid_t oid = vargs->oid;
if(oid == 0) {
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
sdskv_database_id_t oid_db_id = vargs->srv_ctx->oid_db_id;
oid = get_or_create_oid(sdskv_ph, name_db_id, oid_db_id, vargs->object_name);
vargs->oid = oid;
}
bake_provider_handle_t bph = vargs->srv_ctx->bake_ph;
bake_target_id_t bti = vargs->srv_ctx->bake_tid;
......@@ -163,7 +192,14 @@ void write_op_exec_writesame(void* u, buffer_u buf, size_t data_len, size_t writ
void write_op_exec_append(void* u, buffer_u buf, size_t len)
{
auto vargs = static_cast<server_visitor_args_t>(u);
oid_t oid = get_or_create_oid(vargs->object_name);
oid_t oid = vargs->oid;
if(oid == 0) {
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
sdskv_database_id_t oid_db_id = vargs->srv_ctx->oid_db_id;
oid = get_or_create_oid(sdskv_ph, name_db_id, oid_db_id, vargs->object_name);
vargs->oid = oid;
}
bake_provider_handle_t bph = vargs->srv_ctx->bake_ph;
bake_target_id_t bti = vargs->srv_ctx->bake_tid;
......@@ -199,7 +235,14 @@ void write_op_exec_remove(void* u)
void write_op_exec_truncate(void* u, uint64_t offset)
{
auto vargs = static_cast<server_visitor_args_t>(u);
oid_t oid = get_or_create_oid(vargs->object_name);
oid_t oid = vargs->oid;
if(oid == 0) {
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
sdskv_database_id_t oid_db_id = vargs->srv_ctx->oid_db_id;
oid = get_or_create_oid(sdskv_ph, name_db_id, oid_db_id, vargs->object_name);
vargs->oid = oid;
}
insert_punch_log_entry(oid, offset);
}
......@@ -207,7 +250,14 @@ void write_op_exec_truncate(void* u, uint64_t offset)
void write_op_exec_zero(void* u, uint64_t offset, uint64_t len)
{
auto vargs = static_cast<server_visitor_args_t>(u);
oid_t oid = get_or_create_oid(vargs->object_name);
oid_t oid = vargs->oid;
if(oid == 0) {
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
sdskv_database_id_t oid_db_id = vargs->srv_ctx->oid_db_id;
oid = get_or_create_oid(sdskv_ph, name_db_id, oid_db_id, vargs->object_name);
vargs->oid = oid;
}
insert_zero_log_entry(oid, offset, len);
}
......@@ -219,9 +269,16 @@ void write_op_exec_omap_set(void* u, char const* const* keys,
{
int ret;
auto vargs = static_cast<server_visitor_args_t>(u);
oid_t oid = get_or_create_oid(vargs->object_name);
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
sdskv_database_id_t oid_db_id = vargs->srv_ctx->oid_db_id;
sdskv_database_id_t omap_db_id = vargs->srv_ctx->omap_db_id;
oid_t oid = vargs->oid;
if(oid == 0) {
oid = get_or_create_oid(sdskv_ph, name_db_id, oid_db_id, vargs->object_name);
vargs->oid = oid;
}
for(auto i=0; i<num; i++) {
ret = sdskv_put(sdskv_ph, omap_db_id,
(const void*)keys[i], strlen(keys[i])+1,
......@@ -230,56 +287,67 @@ void write_op_exec_omap_set(void* u, char const* const* keys,
fprintf(stderr, "write_op_exec_omap_set: error in sdskv_put() (ret = %d)\n", ret);
}
}
#if 0
for(auto i=0; i<num; i++) {
std::vector<char> val(vals[i], vals[i]+lens[i]);
omap_key_t omk;
omk.oid = oid;
omk.key = std::string(keys[i]);
omap_map[std::move(omk)] = std::move(val);
}
#endif
}
void write_op_exec_omap_rm_keys(void* u, char const* const* keys, size_t num_keys)
{
auto vargs = static_cast<server_visitor_args_t>(u);
oid_t oid = get_or_create_oid(vargs->object_name);
int ret;
auto vargs = static_cast<server_visitor_args_t>(u);
sdskv_provider_handle_t sdskv_ph = vargs->srv_ctx->sdskv_ph;
sdskv_database_id_t name_db_id = vargs->srv_ctx->name_db_id;
sdskv_database_id_t oid_db_id = vargs->srv_ctx->oid_db_id;
sdskv_database_id_t omap_db_id = vargs->srv_ctx->omap_db_id;
oid_t oid = vargs->oid;
if(oid == 0) {
oid = get_or_create_oid(sdskv_ph, name_db_id, oid_db_id, vargs->object_name);
vargs->oid = oid;
}
for(auto i=0; i<num_keys; i++) {
ret = sdskv_erase(sdskv_ph, omap_db_id,
(const void*)keys[i], strlen(keys[i])+1);
if(ret != SDSKV_SUCCESS)
fprintf(stderr, "write_op_exec_omap_rm_keys: error in sdskv_erase() (ret = %d)\n", ret);
}
#if 0
for(auto i=0; i < num_keys; i++) {
omap_key_t omk;
omk.oid = oid;
omk.key = std::string(keys[i]);
omap_map.erase(omk);
}
#endif
}
oid_t get_or_create_oid(const char* object_name)
oid_t get_or_create_oid(
sdskv_provider_handle_t ph,
sdskv_database_id_t name_db_id,
sdskv_database_id_t oid_db_id,
const char* object_name)
{
oid_t oid = 0;
std::string name(object_name);
// check that the object exists, if not, create the object
if(name_map.count(name) == 0) {
hg_size_t vsize = sizeof(oid);
int ret;
ret = sdskv_get(ph, name_db_id, (const void*)object_name,
strlen(object_name)+1, &oid, &vsize);
if(SDSKV_ERR_UNKNOWN_KEY == ret) {
std::hash<std::string> hash_fn;
oid = hash_fn(name);
while(oid_map.count(oid) != 0 || oid == 0) {
oid = hash_fn(std::string(object_name));
ret = SDSKV_SUCCESS;
while(ret == SDSKV_SUCCESS) {
hg_size_t s = 0;
if(oid != 0) {
ret = sdskv_length(ph, oid_db_id, (const void*)&oid,
sizeof(oid), &s);
}
oid += 1;
}
name_map[name] = oid;
oid_map[oid] = name;
// we make sure we stopped at an unknown key (not another SDSKV error)
if(ret != SDSKV_ERR_UNKNOWN_KEY) return 0;
// set name => oid
ret = sdskv_put(ph, name_db_id, (const void*)object_name,
strlen(object_name)+1, &oid, sizeof(oid));
if(ret != SDSKV_SUCCESS) return 0;
// set oid => name
ret = sdskv_put(ph, oid_db_id, &oid, sizeof(oid),
(const void*)object_name, strlen(object_name)+1);
if(ret != SDSKV_SUCCESS) return 0;
} else {
oid = name_map[name];
oid = 0;
}
return oid;
}
......
......@@ -182,6 +182,7 @@ static hg_return_t mobject_write_op_ult(hg_handle_t h)
server_visitor_args vargs;
vargs.object_name = in.object_name;
vargs.oid = 0;
vargs.pool_name = in.pool_name;
vargs.srv_ctx = margo_registered_data_mplex(mid, info->id, info->target_id);
if(vargs.srv_ctx == NULL) return HG_OTHER_ERROR;
......@@ -234,6 +235,7 @@ static hg_return_t mobject_read_op_ult(hg_handle_t h)
server_visitor_args vargs;
vargs.object_name = in.object_name;
vargs.oid = 0;
vargs.pool_name = in.pool_name;
vargs.srv_ctx = margo_registered_data_mplex(mid, info->id, info->target_id);
if(vargs.srv_ctx == NULL) return HG_OTHER_ERROR;
......
......@@ -3,6 +3,7 @@
#include <margo.h>
#include "libmobject-store.h"
#include "src/server/core/key-types.h"
#include "src/server/mobject-server-context.h"
#ifdef __cplusplus
......@@ -11,6 +12,7 @@ extern "C" {
typedef struct {
const char* object_name;
oid_t oid;
const char* pool_name;
struct mobject_server_context* srv_ctx;
const char* client_addr_str;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment