Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
sds
margo
Commits
7318e084
Commit
7318e084
authored
Aug 24, 2017
by
Shane Snyder
Browse files
port multiplex example to new api
parent
69c34459
Changes
11
Hide whitespace changes
Inline
Side-by-side
Makefile.am
View file @
7318e084
...
...
@@ -39,7 +39,7 @@ include Make.rules
include
$(top_srcdir)/src/Makefile.subdir
include
$(top_srcdir)/examples/Makefile.subdir
#
include $(top_srcdir)/examples/multiplex/Makefile.subdir
include
$(top_srcdir)/examples/multiplex/Makefile.subdir
#include $(top_srcdir)/examples/composition/Makefile.subdir
include
$(top_srcdir)/tests/Makefile.subdir
examples/margo-example-client.c
View file @
7318e084
...
...
@@ -61,10 +61,11 @@ int main(int argc, char **argv)
/* actually start margo -- margo_init() encapsulates the Mercury &
* Argobots initialization, so this step must precede their use. */
/* Use main process to drive progress (it will relinquish control to
* Mercury during blocking communication calls).
The rpc handler pool
*
is null in this example program
because this is a pure client that
*
will not be servicing
rpc requests.
* Mercury during blocking communication calls).
No RPC threads are
*
used
because this is a pure client that
will not be servicing
* rpc requests.
*/
/***************************************/
mid
=
margo_init
(
proto
,
MARGO_CLIENT_MODE
,
0
,
0
);
if
(
mid
==
MARGO_INSTANCE_NULL
)
{
...
...
examples/margo-example-server.c
View file @
7318e084
...
...
@@ -35,6 +35,7 @@ int main(int argc, char **argv)
/* actually start margo -- this step encapsulates the Mercury and
* Argobots initialization and must precede their use */
/* Use the calling xstream to drive progress and execute handlers. */
/***************************************/
mid
=
margo_init
(
argv
[
1
],
MARGO_SERVER_MODE
,
0
,
-
1
);
if
(
mid
==
MARGO_INSTANCE_NULL
)
{
...
...
@@ -54,7 +55,7 @@ int main(int argc, char **argv)
hret
=
margo_addr_to_string
(
mid
,
addr_self_string
,
&
addr_self_string_sz
,
addr_self
);
if
(
hret
!=
HG_SUCCESS
)
{
fprintf
(
stderr
,
"Error: margo_addr_
self
()
\n
"
);
fprintf
(
stderr
,
"Error: margo_addr_
to_string
()
\n
"
);
margo_addr_free
(
mid
,
addr_self
);
margo_finalize
(
mid
);
return
(
-
1
);
...
...
examples/multiplex/margo-example-mp-client.c
View file @
7318e084
...
...
@@ -8,7 +8,6 @@
#include
<assert.h>
#include
<unistd.h>
#include
<abt.h>
#include
<abt-snoozer.h>
#include
<margo.h>
#include
"svc1-client.h"
...
...
@@ -19,10 +18,8 @@ static hg_id_t my_rpc_shutdown_id;
int
main
(
int
argc
,
char
**
argv
)
{
int
i
;
int
ret
;
margo_instance_id
mid
;
hg_context_t
*
hg_context
;
hg_class_t
*
hg_class
;
hg_return_t
hret
;
hg_addr_t
svr_addr
=
HG_ADDR_NULL
;
hg_handle_t
handle
;
char
proto
[
12
]
=
{
0
};
...
...
@@ -33,58 +30,36 @@ int main(int argc, char **argv)
return
(
-
1
);
}
/* boilerplate HG initialization steps */
/***************************************/
/* initialize Mercury using the transport portion of the destination
* address (i.e., the part before the first : character if present)
*/
for
(
i
=
0
;
i
<
11
&&
argv
[
1
][
i
]
!=
'\0'
&&
argv
[
1
][
i
]
!=
':'
;
i
++
)
proto
[
i
]
=
argv
[
1
][
i
];
hg_class
=
HG_Init
(
proto
,
HG_FALSE
);
if
(
!
hg_class
)
{
fprintf
(
stderr
,
"Error: HG_Init()
\n
"
);
return
(
-
1
);
}
hg_context
=
HG_Context_create
(
hg_class
);
if
(
!
hg_context
)
{
fprintf
(
stderr
,
"Error: HG_Context_create()
\n
"
);
HG_Finalize
(
hg_class
);
return
(
-
1
);
}
/* set up argobots */
/* actually start margo -- margo_init() encapsulates the Mercury &
* Argobots initialization, so this step must precede their use. */
/* Use main process to drive progress (it will relinquish control to
* Mercury during blocking communication calls). No RPC threads are
* used because this is a pure client that will not be servicing
* rpc requests.
*/
/***************************************/
ret
=
ABT_init
(
argc
,
argv
);
if
(
ret
!=
0
)
mid
=
margo_init
(
proto
,
MARGO_CLIENT_MODE
,
0
,
0
);
if
(
mid
==
MARGO_INSTANCE_NULL
)
{
fprintf
(
stderr
,
"Error:
ABT
_init()
\n
"
);
fprintf
(
stderr
,
"Error:
margo
_init()
\n
"
);
return
(
-
1
);
}
/* set primary ES to idle without polling */
ret
=
ABT_snoozer_xstream_self_set
();
if
(
ret
!=
0
)
{
fprintf
(
stderr
,
"Error: ABT_snoozer_xstream_self_set()
\n
"
);
return
(
-
1
);
}
/* actually start margo */
/***************************************/
mid
=
margo_init
(
0
,
0
,
hg_context
);
/* register core RPC */
MARGO_REGISTER
(
mid
,
"my_shutdown_rpc"
,
void
,
void
,
NULL
,
&
my_rpc_shutdown_id
);
my_rpc_shutdown_id
=
MARGO_REGISTER
(
mid
,
"my_shutdown_rpc"
,
void
,
void
,
NULL
);
/* register service APIs */
svc1_register_client
(
mid
);
svc2_register_client
(
mid
);
/* find addr for server */
ret
=
margo_addr_lookup
(
mid
,
argv
[
1
],
&
svr_addr
);
assert
(
ret
==
0
);
h
ret
=
margo_addr_lookup
(
mid
,
argv
[
1
],
&
svr_addr
);
assert
(
h
ret
==
HG_SUCCESS
);
svc1_do_thing
(
mid
,
svr_addr
,
1
);
svc1_do_other_thing
(
mid
,
svr_addr
,
1
);
...
...
@@ -95,22 +70,16 @@ int main(int argc, char **argv)
/* send one rpc to server to shut it down */
/* create handle */
ret
=
HG_Create
(
hg_context
,
svr_addr
,
my_rpc_shutdown_id
,
&
handle
);
assert
(
ret
==
0
);
h
ret
=
margo_create
(
mid
,
svr_addr
,
my_rpc_shutdown_id
,
&
handle
);
assert
(
h
ret
==
HG_SUCCESS
);
margo_forward
(
mid
,
handle
,
NULL
);
hret
=
margo_forward
(
mid
,
handle
,
NULL
);
assert
(
hret
==
HG_SUCCESS
);
HG_A
ddr_free
(
hg_class
,
svr_addr
);
margo_a
ddr_free
(
mid
,
svr_addr
);
/* shut down everything */
margo_finalize
(
mid
);
ABT_finalize
();
HG_Context_destroy
(
hg_context
);
HG_Finalize
(
hg_class
);
return
(
0
);
}
examples/multiplex/margo-example-mp-server.c
View file @
7318e084
...
...
@@ -24,19 +24,17 @@
static
void
my_rpc_shutdown_ult
(
hg_handle_t
handle
)
{
hg_return_t
hret
;
const
struct
hg_info
*
hgi
;
margo_instance_id
mid
;
//printf("Got RPC request to shutdown\n");
hgi
=
HG_Get_info
(
handle
);
assert
(
hgi
);
mid
=
margo_hg_handle_get_instance
(
handle
);
assert
(
mid
!=
MARGO_INSTANCE_NULL
);
hret
=
margo_respond
(
mid
,
handle
,
NULL
);
assert
(
hret
==
HG_SUCCESS
);
HG_D
estroy
(
handle
);
margo_d
estroy
(
handle
);
/* NOTE: we assume that the server daemon is using
* margo_wait_for_finalize() to suspend until this RPC executes, so there
...
...
@@ -53,8 +51,7 @@ int main(int argc, char **argv)
{
int
ret
;
margo_instance_id
mid
;
hg_context_t
*
hg_context
;
hg_class_t
*
hg_class
;
hg_return_t
hret
;
hg_addr_t
addr_self
;
char
addr_self_string
[
128
];
hg_size_t
addr_self_string_sz
=
128
;
...
...
@@ -69,73 +66,44 @@ int main(int argc, char **argv)
return
(
-
1
);
}
/* boilerplate HG initialization steps */
/* actually start margo -- this step encapsulates the Mercury and
* Argobots initialization and must precede their use */
/* Use the calling xstream to drive progress and execute handlers. */
/***************************************/
hg_class
=
HG_Init
(
argv
[
1
],
HG_TRUE
);
if
(
!
hg_class
)
mid
=
margo_init
(
argv
[
1
],
MARGO_SERVER_MODE
,
0
,
-
1
);
if
(
mid
==
MARGO_INSTANCE_NULL
)
{
fprintf
(
stderr
,
"Error: HG_Init()
\n
"
);
return
(
-
1
);
}
hg_context
=
HG_Context_create
(
hg_class
);
if
(
!
hg_context
)
{
fprintf
(
stderr
,
"Error: HG_Context_create()
\n
"
);
HG_Finalize
(
hg_class
);
fprintf
(
stderr
,
"Error: margo_init()
\n
"
);
return
(
-
1
);
}
/* figure out what address this server is listening on */
ret
=
HG_A
ddr_self
(
hg_class
,
&
addr_self
);
if
(
ret
!=
HG_SUCCESS
)
h
ret
=
margo_a
ddr_self
(
mid
,
&
addr_self
);
if
(
h
ret
!=
HG_SUCCESS
)
{
fprintf
(
stderr
,
"Error: HG_Addr_self()
\n
"
);
HG_Context_destroy
(
hg_context
);
HG_Finalize
(
hg_class
);
fprintf
(
stderr
,
"Error: margo_addr_self()
\n
"
);
margo_finalize
(
mid
);
return
(
-
1
);
}
ret
=
HG_A
ddr_to_string
(
hg_class
,
addr_self_string
,
&
addr_self_string_sz
,
addr_self
);
if
(
ret
!=
HG_SUCCESS
)
h
ret
=
margo_a
ddr_to_string
(
mid
,
addr_self_string
,
&
addr_self_string_sz
,
addr_self
);
if
(
h
ret
!=
HG_SUCCESS
)
{
fprintf
(
stderr
,
"Error: HG_Addr_self()
\n
"
);
HG_Context_destroy
(
hg_context
);
HG_Finalize
(
hg_class
);
HG_Addr_free
(
hg_class
,
addr_self
);
fprintf
(
stderr
,
"Error: margo_addr_to_string()
\n
"
);
margo_addr_free
(
mid
,
addr_self
);
margo_finalize
(
mid
);
return
(
-
1
);
}
HG_A
ddr_free
(
hg_class
,
addr_self
);
margo_a
ddr_free
(
mid
,
addr_self
);
printf
(
"# accepting RPCs on address
\"
%s
\"\n
"
,
addr_self_string
);
/* set up argobots */
/***************************************/
ret
=
ABT_init
(
argc
,
argv
);
if
(
ret
!=
0
)
{
fprintf
(
stderr
,
"Error: ABT_init()
\n
"
);
return
(
-
1
);
}
/* set primary ES to idle without polling */
ret
=
ABT_snoozer_xstream_self_set
();
if
(
ret
!=
0
)
{
fprintf
(
stderr
,
"Error: ABT_snoozer_xstream_self_set()
\n
"
);
return
(
-
1
);
}
/* actually start margo */
/***************************************/
mid
=
margo_init
(
0
,
0
,
hg_context
);
assert
(
mid
);
/* register RPCs and services */
/***************************************/
/* register a shutdown RPC as just a generic handler; not part of a
* multiplexed service
*/
MARGO_REGISTER
(
hg_class
,
"my_shutdown_rpc"
,
void
,
void
,
my_rpc_shutdown_ult
,
MARGO_RPC_ID_IGNORE
);
MARGO_REGISTER
(
mid
,
"my_shutdown_rpc"
,
void
,
void
,
my_rpc_shutdown_ult
);
/* register svc1, with mplex_id 1, to execute on the default handler pool
* used by Margo
...
...
@@ -144,7 +112,7 @@ int main(int argc, char **argv)
ret
=
svc1_register
(
mid
,
*
handler_pool
,
1
);
assert
(
ret
==
0
);
/* create a dedicated and pool for another instance of svc1 */
/* create a dedicated
xstream
and pool for another instance of svc1 */
ret
=
ABT_snoozer_xstream_create
(
1
,
&
svc1_pool2
,
&
svc1_xstream2
);
assert
(
ret
==
0
);
/* register svc1, with mplex_id 2, to execute on a separate pool. This
...
...
@@ -161,7 +129,6 @@ int main(int argc, char **argv)
ret
=
svc2_register
(
mid
,
*
handler_pool
,
3
);
assert
(
ret
==
0
);
/* shut things down */
/****************************************/
...
...
@@ -172,20 +139,15 @@ int main(int argc, char **argv)
*/
margo_wait_for_finalize
(
mid
);
/* TODO: rethink this; can't touch mid after wait for finalize */
/* TODO: rethink this; can't touch mid
or use ABT
after wait for finalize */
#if 0
svc1_deregister(mid, *handler_pool, 1);
svc1_deregister(mid, svc1_pool2, 2);
svc2_deregister(mid, *handler_pool, 3);
#endif
ABT_xstream_join(svc1_xstream2);
ABT_xstream_free(&svc1_xstream2);
ABT_finalize
();
HG_Context_destroy
(
hg_context
);
HG_Finalize
(
hg_class
);
#endif
return
(
0
);
}
...
...
examples/multiplex/svc1-client.c
View file @
7318e084
...
...
@@ -21,11 +21,11 @@ static hg_id_t svc1_do_other_thing_id = -1;
int
svc1_register_client
(
margo_instance_id
mid
)
{
MARGO_REGISTER
(
mid
,
"svc1_do_thing"
,
svc1_do_thing_in_t
,
svc1_do_thing_out_t
,
NULL
,
&
svc1_do_thing_id
);
svc1_do_thing_id
=
MARGO_REGISTER
(
mid
,
"svc1_do_thing"
,
svc1_do_thing_in_t
,
svc1_do_thing_out_t
,
NULL
);
MARGO_REGISTER
(
mid
,
"svc1_do_other_thing"
,
svc1_do_other_thing_in_t
,
svc1_do_other_thing_out_t
,
NULL
,
&
svc1_do_other_thing_id
);
svc1_do_other_thing_id
=
MARGO_REGISTER
(
mid
,
"svc1_do_other_thing"
,
svc1_do_other_thing_in_t
,
svc1_do_other_thing_out_t
,
NULL
);
return
(
0
);
}
...
...
@@ -35,10 +35,9 @@ void svc1_do_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t mplex_id)
hg_handle_t
handle
;
svc1_do_thing_in_t
in
;
svc1_do_thing_out_t
out
;
in
t
ret
;
hg_return_
t
h
ret
;
hg_size_t
size
;
void
*
buffer
;
const
struct
hg_info
*
hgi
;
/* allocate buffer for bulk transfer */
size
=
512
;
...
...
@@ -47,32 +46,32 @@ void svc1_do_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t mplex_id)
sprintf
((
char
*
)
buffer
,
"Hello world!
\n
"
);
/* create handle */
ret
=
HG_Create
(
margo_get_context
(
mid
)
,
svr_addr
,
svc1_do_thing_id
,
&
handle
);
assert
(
ret
==
0
);
h
ret
=
margo_create
(
mid
,
svr_addr
,
svc1_do_thing_id
,
&
handle
);
assert
(
h
ret
==
HG_SUCCESS
);
/* register buffer for rdma/bulk access by server */
hgi
=
HG_Get_info
(
handle
);
assert
(
hgi
);
ret
=
HG_Bulk_create
(
hgi
->
hg_class
,
1
,
&
buffer
,
&
size
,
hret
=
margo_bulk_create
(
mid
,
1
,
&
buffer
,
&
size
,
HG_BULK_READ_ONLY
,
&
in
.
bulk_handle
);
assert
(
ret
==
0
);
assert
(
h
ret
==
HG_SUCCESS
);
/* XXX */
HG_Set_target_id
(
handle
,
mplex_id
);
/* Send rpc. Note that we are also transmitting the bulk handle in the
* input struct. It was set above.
*/
in
.
input_val
=
0
;
margo_forward
(
mid
,
handle
,
&
in
);
hret
=
margo_forward
(
mid
,
handle
,
&
in
);
assert
(
hret
==
HG_SUCCESS
);
/* decode response */
ret
=
HG_G
et_output
(
handle
,
&
out
);
assert
(
ret
==
0
);
h
ret
=
margo_g
et_output
(
handle
,
&
out
);
assert
(
h
ret
==
HG_SUCCESS
);
/* clean up resources consumed by this rpc */
HG_Bulk_free
(
in
.
bulk_handle
);
HG_Free_output
(
handle
,
&
out
);
HG_D
estroy
(
handle
);
margo_free_output
(
handle
,
&
out
);
margo_bulk_free
(
in
.
bulk_handle
);
margo_d
estroy
(
handle
);
free
(
buffer
);
return
;
...
...
@@ -83,10 +82,9 @@ void svc1_do_other_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t mpl
hg_handle_t
handle
;
svc1_do_other_thing_in_t
in
;
svc1_do_other_thing_out_t
out
;
in
t
ret
;
hg_return_
t
h
ret
;
hg_size_t
size
;
void
*
buffer
;
const
struct
hg_info
*
hgi
;
/* allocate buffer for bulk transfer */
size
=
512
;
...
...
@@ -95,34 +93,33 @@ void svc1_do_other_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t mpl
sprintf
((
char
*
)
buffer
,
"Hello world!
\n
"
);
/* create handle */
ret
=
HG_Create
(
margo_get_context
(
mid
)
,
svr_addr
,
svc1_do_other_thing_id
,
&
handle
);
assert
(
ret
==
0
);
h
ret
=
margo_create
(
mid
,
svr_addr
,
svc1_do_other_thing_id
,
&
handle
);
assert
(
h
ret
==
HG_SUCCESS
);
/* register buffer for rdma/bulk access by server */
hgi
=
HG_Get_info
(
handle
);
assert
(
hgi
);
ret
=
HG_Bulk_create
(
hgi
->
hg_class
,
1
,
&
buffer
,
&
size
,
hret
=
margo_bulk_create
(
mid
,
1
,
&
buffer
,
&
size
,
HG_BULK_READ_ONLY
,
&
in
.
bulk_handle
);
assert
(
ret
==
0
);
assert
(
h
ret
==
HG_SUCCESS
);
/* XXX */
HG_Set_target_id
(
handle
,
mplex_id
);
/* Send rpc. Note that we are also transmitting the bulk handle in the
* input struct. It was set above.
*/
in
.
input_val
=
0
;
margo_forward
(
mid
,
handle
,
&
in
);
hret
=
margo_forward
(
mid
,
handle
,
&
in
);
assert
(
hret
==
HG_SUCCESS
);
/* decode response */
ret
=
HG_G
et_output
(
handle
,
&
out
);
assert
(
ret
==
0
);
h
ret
=
margo_g
et_output
(
handle
,
&
out
);
assert
(
h
ret
==
HG_SUCCESS
);
/* clean up resources consumed by this rpc */
HG_Bulk_free
(
in
.
bulk_handle
);
HG_Free_output
(
handle
,
&
out
);
HG_D
estroy
(
handle
);
margo_free_output
(
handle
,
&
out
);
margo_bulk_free
(
in
.
bulk_handle
);
margo_d
estroy
(
handle
);
free
(
buffer
);
return
;
}
examples/multiplex/svc1-server.c
View file @
7318e084
...
...
@@ -14,7 +14,6 @@ static void svc1_do_thing_ult(hg_handle_t handle)
hg_return_t
hret
;
svc1_do_thing_out_t
out
;
svc1_do_thing_in_t
in
;
int
ret
;
hg_size_t
size
;
void
*
buffer
;
hg_bulk_t
bulk_handle
;
...
...
@@ -24,10 +23,12 @@ static void svc1_do_thing_ult(hg_handle_t handle)
ABT_xstream
my_xstream
;
pthread_t
my_tid
;
ret
=
HG_G
et_input
(
handle
,
&
in
);
assert
(
ret
==
HG_SUCCESS
);
hgi
=
HG_G
et_info
(
handle
);
h
ret
=
margo_g
et_input
(
handle
,
&
in
);
assert
(
h
ret
==
HG_SUCCESS
);
hgi
=
margo_g
et_info
(
handle
);
assert
(
hgi
);
mid
=
margo_hg_info_get_instance
(
hgi
);
assert
(
mid
!=
MARGO_INSTANCE_NULL
);
ABT_xstream_self
(
&
my_xstream
);
ABT_thread_self
(
&
my_ult
);
...
...
@@ -43,25 +44,23 @@ static void svc1_do_thing_ult(hg_handle_t handle)
assert
(
buffer
);
/* register local target buffer for bulk access */
ret
=
HG_B
ulk_create
(
hgi
->
hg_class
,
1
,
&
buffer
,
h
ret
=
margo_b
ulk_create
(
mid
,
1
,
&
buffer
,
&
size
,
HG_BULK_WRITE_ONLY
,
&
bulk_handle
);
assert
(
ret
==
0
);
mid
=
margo_hg_handle_get_instance
(
handle
);
assert
(
hret
==
HG_SUCCESS
);
/* do bulk transfer from client to server */
ret
=
margo_bulk_transfer
(
mid
,
HG_BULK_PULL
,
h
ret
=
margo_bulk_transfer
(
mid
,
HG_BULK_PULL
,
hgi
->
addr
,
in
.
bulk_handle
,
0
,
bulk_handle
,
0
,
size
);
assert
(
ret
==
0
);
bulk_handle
,
0
,
size
,
HG_OP_ID_IGNORE
);
assert
(
h
ret
==
HG_SUCCESS
);
HG_F
ree_input
(
handle
,
&
in
);
margo_f
ree_input
(
handle
,
&
in
);
hret
=
HG_R
espond
(
handle
,
NULL
,
NULL
,
&
out
);
hret
=
margo_r
espond
(
mid
,
handle
,
&
out
);
assert
(
hret
==
HG_SUCCESS
);
HG_B
ulk_free
(
bulk_handle
);
HG_D
estroy
(
handle
);
margo_b
ulk_free
(
bulk_handle
);
margo_d
estroy
(
handle
);
free
(
buffer
);
return
;
...
...
@@ -73,7 +72,6 @@ static void svc1_do_other_thing_ult(hg_handle_t handle)
hg_return_t
hret
;
svc1_do_other_thing_out_t
out
;
svc1_do_other_thing_in_t
in
;
int
ret
;
hg_size_t
size
;
void
*
buffer
;
hg_bulk_t
bulk_handle
;
...
...
@@ -83,10 +81,12 @@ static void svc1_do_other_thing_ult(hg_handle_t handle)
ABT_xstream
my_xstream
;
pthread_t
my_tid
;
ret
=
HG_G
et_input
(
handle
,
&
in
);
assert
(
ret
==
HG_SUCCESS
);
hgi
=
HG_G
et_info
(
handle
);
h
ret
=
margo_g
et_input
(
handle
,
&
in
);
assert
(
h
ret
==
HG_SUCCESS
);
hgi
=
margo_g
et_info
(
handle
);
assert
(
hgi
);
mid
=
margo_hg_info_get_instance
(
hgi
);
assert
(
mid
!=
MARGO_INSTANCE_NULL
);
ABT_xstream_self
(
&
my_xstream
);
ABT_thread_self
(
&
my_ult
);
...
...
@@ -102,25 +102,23 @@ static void svc1_do_other_thing_ult(hg_handle_t handle)
assert
(
buffer
);
/* register local target buffer for bulk access */
ret
=
HG_B
ulk_create
(
hgi
->
hg_class
,
1
,
&
buffer
,
h
ret
=
margo_b
ulk_create
(
mid
,
1
,
&
buffer
,
&
size
,
HG_BULK_WRITE_ONLY
,
&
bulk_handle
);
assert
(
ret
==
0
);
mid
=
margo_hg_handle_get_instance
(
handle
);
assert
(
hret
==
HG_SUCCESS
);
/* do bulk transfer from client to server */
ret
=
margo_bulk_transfer
(
mid
,
HG_BULK_PULL
,
h
ret
=
margo_bulk_transfer
(
mid
,
HG_BULK_PULL
,
hgi
->
addr
,
in
.
bulk_handle
,
0
,
bulk_handle
,
0
,
size
);
assert
(
ret
==
0
);
bulk_handle
,
0
,
size
,
HG_OP_ID_IGNORE
);
assert
(
h
ret
==
HG_SUCCESS
);
HG_F
ree_input
(
handle
,
&
in
);
margo_f
ree_input
(
handle
,
&
in
);
hret
=
HG_R
espond
(
handle
,
NULL
,
NULL
,
&
out
);
hret
=
margo_r
espond
(
mid
,
handle
,
&
out
);
assert
(
hret
==
HG_SUCCESS
);
HG_B
ulk_free
(
bulk_handle
);
HG_D
estroy
(
handle
);
margo_b
ulk_free
(
bulk_handle
);
margo_d
estroy
(
handle
);
free
(
buffer
);
return
;
...
...
@@ -131,10 +129,10 @@ int svc1_register(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id)
{
MARGO_REGISTER_MPLEX
(
mid
,
"svc1_do_thing"
,
svc1_do_thing_in_t
,
svc1_do_thing_out_t
,
svc1_do_thing_ult
,
mplex_id
,
pool
,
MARGO_RPC_ID_IGNORE
);
svc1_do_thing_ult
,
mplex_id
,
pool
);
MARGO_REGISTER_MPLEX
(
mid
,
"svc1_do_other_thing"
,
svc1_do_other_thing_in_t
,
svc1_do_other_thing_out_t
,
svc1_do_other_thing_ult
,
mplex_id
,
pool
,
MARGO_RPC_ID_IGNORE
);
svc1_do_other_thing_ult
,
mplex_id
,
pool
);
return
(
0
);
}
...
...
@@ -144,4 +142,3 @@ void svc1_deregister(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id)
/* TODO: undo what was done in svc1_register() */
return
;
}
examples/multiplex/svc2-client.c
View file @
7318e084
...
...
@@ -21,10 +21,10 @@ static hg_id_t svc2_do_other_thing_id = -1;
int
svc2_register_client
(
margo_instance_id
mid
)
{
MARGO_REGISTER
(
mid
,
"svc2_do_thing"
,
svc2_do_thing_in_t
,
svc2_do_thing_out_t
,
NULL
,
&
svc2_do_thing_id
);
MARGO_REGISTER
(
mid
,
"svc2_do_other_thing"
,
svc2_do_other_thing_in_t
,
svc2_do_other_thing_out_t
,
NULL
,
&
svc2_do_other_thing_id
);
svc2_do_thing_id
=
MARGO_REGISTER
(
mid
,
"svc2_do_thing"
,
svc2_do_thing_in_t
,
svc2_do_thing_out_t
,
NULL
);
svc2_do_other_thing_id
=
MARGO_REGISTER
(
mid
,
"svc2_do_other_thing"
,
svc2_do_other_thing_in_t
,
svc2_do_other_thing_out_t
,
NULL
);
return
(
0
);
}
...
...
@@ -34,10 +34,9 @@ void svc2_do_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t mplex_id)
hg_handle_t
handle
;
svc2_do_thing_in_t
in
;
svc2_do_thing_out_t
out
;
in
t
ret
;
hg_return_
t
h
ret
;
hg_size_t
size
;
void
*
buffer
;
const
struct
hg_info
*
hgi
;