Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
M
margo
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
12
Issues
12
List
Boards
Labels
Milestones
Merge Requests
1
Merge Requests
1
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Commits
Issue Boards
Open sidebar
sds
margo
Commits
4d212f8a
Commit
4d212f8a
authored
Sep 28, 2017
by
Matthieu Dorier
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
added non-blocking functions
parent
70ef3d41
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
120 additions
and
26 deletions
+120
-26
margo.h
include/margo.h
+60
-0
margo.c
src/margo.c
+60
-26
No files found.
include/margo.h
View file @
4d212f8a
...
@@ -28,8 +28,10 @@ extern "C" {
...
@@ -28,8 +28,10 @@ extern "C" {
struct
margo_instance
;
struct
margo_instance
;
typedef
struct
margo_instance
*
margo_instance_id
;
typedef
struct
margo_instance
*
margo_instance_id
;
typedef
struct
margo_data
*
margo_data_ptr
;
typedef
struct
margo_data
*
margo_data_ptr
;
typedef
ABT_eventual
margo_request
;
#define MARGO_INSTANCE_NULL ((margo_instance_id)NULL)
#define MARGO_INSTANCE_NULL ((margo_instance_id)NULL)
#define MARGO_REQUEST_NULL ABT_EVENTUAL_NULL
#define MARGO_CLIENT_MODE 0
#define MARGO_CLIENT_MODE 0
#define MARGO_SERVER_MODE 1
#define MARGO_SERVER_MODE 1
#define MARGO_DEFAULT_MPLEX_ID 0
#define MARGO_DEFAULT_MPLEX_ID 0
...
@@ -379,6 +381,27 @@ hg_return_t margo_forward(
...
@@ -379,6 +381,27 @@ hg_return_t margo_forward(
hg_handle_t
handle
,
hg_handle_t
handle
,
void
*
in_struct
);
void
*
in_struct
);
/**
* Forward (without blocking) an RPC request to a remote host
* @param [in] handle identifier for the RPC to be sent
* @param [in] in_struct input argument struct for RPC
* @param [out] req request to wait on using margo_wait
* @returns 0 on success, hg_return_t values on error
*/
hg_return_t
margo_iforward
(
hg_handle_t
handle
,
void
*
in_struct
,
margo_request
*
req
);
/**
* Wait for an operation initiated by a non-blocking
* margo function (margo_iforward, margo_irespond, etc.)
* @param [in] req request to wait on
* @returns 0 on success, hg_return_t values on error
*/
hg_return_t
margo_wait
(
margo_request
req
);
/**
/**
* Forward an RPC request to a remote host with a user-defined timeout
* Forward an RPC request to a remote host with a user-defined timeout
* @param [in] handle identifier for the RPC to be sent
* @param [in] handle identifier for the RPC to be sent
...
@@ -407,6 +430,19 @@ hg_return_t margo_respond(
...
@@ -407,6 +430,19 @@ hg_return_t margo_respond(
hg_handle_t
handle
,
hg_handle_t
handle
,
void
*
out_struct
);
void
*
out_struct
);
/**
* Send an RPC response without blocking.
* @param [in] handle identifier for the RPC for which a response is being
* sent
* @param [in] out_struct output argument struct for response
* @param [out] req request on which to wait using margo_wait
* @return HG_SUCCESS on success, hg_return_t values on error. See HG_Respond.
*/
hg_return_t
margo_irespond
(
hg_handle_t
handle
,
void
*
out_struct
,
margo_request
*
req
);
/**
/**
* Create an abstract bulk handle from specified memory segments.
* Create an abstract bulk handle from specified memory segments.
* Memory allocated is then freed when margo_bulk_free() is called.
* Memory allocated is then freed when margo_bulk_free() is called.
...
@@ -554,6 +590,30 @@ hg_return_t margo_bulk_transfer(
...
@@ -554,6 +590,30 @@ hg_return_t margo_bulk_transfer(
size_t
local_offset
,
size_t
local_offset
,
size_t
size
);
size_t
size
);
/**
* Asynchronously performs a bulk transfer
* @param [in] mid Margo instance
* @param [in] op type of operation to perform
* @param [in] origin_addr remote Mercury address
* @param [in] origin_handle remote Mercury bulk memory handle
* @param [in] origin_offset offset into remote bulk memory to access
* @param [in] local_handle local bulk memory handle
* @param [in] local_offset offset into local bulk memory to access
* @param [in] size size (in bytes) of transfer
* @param [out] req request to wait on using margo_wait
* @returns 0 on success, hg_return_t values on error
*/
hg_return_t
margo_bulk_itransfer
(
margo_instance_id
mid
,
hg_bulk_op_t
op
,
hg_addr_t
origin_addr
,
hg_bulk_t
origin_handle
,
size_t
origin_offset
,
hg_bulk_t
local_handle
,
size_t
local_offset
,
size_t
size
,
margo_request
*
req
);
/**
/**
* Suspends the calling ULT for a specified time duration
* Suspends the calling ULT for a specified time duration
* @param [in] mid Margo instance
* @param [in] mid Margo instance
...
...
src/margo.c
View file @
4d212f8a
...
@@ -625,11 +625,23 @@ static hg_return_t margo_cb(const struct hg_cb_info *info)
...
@@ -625,11 +625,23 @@ static hg_return_t margo_cb(const struct hg_cb_info *info)
hg_return_t
margo_forward
(
hg_return_t
margo_forward
(
hg_handle_t
handle
,
hg_handle_t
handle
,
void
*
in_struct
)
void
*
in_struct
)
{
hg_return_t
hret
;
margo_request
req
;
hret
=
margo_iforward
(
handle
,
in_struct
,
&
req
);
if
(
hret
!=
HG_SUCCESS
)
return
hret
;
return
margo_wait
(
req
);
}
hg_return_t
margo_iforward
(
hg_handle_t
handle
,
void
*
in_struct
,
margo_request
*
req
)
{
{
hg_return_t
hret
=
HG_TIMEOUT
;
hg_return_t
hret
=
HG_TIMEOUT
;
ABT_eventual
eventual
;
ABT_eventual
eventual
;
int
ret
;
int
ret
;
hg_return_t
*
waited_hret
;
struct
margo_cb_arg
arg
;
struct
margo_cb_arg
arg
;
ret
=
ABT_eventual_create
(
sizeof
(
hret
),
&
eventual
);
ret
=
ABT_eventual_create
(
sizeof
(
hret
),
&
eventual
);
...
@@ -639,16 +651,20 @@ hg_return_t margo_forward(
...
@@ -639,16 +651,20 @@ hg_return_t margo_forward(
}
}
arg
.
eventual
=
&
eventual
;
arg
.
eventual
=
&
eventual
;
*
req
=
eventual
;
hret
=
HG_Forward
(
handle
,
margo_cb
,
&
arg
,
in_struct
);
return
HG_Forward
(
handle
,
margo_cb
,
&
arg
,
in_struct
);
if
(
hret
==
HG_SUCCESS
)
}
{
ABT_eventual_wait
(
eventual
,
(
void
**
)
&
waited_hret
);
hret
=
*
waited_hret
;
}
ABT_eventual_free
(
&
eventual
);
hg_return_t
margo_wait
(
margo_request
req
)
{
hg_return_t
*
waited_hret
;
hg_return_t
hret
;
ABT_eventual_wait
(
req
,
(
void
**
)
&
waited_hret
);
hret
=
*
waited_hret
;
ABT_eventual_free
(
&
req
);
return
(
hret
);
return
(
hret
);
}
}
...
@@ -720,11 +736,23 @@ hg_return_t margo_forward_timed(
...
@@ -720,11 +736,23 @@ hg_return_t margo_forward_timed(
hg_return_t
margo_respond
(
hg_return_t
margo_respond
(
hg_handle_t
handle
,
hg_handle_t
handle
,
void
*
out_struct
)
void
*
out_struct
)
{
hg_return_t
hret
;
margo_request
req
;
hret
=
margo_irespond
(
handle
,
out_struct
,
&
req
);
if
(
hret
!=
HG_SUCCESS
)
return
hret
;
return
margo_wait
(
req
);
}
hg_return_t
margo_irespond
(
hg_handle_t
handle
,
void
*
out_struct
,
margo_request
*
req
)
{
{
hg_return_t
hret
=
HG_TIMEOUT
;
hg_return_t
hret
=
HG_TIMEOUT
;
ABT_eventual
eventual
;
ABT_eventual
eventual
;
int
ret
;
int
ret
;
hg_return_t
*
waited_hret
;
struct
margo_cb_arg
arg
;
struct
margo_cb_arg
arg
;
ret
=
ABT_eventual_create
(
sizeof
(
hret
),
&
eventual
);
ret
=
ABT_eventual_create
(
sizeof
(
hret
),
&
eventual
);
...
@@ -734,17 +762,9 @@ hg_return_t margo_respond(
...
@@ -734,17 +762,9 @@ hg_return_t margo_respond(
}
}
arg
.
eventual
=
&
eventual
;
arg
.
eventual
=
&
eventual
;
*
req
=
eventual
;
hret
=
HG_Respond
(
handle
,
margo_cb
,
&
arg
,
out_struct
);
return
HG_Respond
(
handle
,
margo_cb
,
&
arg
,
out_struct
);
if
(
hret
==
HG_SUCCESS
)
{
ABT_eventual_wait
(
eventual
,
(
void
**
)
&
waited_hret
);
hret
=
*
waited_hret
;
}
ABT_eventual_free
(
&
eventual
);
return
(
hret
);
}
}
hg_return_t
margo_bulk_create
(
hg_return_t
margo_bulk_create
(
...
@@ -796,6 +816,26 @@ hg_return_t margo_bulk_transfer(
...
@@ -796,6 +816,26 @@ hg_return_t margo_bulk_transfer(
hg_bulk_t
local_handle
,
hg_bulk_t
local_handle
,
size_t
local_offset
,
size_t
local_offset
,
size_t
size
)
size_t
size
)
{
margo_request
req
;
hg_return_t
hret
=
margo_bulk_itransfer
(
mid
,
op
,
origin_addr
,
origin_handle
,
origin_offset
,
local_handle
,
local_offset
,
size
,
req
);
if
(
hret
!=
HG_SUCCESS
)
return
hret
;
return
margo_wait
(
req
);
}
hg_return_t
margo_bulk_itransfer
(
margo_instance_id
mid
,
hg_bulk_op_t
op
,
hg_addr_t
origin_addr
,
hg_bulk_t
origin_handle
,
size_t
origin_offset
,
hg_bulk_t
local_handle
,
size_t
local_offset
,
size_t
size
,
margo_request
*
req
)
{
{
hg_return_t
hret
=
HG_TIMEOUT
;
hg_return_t
hret
=
HG_TIMEOUT
;
hg_return_t
*
waited_hret
;
hg_return_t
*
waited_hret
;
...
@@ -810,17 +850,11 @@ hg_return_t margo_bulk_transfer(
...
@@ -810,17 +850,11 @@ hg_return_t margo_bulk_transfer(
}
}
arg
.
eventual
=
&
eventual
;
arg
.
eventual
=
&
eventual
;
*
req
=
eventual
;
hret
=
HG_Bulk_transfer
(
mid
->
hg_context
,
margo_bulk_transfer_cb
,
hret
=
HG_Bulk_transfer
(
mid
->
hg_context
,
margo_bulk_transfer_cb
,
&
arg
,
op
,
origin_addr
,
origin_handle
,
origin_offset
,
local_handle
,
&
arg
,
op
,
origin_addr
,
origin_handle
,
origin_offset
,
local_handle
,
local_offset
,
size
,
HG_OP_ID_IGNORE
);
local_offset
,
size
,
HG_OP_ID_IGNORE
);
if
(
hret
==
HG_SUCCESS
)
{
ABT_eventual_wait
(
eventual
,
(
void
**
)
&
waited_hret
);
hret
=
*
waited_hret
;
}
ABT_eventual_free
(
&
eventual
);
return
(
hret
);
return
(
hret
);
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment