Commit 5df08702 authored by Antonio Pena Monferrer's avatar Antonio Pena Monferrer Committed by Antonio J. Pena
Browse files

netmod/portals4: Use persistent ME in RMA



rma/manyget was failing because we reached the maximum number of MEs
allowed: we were posting an ME for every remote get. This patch
implements a single persistent ME instead for the remote gets.

Fixes #2264
Signed-off-by: Kenneth Raffenetti's avatarKen Raffenetti <raffenet@mcs.anl.gov>
parent 3752b556
......@@ -19,7 +19,32 @@
static char *recvbufs;
static ptl_me_t overflow_me;
static ptl_me_t get_me;
static ptl_handle_me_t me_handles[NUM_RECV_BUFS];
static ptl_handle_me_t get_me_handle;
struct pending_gets_st {
void *ptr;
MPID_Request *req;
UT_hash_handle hh;
};
static struct pending_gets_st *pending_gets = NULL;
#undef FUNCNAME
#define FUNCNAME find_req
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
static inline MPID_Request * find_req(void *ptr)
{
struct pending_gets_st *pending_get;
MPID_Request *req;
HASH_FIND_PTR(pending_gets, &ptr, pending_get);
req = pending_get->req;
HASH_DEL(pending_gets, pending_get);
MPIU_Free(pending_get);
return req;
}
#undef FUNCNAME
#define FUNCNAME MPID_nem_ptl_nm_init
......@@ -57,6 +82,22 @@ int MPID_nem_ptl_nm_init(void)
MPID_nem_ptl_strerror(ret));
}
/* register persistent ME for GET operations */
get_me.start = NULL;
get_me.length = PTL_SIZE_MAX;
get_me.ct_handle = PTL_CT_NONE;
get_me.uid = PTL_UID_ANY;
get_me.options = ( PTL_ME_OP_GET | PTL_ME_IS_ACCESSIBLE | PTL_ME_NO_TRUNCATE |
PTL_ME_EVENT_LINK_DISABLE | PTL_ME_EVENT_UNLINK_DISABLE );
get_me.match_id.phys.pid = PTL_PID_ANY;
get_me.match_id.phys.nid = PTL_NID_ANY;
get_me.match_bits = NPTL_MATCH(GET_TAG, 0, MPI_ANY_SOURCE);
get_me.ignore_bits = NPTL_MATCH_IGNORE;
get_me.min_free = 0;
ret = PtlMEAppend(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_control_pt, &get_me, PTL_PRIORITY_LIST, NULL,
&get_me_handle);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmeappend", "**ptlmeappend %s", MPID_nem_ptl_strerror(ret));
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_NM_INIT);
return mpi_errno;
......@@ -83,6 +124,10 @@ int MPID_nem_ptl_nm_finalize(void)
MPID_nem_ptl_strerror(ret));
}
ret = PtlMEUnlink(get_me_handle);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmeunlink", "**ptlmeunlink %s",
MPID_nem_ptl_strerror(ret));
MPIU_Free(recvbufs);
fn_exit:
......@@ -92,55 +137,6 @@ int MPID_nem_ptl_nm_finalize(void)
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME meappend_large
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
static inline int meappend_large(ptl_process_t id, MPID_Request *req, ptl_match_bits_t match_bits, void *buf, size_t remaining)
{
int mpi_errno = MPI_SUCCESS;
int ret;
ptl_me_t me;
MPIDI_STATE_DECL(MPID_STATE_MEAPPEND_LARGE);
MPIDI_FUNC_ENTER(MPID_STATE_MEAPPEND_LARGE);
me.start = buf;
me.length = remaining < MPIDI_nem_ptl_ni_limits.max_msg_size ?
remaining : MPIDI_nem_ptl_ni_limits.max_msg_size;
me.ct_handle = PTL_CT_NONE;
me.uid = PTL_UID_ANY;
me.options = ( PTL_ME_OP_GET | PTL_ME_USE_ONCE | PTL_ME_IS_ACCESSIBLE |
PTL_ME_EVENT_LINK_DISABLE | PTL_ME_EVENT_UNLINK_DISABLE );
me.match_id = id;
me.match_bits = match_bits;
me.ignore_bits = NPTL_MATCH_IGNORE;
me.min_free = 0;
while (remaining) {
ptl_handle_me_t foo_me_handle;
++REQ_PTL(req)->num_gets;
ret = PtlMEAppend(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_control_pt, &me, PTL_PRIORITY_LIST, req,
&foo_me_handle);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmeappend", "**ptlmeappend %s",
MPID_nem_ptl_strerror(ret));
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "PtlMEAppend(req=%p tag=%#lx)", req, NPTL_MATCH_GET_TAG(match_bits)));
me.start = (char *)me.start + me.length;
remaining -= me.length;
if (remaining < MPIDI_nem_ptl_ni_limits.max_msg_size)
me.length = remaining;
}
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MEAPPEND_LARGE);
return mpi_errno;
fn_fail:
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME send_pkt
#undef FCNAME
......@@ -151,10 +147,10 @@ static inline int send_pkt(MPIDI_VC_t *vc, void *hdr_p, void *data_p, MPIDI_msg_
int mpi_errno = MPI_SUCCESS;
MPID_nem_ptl_vc_area *const vc_ptl = VC_PTL(vc);
int ret;
char *sendbuf;
const size_t sent_sz = data_sz < (PAYLOAD_SIZE-sreq->dev.ext_hdr_sz) ? data_sz : (PAYLOAD_SIZE-sreq->dev.ext_hdr_sz);
const size_t sendbuf_sz = SENDBUF_SIZE(sent_sz+sreq->dev.ext_hdr_sz);
char *sendbuf, *sendbuf_ptr;
const size_t sent_sz = data_sz < (PAYLOAD_SIZE-sreq->dev.ext_hdr_sz) ? data_sz : (PAYLOAD_SIZE-sreq->dev.ext_hdr_sz-sizeof(ptl_size_t));
const size_t remaining = data_sz - sent_sz;
const size_t sendbuf_sz = SENDBUF_SIZE(sent_sz+sreq->dev.ext_hdr_sz+(remaining?sizeof(ptl_size_t):0));
ptl_match_bits_t match_bits = NPTL_MATCH(CTL_TAG, 0, MPIDI_Process.my_pg_rank);
MPIDI_STATE_DECL(MPID_STATE_SEND_PKT);
......@@ -163,11 +159,12 @@ static inline int send_pkt(MPIDI_VC_t *vc, void *hdr_p, void *data_p, MPIDI_msg_
sendbuf = MPIU_Malloc(sendbuf_sz);
MPIU_Assert(sendbuf != NULL);
MPIU_Memcpy(sendbuf, hdr_p, sizeof(MPIDI_CH3_Pkt_t));
sendbuf_ptr = sendbuf + sizeof(MPIDI_CH3_Pkt_t);
if (sreq->dev.ext_hdr_sz > 0) {
/* copy extended packet header to send buf */
MPIU_Memcpy(sendbuf + sizeof(MPIDI_CH3_Pkt_t),
sreq->dev.ext_hdr_ptr, sreq->dev.ext_hdr_sz);
MPIU_Memcpy(sendbuf_ptr, sreq->dev.ext_hdr_ptr, sreq->dev.ext_hdr_sz);
sendbuf_ptr += sreq->dev.ext_hdr_sz;
}
TMPBUF(sreq) = NULL;
......@@ -175,12 +172,23 @@ static inline int send_pkt(MPIDI_VC_t *vc, void *hdr_p, void *data_p, MPIDI_msg_
REQ_PTL(sreq)->put_done = 0;
if (data_sz) {
MPIU_Memcpy(sendbuf + sizeof(MPIDI_CH3_Pkt_t) + sreq->dev.ext_hdr_sz, data_p, sent_sz);
if (remaining) /* Post MEs for the remote gets */
mpi_errno = meappend_large(vc_ptl->id, sreq, NPTL_MATCH(GET_TAG, 0, MPIDI_Process.my_pg_rank),
(char *)data_p + sent_sz, remaining);
if (mpi_errno)
goto fn_fail;
MPIU_Memcpy(sendbuf_ptr, data_p, sent_sz);
sendbuf_ptr += sent_sz;
if (remaining) {
char *tmp_ptr = (char *)data_p + sent_sz;
const char *endbuf = tmp_ptr + remaining;
/* The address/offset for the remote side to do the get is last in the buffer */
ptl_size_t *offset = (ptl_size_t *)sendbuf_ptr;
*offset = (ptl_size_t)tmp_ptr;
do {
struct pending_gets_st *pending_get = MPIU_Malloc(sizeof(struct pending_gets_st));
++REQ_PTL(sreq)->num_gets;
pending_get->ptr = tmp_ptr;
pending_get->req = sreq;
HASH_ADD_PTR(pending_gets, ptr, pending_get);
tmp_ptr += MPIDI_nem_ptl_ni_limits.max_msg_size;
} while (tmp_ptr < endbuf);
}
}
SENDBUF(sreq) = sendbuf;
......@@ -213,11 +221,11 @@ static int send_noncontig_pkt(MPIDI_VC_t *vc, MPID_Request *sreq, void *hdr_p)
int mpi_errno = MPI_SUCCESS;
MPID_nem_ptl_vc_area *const vc_ptl = VC_PTL(vc);
int ret;
char *sendbuf;
char *sendbuf, *sendbuf_ptr;
const size_t data_sz = sreq->dev.segment_size - sreq->dev.segment_first;
const size_t sent_sz = data_sz < (PAYLOAD_SIZE-sreq->dev.ext_hdr_sz) ? data_sz : (PAYLOAD_SIZE-sreq->dev.ext_hdr_sz);
const size_t sendbuf_sz = SENDBUF_SIZE(sent_sz+sreq->dev.ext_hdr_sz);
const size_t sent_sz = data_sz < (PAYLOAD_SIZE-sreq->dev.ext_hdr_sz) ? data_sz : (PAYLOAD_SIZE-sreq->dev.ext_hdr_sz-sizeof(ptl_size_t));
const size_t remaining = data_sz - sent_sz;
const size_t sendbuf_sz = SENDBUF_SIZE(sent_sz+sreq->dev.ext_hdr_sz+(remaining?sizeof(ptl_size_t):0));
ptl_match_bits_t match_bits = NPTL_MATCH(CTL_TAG, 0, MPIDI_Process.my_pg_rank);
MPIDI_STATE_DECL(MPID_STATE_SEND_NONCONTIG_PKT);
MPIDI_FUNC_ENTER(MPID_STATE_SEND_NONCONTIG_PKT);
......@@ -225,11 +233,12 @@ static int send_noncontig_pkt(MPIDI_VC_t *vc, MPID_Request *sreq, void *hdr_p)
sendbuf = MPIU_Malloc(sendbuf_sz);
MPIU_Assert(sendbuf != NULL);
MPIU_Memcpy(sendbuf, hdr_p, sizeof(MPIDI_CH3_Pkt_t));
sendbuf_ptr = sendbuf + sizeof(MPIDI_CH3_Pkt_t);
if (sreq->dev.ext_hdr_sz > 0) {
/* copy extended packet header to send buf */
MPIU_Memcpy(sendbuf + sizeof(MPIDI_CH3_Pkt_t),
sreq->dev.ext_hdr_ptr, sreq->dev.ext_hdr_sz);
MPIU_Memcpy(sendbuf_ptr, sreq->dev.ext_hdr_ptr, sreq->dev.ext_hdr_sz);
sendbuf_ptr += sreq->dev.ext_hdr_sz;
}
TMPBUF(sreq) = NULL;
......@@ -239,18 +248,29 @@ static int send_noncontig_pkt(MPIDI_VC_t *vc, MPID_Request *sreq, void *hdr_p)
if (data_sz) {
MPIDI_msg_sz_t first = sreq->dev.segment_first;
MPIDI_msg_sz_t last = sreq->dev.segment_first + sent_sz;
MPID_Segment_pack(sreq->dev.segment_ptr, first, &last, sendbuf + sizeof(MPIDI_CH3_Pkt_t) + sreq->dev.ext_hdr_sz);
MPID_Segment_pack(sreq->dev.segment_ptr, first, &last, sendbuf_ptr);
sendbuf_ptr += sent_sz;
if (remaining) { /* Post MEs for the remote gets */
char *tmp_ptr, *endbuf;
ptl_size_t *offset = (ptl_size_t *)sendbuf_ptr;
TMPBUF(sreq) = MPIU_Malloc(remaining);
tmp_ptr = TMPBUF(sreq);
endbuf = (char *)TMPBUF(sreq) + remaining;
*offset = (ptl_size_t)TMPBUF(sreq);
first = last;
last = sreq->dev.segment_size;
MPID_Segment_pack(sreq->dev.segment_ptr, first, &last, TMPBUF(sreq));
MPIU_Assert(last == sreq->dev.segment_size);
mpi_errno = meappend_large(vc_ptl->id, sreq, NPTL_MATCH(GET_TAG, 0, MPIDI_Process.my_pg_rank), TMPBUF(sreq), remaining);
if (mpi_errno)
goto fn_fail;
do {
struct pending_gets_st *pending_get = MPIU_Malloc(sizeof(struct pending_gets_st));
++REQ_PTL(sreq)->num_gets;
pending_get->ptr = tmp_ptr;
pending_get->req = sreq;
HASH_ADD_PTR(pending_gets, ptr, pending_get);
tmp_ptr += MPIDI_nem_ptl_ni_limits.max_msg_size;
} while (tmp_ptr < endbuf);
}
}
......@@ -419,12 +439,13 @@ int MPID_nem_ptl_nm_ctl_event_handler(const ptl_event_t *e)
int incomplete;
size_t size;
char *buf_ptr;
ptl_size_t target_offset;
MPID_Request *req = MPID_Request_create();
MPIU_Assert(req != NULL);
MPIDI_CH3U_Request_decrement_cc(req, &incomplete); /* We'll increment it below */
REQ_PTL(req)->event_handler = MPID_nem_ptl_nm_ctl_event_handler;
REQ_PTL(req)->bytes_put = packet_sz + remaining;
REQ_PTL(req)->bytes_put = packet_sz + remaining - sizeof(ptl_size_t);
TMPBUF(req) = MPIU_Malloc(REQ_PTL(req)->bytes_put);
MPIU_Assert(TMPBUF(req) != NULL);
MPIU_Memcpy(TMPBUF(req), e->start, packet_sz);
......@@ -432,11 +453,12 @@ int MPID_nem_ptl_nm_ctl_event_handler(const ptl_event_t *e)
req->ch.vc = vc;
size = remaining < MPIDI_nem_ptl_ni_limits.max_msg_size ? remaining : MPIDI_nem_ptl_ni_limits.max_msg_size;
buf_ptr = (char *)TMPBUF(req) + packet_sz;
while (remaining) {
buf_ptr = (char *)TMPBUF(req) + packet_sz - sizeof(ptl_size_t);
target_offset = *((ptl_size_t *)buf_ptr);
do {
MPIDI_CH3U_Request_increment_cc(req, &incomplete); /* Will be decremented - and eventually freed in REPLY */
ret = MPID_nem_ptl_rptl_get(MPIDI_nem_ptl_global_md, (ptl_size_t)buf_ptr,
size, vc_ptl->id, vc_ptl->ptc, NPTL_MATCH(GET_TAG, 0, MPIDI_Process.my_pg_rank), 0, req);
size, vc_ptl->id, vc_ptl->ptc, NPTL_MATCH(GET_TAG, 0, MPIDI_Process.my_pg_rank), target_offset, req);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlget", "**ptlget %s",
MPID_nem_ptl_strerror(ret));
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST,
......@@ -445,9 +467,10 @@ int MPID_nem_ptl_nm_ctl_event_handler(const ptl_event_t *e)
vc_ptl->id.phys.pid, vc_ptl->ptc, GET_TAG));
buf_ptr += size;
remaining -= size;
target_offset += size;
if (remaining < MPIDI_nem_ptl_ni_limits.max_msg_size)
size = remaining;
}
} while (remaining);
}
/* FIXME: this search/delete not be necessary if we set PTL_ME_UNEXPECTED_HDR_DISABLE
......@@ -462,7 +485,7 @@ int MPID_nem_ptl_nm_ctl_event_handler(const ptl_event_t *e)
case PTL_EVENT_GET:
{
MPID_Request *const req = e->user_ptr;
MPID_Request *const req = find_req(e->start);
if (--REQ_PTL(req)->num_gets == 0) {
MPIU_Free(TMPBUF(req));
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment