Commit d005458e authored by Shane Snyder's avatar Shane Snyder
Browse files

retool existing code into a server library

parent ec0a8846
......@@ -13,7 +13,7 @@ CLEANFILES = $(bin_SCRIPTS)
MAINTAINERCLEANFILES =
EXTRA_DIST =
BUILT_SOURCES =
include_HEADERS = include/bake-bulk.h include/bake-bulk-client.h
include_HEADERS = include/bake-bulk.h include/bake-bulk-client.h include/bake-bulk-server.h
EXTRA_DIST += \
prepare.sh
......@@ -26,8 +26,9 @@ AM_LIBS =
AM_CXXFLAGS = $(AM_CFLAGS)
lib_LTLIBRARIES = src/libbake-bulk-client.la
lib_LTLIBRARIES = src/libbake-bulk-client.la src/libbake-bulk-server.la
src_libbake_bulk_client_la_SOURCES =
src_libbake_bulk_server_la_SOURCES =
LDADD = src/libbake-bulk-client.la
......
......@@ -8,7 +8,6 @@
#define __BAKE_BULK_CLIENT_H
#include <stdint.h>
#include "bake-bulk.h"
/**
......
/*
* (C) 2016 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __BAKE_BULK_SERVER_H
#define __BAKE_BULK_SERVER_H
#include <margo.h>
#include <libpmemobj.h>
#include "bake-bulk.h"
struct bake_bulk_root
{
bake_target_id_t target_id;
};
/**
* Register a bake server instance for a given Margo instance.
*
* @param[in] mid Margo instance identifier
* @param[in] bb_pmem_pool libpmem pool to use for the bake storage service
* @param[in] bb_pmem_root libpmem root for the bake pool
*/
void bake_server_register(
margo_instance_id mid,
PMEMobjpool *bb_pmem_pool,
struct bake_bulk_root *bb_pmem_root);
#endif /* __BAKE_BULK_SERVER_H */
src_libbake_bulk_client_la_SOURCES += \
src/bake-bulk-client.c
src_libbake_bulk_server_la_SOURCES += \
src/bake-bulk-server.c
bin_PROGRAMS += \
src/bake-bulk-server \
src/bb-shutdown \
src/bb-copy-to \
src/bb-copy-from \
src/bb-latency-bench
src_bake_bulk_server_SOURCES = \
src/bake-bulk-server.c \
src/bake-bulk-rpc.c
......@@ -5,7 +5,7 @@
*/
#include <assert.h>
#include <bake-bulk.h>
#include <bake-bulk-client.h>
#include <margo.h>
#include "uthash.h"
#include "bake-bulk-rpc.h"
......
/*
* (C) 2015 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <assert.h>
#include <libpmemobj.h>
#include "bake-bulk-rpc.h"
/* TODO: this should not be global in the long run; server may provide access
* to multiple targets
*/
extern PMEMobjpool *g_pmem_pool;
extern struct bake_bulk_root *g_bake_bulk_root;
/* definition of internal region_id_t identifier for libpmemobj back end */
typedef struct {
PMEMoid oid;
uint64_t size;
} pmemobj_region_id_t;
/* service a remote RPC that instructs the server daemon to shut down */
static void bake_bulk_shutdown_ult(hg_handle_t handle)
{
hg_return_t hret;
struct hg_info *hgi;
margo_instance_id mid;
// printf("Got RPC request to shutdown.\n");
hgi = HG_Get_info(handle);
assert(hgi);
mid = margo_hg_class_to_instance(hgi->hg_class);
hret = margo_respond(mid, handle, NULL);
assert(hret == HG_SUCCESS);
HG_Destroy(handle);
/* NOTE: we assume that the server daemon is using
* margo_wait_for_finalize() to suspend until this RPC executes, so there
* is no need to send any extra signal to notify it.
*/
margo_finalize(mid);
return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_shutdown_ult)
/* service a remote RPC that creates a bulk region */
static void bake_bulk_create_ult(hg_handle_t handle)
{
bake_bulk_create_out_t out;
bake_bulk_create_in_t in;
hg_return_t hret;
pmemobj_region_id_t* prid;
/* TODO: this check needs to be somewhere else */
assert(sizeof(pmemobj_region_id_t) <= BAKE_BULK_REGION_ID_DATA_SIZE);
// printf("Got RPC request to create bulk region.\n");
memset(&out, 0, sizeof(out));
hret = HG_Get_input(handle, &in);
if(hret != HG_SUCCESS)
{
out.ret = -1;
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
prid = (pmemobj_region_id_t*)out.rid.data;
prid->size = in.region_size;
out.ret = pmemobj_alloc(g_pmem_pool, &prid->oid, in.region_size, 0, NULL, NULL);
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_create_ult)
/* service a remote RPC that writes to a bulk region */
static void bake_bulk_write_ult(hg_handle_t handle)
{
bake_bulk_write_out_t out;
bake_bulk_write_in_t in;
hg_return_t hret;
char* buffer;
hg_size_t size;
hg_bulk_t bulk_handle;
struct hg_info *hgi;
margo_instance_id mid;
pmemobj_region_id_t* prid;
// printf("Got RPC request to write bulk region.\n");
memset(&out, 0, sizeof(out));
hgi = HG_Get_info(handle);
assert(hgi);
mid = margo_hg_class_to_instance(hgi->hg_class);
hret = HG_Get_input(handle, &in);
if(hret != HG_SUCCESS)
{
out.ret = -1;
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
prid = (pmemobj_region_id_t*)in.rid.data;
/* find memory address for target object */
buffer = pmemobj_direct(prid->oid);
if(!buffer)
{
out.ret = -1;
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
size = HG_Bulk_get_size(in.bulk_handle);
/* create bulk handle for local side of transfer */
hret = HG_Bulk_create(hgi->hg_class, 1, (void**)(&buffer), &size,
HG_BULK_WRITE_ONLY, &bulk_handle);
if(hret != HG_SUCCESS)
{
out.ret = -1;
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
hret = margo_bulk_transfer(mid, HG_BULK_PULL, hgi->addr, in.bulk_handle,
0, bulk_handle, 0, size);
if(hret != HG_SUCCESS)
{
out.ret = -1;
HG_Bulk_free(bulk_handle);
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
out.ret = 0;
HG_Bulk_free(bulk_handle);
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_write_ult)
/* service a remote RPC that writes to a bulk region in eager mode */
static void bake_bulk_eager_write_ult(hg_handle_t handle)
{
bake_bulk_eager_write_out_t out;
bake_bulk_eager_write_in_t in;
hg_return_t hret;
char* buffer;
hg_bulk_t bulk_handle;
struct hg_info *hgi;
margo_instance_id mid;
pmemobj_region_id_t* prid;
// printf("Got RPC request to write bulk region.\n");
memset(&out, 0, sizeof(out));
hgi = HG_Get_info(handle);
assert(hgi);
mid = margo_hg_class_to_instance(hgi->hg_class);
hret = HG_Get_input(handle, &in);
if(hret != HG_SUCCESS)
{
out.ret = -1;
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
prid = (pmemobj_region_id_t*)in.rid.data;
/* find memory address for target object */
buffer = pmemobj_direct(prid->oid);
if(!buffer)
{
out.ret = -1;
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
memcpy(buffer, in.buffer, in.size);
out.ret = 0;
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_eager_write_ult)
/* service a remote RPC that persists to a bulk region */
static void bake_bulk_persist_ult(hg_handle_t handle)
{
bake_bulk_persist_out_t out;
bake_bulk_persist_in_t in;
hg_return_t hret;
char* buffer;
pmemobj_region_id_t* prid;
// printf("Got RPC request to persist bulk region.\n");
memset(&out, 0, sizeof(out));
hret = HG_Get_input(handle, &in);
if(hret != HG_SUCCESS)
{
out.ret = -1;
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
prid = (pmemobj_region_id_t*)in.rid.data;
/* find memory address for target object */
buffer = pmemobj_direct(prid->oid);
if(!buffer)
{
out.ret = -1;
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
/* TODO: should this have an abt shim in case it blocks? */
pmemobj_persist(g_pmem_pool, buffer, prid->size);
out.ret = 0;
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_persist_ult)
/* service a remote RPC that retrieves the size of a bulk region */
static void bake_bulk_get_size_ult(hg_handle_t handle)
{
bake_bulk_get_size_out_t out;
bake_bulk_get_size_in_t in;
hg_return_t hret;
pmemobj_region_id_t* prid;
// printf("Got RPC request to get_size bulk region.\n");
memset(&out, 0, sizeof(out));
hret = HG_Get_input(handle, &in);
if(hret != HG_SUCCESS)
{
out.ret = -1;
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
prid = (pmemobj_region_id_t*)in.rid.data;
/* kind of cheating here; the size is encoded in the RID */
out.size = prid->size;
out.ret = 0;
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_get_size_ult)
/* service a remote RPC for a no-op */
static void bake_bulk_noop_ult(hg_handle_t handle)
{
// printf("Got RPC request to noop bulk region.\n");
HG_Respond(handle, NULL, NULL, NULL);
HG_Destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_noop_ult)
/* TODO consolidate with write handler; read and write are nearly identical */
/* service a remote RPC that reads to a bulk region */
static void bake_bulk_read_ult(hg_handle_t handle)
{
bake_bulk_read_out_t out;
bake_bulk_read_in_t in;
hg_return_t hret;
char* buffer;
hg_size_t size;
hg_bulk_t bulk_handle;
struct hg_info *hgi;
margo_instance_id mid;
pmemobj_region_id_t* prid;
// printf("Got RPC request to read bulk region.\n");
memset(&out, 0, sizeof(out));
hgi = HG_Get_info(handle);
assert(hgi);
mid = margo_hg_class_to_instance(hgi->hg_class);
hret = HG_Get_input(handle, &in);
if(hret != HG_SUCCESS)
{
out.ret = -1;
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
prid = (pmemobj_region_id_t*)in.rid.data;
/* find memory address for target object */
buffer = pmemobj_direct(prid->oid);
if(!buffer)
{
out.ret = -1;
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
size = HG_Bulk_get_size(in.bulk_handle);
/* create bulk handle for local side of transfer */
hret = HG_Bulk_create(hgi->hg_class, 1, (void**)(&buffer), &size,
HG_BULK_READ_ONLY, &bulk_handle);
if(hret != HG_SUCCESS)
{
out.ret = -1;
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
hret = margo_bulk_transfer(mid, HG_BULK_PUSH, hgi->addr, in.bulk_handle,
0, bulk_handle, 0, size);
if(hret != HG_SUCCESS)
{
out.ret = -1;
HG_Bulk_free(bulk_handle);
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
out.ret = 0;
HG_Bulk_free(bulk_handle);
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_read_ult)
/* service a remote RPC that reads to a bulk region and eagerly sends
* response */
static void bake_bulk_eager_read_ult(hg_handle_t handle)
{
bake_bulk_eager_read_out_t out;
bake_bulk_eager_read_in_t in;
hg_return_t hret;
char* buffer;
hg_size_t size;
struct hg_info *hgi;
margo_instance_id mid;
pmemobj_region_id_t* prid;
// printf("Got RPC request to read bulk region.\n");
memset(&out, 0, sizeof(out));
hgi = HG_Get_info(handle);
assert(hgi);
mid = margo_hg_class_to_instance(hgi->hg_class);
hret = HG_Get_input(handle, &in);
if(hret != HG_SUCCESS)
{
out.ret = -1;
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
prid = (pmemobj_region_id_t*)in.rid.data;
/* find memory address for target object */
buffer = pmemobj_direct(prid->oid);
if(!buffer)
{
out.ret = -1;
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
out.ret = 0;
out.buffer = buffer;
out.size = in.size;
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_eager_read_ult)
/* service a remote RPC that probes for a target id */
static void bake_bulk_probe_ult(hg_handle_t handle)
{
bake_bulk_probe_out_t out;
// printf("Got RPC request to probe bulk region.\n");
memset(&out, 0, sizeof(out));
out.ret = 0;
out.bti = g_bake_bulk_root->target_id;
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_probe_ult)
......@@ -101,15 +101,6 @@ DECLARE_MARGO_RPC_HANDLER(bake_bulk_probe_ult)
/* noop */
DECLARE_MARGO_RPC_HANDLER(bake_bulk_noop_ult)
/* TODO: this should be somewhere else, just putting in this header for
* convenience right now. The type should only be visible to the server
* daemon and the rpc handlers.
*/
struct bake_bulk_root
{
bake_target_id_t target_id;
};
/* TODO: where should the encoder defs live? Not in bake-bulk-rpc.c because
* we don't really need the rpc handlers to be linked into clients...
......
/*
* (C) 2015 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <uuid/uuid.h>
#include <abt.h>
#include <abt-snoozer.h>
#include <margo.h>
#include <libpmemobj.h>
#include <bake-bulk-server.h>
int main(int argc, char **argv)
{
int ret;
margo_instance_id mid;
ABT_xstream handler_xstream;
ABT_pool handler_pool;
hg_context_t *hg_context;
hg_class_t *hg_class;
char target_string[64];
PMEMoid root_oid;
PMEMobjpool *bb_pmem_pool = NULL;
struct bake_bulk_root *bb_pmem_root = NULL;
if(argc != 3)
{
fprintf(stderr, "Usage: bake-bulk-server <HG listening addr> <pmem pool>\n");
fprintf(stderr, " Example: ./bake-bulk-server tcp://localhost:1234 /dev/shm/foo.dat\n");
return(-1);
}
/* open pmem pool */
bb_pmem_pool = pmemobj_open(argv[2], NULL);
if(!bb_pmem_pool)
{
fprintf(stderr, "pmemobj_open: %s\n", pmemobj_errormsg());
return(-1);
}
/* find root */
root_oid = pmemobj_root(bb_pmem_pool, sizeof(*bb_pmem_root));
bb_pmem_root = pmemobj_direct(root_oid);
if(uuid_is_null(bb_pmem_root->target_id.id))
{
uuid_generate(bb_pmem_root->target_id.id);
pmemobj_persist(bb_pmem_pool, bb_pmem_root, sizeof(*bb_pmem_root));
}
uuid_unparse(bb_pmem_root->target_id.id, target_string);
fprintf(stderr, "BAKE target ID: %s\n", target_string);
/* boilerplate HG initialization steps */
/***************************************/
hg_class = HG_Init(argv[1], HG_TRUE);
if(!hg_class)