Commit 5becf802 authored by Philip Carns's avatar Philip Carns

Merge branch 'bake-server-daemon-to-lib' into 'master'

refactor bake-bulk to include a server library

See merge request !1
parents 1e31e3a7 7dca03eb
......@@ -13,7 +13,7 @@ CLEANFILES = $(bin_SCRIPTS)
MAINTAINERCLEANFILES =
EXTRA_DIST =
BUILT_SOURCES =
include_HEADERS = include/bake-bulk.h
include_HEADERS = include/bake-bulk.h include/bake-bulk-client.h include/bake-bulk-server.h
EXTRA_DIST += \
prepare.sh
......@@ -26,10 +26,11 @@ AM_LIBS =
AM_CXXFLAGS = $(AM_CFLAGS)
lib_LTLIBRARIES = src/libbake-bulk.la
src_libbake_bulk_la_SOURCES =
lib_LTLIBRARIES = src/libbake-bulk-client.la src/libbake-bulk-server.la
src_libbake_bulk_client_la_SOURCES =
src_libbake_bulk_server_la_SOURCES =
LDADD = src/libbake-bulk.la
LDADD = src/libbake-bulk-client.la src/libbake-bulk-server.la
pkgconfigdir = $(libdir)/pkgconfig
pkgconfig_DATA = maint/bake-bulk.pc
......
/*
* (C) 2016 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __BAKE_BULK_CLIENT_H
#define __BAKE_BULK_CLIENT_H
#include <stdint.h>
#include "bake-bulk.h"
/**
* Obtain identifying information for a bake target through the provided
* remote mercury address.
*
* @param [in] mecury_dest Mercury address in string form
* @param [out] bti BAKE target identifier
* @returns 0 on success, -1 on failure
*/
int bake_probe_instance(
const char *mercury_dest,
bake_target_id_t *bti);
/**
* Create a bounded-size bulk data region. The resulting region can be
* written using bulk write operations, and can be persisted (once writes are
* complete) with a a bulk persist operation. The region is not valid for
* read access until persisted.
*
* @param [in] bti BAKE target identifier
* @param [in] region_size size of region to be created
* @param [out] rid identifier for new region
* @returns 0 on success, -1 on failure
*/
int bake_bulk_create(
bake_target_id_t bti,
uint64_t region_size,
bake_bulk_region_id_t *rid);
/**
* Writes into a region that was previously created with bake_bulk_create().
* Result is not guaranteed to be persistent until explicit
* bake_bulk_persist() call.
*
* Results are undefined if multiple writers (from same process or different
* processes) perform overlapping writes.
*
* @param [in] bti BAKE target identifier
* @param [in] rid identifier for region
* @param [in] region_offset offset into the target region to write
* @param [in] buf local memory buffer to write
* @param [in] buf_size size of local memory buffer to write
* @returns 0 on success, -1 on failure
*/
int bake_bulk_write(
bake_target_id_t bti,
bake_bulk_region_id_t rid,
uint64_t region_offset,
void const *buf,
uint64_t buf_size);
/**
* Persist a bulk region. The region is considered immutable at this point
* and reads may be performed on the region.
*
* @param [in] bti BAKE target identifier
* @param [in] rid identifier for region
* @returns 0 on success, -1 on failure
*/
int bake_bulk_persist(
bake_target_id_t bti,
bake_bulk_region_id_t rid);
/**
* Check the size of an existing region.
*
* @param [in] bti BAKE target identifier
* @param [in] rid identifier for region
* @param [out] size sizes of region
* @returns 0 on success, -1 on failure
*/
int bake_bulk_get_size(
bake_target_id_t bti,
bake_bulk_region_id_t rid,
uint64_t *region_size);
/**
* Reads from a region that was previously persisted with bake_bulk_persist().
*
* NOTE: for now at least, this call does not support "short" reads. It
* either succeeds in reading the requested size or not.
*
* @param [in] bti BAKE target identifier
* @param [in] rid region identifier
* @param [in] region_offset offset into the target region to read from
* @param [in] buf local memory buffer read into
* @param [in] buf_size size of local memory buffer to read into
* @returns 0 on success, -1 on failure
*/
int bake_bulk_read(
bake_target_id_t bti,
bake_bulk_region_id_t rid,
uint64_t region_offset,
void *buf,
uint64_t buf_size);
/**
* Release local resources associated with access to a target; does not
* modify the target in any way.
*
* @param [in] bti BAKE target_identifier
*/
void bake_release_instance(
bake_target_id_t bti);
/**
* Utility function to shut down a remote service
*
* @param [in] bti Bake target identifier
* @returns 0 on success, -1 on fialure
*/
int bake_shutdown_service(bake_target_id_t bti);
/* NOTE: code below is a copy of the bulk portion of the proposed BAKE API.
* Commented out for now but leaving it in place for reference
*/
/**
* Issue a no-op
*
* @param [in] bti BAKE target identifier
* @returns 0 on success, -1 on failure
*/
int bake_bulk_noop(
bake_target_id_t bti);
#endif /* __BAKE_BULK__CLIENT_H */
/*
* (C) 2016 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __BAKE_BULK_SERVER_H
#define __BAKE_BULK_SERVER_H
#include <margo.h>
#include <libpmemobj.h>
#include "bake-bulk.h"
struct bake_bulk_root
{
bake_target_id_t target_id;
};
/**
* Register a bake server instance for a given Margo instance.
*
* @param[in] mid Margo instance identifier
* @param[in] bb_pmem_pool libpmem pool to use for the bake storage service
* @param[in] bb_pmem_root libpmem root for the bake pool
*/
void bake_server_register(
margo_instance_id mid,
PMEMobjpool *bb_pmem_pool,
struct bake_bulk_root *bb_pmem_root);
#endif /* __BAKE_BULK_SERVER_H */
......@@ -27,131 +27,6 @@ typedef struct {
char data[BAKE_BULK_REGION_ID_DATA_SIZE];
} bake_bulk_region_id_t;
/**
* Obtain identifying information for a bake target through the provided
* remote mercury address.
*
* @param [in] mecury_dest Mercury address in string form
* @param [out] bti BAKE target identifier
* @returns 0 on success, -1 on failure
*/
int bake_probe_instance(
const char *mercury_dest,
bake_target_id_t *bti);
/**
* Create a bounded-size bulk data region. The resulting region can be
* written using bulk write operations, and can be persisted (once writes are
* complete) with a a bulk persist operation. The region is not valid for
* read access until persisted.
*
* @param [in] bti BAKE target identifier
* @param [in] region_size size of region to be created
* @param [out] rid identifier for new region
* @returns 0 on success, -1 on failure
*/
int bake_bulk_create(
bake_target_id_t bti,
uint64_t region_size,
bake_bulk_region_id_t *rid);
/**
* Writes into a region that was previously created with bake_bulk_create().
* Result is not guaranteed to be persistent until explicit
* bake_bulk_persist() call.
*
* Results are undefined if multiple writers (from same process or different
* processes) perform overlapping writes.
*
* @param [in] bti BAKE target identifier
* @param [in] rid identifier for region
* @param [in] region_offset offset into the target region to write
* @param [in] buf local memory buffer to write
* @param [in] buf_size size of local memory buffer to write
* @returns 0 on success, -1 on failure
*/
int bake_bulk_write(
bake_target_id_t bti,
bake_bulk_region_id_t rid,
uint64_t region_offset,
void const *buf,
uint64_t buf_size);
/**
* Persist a bulk region. The region is considered immutable at this point
* and reads may be performed on the region.
*
* @param [in] bti BAKE target identifier
* @param [in] rid identifier for region
* @returns 0 on success, -1 on failure
*/
int bake_bulk_persist(
bake_target_id_t bti,
bake_bulk_region_id_t rid);
/**
* Check the size of an existing region.
*
* @param [in] bti BAKE target identifier
* @param [in] rid identifier for region
* @param [out] size sizes of region
* @returns 0 on success, -1 on failure
*/
int bake_bulk_get_size(
bake_target_id_t bti,
bake_bulk_region_id_t rid,
uint64_t *region_size);
/**
* Reads from a region that was previously persisted with bake_bulk_persist().
*
* NOTE: for now at least, this call does not support "short" reads. It
* either succeeds in reading the requested size or not.
*
* @param [in] bti BAKE target identifier
* @param [in] rid region identifier
* @param [in] region_offset offset into the target region to read from
* @param [in] buf local memory buffer read into
* @param [in] buf_size size of local memory buffer to read into
* @returns 0 on success, -1 on failure
*/
int bake_bulk_read(
bake_target_id_t bti,
bake_bulk_region_id_t rid,
uint64_t region_offset,
void *buf,
uint64_t buf_size);
/**
* Release local resources associated with access to a target; does not
* modify the target in any way.
*
* @param [in] bti BAKE target_identifier
*/
void bake_release_instance(
bake_target_id_t bti);
/**
* Utility function to shut down a remote service
*
* @param [in] bti Bake target identifier
* @returns 0 on success, -1 on fialure
*/
int bake_shutdown_service(bake_target_id_t bti);
/* NOTE: code below is a copy of the bulk portion of the proposed BAKE API.
* Commented out for now but leaving it in place for reference
*/
/**
* Issue a no-op
*
* @param [in] bti BAKE target identifier
* @returns 0 on success, -1 on failure
*/
int bake_bulk_noop(
bake_target_id_t bti);
#if 0
/// ==== Some high-level goals ====
......
src_libbake_bulk_la_SOURCES += \
src/bake-bulk.c
src_libbake_bulk_client_la_SOURCES += \
src/bake-bulk-client.c
src_libbake_bulk_server_la_SOURCES += \
src/bake-bulk-server.c
bin_PROGRAMS += \
src/bake-bulk-server \
src/bake-bulk-server-daemon \
src/bb-shutdown \
src/bb-copy-to \
src/bb-copy-from \
src/bb-latency-bench
src_bake_bulk_server_SOURCES = \
src/bake-bulk-server.c \
src/bake-bulk-rpc.c
......@@ -5,7 +5,7 @@
*/
#include <assert.h>
#include <bake-bulk.h>
#include <bake-bulk-client.h>
#include <margo.h>
#include "uthash.h"
#include "bake-bulk-rpc.h"
......
/*
* (C) 2015 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <assert.h>
#include <libpmemobj.h>
#include "bake-bulk-rpc.h"
/* TODO: this should not be global in the long run; server may provide access
* to multiple targets
*/
extern PMEMobjpool *g_pmem_pool;
extern struct bake_bulk_root *g_bake_bulk_root;
/* definition of internal region_id_t identifier for libpmemobj back end */
typedef struct {
PMEMoid oid;
uint64_t size;
} pmemobj_region_id_t;
/* service a remote RPC that instructs the server daemon to shut down */
static void bake_bulk_shutdown_ult(hg_handle_t handle)
{
hg_return_t hret;
struct hg_info *hgi;
margo_instance_id mid;
// printf("Got RPC request to shutdown.\n");
hgi = HG_Get_info(handle);
assert(hgi);
mid = margo_hg_class_to_instance(hgi->hg_class);
hret = margo_respond(mid, handle, NULL);
assert(hret == HG_SUCCESS);
HG_Destroy(handle);
/* NOTE: we assume that the server daemon is using
* margo_wait_for_finalize() to suspend until this RPC executes, so there
* is no need to send any extra signal to notify it.
*/
margo_finalize(mid);
return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_shutdown_ult)
/* service a remote RPC that creates a bulk region */
static void bake_bulk_create_ult(hg_handle_t handle)
{
bake_bulk_create_out_t out;
bake_bulk_create_in_t in;
hg_return_t hret;
pmemobj_region_id_t* prid;
/* TODO: this check needs to be somewhere else */
assert(sizeof(pmemobj_region_id_t) <= BAKE_BULK_REGION_ID_DATA_SIZE);
// printf("Got RPC request to create bulk region.\n");
memset(&out, 0, sizeof(out));
hret = HG_Get_input(handle, &in);
if(hret != HG_SUCCESS)
{
out.ret = -1;
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
prid = (pmemobj_region_id_t*)out.rid.data;
prid->size = in.region_size;
out.ret = pmemobj_alloc(g_pmem_pool, &prid->oid, in.region_size, 0, NULL, NULL);
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_create_ult)
/* service a remote RPC that writes to a bulk region */
static void bake_bulk_write_ult(hg_handle_t handle)
{
bake_bulk_write_out_t out;
bake_bulk_write_in_t in;
hg_return_t hret;
char* buffer;
hg_size_t size;
hg_bulk_t bulk_handle;
struct hg_info *hgi;
margo_instance_id mid;
pmemobj_region_id_t* prid;
// printf("Got RPC request to write bulk region.\n");
memset(&out, 0, sizeof(out));
hgi = HG_Get_info(handle);
assert(hgi);
mid = margo_hg_class_to_instance(hgi->hg_class);
hret = HG_Get_input(handle, &in);
if(hret != HG_SUCCESS)
{
out.ret = -1;
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
prid = (pmemobj_region_id_t*)in.rid.data;
/* find memory address for target object */
buffer = pmemobj_direct(prid->oid);
if(!buffer)
{
out.ret = -1;
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
size = HG_Bulk_get_size(in.bulk_handle);
/* create bulk handle for local side of transfer */
hret = HG_Bulk_create(hgi->hg_class, 1, (void**)(&buffer), &size,
HG_BULK_WRITE_ONLY, &bulk_handle);
if(hret != HG_SUCCESS)
{
out.ret = -1;
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
hret = margo_bulk_transfer(mid, HG_BULK_PULL, hgi->addr, in.bulk_handle,
0, bulk_handle, 0, size);
if(hret != HG_SUCCESS)
{
out.ret = -1;
HG_Bulk_free(bulk_handle);
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
out.ret = 0;
HG_Bulk_free(bulk_handle);
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_write_ult)
/* service a remote RPC that writes to a bulk region in eager mode */
static void bake_bulk_eager_write_ult(hg_handle_t handle)
{
bake_bulk_eager_write_out_t out;
bake_bulk_eager_write_in_t in;
hg_return_t hret;
char* buffer;
hg_bulk_t bulk_handle;
struct hg_info *hgi;
margo_instance_id mid;
pmemobj_region_id_t* prid;
// printf("Got RPC request to write bulk region.\n");
memset(&out, 0, sizeof(out));
hgi = HG_Get_info(handle);
assert(hgi);
mid = margo_hg_class_to_instance(hgi->hg_class);
hret = HG_Get_input(handle, &in);
if(hret != HG_SUCCESS)
{
out.ret = -1;
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
prid = (pmemobj_region_id_t*)in.rid.data;
/* find memory address for target object */
buffer = pmemobj_direct(prid->oid);
if(!buffer)
{
out.ret = -1;
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
memcpy(buffer, in.buffer, in.size);
out.ret = 0;
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_eager_write_ult)
/* service a remote RPC that persists to a bulk region */
static void bake_bulk_persist_ult(hg_handle_t handle)
{
bake_bulk_persist_out_t out;
bake_bulk_persist_in_t in;
hg_return_t hret;
char* buffer;
pmemobj_region_id_t* prid;
// printf("Got RPC request to persist bulk region.\n");
memset(&out, 0, sizeof(out));
hret = HG_Get_input(handle, &in);
if(hret != HG_SUCCESS)
{
out.ret = -1;
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
prid = (pmemobj_region_id_t*)in.rid.data;
/* find memory address for target object */
buffer = pmemobj_direct(prid->oid);
if(!buffer)
{
out.ret = -1;
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
/* TODO: should this have an abt shim in case it blocks? */
pmemobj_persist(g_pmem_pool, buffer, prid->size);
out.ret = 0;
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_persist_ult)
/* service a remote RPC that retrieves the size of a bulk region */
static void bake_bulk_get_size_ult(hg_handle_t handle)
{
bake_bulk_get_size_out_t out;
bake_bulk_get_size_in_t in;
hg_return_t hret;
pmemobj_region_id_t* prid;
// printf("Got RPC request to get_size bulk region.\n");
memset(&out, 0, sizeof(out));
hret = HG_Get_input(handle, &in);
if(hret != HG_SUCCESS)
{
out.ret = -1;
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
prid = (pmemobj_region_id_t*)in.rid.data;
/* kind of cheating here; the size is encoded in the RID */
out.size = prid->size;
out.ret = 0;
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_get_size_ult)
/* service a remote RPC for a no-op */
static void bake_bulk_noop_ult(hg_handle_t handle)
{
// printf("Got RPC request to noop bulk region.\n");
HG_Respond(handle, NULL, NULL, NULL);
HG_Destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_noop_ult)
/* TODO consolidate with write handler; read and write are nearly identical */
/* service a remote RPC that reads to a bulk region */
static void bake_bulk_read_ult(hg_handle_t handle)
{
bake_bulk_read_out_t out;
bake_bulk_read_in_t in;
hg_return_t hret;
char* buffer;
hg_size_t size;
hg_bulk_t bulk_handle;
struct hg_info *hgi;
margo_instance_id mid;
pmemobj_region_id_t* prid;
// printf("Got RPC request to read bulk region.\n");
memset(&out, 0, sizeof(out));
hgi = HG_Get_info(handle);
assert(hgi);
mid = margo_hg_class_to_instance(hgi->hg_class);
hret = HG_Get_input(handle, &in);
if(hret != HG_SUCCESS)
{
out.ret = -1;
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
prid = (pmemobj_region_id_t*)in.rid.data;
/* find memory address for target object */
buffer = pmemobj_direct(prid->oid);
if(!buffer)
{
out.ret = -1;
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
size = HG_Bulk_get_size(in.bulk_handle);
/* create bulk handle for local side of transfer */
hret = HG_Bulk_create(hgi->hg_class, 1, (void**)(&buffer), &size,
HG_BULK_READ_ONLY, &bulk_handle);
if(hret != HG_SUCCESS)
{
out.ret = -1;
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
hret = margo_bulk_transfer(mid, HG_BULK_PUSH, hgi->addr, in.bulk_handle,
0, bulk_handle, 0, size);
if(hret != HG_SUCCESS)
{
out.ret = -1;
HG_Bulk_free(bulk_handle);
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
out.ret = 0;
HG_Bulk_free(bulk_handle);
HG_Free_input(handle, &in);
HG_Respond(handle, NULL, NULL, &out);
HG_Destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(bake_bulk_read_ult)
/* service a remote RPC that reads to a bulk region and eagerly sends
* response */
static void bake_bulk_eager_read_ult(hg_handle_t handle)
{
bake_bulk_eager_read_out_t out;
bake_bulk_eager_read_in_t in;
hg_return_t hret;
char* buffer;
hg_size_t size;