Commit a1ef1e13 authored by Matthieu Dorier's avatar Matthieu Dorier

passing server context down to io chain visitor

parent bbf0ca8d
......@@ -49,16 +49,19 @@ void read_op_exec_omap_get_keys(void* u, const char* start_after, uint64_t max_r
mobject_store_omap_iter_t* iter, int* prval)
{
auto vargs = static_cast<server_visitor_args_t>(u);
omap_iter_create(iter);
}
void read_op_exec_omap_get_vals(void* u, const char* start_after, const char* filter_prefix, uint64_t max_return, mobject_store_omap_iter_t* iter, int* prval)
{
auto vargs = static_cast<server_visitor_args_t>(u);
omap_iter_create(iter);
}
void read_op_exec_omap_get_vals_by_keys(void* u, char const* const* keys, size_t num_keys, mobject_store_omap_iter_t* iter, int* prval)
{
auto vargs = static_cast<server_visitor_args_t>(u);
omap_iter_create(iter);
}
void read_op_exec_end(void* u)
......
......@@ -59,7 +59,8 @@ void read_op_exec_read(void* u, uint64_t offset, size_t len, buffer_u buf, size_
*prval = -1;
return;
}
fake_db[name].read(vargs->mid, vargs->client_addr, vargs->bulk_handle,
margo_instance_id mid = vargs->srv_ctx->mid;
fake_db[name].read(mid, vargs->client_addr, vargs->bulk_handle,
buf.as_offset, offset, len, bytes_read);
*prval = 0;
}
......
......@@ -69,7 +69,8 @@ void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset)
if(fake_db.count(name) == 0) {
std::cerr << "[FAKE-BACKEND-WARNING] (write) Object " << name << " does not exist, it will be created" << std::endl;
}
fake_db[name].write(vargs->mid, vargs->client_addr, vargs->bulk_handle, buf.as_offset, offset, len);
margo_instance_id mid = vargs->srv_ctx->mid;
fake_db[name].write(mid, vargs->client_addr, vargs->bulk_handle, buf.as_offset, offset, len);
}
void write_op_exec_write_full(void* u, buffer_u buf, size_t len)
......@@ -79,7 +80,8 @@ void write_op_exec_write_full(void* u, buffer_u buf, size_t len)
if(fake_db.count(name) == 0) {
std::cerr << "[FAKE-BACKEND-WARNING] (write_full) Object " << name << " does not exist, it will be created" << std::endl;
}
fake_db[name].write_full(vargs->mid, vargs->client_addr, vargs->bulk_handle, buf.as_offset, len);
margo_instance_id mid = vargs->srv_ctx->mid;
fake_db[name].write_full(mid, vargs->client_addr, vargs->bulk_handle, buf.as_offset, len);
}
void write_op_exec_writesame(void* u, buffer_u buf, size_t data_len, size_t write_len, uint64_t offset)
......@@ -89,7 +91,8 @@ void write_op_exec_writesame(void* u, buffer_u buf, size_t data_len, size_t writ
if(fake_db.count(name) == 0) {
std::cerr << "[FAKE-BACKEND-WARNING] (writesame) Object " << name << " does not exist, it will be created" << std::endl;
}
fake_db[name].writesame(vargs->mid, vargs->client_addr, vargs->bulk_handle, buf.as_offset, offset, data_len, write_len);
margo_instance_id mid = vargs->srv_ctx->mid;
fake_db[name].writesame(mid, vargs->client_addr, vargs->bulk_handle, buf.as_offset, offset, data_len, write_len);
}
void write_op_exec_append(void* u, buffer_u buf, size_t len)
......@@ -99,7 +102,8 @@ void write_op_exec_append(void* u, buffer_u buf, size_t len)
if(fake_db.count(name) == 0) {
std::cerr << "[FAKE-BACKEND-WARNING] (append) Object " << name << " does not exist, it will be created" << std::endl;
}
fake_db[name].append(vargs->mid, vargs->client_addr, vargs->bulk_handle, buf.as_offset, len);
margo_instance_id mid = vargs->srv_ctx->mid;
fake_db[name].append(mid, vargs->client_addr, vargs->bulk_handle, buf.as_offset, len);
}
void write_op_exec_remove(void* u)
......
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __SERVER_MOBJECT_CONTEXT_H
#define __SERVER_MOBJECT_CONTEXT_H
#include <margo.h>
//#include <sds-keyval.h>
#include <bake-bulk-server.h>
#include <bake-bulk-client.h>
//#include <libpmemobj.h>
#include <ssg-mpi.h>
#ifdef __cplusplus
extern "C" {
#endif
struct mobject_server_context
{
/* margo, bake, sds-keyval, ssg state */
margo_instance_id mid;
/* TODO bake, sds-keyval stuff */
ssg_group_id_t gid;
bake_target_id_t bake_id;
/* server shutdown conditional logic */
ABT_mutex shutdown_mutex;
ABT_cond shutdown_cond;
int shutdown_flag;
int ref_count;
};
#ifdef __cplusplus
}
#endif
#endif
......@@ -4,7 +4,7 @@
* See COPYRIGHT in top-level directory.
*/
//#define FAKE_CPP_SERVER
#define FAKE_CPP_SERVER
#include <assert.h>
#include <mpi.h>
......@@ -17,6 +17,7 @@
#include <ssg-mpi.h>
#include "mobject-server.h"
#include "src/server/mobject-server-context.h"
#include "src/rpc-types/write-op.h"
#include "src/rpc-types/read-op.h"
//#include "src/server/print-write-op.h"
......@@ -32,20 +33,6 @@
#include "src/server/core/core-write-op.h"
#endif
struct mobject_server_context
{
/* margo, bake, sds-keyval, ssg state */
margo_instance_id mid;
/* TODO bake, sds-keyval stuff */
ssg_group_id_t gid;
bake_target_id_t bake_id;
/* server shutdown conditional logic */
ABT_mutex shutdown_mutex;
ABT_cond shutdown_cond;
int shutdown_flag;
int ref_count;
} ;
static int mobject_server_register(mobject_server_context_t *srv_ctx);
static void mobject_server_cleanup(mobject_server_context_t *srv_ctx);
......@@ -221,11 +208,12 @@ static hg_return_t mobject_write_op_ult(hg_handle_t h)
assert(ret == HG_SUCCESS);
const struct hg_info* info = margo_get_info(h);
margo_instance_id mid = margo_hg_handle_get_instance(h);
server_visitor_args vargs;
vargs.object_name = in.object_name;
vargs.pool_name = in.pool_name;
vargs.mid = margo_hg_handle_get_instance(h);
vargs.srv_ctx = margo_registered_data(mid, info->id);
vargs.client_addr = info->addr;
vargs.bulk_handle = in.write_op->bulk_handle;
......@@ -270,11 +258,12 @@ static hg_return_t mobject_read_op_ult(hg_handle_t h)
read_response_t resp = build_matching_read_responses(in.read_op);
const struct hg_info* info = margo_get_info(h);
margo_instance_id mid = margo_hg_handle_get_instance(h);
server_visitor_args vargs;
vargs.object_name = in.object_name;
vargs.pool_name = in.pool_name;
vargs.mid = margo_hg_handle_get_instance(h);
vargs.srv_ctx = margo_registered_data(mid,info->id);
vargs.client_addr = info->addr;
vargs.bulk_handle = in.read_op->bulk_handle;
......
......@@ -3,17 +3,18 @@
#include <margo.h>
#include "libmobject-store.h"
#include "src/server/mobject-server-context.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct {
const char* object_name;
const char* pool_name;
margo_instance_id mid;
hg_addr_t client_addr;
hg_bulk_t bulk_handle;
const char* object_name;
const char* pool_name;
struct mobject_server_context* srv_ctx;
hg_addr_t client_addr;
hg_bulk_t bulk_handle;
} server_visitor_args;
typedef server_visitor_args* server_visitor_args_t;
......
......@@ -73,7 +73,6 @@ int main(int argc, char** argv)
mobject_store_omap_iter_t iter4;
int prval4;
mobject_store_read_op_omap_get_vals(read_op, start_after2, filter_prefix2, 3, &iter4, &prval4);
// Add "omap_get_vals_by_keys" operation
const char* keys[] = {"matthieu", "robl"};
mobject_store_omap_iter_t iter5;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment