Commit 333b06cf authored by Antonio J. Pena's avatar Antonio J. Pena Committed by Pavan Balaji

netmod/portals4: Enforce ordering in RMA

The RMA delivery order involving eager and rendezvous (at netmod level)
packets to CH3 was not enforced because Portals 4 ensures matching order
but not the order of event delivery. This patch introduces a reordering
queue to enforce the order of delivery to CH3.
Signed-off-by: Pavan Balaji's avatarPavan Balaji <balaji@anl.gov>
parent 2c090612
......@@ -49,6 +49,7 @@ typedef struct {
ptl_handle_me_t *get_me_p;
int num_gets;
int put_done;
void *recv_ptr; /* used for reordering in ptl_nm */
void *chunk_buffer[MPID_NEM_PTL_NUM_CHUNK_BUFFERS];
MPIDI_msg_sz_t bytes_put;
int found; /* used in probes with PtlMESearch() */
......
......@@ -17,13 +17,75 @@
#define SENDBUF(req_) REQ_PTL(req_)->chunk_buffer[0]
#define TMPBUF(req_) REQ_PTL(req_)->chunk_buffer[1]
static char *recvbufs;
static char *recvbufs[NUM_RECV_BUFS];
static ptl_me_t overflow_me;
static ptl_me_t get_me;
static ptl_handle_me_t me_handles[NUM_RECV_BUFS];
static ptl_handle_me_t get_me_handle;
/* AUX STUFF FOR REORDERING LOGIC */
static GENERIC_Q_DECL(struct MPID_Request) reorder_queue;
static char *expected_recv_ptr, *max_recv_ptr[NUM_RECV_BUFS];
static int expected_recv_idx;
#undef FUNCNAME
#define FUNCNAME incr_expected_recv_ptr
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
static inline void incr_expected_recv_ptr(size_t size)
{
expected_recv_ptr += size > PTL_MAX_EAGER ? PTL_MAX_EAGER : size;
if (expected_recv_ptr > max_recv_ptr[expected_recv_idx]) {
++expected_recv_idx;
if (expected_recv_idx == NUM_RECV_BUFS)
expected_recv_idx = 0;
expected_recv_ptr = recvbufs[expected_recv_idx];
}
}
#undef FUNCNAME
#define FUNCNAME handle_request
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
static inline int handle_request(MPID_Request *req)
{
int mpi_errno = MPID_nem_handle_pkt(req->ch.vc, TMPBUF(req), REQ_PTL(req)->bytes_put);
incr_expected_recv_ptr(REQ_PTL(req)->bytes_put);
/* Free resources */
MPIU_Free(TMPBUF(req));
MPID_Request_release(req);
return mpi_errno;
}
#undef FUNCNAME
#define FUNCNAME progress_reorder
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
static inline int progress_reorder(void)
{
MPID_Request *req;
int mpi_errno = MPI_SUCCESS;
GENERIC_Q_SEARCH_REMOVE(&reorder_queue,
REQ_PTL(_e)->recv_ptr == expected_recv_ptr,
&req, MPID_Request, dev.next);
while (req) {
mpi_errno = handle_request(req);
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
GENERIC_Q_SEARCH_REMOVE(&reorder_queue,
REQ_PTL(_e)->recv_ptr == expected_recv_ptr,
&req, MPID_Request, dev.next);
}
fn_exit:
return mpi_errno;
fn_fail:
goto fn_exit;
}
/* END AUX STUFF FOR REORDERING LOGIC */
#undef FUNCNAME
#define FUNCNAME MPID_nem_ptl_nm_init
#undef FCNAME
......@@ -33,6 +95,7 @@ int MPID_nem_ptl_nm_init(void)
int mpi_errno = MPI_SUCCESS;
int i;
int ret;
char *tmp_ptr;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_NM_INIT);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_NM_INIT);
......@@ -50,14 +113,19 @@ int MPID_nem_ptl_nm_init(void)
overflow_me.min_free = PTL_MAX_EAGER;
/* allocate all overflow space at once */
recvbufs = MPIU_Malloc(NUM_RECV_BUFS * BUFSIZE);
tmp_ptr = MPIU_Malloc(NUM_RECV_BUFS * BUFSIZE);
expected_recv_ptr = tmp_ptr;
expected_recv_idx = 0;
for (i = 0; i < NUM_RECV_BUFS; ++i) {
overflow_me.start = recvbufs + (i * BUFSIZE);
recvbufs[i] = tmp_ptr;
overflow_me.start = tmp_ptr;
ret = PtlMEAppend(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_control_pt, &overflow_me,
PTL_OVERFLOW_LIST, (void *)(size_t)i, &me_handles[i]);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmeappend", "**ptlmeappend %s",
MPID_nem_ptl_strerror(ret));
tmp_ptr += BUFSIZE;
max_recv_ptr[i] = tmp_ptr - overflow_me.min_free;
}
/* register persistent ME for GET operations */
......@@ -76,6 +144,9 @@ int MPID_nem_ptl_nm_init(void)
&get_me_handle);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmeappend", "**ptlmeappend %s", MPID_nem_ptl_strerror(ret));
/* init the reorder queue */
reorder_queue.head = reorder_queue.tail = NULL;
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_NM_INIT);
return mpi_errno;
......@@ -106,7 +177,8 @@ int MPID_nem_ptl_nm_finalize(void)
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmeunlink", "**ptlmeunlink %s",
MPID_nem_ptl_strerror(ret));
MPIU_Free(recvbufs);
/* Freeing first element because the allocation was a single contiguous buffer */
MPIU_Free(recvbufs[0]);
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_NM_FINALIZE);
......@@ -393,9 +465,26 @@ int MPID_nem_ptl_nm_ctl_event_handler(const ptl_event_t *e)
vc_ptl = VC_PTL(vc);
if (remaining == 0) {
mpi_errno = MPID_nem_handle_pkt(vc, e->start, packet_sz);
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
if (e->start == expected_recv_ptr) {
incr_expected_recv_ptr(packet_sz);
mpi_errno = MPID_nem_handle_pkt(vc, e->start, packet_sz);
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
mpi_errno = progress_reorder();
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
}
else {
MPID_Request *req = MPID_Request_create();
/* This request is actually complete; just needs to wait to enforce ordering */
TMPBUF(req) = MPIU_Malloc(packet_sz);
MPIU_Assert(TMPBUF(req));
MPIU_Memcpy(TMPBUF(req), e->start, packet_sz);
REQ_PTL(req)->bytes_put = packet_sz;
req->ch.vc = vc;
REQ_PTL(req)->recv_ptr = e->start;
GENERIC_Q_ENQUEUE(&reorder_queue, req, dev.next);
}
}
else {
int incomplete;
......@@ -411,7 +500,7 @@ int MPID_nem_ptl_nm_ctl_event_handler(const ptl_event_t *e)
TMPBUF(req) = MPIU_Malloc(REQ_PTL(req)->bytes_put);
MPIU_Assert(TMPBUF(req) != NULL);
MPIU_Memcpy(TMPBUF(req), e->start, packet_sz);
REQ_PTL(req)->recv_ptr = e->start;
req->ch.vc = vc;
size = remaining < MPIDI_nem_ptl_ni_limits.max_msg_size ? remaining : MPIDI_nem_ptl_ni_limits.max_msg_size;
......@@ -477,13 +566,16 @@ int MPID_nem_ptl_nm_ctl_event_handler(const ptl_event_t *e)
MPIDI_CH3U_Request_decrement_cc(req, &incomplete);
if (!incomplete) {
mpi_errno = MPID_nem_handle_pkt(req->ch.vc, TMPBUF(req), REQ_PTL(req)->bytes_put);
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
/* Free resources */
MPIU_Free(TMPBUF(req));
MPID_Request_release(req);
if (REQ_PTL(req)->recv_ptr == expected_recv_ptr) {
mpi_errno = handle_request(req);
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
mpi_errno = progress_reorder();
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
}
else
GENERIC_Q_ENQUEUE(&reorder_queue, req, dev.next);
}
}
break;
......@@ -493,7 +585,7 @@ int MPID_nem_ptl_nm_ctl_event_handler(const ptl_event_t *e)
size_t buf_idx = (size_t)e->user_ptr;
int ret;
overflow_me.start = recvbufs + (buf_idx * BUFSIZE);
overflow_me.start = recvbufs[buf_idx];
ret = PtlMEAppend(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_control_pt, &overflow_me,
PTL_OVERFLOW_LIST, e->user_ptr, &me_handles[buf_idx]);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment