Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
sds
margo
Commits
20ace816
Commit
20ace816
authored
Aug 24, 2017
by
Shane Snyder
Browse files
port compostion example over to new api
parent
5edc0ead
Changes
6
Hide whitespace changes
Inline
Side-by-side
Makefile.am
View file @
20ace816
...
...
@@ -40,6 +40,6 @@ include Make.rules
include
$(top_srcdir)/src/Makefile.subdir
include
$(top_srcdir)/examples/Makefile.subdir
include
$(top_srcdir)/examples/multiplex/Makefile.subdir
#
include $(top_srcdir)/examples/composition/Makefile.subdir
include
$(top_srcdir)/examples/composition/Makefile.subdir
include
$(top_srcdir)/tests/Makefile.subdir
examples/composition/composed-benchmark.c
View file @
20ace816
...
...
@@ -8,7 +8,6 @@
#include
<assert.h>
#include
<unistd.h>
#include
<abt.h>
#include
<abt-snoozer.h>
#include
<margo.h>
#include
"composed-client-lib.h"
...
...
@@ -22,8 +21,7 @@ int main(int argc, char **argv)
int
i
;
int
ret
;
margo_instance_id
mid
;
hg_context_t
*
hg_context
;
hg_class_t
*
hg_class
;
hg_return_t
hret
;
hg_addr_t
delegator_svr_addr
=
HG_ADDR_NULL
;
hg_addr_t
data_xfer_svr_addr
=
HG_ADDR_NULL
;
hg_handle_t
handle
;
...
...
@@ -40,64 +38,36 @@ int main(int argc, char **argv)
ret
=
sscanf
(
argv
[
3
],
"%d"
,
&
iterations
);
assert
(
ret
==
1
);
/* boilerplate HG initialization steps */
/***************************************/
/* initialize Mercury using the transport portion of the destination
* address (i.e., the part before the first : character if present)
*/
for
(
i
=
0
;
i
<
11
&&
argv
[
1
][
i
]
!=
'\0'
&&
argv
[
1
][
i
]
!=
':'
;
i
++
)
proto
[
i
]
=
argv
[
1
][
i
];
/* TODO: this is a hack for now; I don't really want this to operate in server mode,
* but it seems like it needs to for now for sub-service to be able to get back to it
*/
hg_class
=
HG_Init
(
proto
,
HG_TRUE
);
if
(
!
hg_class
)
{
fprintf
(
stderr
,
"Error: HG_Init()
\n
"
);
return
(
-
1
);
}
hg_context
=
HG_Context_create
(
hg_class
);
if
(
!
hg_context
)
{
fprintf
(
stderr
,
"Error: HG_Context_create()
\n
"
);
HG_Finalize
(
hg_class
);
return
(
-
1
);
}
/* set up argobots */
/* actually start margo */
/***************************************/
ret
=
ABT_init
(
argc
,
argv
);
if
(
ret
!=
0
)
mid
=
margo_init
(
proto
,
MARGO_SERVER_MODE
,
0
,
-
1
);
if
(
mid
==
MARGO_INSTANCE_NULL
)
{
fprintf
(
stderr
,
"Error:
ABT
_init()
\n
"
);
fprintf
(
stderr
,
"Error:
margo
_init()
\n
"
);
return
(
-
1
);
}
/* set primary ES to idle without polling */
ret
=
ABT_snoozer_xstream_self_set
();
if
(
ret
!=
0
)
{
fprintf
(
stderr
,
"Error: ABT_snoozer_xstream_self_set()
\n
"
);
return
(
-
1
);
}
/* actually start margo */
/***************************************/
mid
=
margo_init
(
0
,
0
,
hg_context
);
/* register core RPC */
MARGO_REGISTER
(
hg_class
,
"my_shutdown_rpc"
,
void
,
void
,
NULL
,
&
my_rpc_shutdown_id
);
my_rpc_shutdown_id
=
MARGO_REGISTER
(
mid
,
"my_shutdown_rpc"
,
void
,
void
,
NULL
);
/* register service APIs */
data_xfer_register_client
(
mid
);
composed_register_client
(
mid
);
/* find addrs for servers */
ret
=
margo_addr_lookup
(
mid
,
argv
[
2
],
&
data_xfer_svr_addr
);
assert
(
ret
==
0
);
ret
=
margo_addr_lookup
(
mid
,
argv
[
1
],
&
delegator_svr_addr
);
assert
(
ret
==
0
);
h
ret
=
margo_addr_lookup
(
mid
,
argv
[
2
],
&
data_xfer_svr_addr
);
assert
(
h
ret
==
HG_SUCCESS
);
h
ret
=
margo_addr_lookup
(
mid
,
argv
[
1
],
&
delegator_svr_addr
);
assert
(
h
ret
==
HG_SUCCESS
);
buffer
=
calloc
(
1
,
buffer_sz
);
assert
(
buffer
);
...
...
@@ -150,33 +120,28 @@ int main(int argc, char **argv)
/* send rpc(s) to shut down server(s) */
sleep
(
3
);
printf
(
"Shutting down delegator server.
\n
"
);
ret
=
HG_Create
(
hg_context
,
delegator_svr_addr
,
my_rpc_shutdown_id
,
&
handle
);
assert
(
ret
==
0
);
margo_forward
(
mid
,
handle
,
NULL
);
HG_Destroy
(
handle
);
hret
=
margo_create
(
mid
,
delegator_svr_addr
,
my_rpc_shutdown_id
,
&
handle
);
assert
(
hret
==
HG_SUCCESS
);
hret
=
margo_forward
(
mid
,
handle
,
NULL
);
assert
(
hret
==
HG_SUCCESS
);
margo_destroy
(
handle
);
if
(
strcmp
(
argv
[
1
],
argv
[
2
]))
{
sleep
(
3
);
printf
(
"Shutting down data_xfer server.
\n
"
);
ret
=
HG_Create
(
hg_context
,
data_xfer_svr_addr
,
my_rpc_shutdown_id
,
&
handle
);
assert
(
ret
==
0
);
margo_forward
(
mid
,
handle
,
NULL
);
HG_Destroy
(
handle
);
hret
=
margo_create
(
mid
,
data_xfer_svr_addr
,
my_rpc_shutdown_id
,
&
handle
);
assert
(
hret
==
HG_SUCCESS
);
hret
=
margo_forward
(
mid
,
handle
,
NULL
);
assert
(
hret
==
HG_SUCCESS
);
margo_destroy
(
handle
);
}
HG_A
ddr_free
(
hg_class
,
delegator_svr_addr
);
HG_A
ddr_free
(
hg_class
,
data_xfer_svr_addr
);
margo_a
ddr_free
(
mid
,
delegator_svr_addr
);
margo_a
ddr_free
(
mid
,
data_xfer_svr_addr
);
/* shut down everything */
margo_finalize
(
mid
);
ABT_finalize
();
HG_Context_destroy
(
hg_context
);
HG_Finalize
(
hg_class
);
free
(
buffer
);
return
(
0
);
}
examples/composition/composed-client-lib.c
View file @
20ace816
...
...
@@ -22,8 +22,8 @@ static hg_id_t data_xfer_read_id = -1;
int
composed_register_client
(
margo_instance_id
mid
)
{
MARGO_REGISTER
(
mid
,
"delegator_read"
,
delegator_read_in_t
,
delegator_read_out_t
,
NULL
,
&
delegator_read_id
);
delegator_read_id
=
MARGO_REGISTER
(
mid
,
"delegator_read"
,
delegator_read_in_t
,
delegator_read_out_t
,
NULL
);
return
(
0
);
}
...
...
@@ -31,8 +31,8 @@ int composed_register_client(margo_instance_id mid)
int
data_xfer_register_client
(
margo_instance_id
mid
)
{
MARGO_REGISTER
(
mid
,
"data_xfer_read"
,
data_xfer_read_in_t
,
data_xfer_read_out_t
,
NULL
,
&
data_xfer_read_id
);
data_xfer_read_id
=
MARGO_REGISTER
(
mid
,
"data_xfer_read"
,
data_xfer_read_in_t
,
data_xfer_read_out_t
,
NULL
);
return
(
0
);
}
...
...
@@ -42,39 +42,37 @@ void composed_read(margo_instance_id mid, hg_addr_t svr_addr, void *buffer, hg_s
hg_handle_t
handle
;
delegator_read_in_t
in
;
delegator_read_out_t
out
;
int
ret
;
const
struct
hg_info
*
hgi
;
hg_return_t
hret
;
/* create handle */
ret
=
HG_Create
(
margo_get_context
(
mid
)
,
svr_addr
,
delegator_read_id
,
&
handle
);
assert
(
ret
==
0
);
h
ret
=
margo_create
(
mid
,
svr_addr
,
delegator_read_id
,
&
handle
);
assert
(
h
ret
==
HG_SUCCESS
);
/* register buffer for rdma/bulk access by server */
hgi
=
HG_Get_info
(
handle
);
assert
(
hgi
);
ret
=
HG_Bulk_create
(
hgi
->
hg_class
,
1
,
&
buffer
,
&
buffer_sz
,
hret
=
margo_bulk_create
(
mid
,
1
,
&
buffer
,
&
buffer_sz
,
HG_BULK_WRITE_ONLY
,
&
in
.
bulk_handle
);
assert
(
ret
==
0
);
assert
(
h
ret
==
HG_SUCCESS
);
in
.
data_xfer_svc_addr
=
data_xfer_svc_addr_string
;
#if 0
HG_S
et_target_id(handle, mplex_id);
margo_s
et_target_id(handle, mplex_id);
#endif
/* Send rpc. Note that we are also transmitting the bulk handle in the
* input struct. It was set above.
*/
margo_forward
(
mid
,
handle
,
&
in
);
hret
=
margo_forward
(
mid
,
handle
,
&
in
);
assert
(
hret
==
HG_SUCCESS
);
/* decode response */
ret
=
HG_G
et_output
(
handle
,
&
out
);
assert
(
ret
==
0
);
h
ret
=
margo_g
et_output
(
handle
,
&
out
);
assert
(
h
ret
==
HG_SUCCESS
);
/* clean up resources consumed by this rpc */
HG_Bulk_free
(
in
.
bulk_handle
);
HG_Free_output
(
handle
,
&
out
);
HG_D
estroy
(
handle
);
margo_free_output
(
handle
,
&
out
);
margo_bulk_free
(
in
.
bulk_handle
);
margo_d
estroy
(
handle
);
return
;
}
...
...
@@ -84,51 +82,48 @@ void data_xfer_read(margo_instance_id mid, hg_addr_t svr_addr, void *buffer, hg_
hg_handle_t
handle
;
data_xfer_read_in_t
in
;
data_xfer_read_out_t
out
;
int
ret
;
const
struct
hg_info
*
hgi
;
hg_return_t
hret
;
hg_addr_t
addr_self
;
char
addr_self_string
[
128
];
hg_size_t
addr_self_string_sz
=
128
;
/* create handle */
ret
=
HG_Create
(
margo_get_context
(
mid
)
,
svr_addr
,
data_xfer_read_id
,
&
handle
);
assert
(
ret
==
0
);
h
ret
=
margo_create
(
mid
,
svr_addr
,
data_xfer_read_id
,
&
handle
);
assert
(
h
ret
==
HG_SUCCESS
);
/* register buffer for rdma/bulk access by server */
hgi
=
HG_Get_info
(
handle
);
assert
(
hgi
);
ret
=
HG_Bulk_create
(
hgi
->
hg_class
,
1
,
&
buffer
,
&
buffer_sz
,
hret
=
margo_bulk_create
(
mid
,
1
,
&
buffer
,
&
buffer_sz
,
HG_BULK_WRITE_ONLY
,
&
in
.
bulk_handle
);
assert
(
ret
==
0
);
assert
(
h
ret
==
HG_SUCCESS
);
/* figure out local address */
ret
=
HG_A
ddr_self
(
margo_get_class
(
mid
)
,
&
addr_self
);
assert
(
ret
==
HG_SUCCESS
);
h
ret
=
margo_a
ddr_self
(
mid
,
&
addr_self
);
assert
(
h
ret
==
HG_SUCCESS
);
ret
=
HG_A
ddr_to_string
(
margo_get_class
(
mid
)
,
addr_self_string
,
&
addr_self_string_sz
,
addr_self
);
assert
(
ret
==
HG_SUCCESS
);
h
ret
=
margo_a
ddr_to_string
(
mid
,
addr_self_string
,
&
addr_self_string_sz
,
addr_self
);
assert
(
h
ret
==
HG_SUCCESS
);
in
.
client_addr
=
addr_self_string
;
#if 0
HG_S
et_target_id(handle, mplex_id);
margo_s
et_target_id(handle, mplex_id);
#endif
/* Send rpc. Note that we are also transmitting the bulk handle in the
* input struct. It was set above.
*/
margo_forward
(
mid
,
handle
,
&
in
);
hret
=
margo_forward
(
mid
,
handle
,
&
in
);
assert
(
hret
==
HG_SUCCESS
);
/* decode response */
ret
=
HG_G
et_output
(
handle
,
&
out
);
assert
(
ret
==
0
);
h
ret
=
margo_g
et_output
(
handle
,
&
out
);
assert
(
h
ret
==
HG_SUCCESS
);
/* clean up resources consumed by this rpc */
HG_Bulk_free
(
in
.
bulk_handle
);
HG_Free_output
(
handle
,
&
out
);
HG_D
estroy
(
handle
);
HG_A
ddr_free
(
margo_get_class
(
mid
)
,
addr_self
);
margo_free_output
(
handle
,
&
out
);
margo_bulk_free
(
in
.
bulk_handle
);
margo_d
estroy
(
handle
);
margo_a
ddr_free
(
mid
,
addr_self
);
return
;
}
examples/composition/composed-svc-daemon.c
View file @
20ace816
...
...
@@ -11,7 +11,6 @@
#include
<stdlib.h>
#include
<abt.h>
#include
<abt-snoozer.h>
#include
<margo.h>
#include
"data-xfer-service.h"
...
...
@@ -27,19 +26,17 @@
static
void
my_rpc_shutdown_ult
(
hg_handle_t
handle
)
{
hg_return_t
hret
;
const
struct
hg_info
*
hgi
;
margo_instance_id
mid
;
//printf("Got RPC request to shutdown\n");
hgi
=
HG_Get_info
(
handle
);
assert
(
hgi
);
mid
=
margo_hg_handle_get_instance
(
handle
);
assert
(
mid
!=
MARGO_INSTANCE_NULL
);
hret
=
margo_respond
(
mid
,
handle
,
NULL
);
assert
(
hret
==
HG_SUCCESS
);
HG_D
estroy
(
handle
);
margo_d
estroy
(
handle
);
/* NOTE: we assume that the server daemon is using
* margo_wait_for_finalize() to suspend until this RPC executes, so there
...
...
@@ -54,10 +51,8 @@ DEFINE_MARGO_RPC_HANDLER(my_rpc_shutdown_ult)
int
main
(
int
argc
,
char
**
argv
)
{
in
t
ret
;
hg_return_
t
h
ret
;
margo_instance_id
mid
;
hg_context_t
*
hg_context
;
hg_class_t
*
hg_class
;
hg_addr_t
addr_self
;
char
addr_self_string
[
128
];
hg_size_t
addr_self_string_sz
=
128
;
...
...
@@ -75,73 +70,44 @@ int main(int argc, char **argv)
svc_list
=
strdup
(
argv
[
2
]);
assert
(
svc_list
);
/* boilerplate HG initialization steps */
/* actually start margo -- this step encapsulates the Mercury and
* Argobots initialization and must precede their use */
/* Use the calling xstream to drive progress and execute handlers. */
/***************************************/
hg_class
=
HG_Init
(
argv
[
1
],
HG_TRUE
);
if
(
!
hg_class
)
mid
=
margo_init
(
argv
[
1
],
MARGO_SERVER_MODE
,
0
,
-
1
);
if
(
mid
==
MARGO_INSTANCE_NULL
)
{
fprintf
(
stderr
,
"Error: HG_Init()
\n
"
);
return
(
-
1
);
}
hg_context
=
HG_Context_create
(
hg_class
);
if
(
!
hg_context
)
{
fprintf
(
stderr
,
"Error: HG_Context_create()
\n
"
);
HG_Finalize
(
hg_class
);
fprintf
(
stderr
,
"Error: margo_init()
\n
"
);
return
(
-
1
);
}
/* figure out what address this server is listening on */
ret
=
HG_A
ddr_self
(
hg_class
,
&
addr_self
);
if
(
ret
!=
HG_SUCCESS
)
h
ret
=
margo_a
ddr_self
(
mid
,
&
addr_self
);
if
(
h
ret
!=
HG_SUCCESS
)
{
fprintf
(
stderr
,
"Error: HG_Addr_self()
\n
"
);
HG_Context_destroy
(
hg_context
);
HG_Finalize
(
hg_class
);
fprintf
(
stderr
,
"Error: margo_addr_self()
\n
"
);
margo_finalize
(
mid
);
return
(
-
1
);
}
ret
=
HG_A
ddr_to_string
(
hg_class
,
addr_self_string
,
&
addr_self_string_sz
,
addr_self
);
if
(
ret
!=
HG_SUCCESS
)
h
ret
=
margo_a
ddr_to_string
(
mid
,
addr_self_string
,
&
addr_self_string_sz
,
addr_self
);
if
(
h
ret
!=
HG_SUCCESS
)
{
fprintf
(
stderr
,
"Error: HG_Addr_self()
\n
"
);
HG_Context_destroy
(
hg_context
);
HG_Finalize
(
hg_class
);
HG_Addr_free
(
hg_class
,
addr_self
);
fprintf
(
stderr
,
"Error: margo_addr_to_string()
\n
"
);
margo_addr_free
(
mid
,
addr_self
);
margo_finalize
(
mid
);
return
(
-
1
);
}
HG_A
ddr_free
(
hg_class
,
addr_self
);
margo_a
ddr_free
(
mid
,
addr_self
);
printf
(
"# accepting RPCs on address
\"
%s
\"\n
"
,
addr_self_string
);
/* set up argobots */
/***************************************/
ret
=
ABT_init
(
argc
,
argv
);
if
(
ret
!=
0
)
{
fprintf
(
stderr
,
"Error: ABT_init()
\n
"
);
return
(
-
1
);
}
/* set primary ES to idle without polling */
ret
=
ABT_snoozer_xstream_self_set
();
if
(
ret
!=
0
)
{
fprintf
(
stderr
,
"Error: ABT_snoozer_xstream_self_set()
\n
"
);
return
(
-
1
);
}
/* actually start margo */
/***************************************/
mid
=
margo_init
(
0
,
0
,
hg_context
);
assert
(
mid
);
/* register RPCs and services */
/***************************************/
/* register a shutdown RPC as just a generic handler; not part of a
* multiplexed service
*/
MARGO_REGISTER
(
mid
,
"my_shutdown_rpc"
,
void
,
void
,
my_rpc_shutdown_ult
,
MARGO_RPC_ID_IGNORE
);
MARGO_REGISTER
(
mid
,
"my_shutdown_rpc"
,
void
,
void
,
my_rpc_shutdown_ult
);
handler_pool
=
margo_get_handler_pool
(
mid
);
svc
=
strtok
(
svc_list
,
","
);
...
...
@@ -178,11 +144,5 @@ int main(int argc, char **argv)
svc2_deregister(mid, *handler_pool, 3);
#endif
ABT_finalize
();
HG_Context_destroy
(
hg_context
);
HG_Finalize
(
hg_class
);
return
(
0
);
}
examples/composition/data-xfer-service.c
View file @
20ace816
...
...
@@ -18,7 +18,6 @@ static void data_xfer_read_ult(hg_handle_t handle)
hg_return_t
hret
;
data_xfer_read_out_t
out
;
data_xfer_read_in_t
in
;
int
ret
;
const
struct
hg_info
*
hgi
;
margo_instance_id
mid
;
hg_addr_t
client_addr
;
...
...
@@ -28,10 +27,12 @@ static void data_xfer_read_ult(hg_handle_t handle)
pthread_t my_tid;
#endif
ret
=
HG_G
et_input
(
handle
,
&
in
);
assert
(
ret
==
HG_SUCCESS
);
hgi
=
HG_G
et_info
(
handle
);
h
ret
=
margo_g
et_input
(
handle
,
&
in
);
assert
(
h
ret
==
HG_SUCCESS
);
hgi
=
margo_g
et_info
(
handle
);
assert
(
hgi
);
mid
=
margo_hg_info_get_instance
(
hgi
);
assert
(
mid
!=
MARGO_INSTANCE_NULL
);
#if 0
ABT_xstream_self(&my_xstream);
...
...
@@ -43,8 +44,6 @@ static void data_xfer_read_ult(hg_handle_t handle)
out
.
ret
=
0
;
mid
=
margo_hg_handle_get_instance
(
handle
);
if
(
!
in
.
client_addr
)
client_addr
=
hgi
->
addr
;
else
...
...
@@ -54,20 +53,20 @@ static void data_xfer_read_ult(hg_handle_t handle)
}
/* do bulk transfer from client to server */
ret
=
margo_bulk_transfer
(
mid
,
HG_BULK_PUSH
,
h
ret
=
margo_bulk_transfer
(
mid
,
HG_BULK_PUSH
,
client_addr
,
in
.
bulk_handle
,
0
,
g_buffer_bulk_handle
,
0
,
g_buffer_size
);
assert
(
ret
==
0
);
g_buffer_bulk_handle
,
0
,
g_buffer_size
,
HG_OP_ID_IGNORE
);
assert
(
h
ret
==
HG_SUCCESS
);
if
(
in
.
client_addr
)
HG_A
ddr_free
(
margo_get_class
(
mid
)
,
client_addr
);
margo_a
ddr_free
(
mid
,
client_addr
);
HG_F
ree_input
(
handle
,
&
in
);
margo_f
ree_input
(
handle
,
&
in
);
hret
=
HG_R
espond
(
handle
,
NULL
,
NULL
,
&
out
);
hret
=
margo_r
espond
(
mid
,
handle
,
&
out
);
assert
(
hret
==
HG_SUCCESS
);
HG_D
estroy
(
handle
);
margo_d
estroy
(
handle
);
return
;
}
...
...
@@ -82,24 +81,23 @@ int data_xfer_service_register(margo_instance_id mid, ABT_pool pool, uint32_t mp
assert
(
g_buffer
);
/* register local target buffer for bulk access */
hret
=
HG_B
ulk_create
(
margo_get_class
(
mid
)
,
1
,
&
g_buffer
,
hret
=
margo_b
ulk_create
(
mid
,
1
,
&
g_buffer
,
&
g_buffer_size
,
HG_BULK_READ_ONLY
,
&
g_buffer_bulk_handle
);
assert
(
hret
==
HG_SUCCESS
);
/* register RPC handler */
MARGO_REGISTER_MPLEX
(
mid
,
"data_xfer_read"
,
data_xfer_read_in_t
,
data_xfer_read_out_t
,
data_xfer_read_ult
,
mplex_id
,
pool
,
MARGO_RPC_ID_IGNORE
);
data_xfer_read_ult
,
mplex_id
,
pool
);
return
(
0
);
}
void
data_xfer_deregister
(
margo_instance_id
mid
,
ABT_pool
pool
,
uint32_t
mplex_id
)
{
HG_B
ulk_free
(
g_buffer_bulk_handle
);
margo_b
ulk_free
(
g_buffer_bulk_handle
);
free
(
g_buffer
);
/* TODO: undo what was done in data_xfer_register() */
return
;
}
examples/composition/delegator-service.c
View file @
20ace816
...
...
@@ -19,7 +19,6 @@ static void delegator_read_ult(hg_handle_t handle)
delegator_read_in_t
in
;
data_xfer_read_in_t
in_relay
;
data_xfer_read_out_t
out_relay
;
int
ret
;
const
struct
hg_info
*
hgi
;
margo_instance_id
mid
;
hg_addr_t
data_xfer_svc_addr
;
...
...
@@ -33,10 +32,12 @@ static void delegator_read_ult(hg_handle_t handle)
pthread_t my_tid;
#endif
ret
=
HG_G
et_input
(
handle
,
&
in
);
assert
(
ret
==
HG_SUCCESS
);
hgi
=
HG_G
et_info
(
handle
);
h
ret
=
margo_g
et_input
(
handle
,
&
in
);
assert
(
h
ret
==
HG_SUCCESS
);
hgi
=
margo_g
et_info
(
handle
);
assert
(
hgi
);
mid
=
margo_hg_info_get_instance
(
hgi
);
assert
(
mid
!=
MARGO_INSTANCE_NULL
);
#if 0
ABT_xstream_self(&my_xstream);
...
...
@@ -48,34 +49,33 @@ static void delegator_read_ult(hg_handle_t handle)
out
.
ret
=
0
;
mid
=
margo_hg_handle_get_instance
(
handle
);
hret
=
margo_addr_lookup
(
mid
,
in
.
data_xfer_svc_addr
,
&
data_xfer_svc_addr
);
assert
(
hret
==
HG_SUCCESS
);
/* relay to microservice */
hret
=
HG_Create
(
margo_get_context
(
mid
)
,
data_xfer_svc_addr
,
g_data_xfer_read_id
,
&
handle_relay
);
hret
=
margo_create
(
mid
,
data_xfer_svc_addr
,
g_data_xfer_read_id
,
&
handle_relay
);
assert
(
hret
==
HG_SUCCESS
);
/* pass through bulk handle */
in_relay
.
bulk_handle
=
in
.
bulk_handle
;
/* get addr of client to relay to data_xfer service */
hret
=
HG_A
ddr_to_string
(
margo_get_class
(
mid
)
,
client_addr_string
,
&
client_addr_string_sz
,
hgi
->
addr
);
hret
=
margo_a
ddr_to_string
(
mid
,
client_addr_string
,
&
client_addr_string_sz
,
hgi
->
addr
);
assert
(
hret
==
HG_SUCCESS
);
in_relay
.
client_addr
=
client_addr_string
;
margo_forward
(
mid
,
handle_relay
,
&
in_relay
);
hret
=
margo_forward
(
mid
,
handle_relay
,
&
in_relay
);
assert
(
hret
==
HG_SUCCESS
);
hret
=
HG_G
et_output
(
handle_relay
,
&
out_relay
);
hret
=
margo_g
et_output
(
handle_relay
,
&
out_relay
);
assert
(
hret
==
HG_SUCCESS
);
HG_F
ree_input
(
handle
,
&
in
);
HG_F
ree_output
(
handle_relay
,
&
out
);
margo_f
ree_input
(
handle
,
&
in
);
margo_f
ree_output
(
handle_relay
,
&
out
);
hret
=
HG_R
espond
(
handle
,
NULL
,
NULL
,
&
out
);
hret
=
margo_r
espond
(
mid
,
handle
,
&
out
);
assert
(
hret
==
HG_SUCCESS
);
HG_A
ddr_free
(
margo_get_class
(
mid
)
,
data_xfer_svc_addr
);
HG_D
estroy
(
handle
);
HG_D
estroy
(
handle_relay
);
margo_a
ddr_free
(
mid
,
data_xfer_svc_addr
);
margo_d
estroy
(
handle
);
margo_d
estroy
(
handle_relay
);
return
;
}
...
...
@@ -87,13 +87,13 @@ int delegator_service_register(margo_instance_id mid, ABT_pool pool, uint32_t mp
/* NOTE: this RPC may already be registered if this process has already registered a
* data-xfer service
*/
MARGO_REGISTER
(
mid
,
"data_xfer_read"
,
data_xfer_read_in_t
,
data_xfer_read_out_t
,
NULL
,
&
g_data_xfer_read_id
);
g_data_xfer_read_id
=
MARGO_REGISTER
(
mid
,
"data_xfer_read"
,
data_xfer_read_in_t
,
data_xfer_read_out_t
,
NULL
);
/* register RPC handler */
MARGO_REGISTER_MPLEX
(
mid
,
"delegator_read"
,
delegator_read_in_t
,
delegator_read_out_t
,
delegator_read_ult
,
mplex_id
,
pool
,
MARGO_RPC_ID_IGNORE
);
delegator_read_ult
,
mplex_id
,
pool
);
return
(
0
);