Commit bc5e8250 authored by Matthieu Dorier's avatar Matthieu Dorier

started integration of bake-bulk

parent 59fd13a9
...@@ -17,6 +17,10 @@ AM_CXXFLAGS = -std=c++14 $(AM_CFLAGS) ...@@ -17,6 +17,10 @@ AM_CXXFLAGS = -std=c++14 $(AM_CFLAGS)
SERVER_LIBS = @SERVER_LIBS@ SERVER_LIBS = @SERVER_LIBS@
CLIENT_LIBS = @CLIENT_LIBS@ CLIENT_LIBS = @CLIENT_LIBS@
SERVER_CPPFLAGS = @SERVER_CPPFLAGS@
CLIENT_CPPFLAGS = @CLIENT_CPPFLAGS@
SERVER_CFLAGS = @SERVER_CFLAGS@
CLIENT_CFLAGS = @CLIENT_CFLAGS@
lib_LTLIBRARIES = \ lib_LTLIBRARIES = \
src/client/libmobject-store.la \ src/client/libmobject-store.la \
......
...@@ -75,7 +75,7 @@ CFLAGS="$ARGOBOTS_CFLAGS $CFLAGS" ...@@ -75,7 +75,7 @@ CFLAGS="$ARGOBOTS_CFLAGS $CFLAGS"
PKG_CHECK_MODULES([SDSKEYVALSERVER],[kv-server],[], PKG_CHECK_MODULES([SDSKEYVALSERVER],[kv-server],[],
AC_MSG_ERROR([Could not find working sds-keyval server installation!]) ) AC_MSG_ERROR([Could not find working sds-keyval server installation!]) )
SERVER_LIBS="$SDSKEYVALSERVER_LIBS $SERVER_LIBS" SERVER_LIBS="$SDSKEYVALSERVER_LIBS $SERVER_LIBS -lboost_system"
SERVER_CPPFLAGS="$SDSKEYVALSERVER_CFLAGS $SERVER_CPPFLAGS" SERVER_CPPFLAGS="$SDSKEYVALSERVER_CFLAGS $SERVER_CPPFLAGS"
SERVER_CFLAGS="$SDSKEYVALSERVER_CFLAGS $SERVER_CFLAGS" SERVER_CFLAGS="$SDSKEYVALSERVER_CFLAGS $SERVER_CFLAGS"
...@@ -91,11 +91,17 @@ SERVER_LIBS="$PMEM_LIBS $SERVER_LIBS" ...@@ -91,11 +91,17 @@ SERVER_LIBS="$PMEM_LIBS $SERVER_LIBS"
SERVER_CPPFLAGS="$PMEM_CFLAGS $SERVER_CPPFLAGS" SERVER_CPPFLAGS="$PMEM_CFLAGS $SERVER_CPPFLAGS"
SERVER_CFLAGS="$PMEM_CFLAGS $SERVER_CFLAGS" SERVER_CFLAGS="$PMEM_CFLAGS $SERVER_CFLAGS"
PKG_CHECK_MODULES([BAKE],[bake-bulk-server],[], PKG_CHECK_MODULES([BAKESERVER],[bake-bulk-server],[],
AC_MSG_ERROR([Could not find working BAKE installation!]) ) AC_MSG_ERROR([Could not find working BAKE server installation!]) )
SERVER_LIBS="$BAKE_LIBS $SERVER_LIBS" SERVER_LIBS="$BAKESERVER_LIBS $SERVER_LIBS"
SERVER_CPPFLAGS="$BAKE_CFLAGS $SERVER_CPPFLAGS" SERVER_CPPFLAGS="$BAKESERVER_CFLAGS $SERVER_CPPFLAGS"
SERVER_CFLAGS="$BAKE_CFLAGS $SERVER_CFLAGS" SERVER_CFLAGS="$BAKESERVER_CFLAGS $SERVER_CFLAGS"
PKG_CHECK_MODULES([BAKECLIENT],[bake-bulk-client],[],
AC_MSG_ERROR([Could not find working BAKE client installation!]) )
SERVER_LIBS="$BAKECLIENT_LIBS $SERVER_LIBS"
SERVER_CPPFLAGS="$BAKECLIENT_CFLAGS $SERVER_CPPFLAGS"
SERVER_CFLAGS="$BAKECLIENT_CFLAGS $SERVER_CFLAGS"
# check that SSG was compiled with MPI support # check that SSG was compiled with MPI support
AC_CHECK_LIB([ssg], [ssg_group_create_mpi], AC_CHECK_LIB([ssg], [ssg_group_create_mpi],
...@@ -123,5 +129,9 @@ AM_CONDITIONAL(HAVE_RADOS, test x"$with_rados" == "xyes") ...@@ -123,5 +129,9 @@ AM_CONDITIONAL(HAVE_RADOS, test x"$with_rados" == "xyes")
AC_SUBST(SERVER_LIBS) AC_SUBST(SERVER_LIBS)
AC_SUBST(CLIENT_LIBS) AC_SUBST(CLIENT_LIBS)
AC_SUBST(SERVER_CPPFLAGS)
AC_SUBST(CLIENT_CPPFLAGS)
AC_SUBST(SERVER_CFLAGS)
AC_SUBST(CLIENT_CFLAGS)
AC_CONFIG_FILES([Makefile]) AC_CONFIG_FILES([Makefile])
AC_OUTPUT AC_OUTPUT
...@@ -59,15 +59,19 @@ src_server_libmobject_server_la_SOURCES = \ ...@@ -59,15 +59,19 @@ src_server_libmobject_server_la_SOURCES = \
src/server/fake/fake-write-op.cpp \ src/server/fake/fake-write-op.cpp \
src/server/fake/fake-read-op.cpp \ src/server/fake/fake-read-op.cpp \
src/server/fake/fake-db.cpp \ src/server/fake/fake-db.cpp \
src/server/core/core-write-op.cpp \
src/server/core/core-read-op.cpp \
src/server/print-write-op.c \ src/server/print-write-op.c \
src/server/print-read-op.c src/server/print-read-op.c
src_server_libmobject_server_la_CPPFLAGS = ${AM_CPPFLAGS} ${SERVER_CPPFLAGS} src_server_libmobject_server_la_CPPFLAGS = ${AM_CPPFLAGS} ${SERVER_CPPFLAGS}
src_server_libmobject_server_la_CFLAGS = ${AM_CFLAGS} ${SERVER_CFLAGS}
src_server_libmobject_server_la_LIBADD = src/omap-iter/libomap-iter.la \ src_server_libmobject_server_la_LIBADD = src/omap-iter/libomap-iter.la \
src/io-chain/libio-chain.la ${SERVER_LIBS} src/io-chain/libio-chain.la ${SERVER_LIBS}
src_server_mobject_server_daemon_SOURCES = \ src_server_mobject_server_daemon_SOURCES = \
src/server/mobject-server-daemon.c src/server/mobject-server-daemon.c
src_server_mobject_server_daemon_CPPFLAGS = ${AM_CPPFLAGS} ${SERVER_CPPFLAGS} src_server_mobject_server_daemon_CPPFLAGS = ${AM_CPPFLAGS} ${SERVER_CPPFLAGS}
src_server_mobject_server_daemon_CFLAGS = ${AM_CFLAGS} ${SERVER_CFLAGS}
src_server_mobject_server_daemon_LDADD = \ src_server_mobject_server_daemon_LDADD = \
src/server/libmobject-server.la ${SERVER_LIBS} src/server/libmobject-server.la ${SERVER_LIBS}
......
#include <map>
#include <string>
#include <iostream>
#include "src/server/core/core-read-op.h"
#include "src/server/visitor-args.h"
#include "src/io-chain/read-op-visitor.h"
#include "src/io-chain/read-resp-impl.h"
#include "src/omap-iter/omap-iter-impl.h"
static void read_op_exec_begin(void*);
static void read_op_exec_stat(void*, uint64_t*, time_t*, int*);
static void read_op_exec_read(void*, uint64_t, size_t, buffer_u, size_t*, int*);
static void read_op_exec_omap_get_keys(void*, const char*, uint64_t, mobject_store_omap_iter_t*, int*);
static void read_op_exec_omap_get_vals(void*, const char*, const char*, uint64_t, mobject_store_omap_iter_t*, int*);
static void read_op_exec_omap_get_vals_by_keys(void*, char const* const*, size_t, mobject_store_omap_iter_t*, int*);
static void read_op_exec_end(void*);
static struct read_op_visitor read_op_exec = {
.visit_begin = read_op_exec_begin,
.visit_stat = read_op_exec_stat,
.visit_read = read_op_exec_read,
.visit_omap_get_keys = read_op_exec_omap_get_keys,
.visit_omap_get_vals = read_op_exec_omap_get_vals,
.visit_omap_get_vals_by_keys = read_op_exec_omap_get_vals_by_keys,
.visit_end = read_op_exec_end
};
extern "C" void core_read_op(mobject_store_read_op_t read_op, server_visitor_args_t vargs)
{
execute_read_op_visitor(&read_op_exec, read_op, (void*)vargs);
}
void read_op_exec_begin(void* u)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
void read_op_exec_stat(void* u, uint64_t* psize, time_t* pmtime, int* prval)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
void read_op_exec_read(void* u, uint64_t offset, size_t len, buffer_u buf, size_t* bytes_read, int* prval)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
void read_op_exec_omap_get_keys(void* u, const char* start_after, uint64_t max_return,
mobject_store_omap_iter_t* iter, int* prval)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
void read_op_exec_omap_get_vals(void* u, const char* start_after, const char* filter_prefix, uint64_t max_return, mobject_store_omap_iter_t* iter, int* prval)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
void read_op_exec_omap_get_vals_by_keys(void* u, char const* const* keys, size_t num_keys, mobject_store_omap_iter_t* iter, int* prval)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
void read_op_exec_end(void* u)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
#ifndef __CORE_READ_OP_H
#define __CORE_READ_OP_H
#include <margo.h>
#include "libmobject-store.h"
#include "src/server/visitor-args.h"
#ifdef __cplusplus
extern "C" {
#endif
void core_read_op(mobject_store_read_op_t read_op, server_visitor_args_t vargs);
#ifdef __cplusplus
}
#endif
#endif
#include <map>
#include <string>
#include <iostream>
#include "src/server/visitor-args.h"
#include "src/io-chain/write-op-visitor.h"
static void write_op_exec_begin(void*);
static void write_op_exec_end(void*);
static void write_op_exec_create(void*, int);
static void write_op_exec_write(void*, buffer_u, size_t, uint64_t);
static void write_op_exec_write_full(void*, buffer_u, size_t);
static void write_op_exec_writesame(void*, buffer_u, size_t, size_t, uint64_t);
static void write_op_exec_append(void*, buffer_u, size_t);
static void write_op_exec_remove(void*);
static void write_op_exec_truncate(void*, uint64_t);
static void write_op_exec_zero(void*, uint64_t, uint64_t);
static void write_op_exec_omap_set(void*, char const* const*, char const* const*, const size_t*, size_t);
static void write_op_exec_omap_rm_keys(void*, char const* const*, size_t);
static struct write_op_visitor write_op_exec = {
.visit_begin = write_op_exec_begin,
.visit_create = write_op_exec_create,
.visit_write = write_op_exec_write,
.visit_write_full = write_op_exec_write_full,
.visit_writesame = write_op_exec_writesame,
.visit_append = write_op_exec_append,
.visit_remove = write_op_exec_remove,
.visit_truncate = write_op_exec_truncate,
.visit_zero = write_op_exec_zero,
.visit_omap_set = write_op_exec_omap_set,
.visit_omap_rm_keys = write_op_exec_omap_rm_keys,
.visit_end = write_op_exec_end
};
extern "C" void core_write_op(mobject_store_write_op_t write_op, server_visitor_args_t vargs)
{
/* Execute the operation chain */
execute_write_op_visitor(&write_op_exec, write_op, (void*)vargs);
}
void write_op_exec_begin(void* u)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
void write_op_exec_end(void* u)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
void write_op_exec_create(void* u, int exclusive)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
void write_op_exec_write(void* u, buffer_u buf, size_t len, uint64_t offset)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
void write_op_exec_write_full(void* u, buffer_u buf, size_t len)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
void write_op_exec_writesame(void* u, buffer_u buf, size_t data_len, size_t write_len, uint64_t offset)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
void write_op_exec_append(void* u, buffer_u buf, size_t len)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
void write_op_exec_remove(void* u)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
void write_op_exec_truncate(void* u, uint64_t offset)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
void write_op_exec_zero(void* u, uint64_t offset, uint64_t len)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
void write_op_exec_omap_set(void* u, char const* const* keys,
char const* const* vals,
const size_t *lens,
size_t num)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
void write_op_exec_omap_rm_keys(void* u, char const* const* keys, size_t num_keys)
{
auto vargs = static_cast<server_visitor_args_t>(u);
}
#ifndef __CORE_WRITE_OP_H
#define __CORE_WRITE_OP_H
#include <margo.h>
#include "libmobject-store.h"
#include "src/server/visitor-args.h"
#ifdef __cplusplus
extern "C" {
#endif
void core_write_op(mobject_store_write_op_t write_op, server_visitor_args_t vargs);
#ifdef __cplusplus
}
#endif
#endif
...@@ -17,7 +17,7 @@ static void read_op_exec_omap_get_vals(void*, const char*, const char*, uint64_t ...@@ -17,7 +17,7 @@ static void read_op_exec_omap_get_vals(void*, const char*, const char*, uint64_t
static void read_op_exec_omap_get_vals_by_keys(void*, char const* const*, size_t, mobject_store_omap_iter_t*, int*); static void read_op_exec_omap_get_vals_by_keys(void*, char const* const*, size_t, mobject_store_omap_iter_t*, int*);
static void read_op_exec_end(void*); static void read_op_exec_end(void*);
struct read_op_visitor read_op_exec = { static struct read_op_visitor read_op_exec = {
.visit_begin = read_op_exec_begin, .visit_begin = read_op_exec_begin,
.visit_stat = read_op_exec_stat, .visit_stat = read_op_exec_stat,
.visit_read = read_op_exec_read, .visit_read = read_op_exec_read,
......
...@@ -20,7 +20,7 @@ static void write_op_exec_zero(void*, uint64_t, uint64_t); ...@@ -20,7 +20,7 @@ static void write_op_exec_zero(void*, uint64_t, uint64_t);
static void write_op_exec_omap_set(void*, char const* const*, char const* const*, const size_t*, size_t); static void write_op_exec_omap_set(void*, char const* const*, char const* const*, const size_t*, size_t);
static void write_op_exec_omap_rm_keys(void*, char const* const*, size_t); static void write_op_exec_omap_rm_keys(void*, char const* const*, size_t);
struct write_op_visitor write_op_exec = { static struct write_op_visitor write_op_exec = {
.visit_begin = write_op_exec_begin, .visit_begin = write_op_exec_begin,
.visit_create = write_op_exec_create, .visit_create = write_op_exec_create,
.visit_write = write_op_exec_write, .visit_write = write_op_exec_write,
......
...@@ -9,7 +9,8 @@ ...@@ -9,7 +9,8 @@
#include <abt.h> #include <abt.h>
#include <margo.h> #include <margo.h>
//#include <sds-keyval.h> //#include <sds-keyval.h>
//#include <bake-bulk-server.h> #include <bake-bulk-server.h>
#include <bake-bulk-client.h>
//#include <libpmemobj.h> //#include <libpmemobj.h>
#include <ssg-mpi.h> #include <ssg-mpi.h>
...@@ -21,8 +22,10 @@ ...@@ -21,8 +22,10 @@
#include "src/io-chain/write-op-impl.h" #include "src/io-chain/write-op-impl.h"
#include "src/io-chain/read-op-impl.h" #include "src/io-chain/read-op-impl.h"
#include "src/server/visitor-args.h" #include "src/server/visitor-args.h"
#include "src/server/fake/fake-write-op.h" //#include "src/server/fake/fake-write-op.h"
#include "src/server/fake/fake-read-op.h" //#include "src/server/fake/fake-write-op.h"
#include "src/server/core/core-read-op.h"
#include "src/server/core/core-write-op.h"
typedef struct mobject_server_context typedef struct mobject_server_context
{ {
...@@ -30,7 +33,7 @@ typedef struct mobject_server_context ...@@ -30,7 +33,7 @@ typedef struct mobject_server_context
margo_instance_id mid; margo_instance_id mid;
/* TODO bake, sds-keyval stuff */ /* TODO bake, sds-keyval stuff */
ssg_group_id_t gid; ssg_group_id_t gid;
bake_target_id_t bake_id;
/* server shutdown conditional logic */ /* server shutdown conditional logic */
ABT_mutex shutdown_mutex; ABT_mutex shutdown_mutex;
ABT_cond shutdown_cond; ABT_cond shutdown_cond;
...@@ -74,7 +77,6 @@ int mobject_server_init(margo_instance_id mid, const char *cluster_file) ...@@ -74,7 +77,6 @@ int mobject_server_init(margo_instance_id mid, const char *cluster_file)
ABT_mutex_create(&srv_ctx->shutdown_mutex); ABT_mutex_create(&srv_ctx->shutdown_mutex);
ABT_cond_create(&srv_ctx->shutdown_cond); ABT_cond_create(&srv_ctx->shutdown_cond);
/* TODO bake-bulk */
/* TODO sds-keyval */ /* TODO sds-keyval */
# if 0 # if 0
kv_context *metadata; kv_context *metadata;
...@@ -123,6 +125,16 @@ int mobject_server_init(margo_instance_id mid, const char *cluster_file) ...@@ -123,6 +125,16 @@ int mobject_server_init(margo_instance_id mid, const char *cluster_file)
} }
} }
/* initialize bake-bulk */
/* server part */
struct bake_pool_info* pool_info = bake_server_makepool("/dev/shm/mobject.dat");
bake_server_register(mid, pool_info);
// XXX: check return values for the above two calls
/* client part */
hg_addr_t self_addr = ssg_get_addr(srv_ctx->gid, my_id);
bake_probe_instance(mid, self_addr, &(srv_ctx->bake_id));
// XXX: check return value of the above calls
g_srv_ctx = srv_ctx; g_srv_ctx = srv_ctx;
return 0; return 0;
...@@ -217,7 +229,8 @@ static hg_return_t mobject_write_op_ult(hg_handle_t h) ...@@ -217,7 +229,8 @@ static hg_return_t mobject_write_op_ult(hg_handle_t h)
/* Execute the operation chain */ /* Execute the operation chain */
//print_write_op(in.write_op, in.object_name); //print_write_op(in.write_op, in.object_name);
fake_write_op(in.write_op, &vargs); //fake_write_op(in.write_op, &vargs);
core_write_op(in.write_op, &vargs);
// set the return value of the RPC // set the return value of the RPC
out.ret = 0; out.ret = 0;
...@@ -262,7 +275,8 @@ static hg_return_t mobject_read_op_ult(hg_handle_t h) ...@@ -262,7 +275,8 @@ static hg_return_t mobject_read_op_ult(hg_handle_t h)
/* Compute the result. */ /* Compute the result. */
//print_read_op(in.read_op, in.object_name); //print_read_op(in.read_op, in.object_name);
fake_read_op(in.read_op, &vargs); //fake_read_op(in.read_op, &vargs);
core_read_op(in.read_op, &vargs);
out.responses = resp; out.responses = resp;
...@@ -304,6 +318,11 @@ DEFINE_MARGO_RPC_HANDLER(mobject_shutdown_ult) ...@@ -304,6 +318,11 @@ DEFINE_MARGO_RPC_HANDLER(mobject_shutdown_ult)
static void mobject_server_cleanup(mobject_server_context_t *srv_ctx) static void mobject_server_cleanup(mobject_server_context_t *srv_ctx)
{ {
// cleanup bake-bulk
bake_shutdown_service(srv_ctx->bake_id);
bake_release_instance(srv_ctx->bake_id);
// XXX: check the return value of these calls
ssg_group_destroy(srv_ctx->gid); ssg_group_destroy(srv_ctx->gid);
ssg_finalize(); ssg_finalize();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment