Commit e27261e2 authored by Philip Carns's avatar Philip Carns

begin to stub out multiplex support

parent 615b275d
......@@ -39,5 +39,6 @@ include Make.rules
include $(top_srcdir)/src/Makefile.subdir
include $(top_srcdir)/examples/Makefile.subdir
include $(top_srcdir)/examples/multiplex/Makefile.subdir
include $(top_srcdir)/tests/Makefile.subdir
noinst_PROGRAMS += examples/multiplex/margo-example-mp-server
examples_multiplex_margo_example_mp_server_SOURCES = \
examples/multiplex/margo-example-mp-server.c \
examples/multiplex/svc1.c
/*
* (C) 2015 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <abt.h>
#include <abt-snoozer.h>
#include <margo.h>
#include "svc1.h"
/* example server program that starts a skeleton for sub-services within
* this process to register with
*/
/* this is a "common" rpc that is handled by the core daemon
*/
static void my_rpc_shutdown_ult(hg_handle_t handle)
{
hg_return_t hret;
struct hg_info *hgi;
margo_instance_id mid;
printf("Got RPC request to shutdown\n");
hgi = HG_Get_info(handle);
assert(hgi);
mid = margo_hg_class_to_instance(hgi->hg_class);
hret = margo_respond(mid, handle, NULL);
assert(hret == HG_SUCCESS);
HG_Destroy(handle);
/* NOTE: we assume that the server daemon is using
* margo_wait_for_finalize() to suspend until this RPC executes, so there
* is no need to send any extra signal to notify it.
*/
margo_finalize(mid);
return;
}
DEFINE_MARGO_RPC_HANDLER(my_rpc_shutdown_ult)
int main(int argc, char **argv)
{
int ret;
margo_instance_id mid;
hg_context_t *hg_context;
hg_class_t *hg_class;
char proto[12] = {0};
int i;
hg_addr_t addr_self;
char addr_self_string[128];
hg_size_t addr_self_string_sz = 128;
if(argc != 2)
{
fprintf(stderr, "Usage: ./server <listen_addr>\n");
fprintf(stderr, "Example: ./server na+sm://\n");
return(-1);
}
/* boilerplate HG initialization steps */
/***************************************/
hg_class = HG_Init(argv[1], HG_TRUE);
if(!hg_class)
{
fprintf(stderr, "Error: HG_Init()\n");
return(-1);
}
hg_context = HG_Context_create(hg_class);
if(!hg_context)
{
fprintf(stderr, "Error: HG_Context_create()\n");
HG_Finalize(hg_class);
return(-1);
}
/* figure out what address this server is listening on */
ret = HG_Addr_self(hg_class, &addr_self);
if(ret != HG_SUCCESS)
{
fprintf(stderr, "Error: HG_Addr_self()\n");
HG_Context_destroy(hg_context);
HG_Finalize(hg_class);
return(-1);
}
ret = HG_Addr_to_string(hg_class, addr_self_string, &addr_self_string_sz, addr_self);
if(ret != HG_SUCCESS)
{
fprintf(stderr, "Error: HG_Addr_self()\n");
HG_Context_destroy(hg_context);
HG_Finalize(hg_class);
HG_Addr_free(hg_class, addr_self);
return(-1);
}
HG_Addr_free(hg_class, addr_self);
for(i=0; i<11 && argv[1][i] != '\0' && argv[1][i] != ':'; i++)
proto[i] = argv[1][i];
printf("# accepting RPCs on address \"%s://%s\"\n", proto, addr_self_string);
/* set up argobots */
/***************************************/
ret = ABT_init(argc, argv);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_init()\n");
return(-1);
}
/* set primary ES to idle without polling */
ret = ABT_snoozer_xstream_self_set();
if(ret != 0)
{
fprintf(stderr, "Error: ABT_snoozer_xstream_self_set()\n");
return(-1);
}
/* actually start margo */
/***************************************/
mid = margo_init(0, 0, hg_context);
assert(mid);
/* register svc1, with mplex_id 1, to execute on the default handler pool
* used by Margo
*/
ret = svc1_register(mid, margo_get_handler_pool(mid), 1);
assert(ret == 0);
/* NOTE: there isn't anything else for the server to do at this point
* except wait for itself to be shut down. The
* margo_wait_for_finalize() call here yields to let Margo drive
* progress until that happens.
*/
margo_wait_for_finalize(mid);
ABT_finalize();
HG_Context_destroy(hg_context);
HG_Finalize(hg_class);
return(0);
}
/*
* (C) 2015 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <assert.h>
#include "svc1.h"
static void svc1_do_thing_ult(hg_handle_t handle)
{
hg_return_t hret;
svc1_do_thing_out_t out;
svc1_do_thing_in_t in;
int ret;
hg_size_t size;
void *buffer;
hg_bulk_t bulk_handle;
struct hg_info *hgi;
margo_instance_id mid;
ret = HG_Get_input(handle, &in);
assert(ret == HG_SUCCESS);
printf("Got RPC request with input_val: %d\n", in.input_val);
out.ret = 0;
/* set up target buffer for bulk transfer */
size = 512;
buffer = calloc(1, 512);
assert(buffer);
/* register local target buffer for bulk access */
hgi = HG_Get_info(handle);
assert(hgi);
ret = HG_Bulk_create(hgi->hg_class, 1, &buffer,
&size, HG_BULK_WRITE_ONLY, &bulk_handle);
assert(ret == 0);
mid = margo_hg_class_to_instance(hgi->hg_class);
/* do bulk transfer from client to server */
ret = margo_bulk_transfer(mid, HG_BULK_PULL,
hgi->addr, in.bulk_handle, 0,
bulk_handle, 0, size);
assert(ret == 0);
HG_Free_input(handle, &in);
hret = HG_Respond(handle, NULL, NULL, &out);
assert(hret == HG_SUCCESS);
HG_Bulk_free(bulk_handle);
HG_Destroy(handle);
free(buffer);
return;
}
DEFINE_MARGO_RPC_HANDLER(svc1_do_thing_ult)
static void svc1_do_other_thing_ult(hg_handle_t handle)
{
hg_return_t hret;
svc1_do_other_thing_out_t out;
svc1_do_other_thing_in_t in;
int ret;
hg_size_t size;
void *buffer;
hg_bulk_t bulk_handle;
struct hg_info *hgi;
margo_instance_id mid;
ret = HG_Get_input(handle, &in);
assert(ret == HG_SUCCESS);
printf("Got RPC request with input_val: %d\n", in.input_val);
out.ret = 0;
/* set up target buffer for bulk transfer */
size = 512;
buffer = calloc(1, 512);
assert(buffer);
/* register local target buffer for bulk access */
hgi = HG_Get_info(handle);
assert(hgi);
ret = HG_Bulk_create(hgi->hg_class, 1, &buffer,
&size, HG_BULK_WRITE_ONLY, &bulk_handle);
assert(ret == 0);
mid = margo_hg_class_to_instance(hgi->hg_class);
/* do bulk transfer from client to server */
ret = margo_bulk_transfer(mid, HG_BULK_PULL,
hgi->addr, in.bulk_handle, 0,
bulk_handle, 0, size);
assert(ret == 0);
HG_Free_input(handle, &in);
hret = HG_Respond(handle, NULL, NULL, &out);
assert(hret == HG_SUCCESS);
HG_Bulk_free(bulk_handle);
HG_Destroy(handle);
free(buffer);
return;
}
DEFINE_MARGO_RPC_HANDLER(svc1_do_other_thing_ult)
int svc1_register(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id)
{
hg_return_t hret;
hg_id_t id;
hg_bool_t flag;
hret = HG_Registered_name(margo_get_class(mid), "svc1_do_thing", &id, &flag);
if(hret != HG_SUCCESS)
{
return(-1);
}
/* TODO: for each function:
* - check if registered with mercury or not
* - if not, then register
* - register with margo
* - this will put into hash table in mid that can map <id,mplex_id> to
* <pool>, checking for duplicate first
*
* - elsewhere:
* - new variant of DEFINE_MARGO_RPC_HANDLER that:
* - looks up registered margo thing
* - creates thread targeting pool
*/
return(-1);
}
void svc1_deregister(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id)
{
/* TODO: undo what was done in svc1_register() */
return;
}
/*
* (C) 2015 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __SVC1
#define __SVC1
#include <margo.h>
/* this is an example service that will be registered as a unit on a
* centralized progress engine */
int svc1_register(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id);
void svc1_deregister(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id);
MERCURY_GEN_PROC(svc1_do_thing_out_t, ((int32_t)(ret)))
MERCURY_GEN_PROC(svc1_do_thing_in_t,
((int32_t)(input_val))\
((hg_bulk_t)(bulk_handle)))
DECLARE_MARGO_RPC_HANDLER(svc1_do_thing_ult)
DECLARE_MARGO_RPC_HANDLER(svc1_do_thing_shutdown_ult)
MERCURY_GEN_PROC(svc1_do_other_thing_out_t, ((int32_t)(ret)))
MERCURY_GEN_PROC(svc1_do_other_thing_in_t,
((int32_t)(input_val))\
((hg_bulk_t)(bulk_handle)))
DECLARE_MARGO_RPC_HANDLER(svc1_do_other_thing_ult)
DECLARE_MARGO_RPC_HANDLER(svc1_do_other_thing_shutdown_ult)
#endif /* __SVC1 */
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment