Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
M
margo
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
13
Issues
13
List
Boards
Labels
Milestones
Merge Requests
1
Merge Requests
1
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
sds
margo
Commits
8bdfc111
Commit
8bdfc111
authored
Aug 28, 2017
by
Matthieu Dorier
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'dev-wrap-hg-iface' into 'margo-registered-data'
wrap hg interface and update margo api See merge request
!2
parents
81337644
ce7a0794
Changes
21
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
21 changed files
with
1384 additions
and
1337 deletions
+1384
-1337
examples/composition/composed-benchmark.c
examples/composition/composed-benchmark.c
+24
-59
examples/composition/composed-client-lib.c
examples/composition/composed-client-lib.c
+35
-40
examples/composition/composed-svc-daemon.c
examples/composition/composed-svc-daemon.c
+20
-60
examples/composition/data-xfer-service.c
examples/composition/data-xfer-service.c
+14
-16
examples/composition/delegator-service.c
examples/composition/delegator-service.c
+19
-20
examples/margo-example-client.c
examples/margo-example-client.c
+35
-69
examples/margo-example-server.c
examples/margo-example-server.c
+21
-58
examples/multiplex/margo-example-mp-client.c
examples/multiplex/margo-example-mp-client.c
+20
-50
examples/multiplex/margo-example-mp-server.c
examples/multiplex/margo-example-mp-server.c
+23
-61
examples/multiplex/svc1-client.c
examples/multiplex/svc1-client.c
+30
-35
examples/multiplex/svc1-server.c
examples/multiplex/svc1-server.c
+28
-31
examples/multiplex/svc2-client.c
examples/multiplex/svc2-client.c
+30
-35
examples/multiplex/svc2-server.c
examples/multiplex/svc2-server.c
+28
-31
examples/my-rpc.c
examples/my-rpc.c
+22
-18
include/margo.h
include/margo.h
+428
-92
src/margo.c
src/margo.c
+455
-329
tests/margo-test-client-timeout.c
tests/margo-test-client-timeout.c
+39
-76
tests/margo-test-client.c
tests/margo-test-client.c
+37
-76
tests/margo-test-server.c
tests/margo-test-server.c
+32
-106
tests/margo-test-sleep.c
tests/margo-test-sleep.c
+9
-43
tests/my-rpc.c
tests/my-rpc.c
+35
-32
No files found.
examples/composition/composed-benchmark.c
View file @
8bdfc111
...
...
@@ -8,7 +8,6 @@
#include <assert.h>
#include <unistd.h>
#include <abt.h>
#include <abt-snoozer.h>
#include <margo.h>
#include "composed-client-lib.h"
...
...
@@ -22,8 +21,7 @@ int main(int argc, char **argv)
int
i
;
int
ret
;
margo_instance_id
mid
;
hg_context_t
*
hg_context
;
hg_class_t
*
hg_class
;
hg_return_t
hret
;
hg_addr_t
delegator_svr_addr
=
HG_ADDR_NULL
;
hg_addr_t
data_xfer_svr_addr
=
HG_ADDR_NULL
;
hg_handle_t
handle
;
...
...
@@ -40,64 +38,36 @@ int main(int argc, char **argv)
ret
=
sscanf
(
argv
[
3
],
"%d"
,
&
iterations
);
assert
(
ret
==
1
);
/* boilerplate HG initialization steps */
/***************************************/
/* initialize Mercury using the transport portion of the destination
* address (i.e., the part before the first : character if present)
*/
for
(
i
=
0
;
i
<
11
&&
argv
[
1
][
i
]
!=
'\0'
&&
argv
[
1
][
i
]
!=
':'
;
i
++
)
proto
[
i
]
=
argv
[
1
][
i
];
/* TODO: this is a hack for now; I don't really want this to operate in server mode,
* but it seems like it needs to for now for sub-service to be able to get back to it
*/
hg_class
=
HG_Init
(
proto
,
HG_TRUE
);
if
(
!
hg_class
)
{
fprintf
(
stderr
,
"Error: HG_Init()
\n
"
);
return
(
-
1
);
}
hg_context
=
HG_Context_create
(
hg_class
);
if
(
!
hg_context
)
{
fprintf
(
stderr
,
"Error: HG_Context_create()
\n
"
);
HG_Finalize
(
hg_class
);
return
(
-
1
);
}
/* set up argobots */
/* actually start margo */
/***************************************/
ret
=
ABT_init
(
argc
,
argv
);
if
(
ret
!=
0
)
mid
=
margo_init
(
proto
,
MARGO_SERVER_MODE
,
0
,
-
1
);
if
(
mid
==
MARGO_INSTANCE_NULL
)
{
fprintf
(
stderr
,
"Error:
ABT
_init()
\n
"
);
fprintf
(
stderr
,
"Error:
margo
_init()
\n
"
);
return
(
-
1
);
}
/* set primary ES to idle without polling */
ret
=
ABT_snoozer_xstream_self_set
();
if
(
ret
!=
0
)
{
fprintf
(
stderr
,
"Error: ABT_snoozer_xstream_self_set()
\n
"
);
return
(
-
1
);
}
/* actually start margo */
/***************************************/
mid
=
margo_init
(
0
,
0
,
hg_context
);
/* register core RPC */
MARGO_REGISTER
(
hg_class
,
"my_shutdown_rpc"
,
void
,
void
,
NULL
,
&
my_rpc_shutdown_id
);
my_rpc_shutdown_id
=
MARGO_REGISTER
(
mid
,
"my_shutdown_rpc"
,
void
,
void
,
NULL
);
/* register service APIs */
data_xfer_register_client
(
mid
);
composed_register_client
(
mid
);
/* find addrs for servers */
ret
=
margo_addr_lookup
(
mid
,
argv
[
2
],
&
data_xfer_svr_addr
);
assert
(
ret
==
0
);
ret
=
margo_addr_lookup
(
mid
,
argv
[
1
],
&
delegator_svr_addr
);
assert
(
ret
==
0
);
h
ret
=
margo_addr_lookup
(
mid
,
argv
[
2
],
&
data_xfer_svr_addr
);
assert
(
hret
==
HG_SUCCESS
);
h
ret
=
margo_addr_lookup
(
mid
,
argv
[
1
],
&
delegator_svr_addr
);
assert
(
hret
==
HG_SUCCESS
);
buffer
=
calloc
(
1
,
buffer_sz
);
assert
(
buffer
);
...
...
@@ -150,33 +120,28 @@ int main(int argc, char **argv)
/* send rpc(s) to shut down server(s) */
sleep
(
3
);
printf
(
"Shutting down delegator server.
\n
"
);
ret
=
HG_Create
(
hg_context
,
delegator_svr_addr
,
my_rpc_shutdown_id
,
&
handle
);
assert
(
ret
==
0
);
margo_forward
(
mid
,
handle
,
NULL
);
HG_Destroy
(
handle
);
hret
=
margo_create
(
mid
,
delegator_svr_addr
,
my_rpc_shutdown_id
,
&
handle
);
assert
(
hret
==
HG_SUCCESS
);
hret
=
margo_forward
(
mid
,
handle
,
NULL
);
assert
(
hret
==
HG_SUCCESS
);
margo_destroy
(
handle
);
if
(
strcmp
(
argv
[
1
],
argv
[
2
]))
{
sleep
(
3
);
printf
(
"Shutting down data_xfer server.
\n
"
);
ret
=
HG_Create
(
hg_context
,
data_xfer_svr_addr
,
my_rpc_shutdown_id
,
&
handle
);
assert
(
ret
==
0
);
margo_forward
(
mid
,
handle
,
NULL
);
HG_Destroy
(
handle
);
hret
=
margo_create
(
mid
,
data_xfer_svr_addr
,
my_rpc_shutdown_id
,
&
handle
);
assert
(
hret
==
HG_SUCCESS
);
hret
=
margo_forward
(
mid
,
handle
,
NULL
);
assert
(
hret
==
HG_SUCCESS
);
margo_destroy
(
handle
);
}
HG_Addr_free
(
hg_class
,
delegator_svr_addr
);
HG_Addr_free
(
hg_class
,
data_xfer_svr_addr
);
margo_addr_free
(
mid
,
delegator_svr_addr
);
margo_addr_free
(
mid
,
data_xfer_svr_addr
);
/* shut down everything */
margo_finalize
(
mid
);
ABT_finalize
();
HG_Context_destroy
(
hg_context
);
HG_Finalize
(
hg_class
);
free
(
buffer
);
return
(
0
);
}
examples/composition/composed-client-lib.c
View file @
8bdfc111
...
...
@@ -22,8 +22,8 @@ static hg_id_t data_xfer_read_id = -1;
int
composed_register_client
(
margo_instance_id
mid
)
{
MARGO_REGISTER
(
mid
,
"delegator_read"
,
delegator_read_in_t
,
delegator_read_out_t
,
NULL
,
&
delegator_read_id
);
delegator_read_id
=
MARGO_REGISTER
(
mid
,
"delegator_read"
,
delegator_read_in_t
,
delegator_read_out_t
,
NULL
);
return
(
0
);
}
...
...
@@ -31,8 +31,8 @@ int composed_register_client(margo_instance_id mid)
int
data_xfer_register_client
(
margo_instance_id
mid
)
{
MARGO_REGISTER
(
mid
,
"data_xfer_read"
,
data_xfer_read_in_t
,
data_xfer_read_out_t
,
NULL
,
&
data_xfer_read_id
);
data_xfer_read_id
=
MARGO_REGISTER
(
mid
,
"data_xfer_read"
,
data_xfer_read_in_t
,
data_xfer_read_out_t
,
NULL
);
return
(
0
);
}
...
...
@@ -42,39 +42,37 @@ void composed_read(margo_instance_id mid, hg_addr_t svr_addr, void *buffer, hg_s
hg_handle_t
handle
;
delegator_read_in_t
in
;
delegator_read_out_t
out
;
int
ret
;
const
struct
hg_info
*
hgi
;
hg_return_t
hret
;
/* create handle */
ret
=
HG_Create
(
margo_get_context
(
mid
)
,
svr_addr
,
delegator_read_id
,
&
handle
);
assert
(
ret
==
0
);
hret
=
margo_create
(
mid
,
svr_addr
,
delegator_read_id
,
&
handle
);
assert
(
hret
==
HG_SUCCESS
);
/* register buffer for rdma/bulk access by server */
hgi
=
HG_Get_info
(
handle
);
assert
(
hgi
);
ret
=
HG_Bulk_create
(
hgi
->
hg_class
,
1
,
&
buffer
,
&
buffer_sz
,
hret
=
margo_bulk_create
(
mid
,
1
,
&
buffer
,
&
buffer_sz
,
HG_BULK_WRITE_ONLY
,
&
in
.
bulk_handle
);
assert
(
ret
==
0
);
assert
(
hret
==
HG_SUCCESS
);
in
.
data_xfer_svc_addr
=
data_xfer_svc_addr_string
;
#if 0
HG_S
et_target_id(handle, mplex_id);
margo_s
et_target_id(handle, mplex_id);
#endif
/* Send rpc. Note that we are also transmitting the bulk handle in the
* input struct. It was set above.
*/
margo_forward
(
mid
,
handle
,
&
in
);
hret
=
margo_forward
(
mid
,
handle
,
&
in
);
assert
(
hret
==
HG_SUCCESS
);
/* decode response */
ret
=
HG_G
et_output
(
handle
,
&
out
);
assert
(
ret
==
0
);
hret
=
margo_g
et_output
(
handle
,
&
out
);
assert
(
hret
==
HG_SUCCESS
);
/* clean up resources consumed by this rpc */
HG_Bulk_free
(
in
.
bulk_handle
);
HG_Free_output
(
handle
,
&
out
);
HG_D
estroy
(
handle
);
margo_free_output
(
handle
,
&
out
);
margo_bulk_free
(
in
.
bulk_handle
);
margo_d
estroy
(
handle
);
return
;
}
...
...
@@ -84,51 +82,48 @@ void data_xfer_read(margo_instance_id mid, hg_addr_t svr_addr, void *buffer, hg_
hg_handle_t
handle
;
data_xfer_read_in_t
in
;
data_xfer_read_out_t
out
;
int
ret
;
const
struct
hg_info
*
hgi
;
hg_return_t
hret
;
hg_addr_t
addr_self
;
char
addr_self_string
[
128
];
hg_size_t
addr_self_string_sz
=
128
;
/* create handle */
ret
=
HG_Create
(
margo_get_context
(
mid
)
,
svr_addr
,
data_xfer_read_id
,
&
handle
);
assert
(
ret
==
0
);
hret
=
margo_create
(
mid
,
svr_addr
,
data_xfer_read_id
,
&
handle
);
assert
(
hret
==
HG_SUCCESS
);
/* register buffer for rdma/bulk access by server */
hgi
=
HG_Get_info
(
handle
);
assert
(
hgi
);
ret
=
HG_Bulk_create
(
hgi
->
hg_class
,
1
,
&
buffer
,
&
buffer_sz
,
hret
=
margo_bulk_create
(
mid
,
1
,
&
buffer
,
&
buffer_sz
,
HG_BULK_WRITE_ONLY
,
&
in
.
bulk_handle
);
assert
(
ret
==
0
);
assert
(
hret
==
HG_SUCCESS
);
/* figure out local address */
ret
=
HG_Addr_self
(
margo_get_class
(
mid
)
,
&
addr_self
);
assert
(
ret
==
HG_SUCCESS
);
hret
=
margo_addr_self
(
mid
,
&
addr_self
);
assert
(
h
ret
==
HG_SUCCESS
);
ret
=
HG_Addr_to_string
(
margo_get_class
(
mid
)
,
addr_self_string
,
&
addr_self_string_sz
,
addr_self
);
assert
(
ret
==
HG_SUCCESS
);
hret
=
margo_addr_to_string
(
mid
,
addr_self_string
,
&
addr_self_string_sz
,
addr_self
);
assert
(
h
ret
==
HG_SUCCESS
);
in
.
client_addr
=
addr_self_string
;
#if 0
HG_S
et_target_id(handle, mplex_id);
margo_s
et_target_id(handle, mplex_id);
#endif
/* Send rpc. Note that we are also transmitting the bulk handle in the
* input struct. It was set above.
*/
margo_forward
(
mid
,
handle
,
&
in
);
hret
=
margo_forward
(
mid
,
handle
,
&
in
);
assert
(
hret
==
HG_SUCCESS
);
/* decode response */
ret
=
HG_G
et_output
(
handle
,
&
out
);
assert
(
ret
==
0
);
hret
=
margo_g
et_output
(
handle
,
&
out
);
assert
(
hret
==
HG_SUCCESS
);
/* clean up resources consumed by this rpc */
HG_Bulk_free
(
in
.
bulk_handle
);
HG_Free_output
(
handle
,
&
out
);
HG_D
estroy
(
handle
);
HG_Addr_free
(
margo_get_class
(
mid
)
,
addr_self
);
margo_free_output
(
handle
,
&
out
);
margo_bulk_free
(
in
.
bulk_handle
);
margo_d
estroy
(
handle
);
margo_addr_free
(
mid
,
addr_self
);
return
;
}
examples/composition/composed-svc-daemon.c
View file @
8bdfc111
...
...
@@ -11,7 +11,6 @@
#include <stdlib.h>
#include <abt.h>
#include <abt-snoozer.h>
#include <margo.h>
#include "data-xfer-service.h"
...
...
@@ -27,19 +26,17 @@
static
void
my_rpc_shutdown_ult
(
hg_handle_t
handle
)
{
hg_return_t
hret
;
const
struct
hg_info
*
hgi
;
margo_instance_id
mid
;
//printf("Got RPC request to shutdown\n");
hgi
=
HG_Get_info
(
handle
);
assert
(
hgi
);
mid
=
margo_hg_handle_get_instance
(
handle
);
assert
(
mid
!=
MARGO_INSTANCE_NULL
);
hret
=
margo_respond
(
mid
,
handle
,
NULL
);
assert
(
hret
==
HG_SUCCESS
);
HG_D
estroy
(
handle
);
margo_d
estroy
(
handle
);
/* NOTE: we assume that the server daemon is using
* margo_wait_for_finalize() to suspend until this RPC executes, so there
...
...
@@ -54,10 +51,8 @@ DEFINE_MARGO_RPC_HANDLER(my_rpc_shutdown_ult)
int
main
(
int
argc
,
char
**
argv
)
{
int
ret
;
hg_return_t
h
ret
;
margo_instance_id
mid
;
hg_context_t
*
hg_context
;
hg_class_t
*
hg_class
;
hg_addr_t
addr_self
;
char
addr_self_string
[
128
];
hg_size_t
addr_self_string_sz
=
128
;
...
...
@@ -75,73 +70,44 @@ int main(int argc, char **argv)
svc_list
=
strdup
(
argv
[
2
]);
assert
(
svc_list
);
/* boilerplate HG initialization steps */
/* actually start margo -- this step encapsulates the Mercury and
* Argobots initialization and must precede their use */
/* Use the calling xstream to drive progress and execute handlers. */
/***************************************/
hg_class
=
HG_Init
(
argv
[
1
],
HG_TRUE
);
if
(
!
hg_class
)
mid
=
margo_init
(
argv
[
1
],
MARGO_SERVER_MODE
,
0
,
-
1
);
if
(
mid
==
MARGO_INSTANCE_NULL
)
{
fprintf
(
stderr
,
"Error: HG_Init()
\n
"
);
return
(
-
1
);
}
hg_context
=
HG_Context_create
(
hg_class
);
if
(
!
hg_context
)
{
fprintf
(
stderr
,
"Error: HG_Context_create()
\n
"
);
HG_Finalize
(
hg_class
);
fprintf
(
stderr
,
"Error: margo_init()
\n
"
);
return
(
-
1
);
}
/* figure out what address this server is listening on */
ret
=
HG_Addr_self
(
hg_class
,
&
addr_self
);
if
(
ret
!=
HG_SUCCESS
)
hret
=
margo_addr_self
(
mid
,
&
addr_self
);
if
(
h
ret
!=
HG_SUCCESS
)
{
fprintf
(
stderr
,
"Error: HG_Addr_self()
\n
"
);
HG_Context_destroy
(
hg_context
);
HG_Finalize
(
hg_class
);
fprintf
(
stderr
,
"Error: margo_addr_self()
\n
"
);
margo_finalize
(
mid
);
return
(
-
1
);
}
ret
=
HG_Addr_to_string
(
hg_class
,
addr_self_string
,
&
addr_self_string_sz
,
addr_self
);
if
(
ret
!=
HG_SUCCESS
)
hret
=
margo_addr_to_string
(
mid
,
addr_self_string
,
&
addr_self_string_sz
,
addr_self
);
if
(
h
ret
!=
HG_SUCCESS
)
{
fprintf
(
stderr
,
"Error: HG_Addr_self()
\n
"
);
HG_Context_destroy
(
hg_context
);
HG_Finalize
(
hg_class
);
HG_Addr_free
(
hg_class
,
addr_self
);
fprintf
(
stderr
,
"Error: margo_addr_to_string()
\n
"
);
margo_addr_free
(
mid
,
addr_self
);
margo_finalize
(
mid
);
return
(
-
1
);
}
HG_Addr_free
(
hg_class
,
addr_self
);
margo_addr_free
(
mid
,
addr_self
);
printf
(
"# accepting RPCs on address
\"
%s
\"\n
"
,
addr_self_string
);
/* set up argobots */
/***************************************/
ret
=
ABT_init
(
argc
,
argv
);
if
(
ret
!=
0
)
{
fprintf
(
stderr
,
"Error: ABT_init()
\n
"
);
return
(
-
1
);
}
/* set primary ES to idle without polling */
ret
=
ABT_snoozer_xstream_self_set
();
if
(
ret
!=
0
)
{
fprintf
(
stderr
,
"Error: ABT_snoozer_xstream_self_set()
\n
"
);
return
(
-
1
);
}
/* actually start margo */
/***************************************/
mid
=
margo_init
(
0
,
0
,
hg_context
);
assert
(
mid
);
/* register RPCs and services */
/***************************************/
/* register a shutdown RPC as just a generic handler; not part of a
* multiplexed service
*/
MARGO_REGISTER
(
mid
,
"my_shutdown_rpc"
,
void
,
void
,
my_rpc_shutdown_ult
,
MARGO_RPC_ID_IGNORE
);
MARGO_REGISTER
(
mid
,
"my_shutdown_rpc"
,
void
,
void
,
my_rpc_shutdown_ult
);
handler_pool
=
margo_get_handler_pool
(
mid
);
svc
=
strtok
(
svc_list
,
","
);
...
...
@@ -178,11 +144,5 @@ int main(int argc, char **argv)
svc2_deregister(mid, *handler_pool, 3);
#endif
ABT_finalize
();
HG_Context_destroy
(
hg_context
);
HG_Finalize
(
hg_class
);
return
(
0
);
}
examples/composition/data-xfer-service.c
View file @
8bdfc111
...
...
@@ -18,7 +18,6 @@ static void data_xfer_read_ult(hg_handle_t handle)
hg_return_t
hret
;
data_xfer_read_out_t
out
;
data_xfer_read_in_t
in
;
int
ret
;
const
struct
hg_info
*
hgi
;
margo_instance_id
mid
;
hg_addr_t
client_addr
;
...
...
@@ -28,10 +27,12 @@ static void data_xfer_read_ult(hg_handle_t handle)
pthread_t my_tid;
#endif
ret
=
HG_G
et_input
(
handle
,
&
in
);
assert
(
ret
==
HG_SUCCESS
);
hgi
=
HG_G
et_info
(
handle
);
hret
=
margo_g
et_input
(
handle
,
&
in
);
assert
(
h
ret
==
HG_SUCCESS
);
hgi
=
margo_g
et_info
(
handle
);
assert
(
hgi
);
mid
=
margo_hg_info_get_instance
(
hgi
);
assert
(
mid
!=
MARGO_INSTANCE_NULL
);
#if 0
ABT_xstream_self(&my_xstream);
...
...
@@ -43,8 +44,6 @@ static void data_xfer_read_ult(hg_handle_t handle)
out
.
ret
=
0
;
mid
=
margo_hg_handle_get_instance
(
handle
);
if
(
!
in
.
client_addr
)
client_addr
=
hgi
->
addr
;
else
...
...
@@ -54,20 +53,20 @@ static void data_xfer_read_ult(hg_handle_t handle)
}
/* do bulk transfer from client to server */
ret
=
margo_bulk_transfer
(
mid
,
HG_BULK_PUSH
,
h
ret
=
margo_bulk_transfer
(
mid
,
HG_BULK_PUSH
,
client_addr
,
in
.
bulk_handle
,
0
,
g_buffer_bulk_handle
,
0
,
g_buffer_size
);
assert
(
ret
==
0
);
assert
(
hret
==
HG_SUCCESS
);
if
(
in
.
client_addr
)
HG_Addr_free
(
margo_get_class
(
mid
)
,
client_addr
);
margo_addr_free
(
mid
,
client_addr
);
HG_F
ree_input
(
handle
,
&
in
);
margo_f
ree_input
(
handle
,
&
in
);
hret
=
HG_Respond
(
handle
,
NULL
,
NULL
,
&
out
);
hret
=
margo_respond
(
mid
,
handle
,
&
out
);
assert
(
hret
==
HG_SUCCESS
);
HG_D
estroy
(
handle
);
margo_d
estroy
(
handle
);
return
;
}
...
...
@@ -82,24 +81,23 @@ int data_xfer_service_register(margo_instance_id mid, ABT_pool pool, uint32_t mp
assert
(
g_buffer
);
/* register local target buffer for bulk access */
hret
=
HG_Bulk_create
(
margo_get_class
(
mid
)
,
1
,
&
g_buffer
,
hret
=
margo_bulk_create
(
mid
,
1
,
&
g_buffer
,
&
g_buffer_size
,
HG_BULK_READ_ONLY
,
&
g_buffer_bulk_handle
);
assert
(
hret
==
HG_SUCCESS
);
/* register RPC handler */
MARGO_REGISTER_MPLEX
(
mid
,
"data_xfer_read"
,
data_xfer_read_in_t
,
data_xfer_read_out_t
,
data_xfer_read_ult
,
mplex_id
,
pool
,
MARGO_RPC_ID_IGNORE
);
data_xfer_read_ult
,
mplex_id
,
pool
);
return
(
0
);
}
void
data_xfer_deregister
(
margo_instance_id
mid
,
ABT_pool
pool
,
uint32_t
mplex_id
)
{
HG_B
ulk_free
(
g_buffer_bulk_handle
);
margo_b
ulk_free
(
g_buffer_bulk_handle
);
free
(
g_buffer
);
/* TODO: undo what was done in data_xfer_register() */
return
;
}
examples/composition/delegator-service.c
View file @
8bdfc111
...
...
@@ -19,7 +19,6 @@ static void delegator_read_ult(hg_handle_t handle)
delegator_read_in_t
in
;
data_xfer_read_in_t
in_relay
;
data_xfer_read_out_t
out_relay
;
int
ret
;
const
struct
hg_info
*
hgi
;
margo_instance_id
mid
;
hg_addr_t
data_xfer_svc_addr
;
...
...
@@ -33,10 +32,12 @@ static void delegator_read_ult(hg_handle_t handle)
pthread_t my_tid;
#endif
ret
=
HG_G
et_input
(
handle
,
&
in
);
assert
(
ret
==
HG_SUCCESS
);
hgi
=
HG_G
et_info
(
handle
);
hret
=
margo_g
et_input
(
handle
,
&
in
);
assert
(
h
ret
==
HG_SUCCESS
);
hgi
=
margo_g
et_info
(
handle
);
assert
(
hgi
);
mid
=
margo_hg_info_get_instance
(
hgi
);
assert
(
mid
!=
MARGO_INSTANCE_NULL
);
#if 0
ABT_xstream_self(&my_xstream);
...
...
@@ -48,34 +49,33 @@ static void delegator_read_ult(hg_handle_t handle)
out
.
ret
=
0
;
mid
=
margo_hg_handle_get_instance
(
handle
);
hret
=
margo_addr_lookup
(
mid
,
in
.
data_xfer_svc_addr
,
&
data_xfer_svc_addr
);
assert
(
hret
==
HG_SUCCESS
);
/* relay to microservice */
hret
=
HG_Create
(
margo_get_context
(
mid
)
,
data_xfer_svc_addr
,
g_data_xfer_read_id
,
&
handle_relay
);
hret
=
margo_create
(
mid
,
data_xfer_svc_addr
,
g_data_xfer_read_id
,
&
handle_relay
);
assert
(
hret
==
HG_SUCCESS
);
/* pass through bulk handle */
in_relay
.
bulk_handle
=
in
.
bulk_handle
;
/* get addr of client to relay to data_xfer service */
hret
=
HG_Addr_to_string
(
margo_get_class
(
mid
)
,
client_addr_string
,
&
client_addr_string_sz
,
hgi
->
addr
);
hret
=
margo_addr_to_string
(
mid
,
client_addr_string
,
&
client_addr_string_sz
,
hgi
->
addr
);
assert
(
hret
==
HG_SUCCESS
);
in_relay
.
client_addr
=
client_addr_string
;
margo_forward
(
mid
,
handle_relay
,
&
in_relay
);
hret
=
margo_forward
(
mid
,
handle_relay
,
&
in_relay
);
assert
(
hret
==
HG_SUCCESS
);
hret
=
HG_G
et_output
(
handle_relay
,
&
out_relay
);
hret
=
margo_g
et_output
(
handle_relay
,
&
out_relay
);
assert
(
hret
==
HG_SUCCESS
);
HG_F
ree_input
(
handle
,
&
in
);
HG_F
ree_output
(
handle_relay
,
&
out
);
margo_f
ree_input
(
handle
,
&
in
);
margo_f
ree_output
(
handle_relay
,
&
out
);
hret
=
HG_Respond
(
handle
,
NULL
,
NULL
,
&
out
);
hret
=
margo_respond
(
mid
,
handle
,
&
out
);
assert
(
hret
==
HG_SUCCESS
);
HG_Addr_free
(
margo_get_class
(
mid
)
,
data_xfer_svc_addr
);
HG_D
estroy
(
handle
);
HG_D
estroy
(
handle_relay
);
margo_addr_free
(
mid
,
data_xfer_svc_addr
);
margo_d
estroy
(
handle
);
margo_d
estroy
(
handle_relay
);
return
;
}
...
...
@@ -87,13 +87,13 @@ int delegator_service_register(margo_instance_id mid, ABT_pool pool, uint32_t mp
/* NOTE: this RPC may already be registered if this process has already registered a
* data-xfer service
*/
MARGO_REGISTER
(
mid
,
"data_xfer_read"
,
data_xfer_read_in_t
,
data_xfer_read_out_t
,
NULL
,
&
g_data_xfer_read_id
);
g_data_xfer_read_id
=
MARGO_REGISTER
(
mid
,
"data_xfer_read"
,
data_xfer_read_in_t
,
data_xfer_read_out_t
,
NULL
);
/* register RPC handler */
MARGO_REGISTER_MPLEX
(
mid
,
"delegator_read"
,
delegator_read_in_t
,
delegator_read_out_t
,
delegator_read_ult
,
mplex_id
,
pool
,
MARGO_RPC_ID_IGNORE
);
delegator_read_ult
,
mplex_id
,
pool
);
return
(
0
);
}
...
...
@@ -103,4 +103,3 @@ void delegator_deregister(margo_instance_id mid, ABT_pool pool, uint32_t mplex_i
/* TODO: undo what was done in delegator_register() */
return
;
}
examples/margo-example-client.c
View file @
8bdfc111
...
...
@@ -7,8 +7,8 @@
#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <mercury.h>
#include <abt.h>
#include <abt-snoozer.h>
#include <margo.h>
#include "my-rpc.h"
...
...
@@ -24,8 +24,6 @@ struct run_my_rpc_args
{
int
val
;
margo_instance_id
mid
;
hg_context_t
*
hg_context
;
hg_class_t
*
hg_class
;
hg_addr_t
svr_addr
;
};
...
...
@@ -40,11 +38,10 @@ int main(int argc, char **argv)
ABT_thread
threads
[
4
];
int
i
;
int
ret
;
hg_return_t
hret
;
ABT_xstream
xstream
;
ABT_pool
pool
;
margo_instance_id
mid
;
hg_context_t
*
hg_context
;
hg_class_t
*
hg_class
;
hg_addr_t
svr_addr
=
HG_ADDR_NULL
;
hg_handle_t
handle
;
char
proto
[
12
]
=
{
0
};
...
...
@@ -55,44 +52,27 @@ int main(int argc, char **argv)
return
(
-
1
);
}
/* boilerplate HG initialization steps */
/***************************************/
/* initialize Mercury using the transport portion of the destination
* address (i.e., the part before the first : character if present)
*/
for
(
i
=
0
;
i
<
11
&&
argv
[
1
][
i
]
!=
'\0'
&&
argv
[
1
][
i
]
!=
':'
;
i
++
)
proto
[
i
]
=
argv
[
1
][
i
];
hg_class
=
HG_Init
(
proto
,
HG_FALSE
);
if
(
!
hg_class
)
{
fprintf
(
stderr
,
"Error: HG_Init()
\n
"
);
return
(
-
1
);
}
hg_context
=
HG_Context_create
(
hg_class
);
if
(
!
hg_context
)
{
fprintf
(
stderr
,
"Error: HG_Context_create()
\n
"
);
HG_Finalize
(
hg_class
);
return
(
-
1
);
}
/* set up argobots */
/* actually start margo -- margo_init() encapsulates the Mercury &
* Argobots initialization, so this step must precede their use. */
/* Use main process to drive progress (it will relinquish control to
* Mercury during blocking communication calls). No RPC threads are
* used because this is a pure client that will not be servicing
* rpc requests.
*/
/***************************************/
ret
=
ABT_init
(
argc
,
argv
);
if
(
ret
!=
0
)
mid
=
margo_init
(
proto
,
MARGO_CLIENT_MODE
,
0
,
0
);
if
(
mid
==
MARGO_INSTANCE_NULL
)
{
fprintf
(
stderr
,
"Error: ABT_init()
\n
"
);
return
(
-
1
);
}
/* set primary ES to idle without polling */
ret
=
ABT_snoozer_xstream_self_set
();
if
(
ret
!=
0
)
{
fprintf
(
stderr
,
"Error: ABT_snoozer_xstream_self_set()
\n
"
);
fprintf
(
stderr
,
"Error: margo_init()
\n
"
);
return
(
-
1
);
}
margo_diag_start
(
mid
);
/* retrieve current pool to use for ULT creation */
ret
=
ABT_xstream_self
(
&
xstream
);
...
...
@@ -108,26 +88,18 @@ int main(int argc, char **argv)
return
(
-
1
);
}
/* actually start margo */
/***************************************/
mid
=
margo_init
(
0
,
0
,
hg_context
);
assert
(
mid
);
margo_diag_start
(
mid
);
/* register RPC */