Commit 3dcdfa06 authored by Philip Carns's avatar Philip Carns

initial skeleton derived from hgargo

parents
*.o
*.d
*.i
*.s
*.swp
margo-config.h
autom4te.cache/
config.log
config.status
Makefile
Makefile.in
margo-config.h.in
margo-config.h.in~
configure
cscope.files
aclocal.m4
Makefile.in
compile
config.guess
config.sub
depcomp
install-sh
missing
test-driver
TODO: fill this in
#
# Output dist version
#
.phony: distversion
distversion:
@echo $(VERSION)
#
# Easy way to build unit tests without running them
#
.phony: tests
tests: $(check_PROGRAMS)
AUTOMAKE_OPTIONS = foreign
ACLOCAL_AMFLAGS = -I m4
bin_PROGRAMS =
bin_SCRIPTS =
noinst_LIBRARIES =
noinst_PROGRAMS =
lib_LIBRARIES =
noinst_HEADERS =
TESTS =
XFAIL_TESTS =
check_PROGRAMS =
EXTRA_PROGRAMS =
CLEANFILES = $(bin_SCRIPTS)
MAINTAINERCLEANFILES =
EXTRA_DIST =
BUILT_SOURCES =
include_HEADERS =
EXTRA_DIST += \
prepare.sh
AM_CPPFLAGS = -I$(top_srcdir)/include
AM_CFLAGS =
AM_LIBS =
AM_CXXFLAGS = $(AM_CFLAGS)
lib_LIBRARIES += src/libmargo.a
src_libmargo_a_SOURCES =
pkgconfigdir = $(libdir)/pkgconfig
pkgconfig_DATA = maint/margo.pc
include Make.rules
include $(top_srcdir)/src/Makefile.subdir
include $(top_srcdir)/examples/Makefile.subdir
# margo
margo is a library that provides Argobots bindings to the Mercury RPC
implementation. See the following for more details about each project:
* https://collab.mcs.anl.gov/display/ARGOBOTS/Argobots+Home
* https://mercury-hpc.github.io/
## Dependencies
* mercury (git clone --recurse-submodules https://github.com/mercury-hpc/mercury.git)
* argobots (git://git.mcs.anl.gov/argo/argobots.git)
* abt-snoozer (https://xgitlab.cels.anl.gov/sds/abt-snoozer)
* libev (e.g libev-dev package on Ubuntu or Debian)
## Building
Example configuration:
../configure --prefix=/home/pcarns/working/install \
PKG_CONFIG_PATH=/home/pcarns/working/install/lib/pkgconfig \
CFLAGS="-g -Wall"
## To Do:
* Fix licensing
* Convert dedicated thread that drives HG to an xstream
# -*- Autoconf -*-
# Process this file with autoconf to produce a configure script.
AC_PREREQ([2.67])
AC_INIT([margo], [0.1], [],[],[])
AC_CANONICAL_TARGET
AC_CANONICAL_SYSTEM
AC_CANONICAL_HOST
AM_INIT_AUTOMAKE([foreign subdir-objects -Wall])
# we should remove this soon, only needed for automake 1.10 and older
m4_ifdef([AM_SILENT_RULES], [AM_SILENT_RULES([yes])])
AC_CONFIG_SRCDIR([README.md])
AC_CONFIG_HEADERS([margo-config.h])
# Checks for programs.
AC_PROG_CC
AM_PROG_CC_C_O
AC_PROG_CXX
AC_PROG_CXXCPP
AC_PROG_RANLIB
AC_PROG_MKDIR_P
AC_REQUIRE_CPP
AC_CHECK_SIZEOF([long int])
dnl
dnl Verify pkg-config
dnl
PKG_PROG_PKG_CONFIG
PKG_CONFIG="pkg-config --static"
PKG_CHECK_MODULES_STATIC([MERCURY],[mercury],[],
[AC_MSG_ERROR([Could not find working mercury installation!])])
LIBS="$MERCURY_LIBS $LIBS"
CPPFLAGS="$MERCURY_CFLAGS $CPPFLAGS"
CFLAGS="$MERCURY_CFLAGS $CFLAGS"
PKG_CHECK_MODULES_STATIC([ARGOBOTS],[argobots],[],
[AC_MSG_ERROR([Could not find working argobots installation!])])
LIBS="$ARGOBOTS_LIBS $LIBS"
CPPFLAGS="$ARGOBOTS_CFLAGS $CPPFLAGS"
CFLAGS="$ARGOBOTS_CFLAGS $CFLAGS"
PKG_CHECK_MODULES_STATIC([ABT_SNOOZER],[abt-snoozer],[],
[AC_MSG_ERROR([Could not find working abt-snoozer installation!])])
LIBS="$ABT_SNOOZER_LIBS $LIBS"
CPPFLAGS="$ABT_SNOOZER_CFLAGS $CPPFLAGS"
CFLAGS="$ABT_SNOOZER_CFLAGS $CFLAGS"
# TODO: the mercury tests should probably be in their own .m4
# make sure that mercury has boost preprocessors enabled
AC_MSG_CHECKING(if Boost preprocessing is enabled in Mercury)
AC_TRY_COMPILE([
#include <mercury_macros.h>
#ifndef HG_HAS_BOOST
#error HG_HAS_BOOST not set
#endif
], [],
AC_MSG_RESULT(yes),
AC_MSG_RESULT(no)
AC_MSG_ERROR([Boost preprocessing not enabled in Mercury])
)
AC_CONFIG_FILES([Makefile maint/margo.pc])
AC_OUTPUT
bin_PROGRAMS += examples/client examples/server
examples_client_SOURCES = \
examples/client.c \
examples/my-rpc.c
examples_client_LDADD = src/libhgargo.a
examples_server_SOURCES = \
examples/server.c \
examples/my-rpc.c
examples_server_LDADD = src/libhgargo.a
/*
* (C) 2015 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <abt.h>
#include "hgargo.h"
#include "my-rpc.h"
/* This is an example client program that issues 4 concurrent RPCs, each of
* which includes a bulk transfer driven by the server.
*
* Each client operation executes as an independent ULT in Argobots.
* The HG forward call is executed using asynchronous operations.
*/
static void run_my_rpc(void *_arg);
static hg_id_t my_rpc_id;
int main(int argc, char **argv)
{
int values[4];
ABT_thread threads[4];
int i;
int ret;
ABT_xstream xstream;
ABT_pool pool;
ABT_sched sched;
ABT_pool_def pool_def;
struct hgargo_sched_data *sched_data;
struct hgargo_pool_data *pool_data;
ret = ABT_init(argc, argv);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_init()\n");
return(-1);
}
ret = ABT_xstream_self(&xstream);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_xstream_self()\n");
return(-1);
}
ret = hgargo_pool_get_def(ABT_POOL_ACCESS_MPMC, &pool_def);
if(ret != 0)
{
fprintf(stderr, "Error: hgargo_pool_get_def()\n");
return(-1);
}
ret = ABT_pool_create(&pool_def, ABT_POOL_CONFIG_NULL, &pool);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_pool_create()\n");
return(-1);
}
hgargo_create_scheds(1, &pool, &sched);
ABT_sched_get_data(sched, (void**)(&sched_data));
ABT_pool_get_data(pool, (void**)(&pool_data));
ret = hgargo_setup_ev(&sched_data->ev);
if(ret < 0)
{
fprintf(stderr, "Error: hgargo_setup_ev()\n");
return(-1);
}
pool_data->ev = sched_data->ev;
ABT_sched_set_data(sched, sched_data);
ABT_pool_set_data(pool, pool_data);
ret = ABT_xstream_set_main_sched(xstream, sched);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_xstream_set_main_sched()\n");
return(-1);
}
/* initialize
* note: address here is really just being used to identify transport
*/
hgargo_init(NA_FALSE, "tcp://localhost:1234");
/* register RPC */
my_rpc_id = my_rpc_register();
for(i=0; i<4; i++)
{
values[i] = i;
/* Each fiber gets a pointer to an element of the values array to use
* as input for the run_my_rpc() function.
*/
ret = ABT_thread_create(pool, run_my_rpc, &values[i],
ABT_THREAD_ATTR_NULL, &threads[i]);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_thread_create()\n");
return(-1);
}
}
/* yield to one of the threads */
ABT_thread_yield_to(threads[0]);
for(i=0; i<4; i++)
{
ret = ABT_thread_join(threads[i]);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_thread_join()\n");
return(-1);
}
ret = ABT_thread_free(&threads[i]);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_thread_join()\n");
return(-1);
}
}
hgargo_finalize();
ABT_finalize();
return(0);
}
static void run_my_rpc(void *_arg)
{
int* val = (int*)_arg;
na_addr_t svr_addr = NA_ADDR_NULL;
hg_handle_t handle;
my_rpc_in_t in;
my_rpc_out_t out;
int ret;
hg_size_t size;
void* buffer;
struct hg_info *hgi;
printf("ULT [%d] running.\n", *val);
/* allocate buffer for bulk transfer */
size = 512;
buffer = calloc(1, 512);
assert(buffer);
sprintf((char*)buffer, "Hello world!\n");
/* find addr for server */
ret = hgargo_addr_lookup("tcp://localhost:1234", &svr_addr);
assert(ret == 0);
/* create handle */
ret = hgargo_create_handle(svr_addr, my_rpc_id, &handle);
assert(ret == 0);
/* register buffer for rdma/bulk access by server */
hgi = HG_Get_info(handle);
assert(hgi);
ret = HG_Bulk_create(hgi->hg_bulk_class, 1, &buffer, &size,
HG_BULK_READ_ONLY, &in.bulk_handle);
assert(ret == 0);
/* Send rpc. Note that we are also transmitting the bulk handle in the
* input struct. It was set above.
*/
in.input_val = *((int*)(_arg));
hgargo_forward(handle, &in);
/* decode response */
ret = HG_Get_output(handle, &out);
assert(ret == 0);
printf("Got response ret: %d\n", out.ret);
/* clean up resources consumed by this rpc */
HG_Bulk_free(in.bulk_handle);
HG_Free_output(handle, &out);
HG_Destroy(handle);
free(buffer);
printf("ULT [%d] done.\n", *val);
return;
}
/*
* (C) 2015 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <assert.h>
#include "my-rpc.h"
/* my-rpc:
* This is an example RPC operation. It includes a small bulk transfer,
* driven by the server, that moves data from the client to the server. The
* server writes the data to a local file in /tmp.
*/
/* The rpc handler is defined as a single ULT in Argobots. It uses
* underlying asynchronous operations for the HG transfer, open, write, and
* close.
*/
static void my_rpc_ult(void *_arg)
{
hg_handle_t *handle = _arg;
hg_return_t hret;
my_rpc_out_t out;
my_rpc_in_t in;
int ret;
hg_size_t size;
void *buffer;
hg_bulk_t bulk_handle;
struct hg_info *hgi;
int fd;
char filename[256];
ret = HG_Get_input(*handle, &in);
assert(ret == HG_SUCCESS);
printf("Got RPC request with input_val: %d\n", in.input_val);
out.ret = 0;
/* set up target buffer for bulk transfer */
size = 512;
buffer = calloc(1, 512);
assert(buffer);
/* register local target buffer for bulk access */
hgi = HG_Get_info(*handle);
assert(hgi);
ret = HG_Bulk_create(hgi->hg_bulk_class, 1, &buffer,
&size, HG_BULK_WRITE_ONLY, &bulk_handle);
assert(ret == 0);
/* do bulk transfer from client to server */
ret = hgargo_bulk_transfer(hgi->bulk_context, HG_BULK_PULL,
hgi->addr, in.bulk_handle, 0,
bulk_handle, 0, size);
assert(ret == 0);
#if 0
/* write to a file */
sprintf(filename, "/tmp/hg-fiber-%d.txt", in.input_val);
fd = fbr_eio_open(fctx, filename, O_WRONLY|O_CREAT, S_IWUSR|S_IRUSR, 0);
assert(fd > -1);
ret = fbr_eio_write(fctx, fd, buffer, 512, 0, 0);
assert(ret == 512);
fbr_eio_close(fctx, fd, 0);
#endif
hret = HG_Respond(*handle, NULL, NULL, &out);
assert(hret == HG_SUCCESS);
HG_Bulk_free(bulk_handle);
HG_Destroy(*handle);
free(buffer);
free(handle);
return;
}
DEFINE_ARGO_RPC_HANDLER(my_rpc_ult)
hg_id_t my_rpc_register(void)
{
hg_class_t* hg_class;
hg_id_t tmp;
hg_class = hgargo_get_class();
tmp = MERCURY_REGISTER(hg_class, "my_rpc", my_rpc_in_t, my_rpc_out_t,
my_rpc_ult_handler);
return(tmp);
}
/*
* (C) 2015 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __MY_RPC
#define __MY_RPC
#include "hgargo.h"
/* visible API for example RPC operation */
MERCURY_GEN_PROC(my_rpc_out_t, ((int32_t)(ret)))
MERCURY_GEN_PROC(my_rpc_in_t,
((int32_t)(input_val))\
((hg_bulk_t)(bulk_handle)))
hg_id_t my_rpc_register(void);
#endif /* __MY_RPC */
/*
* (C) 2015 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <abt.h>
#include "hgargo.h"
#include "my-rpc.h"
/* example server program. Starts HG engine, registers the example RPC type,
* and then executes indefinitely.
*/
int main(int argc, char **argv)
{
int ret;
ABT_eventual eventual;
int shutdown;
ABT_xstream xstream;
ABT_pool pool;
ABT_sched sched;
ABT_pool_def pool_def;
struct hgargo_sched_data *sched_data;
struct hgargo_pool_data *pool_data;
ret = ABT_init(argc, argv);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_init()\n");
return(-1);
}
ret = ABT_xstream_self(&xstream);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_xstream_self()\n");
return(-1);
}
ret = hgargo_pool_get_def(ABT_POOL_ACCESS_MPMC, &pool_def);
if(ret != 0)
{
fprintf(stderr, "Error: hgargo_pool_get_def()\n");
return(-1);
}
ret = ABT_pool_create(&pool_def, ABT_POOL_CONFIG_NULL, &pool);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_pool_create()\n");
return(-1);
}
hgargo_create_scheds(1, &pool, &sched);
ABT_sched_get_data(sched, (void**)(&sched_data));
ABT_pool_get_data(pool, (void**)(&pool_data));
ret = hgargo_setup_ev(&sched_data->ev);
if(ret < 0)
{
fprintf(stderr, "Error: hgargo_setup_ev()\n");
return(-1);
}
pool_data->ev = sched_data->ev;
ABT_sched_set_data(sched, sched_data);
ABT_pool_set_data(pool, pool_data);
ret = ABT_xstream_set_main_sched(xstream, sched);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_xstream_set_main_sched()\n");
return(-1);
}
hgargo_init(NA_TRUE, "tcp://localhost:1234");
/* register RPC */
my_rpc_register();
/* suspend this ULT until someone tells us to shut down */
ret = ABT_eventual_create(sizeof(shutdown), &eventual);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_eventual_create()\n");
return(-1);
}
ABT_eventual_wait(eventual, (void**)&shutdown);
hgargo_finalize();
ABT_finalize();
return(0);
}
/*
* (C) 2015 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __MARGO
#define __MARGO
#include <mercury_bulk.h>
#include <mercury.h>
#include <mercury_macros.h>
#include <abt.h>
#include <ev.h>
/* TODO: should this library encapsulate the Mercury initialization steps?
* Right now it does for caller simplicity, but there isn't any
* technical reason. Because it hides the initialization (and context
* creation), it must provide utility functions for address lookup and handle
* creation because those require access to context pointers that are
* produced at init time.
*/
/**
* Initializes margo library, including initializing underlying libevfibers
* and Mercury instances.
* @param [in] listen flag indicating whether to accept inbound RPCs or not
* @param [in] local_addr address to listen on if listen is set
* @returns 0 on success, -1 upon error
*/
int margo_init(na_bool_t listen, const char* local_addr);
/**
* Shuts down margo library and its underlying evfibers and mercury resources
*/
void margo_finalize(void);
/**
* Retrieve the HG class for the running Mercury instance
* @returns pointer on success, NULL upon error
*/
hg_class_t* margo_get_class(void);
/**
* Retrieve the ABT pool associated with the main caller (whoever invoked the
* init function); this is where margo will execute RPC handlers.
*/
ABT_pool* margo_get_main_pool(void);
/**
* Lookup the Mercury/NA address associated with the given string
* @param [in] name string address of remote host
* @param [out] addr Mercury NA address for remote host
* @returns 0 on success, na_return_t values on error
*/
na_return_t margo_addr_lookup(const char* name, na_addr_t* addr);
/**
* Creates a handle to refer to an RPC that will be issued
* @param [in] addr address of remote host to send RPC to
* @param [in] id identifier for RPC operation
* @param [out] handle
* @returns 0 on success, hg_return_t values on error
*/
hg_return_t margo_create_handle(na_addr_t addr, hg_id_t id,
hg_handle_t *handle);
/**
* Forward an RPC request to a remote host
* @param [in] handle identifier for the RPC to be sent
* @param [in] in_struct input argument struct for RPC
* @returns 0 on success, hg_return_t values on error
*/
hg_return_t margo_forward(
hg_handle_t handle,
void *in_struct);
/**
* Perform a bulk transfer
* @param [in] context Mercury bulk context
* @param [in] op type of operation to perform
* @param [in] origin_addr remote Mercury address
* @param [in] origin_handle remote Mercury bulk memory handle
* @param [in] origin_offset offset into remote bulk memory to access
* @param [in] local_handle local bulk memory handle
* @param [in] local_offset offset into local bulk memory to access
* @param [in] size size (in bytes) of transfer
* @returns 0 on success, hg_return_t values on error
*/
hg_return_t margo_bulk_transfer(
hg_bulk_context_t *context,
hg_bulk_op_t op,
na_addr_t origin_addr,
hg_bulk_t origin_handle,
size_t origin_offset,
hg_bulk_t local_handle,
size_t local_offset,
size_t size);
/**
* macro that defines a function to glue an RPC handler to a fiber
* @param [in] __name name of handler function
*/
#define DEFINE_ARGO_RPC_HANDLER(__name) \
static hg_return_t __name##_handler(hg_handle_t handle) { \
int __ret; \
ABT_pool* __pool; \
hg_handle_t* __handle = malloc(sizeof(*__handle)); \
if(!__handle) return(HG_NOMEM_ERROR); \
*__handle = handle; \
__pool = margo_get_main_pool(); \
__ret = ABT_thread_create(*__pool, __name, __handle, ABT_THREAD_ATTR_NULL, NULL); \
if(__ret != 0) { \
return(HG_NOMEM_ERROR); \
} \
return(HG_SUCCESS); \
}
#endif /* __MARGO */
# pkg.m4 - Macros to locate and utilise pkg-config. -*- Autoconf -*-
# serial 1 (pkg-config-0.24)
#
# Copyright © 2004 Scott James Remnant <scott@netsplit.com>.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
# As a special exception to the GNU General Public License, if you
# distribute this file as part of a program that contains a
# configuration script generated by Autoconf, you may include it under
# the same distribution terms that you use for the rest of that program.
# PKG_PROG_PKG_CONFIG([MIN-VERSION])
# ----------------------------------
AC_DEFUN([PKG_PROG_PKG_CONFIG],
[m4_pattern_forbid([^_?PKG_[A-Z_]+$])
m4_pattern_allow([^PKG_CONFIG(_(PATH|LIBDIR|SYSROOT_DIR|ALLOW_SYSTEM_(CFLAGS|LIBS)))?$])
m4_pattern_allow([^PKG_CONFIG_(DISABLE_UNINSTALLED|TOP_BUILD_DIR|DEBUG_SPEW)$])
AC_ARG_VAR([PKG_CONFIG], [path to pkg-config utility])
AC_ARG_VAR([PKG_CONFIG_PATH], [directories to add to pkg-config's search path])
AC_ARG_VAR([PKG_CONFIG_LIBDIR], [path overriding pkg-config's built-in search path])
if test "x$ac_cv_env_PKG_CONFIG_set" != "xset"; then
AC_PATH_TOOL([PKG_CONFIG], [pkg-config])
fi