Commit 8030782e authored by Matthieu Dorier's avatar Matthieu Dorier

started integration of bake-bulk

parent a21fb678
......@@ -17,6 +17,10 @@ AM_CXXFLAGS = -std=c++14 $(AM_CFLAGS)
SERVER_LIBS = @SERVER_LIBS@
CLIENT_LIBS = @CLIENT_LIBS@
SERVER_CPPFLAGS = @SERVER_CPPFLAGS@
CLIENT_CPPFLAGS = @CLIENT_CPPFLAGS@
SERVER_CFLAGS = @SERVER_CFLAGS@
CLIENT_CFLAGS = @CLIENT_CFLAGS@
lib_LTLIBRARIES = \
src/client/libmobject-store.la \
......
......@@ -75,7 +75,7 @@ CFLAGS="$ARGOBOTS_CFLAGS $CFLAGS"
PKG_CHECK_MODULES([SDSKEYVALSERVER],[kv-server],[],
AC_MSG_ERROR([Could not find working sds-keyval server installation!]) )
SERVER_LIBS="$SDSKEYVALSERVER_LIBS $SERVER_LIBS"
SERVER_LIBS="$SDSKEYVALSERVER_LIBS $SERVER_LIBS -lboost_system"
SERVER_CPPFLAGS="$SDSKEYVALSERVER_CFLAGS $SERVER_CPPFLAGS"
SERVER_CFLAGS="$SDSKEYVALSERVER_CFLAGS $SERVER_CFLAGS"
......@@ -91,11 +91,17 @@ SERVER_LIBS="$PMEM_LIBS $SERVER_LIBS"
SERVER_CPPFLAGS="$PMEM_CFLAGS $SERVER_CPPFLAGS"
SERVER_CFLAGS="$PMEM_CFLAGS $SERVER_CFLAGS"
PKG_CHECK_MODULES([BAKE],[bake-bulk-server],[],
AC_MSG_ERROR([Could not find working BAKE installation!]) )
SERVER_LIBS="$BAKE_LIBS $SERVER_LIBS"
SERVER_CPPFLAGS="$BAKE_CFLAGS $SERVER_CPPFLAGS"
SERVER_CFLAGS="$BAKE_CFLAGS $SERVER_CFLAGS"
PKG_CHECK_MODULES([BAKESERVER],[bake-bulk-server],[],
AC_MSG_ERROR([Could not find working BAKE server installation!]) )
SERVER_LIBS="$BAKESERVER_LIBS $SERVER_LIBS"
SERVER_CPPFLAGS="$BAKESERVER_CFLAGS $SERVER_CPPFLAGS"
SERVER_CFLAGS="$BAKESERVER_CFLAGS $SERVER_CFLAGS"
PKG_CHECK_MODULES([BAKECLIENT],[bake-bulk-client],[],
AC_MSG_ERROR([Could not find working BAKE client installation!]) )
SERVER_LIBS="$BAKECLIENT_LIBS $SERVER_LIBS"
SERVER_CPPFLAGS="$BAKECLIENT_CFLAGS $SERVER_CPPFLAGS"
SERVER_CFLAGS="$BAKECLIENT_CFLAGS $SERVER_CFLAGS"
# check that SSG was compiled with MPI support
AC_CHECK_LIB([ssg], [ssg_group_create_mpi],
......@@ -123,5 +129,9 @@ AM_CONDITIONAL(HAVE_RADOS, test x"$with_rados" == "xyes")
AC_SUBST(SERVER_LIBS)
AC_SUBST(CLIENT_LIBS)
AC_SUBST(SERVER_CPPFLAGS)
AC_SUBST(CLIENT_CPPFLAGS)
AC_SUBST(SERVER_CFLAGS)
AC_SUBST(CLIENT_CFLAGS)
AC_CONFIG_FILES([Makefile])
AC_OUTPUT
......@@ -60,15 +60,19 @@ src_server_libmobject_server_la_SOURCES = \
src/server/fake/fake-write-op.cpp \
src/server/fake/fake-read-op.cpp \
src/server/fake/fake-db.cpp \
src/server/core/core-write-op.cpp \
src/server/core/core-read-op.cpp \
src/server/print-write-op.c \
src/server/print-read-op.c
src_server_libmobject_server_la_CPPFLAGS = ${AM_CPPFLAGS} ${SERVER_CPPFLAGS}
src_server_libmobject_server_la_CFLAGS = ${AM_CFLAGS} ${SERVER_CFLAGS}
src_server_libmobject_server_la_LIBADD = src/omap-iter/libomap-iter.la \
src/io-chain/libio-chain.la ${SERVER_LIBS}
src_server_mobject_server_daemon_SOURCES = \
src/server/mobject-server-daemon.c
src_server_mobject_server_daemon_CPPFLAGS = ${AM_CPPFLAGS} ${SERVER_CPPFLAGS}
src_server_mobject_server_daemon_CFLAGS = ${AM_CFLAGS} ${SERVER_CFLAGS}
src_server_mobject_server_daemon_LDADD = \
src/server/libmobject-server.la ${SERVER_LIBS}
......
#include <map>
#include <string>
#include <iostream>
#include "src/server/core/core-read-op.h"
#include "src/server/visitor-args.h"
#include "src/io-chain/read-op-visitor.h"
#include "src/io-chain/read-resp-impl.h"
#include "src/omap-iter/omap-iter-impl.h"
static void read_op_exec_begin(void*);
static void read_op_exec_stat(void*, uint64_t*, time_t*, int*);
static void read_op_exec_read(void*, uint64_t, size_t, buffer_u, size_t*, int*);
static void read_op_exec_omap_get_keys(void*, const char*, uint64_t, mobject_store_omap_iter_t*, int*);
static void read_op_exec_omap_get_vals(void*, const char*, const char*, uint64_t, mobject_store_omap_iter_t*, int*);
static void read_op_exec_omap_get_vals_by_keys(void*, char const* const*, size_t, mobject_store_omap_iter_t*, int*);
static void read_op_exec_end(void*);
static struct read_op_visitor read_op_exec = {
.visit_begin = read_op_exec_begin,
.visit_stat = read_op_exec_stat,
.visit_read = read_op_exec_read,
.visit_omap_get_keys = read_op_exec_omap_get_keys,
.visit_omap_get_vals = read_op_exec_omap_get_vals,
.visit_omap_get_vals_by_keys = read_op_exec_omap_get_vals_by_keys,
.visit_end = read_op_exec_end
};
extern "C" void core_read_op(mobject_store_read_op_t read_op, server_visitor_args_t vargs)
{
execute_read_op_visitor(&read_op_exec, read_op, (void*)vargs);
}
void read_op_exec_begin(void* u)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
void read_op_exec_stat(void* u, uint64_t* psize, time_t* pmtime, int* prval)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
void read_op_exec_read(void* u, uint64_t offset, size_t len, buffer_u buf, size_t* bytes_read, int* prval)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
void read_op_exec_omap_get_keys(void* u, const char* start_after, uint64_t max_return,
mobject_store_omap_iter_t* iter, int* prval)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
void read_op_exec_omap_get_vals(void* u, const char* start_after, const char* filter_prefix, uint64_t max_return, mobject_store_omap_iter_t* iter, int* prval)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
void read_op_exec_omap_get_vals_by_keys(void* u, char const* const* keys, size_t num_keys, mobject_store_omap_iter_t* iter, int* prval)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
void read_op_exec_end(void* u)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
#ifndef __CORE_READ_OP_H
#define __CORE_READ_OP_H
#include <margo.h>
#include "libmobject-store.h"
#include "src/server/visitor-args.h"
#ifdef __cplusplus
extern "C" {
#endif
void core_read_op(mobject_store_read_op_t read_op, server_visitor_args_t vargs);
#ifdef __cplusplus
}
#endif
#endif
#include <map>
#include <string>
#include <iostream>
#include "src/server/visitor-args.h"
#include "src/io-chain/write-op-visitor.h"
static void write_op_exec_begin(void*);
static void write_op_exec_end(void*);
static void write_op_exec_create(void*, int);
static void write_op_exec_write(void*, buffer_u, size_t, uint64_t);
static void write_op_exec_write_full(void*, buffer_u, size_t);
static void write_op_exec_writesame(void*, buffer_u, size_t, size_t, uint64_t);
static void write_op_exec_append(void*, buffer_u, size_t);
static void write_op_exec_remove(void*);
static void write_op_exec_truncate(void*, uint64_t);
static void write_op_exec_zero(void*, uint64_t, uint64_t);
static void write_op_exec_omap_set(void*, char const* const*, char const* const*, const size_t*, size_t);
static void write_op_exec_omap_rm_keys(void*, char const* const*, size_t);
static struct write_op_visitor write_op_exec = {
.visit_begin = write_op_exec_begin,
.visit_create = write_op_exec_create,
.visit_write = write_op_exec_write,
.visit_write_full = write_op_exec_write_full,
.visit_writesame = write_op_exec_writesame,
.visit_append = write_op_exec_append,
.visit_remove = write_op_exec_remove,
.visit_truncate = write_op_exec_truncate,
.visit_zero = write_op_exec_zero,
.visit_omap_set = write_op_exec_omap_set,
.visit_omap_rm_keys = write_op_exec_omap_rm_keys,
.visit_end = write_op_exec_end
};
extern "C" void core_write_op(mobject_store_write_op_t write_op, server_visitor_args_t vargs)
{
/* Execute the operation chain */
execute_write_op_visitor(&write_op_exec, write_op, (void*)vargs);
}
void write_op_exec_begin(void* u)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
void write_op_exec_end(void* u)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
void write_op_exec_create(void* u, int exclusive)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
void write_op_exec_write_full(void* u, buffer_u buf, size_t len)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
void write_op_exec_writesame(void* u, buffer_u buf, size_t data_len, size_t write_len, uint64_t offset)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
void write_op_exec_append(void* u, buffer_u buf, size_t len)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
void write_op_exec_remove(void* u)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
void write_op_exec_truncate(void* u, uint64_t offset)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
void write_op_exec_zero(void* u, uint64_t offset, uint64_t len)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
void write_op_exec_omap_set(void* u, char const* const* keys,
char const* const* vals,
const size_t *lens,
size_t num)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
void write_op_exec_omap_rm_keys(void* u, char const* const* keys, size_t num_keys)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
#ifndef __CORE_WRITE_OP_H
#define __CORE_WRITE_OP_H
#include <margo.h>
#include "libmobject-store.h"
#include "src/server/visitor-args.h"
#ifdef __cplusplus
extern "C" {
#endif
void core_write_op(mobject_store_write_op_t write_op, server_visitor_args_t vargs);
#ifdef __cplusplus
}
#endif
#endif
......@@ -17,7 +17,7 @@ static void read_op_exec_omap_get_vals(void*, const char*, const char*, uint64_t
static void read_op_exec_omap_get_vals_by_keys(void*, char const* const*, size_t, mobject_store_omap_iter_t*, int*);
static void read_op_exec_end(void*);
struct read_op_visitor read_op_exec = {
static struct read_op_visitor read_op_exec = {
.visit_begin = read_op_exec_begin,
.visit_stat = read_op_exec_stat,
.visit_read = read_op_exec_read,
......
......@@ -20,7 +20,7 @@ static void write_op_exec_zero(void*, uint64_t, uint64_t);
static void write_op_exec_omap_set(void*, char const* const*, char const* const*, const size_t*, size_t);
static void write_op_exec_omap_rm_keys(void*, char const* const*, size_t);
struct write_op_visitor write_op_exec = {
static struct write_op_visitor write_op_exec = {
.visit_begin = write_op_exec_begin,
.visit_create = write_op_exec_create,
.visit_write = write_op_exec_write,
......
......@@ -9,7 +9,8 @@
#include <abt.h>
#include <margo.h>
//#include <sds-keyval.h>
//#include <bake-bulk-server.h>
#include <bake-bulk-server.h>
#include <bake-bulk-client.h>
//#include <libpmemobj.h>
#include <ssg-mpi.h>
......@@ -21,8 +22,10 @@
#include "src/io-chain/write-op-impl.h"
#include "src/io-chain/read-op-impl.h"
#include "src/server/visitor-args.h"
#include "src/server/fake/fake-write-op.h"
#include "src/server/fake/fake-read-op.h"
//#include "src/server/fake/fake-write-op.h"
//#include "src/server/fake/fake-write-op.h"
#include "src/server/core/core-read-op.h"
#include "src/server/core/core-write-op.h"
typedef struct mobject_server_context
{
......@@ -30,7 +33,7 @@ typedef struct mobject_server_context
margo_instance_id mid;
/* TODO bake, sds-keyval stuff */
ssg_group_id_t gid;
bake_target_id_t bake_id;
/* server shutdown conditional logic */
ABT_mutex shutdown_mutex;
ABT_cond shutdown_cond;
......@@ -74,7 +77,6 @@ int mobject_server_init(margo_instance_id mid, const char *cluster_file)
ABT_mutex_create(&srv_ctx->shutdown_mutex);
ABT_cond_create(&srv_ctx->shutdown_cond);
/* TODO bake-bulk */
/* TODO sds-keyval */
# if 0
kv_context *metadata;
......@@ -123,6 +125,16 @@ int mobject_server_init(margo_instance_id mid, const char *cluster_file)
}
}
/* initialize bake-bulk */
/* server part */
struct bake_pool_info* pool_info = bake_server_makepool("/dev/shm/mobject.dat");
bake_server_register(mid, pool_info);
// XXX: check return values for the above two calls
/* client part */
hg_addr_t self_addr = ssg_get_addr(srv_ctx->gid, my_id);
bake_probe_instance(mid, self_addr, &(srv_ctx->bake_id));
// XXX: check return value of the above calls
g_srv_ctx = srv_ctx;
return 0;
......@@ -217,7 +229,8 @@ static hg_return_t mobject_write_op_ult(hg_handle_t h)
/* Execute the operation chain */
//print_write_op(in.write_op, in.object_name);
fake_write_op(in.write_op, &vargs);
//fake_write_op(in.write_op, &vargs);
core_write_op(in.write_op, &vargs);
// set the return value of the RPC
out.ret = 0;
......@@ -262,7 +275,8 @@ static hg_return_t mobject_read_op_ult(hg_handle_t h)
/* Compute the result. */
//print_read_op(in.read_op, in.object_name);
fake_read_op(in.read_op, &vargs);
//fake_read_op(in.read_op, &vargs);
core_read_op(in.read_op, &vargs);
out.responses = resp;
......@@ -304,6 +318,11 @@ DEFINE_MARGO_RPC_HANDLER(mobject_shutdown_ult)
static void mobject_server_cleanup(mobject_server_context_t *srv_ctx)
{
// cleanup bake-bulk
bake_shutdown_service(srv_ctx->bake_id);
bake_release_instance(srv_ctx->bake_id);
// XXX: check the return value of these calls
ssg_group_destroy(srv_ctx->gid);
ssg_finalize();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment