Commit f4253c38 authored by Pavan Balaji's avatar Pavan Balaji Committed by Kenneth Raffenetti
Browse files

Initial draft of flow-control in the portals4 netmod.



Portals4 by itself does not provide any flow-control.  This needs to
be managed by an upper-layer, such as MPICH.  Before this patch we
were relying on a bunch of unexpected buffers that were posted to the
portals library to manage unexpected messages.  However, since portals
asynchronously pulls out messages from the network, if the application
is delayed, it might result in the unexpected buffers being filled out
and the portal disabled.  This would cause MPICH to abort.

In this patch, we implement an initial version of flow-control that
allows us to reenable the portal when it gets disabled.  All this is
done in the context of the "rportals" wrappers that are implemented in
the rptl.* files.  We create an extra control portal that is only used
by rportals.  When the primary data portal gets disabled, the target
sends PAUSE messages to all other processes.  Once each process
confirms that it has no outstanding packets on the wire (i.e., all
packets have either been ACKed or NACKed), it sends a PAUSE-ACK
message.  When the target receives PAUSE-ACK messages from all
processes (thus confirming that the network traffic to itself has been
quiesced), it reenables the portal and sends an UNPAUSE message to all
processes.

This patch still does not deal with origin-side resource exhaustion.
This can happen, for example, if we run out of space on the event
queue on the origin side.
Signed-off-by: Kenneth Raffenetti's avatarKen Raffenetti <raffenet@mcs.anl.gov>
parent 28f6a689
......@@ -15,10 +15,12 @@ mpi_core_sources += \
src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_recv.c \
src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_nm.c \
src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_send.c \
src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_lmt.c
src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_lmt.c \
src/mpid/ch3/channels/nemesis/netmod/portals4/rptl.c
noinst_HEADERS += \
src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_impl.h
src/mpid/ch3/channels/nemesis/netmod/portals4/ptl_impl.h \
src/mpid/ch3/channels/nemesis/netmod/portals4/rptl.h
endif BUILD_NEMESIS_NETMOD_PORTALS4
......@@ -18,6 +18,7 @@ extern ptl_handle_ni_t MPIDI_nem_ptl_ni;
extern ptl_pt_index_t MPIDI_nem_ptl_pt;
extern ptl_pt_index_t MPIDI_nem_ptl_get_pt; /* portal for gets by receiver */
extern ptl_pt_index_t MPIDI_nem_ptl_control_pt; /* portal for MPICH control messages */
extern ptl_pt_index_t MPIDI_nem_ptl_rpt_pt; /* portal for MPICH control messages */
extern ptl_handle_eq_t MPIDI_nem_ptl_eq;
extern ptl_handle_md_t MPIDI_nem_ptl_global_md;
......@@ -88,6 +89,7 @@ typedef struct {
ptl_pt_index_t pt;
ptl_pt_index_t ptg;
ptl_pt_index_t ptc;
ptl_pt_index_t ptr;
int id_initialized; /* TRUE iff id and pt have been initialized */
MPIDI_msg_sz_t num_queued_sends; /* number of reqs for this vc in sendq */
} MPID_nem_ptl_vc_area;
......@@ -154,7 +156,7 @@ int MPID_nem_ptl_poll_finalize(void);
int MPID_nem_ptl_poll(int is_blocking_poll);
int MPID_nem_ptl_vc_terminated(MPIDI_VC_t *vc);
int MPID_nem_ptl_get_id_from_bc(const char *business_card, ptl_process_t *id, ptl_pt_index_t *pt, ptl_pt_index_t *ptg,
ptl_pt_index_t *ptc);
ptl_pt_index_t *ptc, ptl_pt_index_t *ptr);
void MPI_nem_ptl_pack_byte(MPID_Segment *segment, MPI_Aint first, MPI_Aint last, void *buf,
MPID_nem_ptl_pack_overflow_t *overflow);
int MPID_nem_ptl_unpack_byte(MPID_Segment *segment, MPI_Aint first, MPI_Aint last, void *buf,
......@@ -197,7 +199,7 @@ const char *MPID_nem_ptl_strnifail(ptl_ni_fail_t ni_fail);
const char *MPID_nem_ptl_strlist(ptl_list_t list);
#define DBG_MSG_PUT(md_, data_sz_, pg_rank_, match_, header_) do { \
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "PtlPut: md=%s data_sz=%lu pg_rank=%d", md_, data_sz_, pg_rank_)); \
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "MPID_nem_ptl_rptl_put: md=%s data_sz=%lu pg_rank=%d", md_, data_sz_, pg_rank_)); \
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, " tag=%#lx ctx=%#lx rank=%ld match=%#lx", \
NPTL_MATCH_GET_TAG(match_), NPTL_MATCH_GET_CTX(match_), NPTL_MATCH_GET_RANK(match_), match_)); \
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, " flags=%c%c%c data_sz=%ld header=%#lx", \
......
......@@ -6,6 +6,7 @@
#include "ptl_impl.h"
#include <pmi.h>
#include "rptl.h"
#ifdef ENABLE_CHECKPOINTING
#error Checkpointing not implemented
......@@ -18,11 +19,13 @@
#define PTI_KEY "PTI"
#define PTIG_KEY "PTIG"
#define PTIC_KEY "PTIC"
#define PTIR_KEY "PTIR"
ptl_handle_ni_t MPIDI_nem_ptl_ni;
ptl_pt_index_t MPIDI_nem_ptl_pt;
ptl_pt_index_t MPIDI_nem_ptl_get_pt; /* portal for gets by receiver */
ptl_pt_index_t MPIDI_nem_ptl_control_pt; /* portal for MPICH control messages */
ptl_pt_index_t MPIDI_nem_ptl_rpt_pt; /* portal for rportals control messages */
ptl_handle_eq_t MPIDI_nem_ptl_eq;
ptl_handle_md_t MPIDI_nem_ptl_global_md;
ptl_ni_limits_t MPIDI_nem_ptl_ni_limits;
......@@ -73,6 +76,54 @@ static MPIDI_Comm_ops_t comm_ops = {
};
#undef FUNCNAME
#define FUNCNAME get_target_info
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static int get_target_info(int rank, ptl_process_t *id, ptl_pt_index_t local_data_pt, ptl_pt_index_t *target_data_pt,
ptl_pt_index_t *target_control_pt)
{
int mpi_errno = MPI_SUCCESS;
struct MPIDI_VC *vc;
MPID_nem_ptl_vc_area *vc_ptl;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_GET_TARGET_INFO);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_GET_TARGET_INFO);
MPIDI_PG_Get_vc(MPIDI_Process.my_pg, rank, &vc);
vc_ptl = VC_PTL(vc);
if (!vc_ptl->id_initialized) {
mpi_errno = MPID_nem_ptl_init_id(vc);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
*id = vc_ptl->id;
MPIU_Assert(local_data_pt == MPIDI_nem_ptl_pt || local_data_pt == MPIDI_nem_ptl_get_pt ||
local_data_pt == MPIDI_nem_ptl_control_pt);
if (local_data_pt == MPIDI_nem_ptl_pt) {
*target_data_pt = vc_ptl->pt;
*target_control_pt = vc_ptl->ptr;
}
else if (local_data_pt == MPIDI_nem_ptl_get_pt) {
*target_data_pt = vc_ptl->ptg;
*target_control_pt = PTL_PT_ANY;
}
else if (local_data_pt == MPIDI_nem_ptl_control_pt) {
*target_data_pt = vc_ptl->ptc;
*target_control_pt = PTL_PT_ANY;
}
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_GET_TARGET_INFO);
return mpi_errno;
fn_fail:
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME ptl_init
#undef FCNAME
......@@ -145,6 +196,11 @@ static int ptl_init(MPIDI_PG_t *pg_p, int pg_rank, char **bc_val_p, int *val_max
PTL_PT_ANY, &MPIDI_nem_ptl_control_pt);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptalloc", "**ptlptalloc %s", MPID_nem_ptl_strerror(ret));
/* allocate portal for MPICH control messages */
ret = PtlPTAlloc(MPIDI_nem_ptl_ni, PTL_PT_ONLY_USE_ONCE | PTL_PT_ONLY_TRUNCATE | PTL_PT_FLOWCTRL, MPIDI_nem_ptl_eq,
PTL_PT_ANY, &MPIDI_nem_ptl_rpt_pt);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptalloc", "**ptlptalloc %s", MPID_nem_ptl_strerror(ret));
/* create an MD that covers all of memory */
md.start = 0;
md.length = (ptl_size_t)-1;
......@@ -154,6 +210,24 @@ static int ptl_init(MPIDI_PG_t *pg_p, int pg_rank, char **bc_val_p, int *val_max
ret = PtlMDBind(MPIDI_nem_ptl_ni, &md, &MPIDI_nem_ptl_global_md);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmdbind", "**ptlmdbind %s", MPID_nem_ptl_strerror(ret));
/* currently, rportlas only works with a single NI and EQ */
ret = MPID_nem_ptl_rptl_init(MPIDI_Process.my_pg->size, 5, get_target_info);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlniinit", "**ptlniinit %s", MPID_nem_ptl_strerror(ret));
/* allow rportal to manage the primary portal and retransmit if needed */
ret = MPID_nem_ptl_rptl_ptinit(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_eq, MPIDI_nem_ptl_pt, MPIDI_nem_ptl_rpt_pt);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptalloc", "**ptlptalloc %s", MPID_nem_ptl_strerror(ret));
/* allow rportal to manage the get and control portals, but we
* don't expect retransmission to be needed on these portals, so
* we pass PTL_PT_ANY as the dummy portal. unfortunately, portals
* does not have an "invalid" PT constant, which would have been
* more appropriate to pass over here. */
ret = MPID_nem_ptl_rptl_ptinit(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_eq, MPIDI_nem_ptl_get_pt, PTL_PT_ANY);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptalloc", "**ptlptalloc %s", MPID_nem_ptl_strerror(ret));
ret = MPID_nem_ptl_rptl_ptinit(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_eq, MPIDI_nem_ptl_control_pt, PTL_PT_ANY);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptalloc", "**ptlptalloc %s", MPID_nem_ptl_strerror(ret));
/* create business card */
mpi_errno = get_business_card(pg_rank, bc_val_p, val_max_sz_p);
......@@ -192,15 +266,30 @@ static int ptl_finalize(void)
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
/* shut down portals */
ret = MPID_nem_ptl_rptl_drain_eq(1, &MPIDI_nem_ptl_eq);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptfree", "**ptlptfree %s", MPID_nem_ptl_strerror(ret));
ret = MPID_nem_ptl_rptl_ptfini(MPIDI_nem_ptl_pt);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptfree", "**ptlptfree %s", MPID_nem_ptl_strerror(ret));
ret = PtlPTFree(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_pt);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptfree", "**ptlptfree %s", MPID_nem_ptl_strerror(ret));
ret = MPID_nem_ptl_rptl_ptfini(MPIDI_nem_ptl_get_pt);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptfree", "**ptlptfree %s", MPID_nem_ptl_strerror(ret));
ret = PtlPTFree(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_get_pt);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptfree", "**ptlptfree %s", MPID_nem_ptl_strerror(ret));
ret = MPID_nem_ptl_rptl_ptfini(MPIDI_nem_ptl_control_pt);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptfree", "**ptlptfree %s", MPID_nem_ptl_strerror(ret));
ret = PtlPTFree(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_control_pt);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptfree", "**ptlptfree %s", MPID_nem_ptl_strerror(ret));
ret = PtlPTFree(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_rpt_pt);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlptfree", "**ptlptfree %s", MPID_nem_ptl_strerror(ret));
ret = PtlNIFini(MPIDI_nem_ptl_ni);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlnifini", "**ptlnifini %s", MPID_nem_ptl_strerror(ret));
......@@ -262,6 +351,12 @@ static int get_business_card(int my_rank, char **bc_val_p, int *val_max_sz_p)
MPIU_ERR_CHKANDJUMP(str_errno == MPIU_STR_NOMEM, mpi_errno, MPI_ERR_OTHER, "**buscard_len");
MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**buscard");
}
str_errno = MPIU_Str_add_binary_arg(bc_val_p, val_max_sz_p, PTIR_KEY, (char *)&MPIDI_nem_ptl_rpt_pt,
sizeof(MPIDI_nem_ptl_rpt_pt));
if (str_errno) {
MPIU_ERR_CHKANDJUMP(str_errno == MPIU_STR_NOMEM, mpi_errno, MPI_ERR_OTHER, "**buscard_len");
MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**buscard");
}
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_GET_BUSINESS_CARD);
......@@ -345,7 +440,7 @@ static int vc_destroy(MPIDI_VC_t *vc)
#define FUNCNAME MPID_nem_ptl_get_id_from_bc
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
int MPID_nem_ptl_get_id_from_bc(const char *business_card, ptl_process_t *id, ptl_pt_index_t *pt, ptl_pt_index_t *ptg, ptl_pt_index_t *ptc)
int MPID_nem_ptl_get_id_from_bc(const char *business_card, ptl_process_t *id, ptl_pt_index_t *pt, ptl_pt_index_t *ptg, ptl_pt_index_t *ptc, ptl_pt_index_t *ptr)
{
int mpi_errno = MPI_SUCCESS;
int ret;
......@@ -369,6 +464,9 @@ int MPID_nem_ptl_get_id_from_bc(const char *business_card, ptl_process_t *id, pt
ret = MPIU_Str_get_binary_arg(business_card, PTIC_KEY, (char *)ptc, sizeof(ptc), &len);
MPIU_ERR_CHKANDJUMP(ret != MPIU_STR_SUCCESS || len != sizeof(*ptc), mpi_errno, MPI_ERR_OTHER, "**badbusinesscard");
ret = MPIU_Str_get_binary_arg(business_card, PTIR_KEY, (char *)ptr, sizeof(ptr), &len);
MPIU_ERR_CHKANDJUMP(ret != MPIU_STR_SUCCESS || len != sizeof(*ptr), mpi_errno, MPI_ERR_OTHER, "**badbusinesscard");
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_GET_ID_FROM_BC);
return mpi_errno;
......@@ -461,7 +559,7 @@ int MPID_nem_ptl_init_id(MPIDI_VC_t *vc)
mpi_errno = vc->pg->getConnInfo(vc->pg_rank, bc, val_max_sz, vc->pg);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
mpi_errno = MPID_nem_ptl_get_id_from_bc(bc, &vc_ptl->id, &vc_ptl->pt, &vc_ptl->ptg, &vc_ptl->ptc);
mpi_errno = MPID_nem_ptl_get_id_from_bc(bc, &vc_ptl->id, &vc_ptl->pt, &vc_ptl->ptg, &vc_ptl->ptc, &vc_ptl->ptr);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
vc_ptl->id_initialized = TRUE;
......
......@@ -6,6 +6,7 @@
#include "ptl_impl.h"
#include <mpl_utlist.h>
#include "rptl.h"
#define NUM_SEND_BUFS 20
#define NUM_RECV_BUFS 20
......@@ -197,10 +198,10 @@ static inline int send_pkt(MPIDI_VC_t *vc, void **vhdr_p, void **vdata_p, MPIDI_
if (len > PTL_MAX_EAGER)
len = PTL_MAX_EAGER;
MPIU_Memcpy(sb->buf.hp.payload, *data_p, len);
ret = PtlPut(MPIDI_nem_ptl_global_md, (ptl_size_t)sb->buf.p, sizeof(sb->buf.hp.hdr) + len, PTL_NO_ACK_REQ, vc_ptl->id, vc_ptl->ptc, 0, 0, sb,
MPIDI_Process.my_pg_rank);
ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, (ptl_size_t)sb->buf.p, sizeof(sb->buf.hp.hdr) + len, PTL_NO_ACK_REQ, vc_ptl->id, vc_ptl->ptc, 0, 0, sb,
MPIDI_Process.my_pg_rank, 1);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s", MPID_nem_ptl_strerror(ret));
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "PtlPut(size=%lu id=(%#x,%#x) pt=%#x) sb=%p",
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "MPID_nem_ptl_rptl_put(size=%lu id=(%#x,%#x) pt=%#x) sb=%p",
sizeof(sb->buf.hp.hdr) + len, vc_ptl->id.phys.nid, vc_ptl->id.phys.pid,
vc_ptl->ptc, sb));
*hdr_p = NULL;
......@@ -214,9 +215,9 @@ static inline int send_pkt(MPIDI_VC_t *vc, void **vhdr_p, void **vdata_p, MPIDI_
if (len > BUFLEN)
len = BUFLEN;
MPIU_Memcpy(sb->buf.p, *data_p, len);
ret = PtlPut(MPIDI_nem_ptl_global_md, (ptl_size_t)sb->buf.p, len, PTL_NO_ACK_REQ, vc_ptl->id, vc_ptl->ptc, 0, 0, sb, MPIDI_Process.my_pg_rank);
ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, (ptl_size_t)sb->buf.p, len, PTL_NO_ACK_REQ, vc_ptl->id, vc_ptl->ptc, 0, 0, sb, MPIDI_Process.my_pg_rank, 1);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s", MPID_nem_ptl_strerror(ret));
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "PtlPut(size=%lu id=(%#x,%#x) pt=%#x) sb=%p", len,
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "MPID_nem_ptl_rptl_put(size=%lu id=(%#x,%#x) pt=%#x) sb=%p", len,
vc_ptl->id.phys.nid, vc_ptl->id.phys.pid, vc_ptl->ptc, sb));
*data_p += len;
*data_sz_p -= len;
......@@ -265,10 +266,10 @@ static int send_noncontig_pkt(MPIDI_VC_t *vc, MPID_Request *sreq, void **vhdr_p,
if (last > PTL_MAX_EAGER)
last = PTL_MAX_EAGER;
MPI_nem_ptl_pack_byte(sreq->dev.segment_ptr, 0, last, sb->buf.hp.payload, &REQ_PTL(sreq)->overflow[0]);
ret = PtlPut(MPIDI_nem_ptl_global_md, (ptl_size_t)sb->buf.p, sizeof(sb->buf.hp.hdr) + last, PTL_NO_ACK_REQ, vc_ptl->id, vc_ptl->ptc, 0, 0, sb,
MPIDI_Process.my_pg_rank);
ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, (ptl_size_t)sb->buf.p, sizeof(sb->buf.hp.hdr) + last, PTL_NO_ACK_REQ, vc_ptl->id, vc_ptl->ptc, 0, 0, sb,
MPIDI_Process.my_pg_rank, 1);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s", MPID_nem_ptl_strerror(ret));
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "PtlPut(size=%lu id=(%#x,%#x) pt=%#x) sb=%p",
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "MPID_nem_ptl_rptl_put(size=%lu id=(%#x,%#x) pt=%#x) sb=%p",
sizeof(sb->buf.hp.hdr) + last, vc_ptl->id.phys.nid, vc_ptl->id.phys.pid,
vc_ptl->ptc, sb));
*vhdr_p = NULL;
......@@ -290,10 +291,10 @@ static int send_noncontig_pkt(MPIDI_VC_t *vc, MPID_Request *sreq, void **vhdr_p,
MPI_nem_ptl_pack_byte(sreq->dev.segment_ptr, sreq->dev.segment_first, last, sb->buf.p, &REQ_PTL(sreq)->overflow[0]);
sreq->dev.segment_first = last;
ret = PtlPut(MPIDI_nem_ptl_global_md, (ptl_size_t)sb->buf.p, last - sreq->dev.segment_first, PTL_NO_ACK_REQ, vc_ptl->id, vc_ptl->ptc, 0, 0, sb,
MPIDI_Process.my_pg_rank);
ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, (ptl_size_t)sb->buf.p, last - sreq->dev.segment_first, PTL_NO_ACK_REQ, vc_ptl->id, vc_ptl->ptc, 0, 0, sb,
MPIDI_Process.my_pg_rank, 1);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s", MPID_nem_ptl_strerror(ret));
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "PtlPut(size=%lu id=(%#x,%#x) pt=%#x) sb=%p",
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "MPID_nem_ptl_rptl_put(size=%lu id=(%#x,%#x) pt=%#x) sb=%p",
last - sreq->dev.segment_first, vc_ptl->id.phys.nid, vc_ptl->id.phys.pid,
vc_ptl->ptc, sb));
......@@ -561,8 +562,8 @@ static int send_queued(void)
send_len += last - sreq->dev.segment_first;
sreq->dev.segment_first = last;
}
ret = PtlPut(MPIDI_nem_ptl_global_md, (ptl_size_t)sb->buf.p, send_len, PTL_NO_ACK_REQ, VC_PTL(sreq->ch.vc)->id, VC_PTL(sreq->ch.vc)->ptc, 0, 0, sb,
MPIDI_Process.my_pg_rank);
ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, (ptl_size_t)sb->buf.p, send_len, PTL_NO_ACK_REQ, VC_PTL(sreq->ch.vc)->id, VC_PTL(sreq->ch.vc)->ptc, 0, 0, sb,
MPIDI_Process.my_pg_rank, 1);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s", MPID_nem_ptl_strerror(ret));
if (!complete)
......
......@@ -5,6 +5,7 @@
*/
#include "ptl_impl.h"
#include "rptl.h"
#define OVERFLOW_LENGTH (1024*1024)
#define NUM_OVERFLOW_ME 8
......@@ -130,7 +131,7 @@ int MPID_nem_ptl_poll(int is_blocking_poll)
/* MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_POLL); */
while (1) {
ret = PtlEQGet(MPIDI_nem_ptl_eq, &event);
ret = MPID_nem_ptl_rptl_eqget(MPIDI_nem_ptl_eq, &event);
if (ret == PTL_EQ_EMPTY)
break;
MPIU_ERR_CHKANDJUMP(ret == PTL_EQ_DROPPED, mpi_errno, MPI_ERR_OTHER, "**eqdropped");
......
......@@ -5,6 +5,7 @@
*/
#include "ptl_impl.h"
#include "rptl.h"
#undef FUNCNAME
#define FUNCNAME dequeue_req
......@@ -233,7 +234,7 @@ static int handler_recv_dequeue_large(const ptl_event_t *e)
if (dt_contig) {
/* recv buffer is contig */
REQ_PTL(rreq)->event_handler = handler_recv_complete;
ret = PtlGet(MPIDI_nem_ptl_global_md, (ptl_size_t)((char *)rreq->dev.user_buf + dt_true_lb + PTL_LARGE_THRESHOLD),
ret = MPID_nem_ptl_rptl_get(MPIDI_nem_ptl_global_md, (ptl_size_t)((char *)rreq->dev.user_buf + dt_true_lb + PTL_LARGE_THRESHOLD),
data_sz - PTL_LARGE_THRESHOLD, vc_ptl->id, vc_ptl->ptg, e->match_bits, 0, rreq);
DBG_MSG_GET("global", data_sz - PTL_LARGE_THRESHOLD, vc->pg_rank, e->match_bits);
MPIU_DBG_MSG_P(CH3_CHANNEL, VERBOSE, " buf=%p", (char *)rreq->dev.user_buf + dt_true_lb + PTL_LARGE_THRESHOLD);
......@@ -260,7 +261,7 @@ static int handler_recv_dequeue_large(const ptl_event_t *e)
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmdbind", "**ptlmdbind %s", MPID_nem_ptl_strerror(ret));
REQ_PTL(rreq)->event_handler = handler_recv_complete;
ret = PtlGet(REQ_PTL(rreq)->md, 0, rreq->dev.segment_size - rreq->dev.segment_first, vc_ptl->id, vc_ptl->ptg,
ret = MPID_nem_ptl_rptl_get(REQ_PTL(rreq)->md, 0, rreq->dev.segment_size - rreq->dev.segment_first, vc_ptl->id, vc_ptl->ptg,
e->match_bits, 0, rreq);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlget", "**ptlget %s", MPID_nem_ptl_strerror(ret));
goto fn_exit;
......@@ -271,7 +272,7 @@ static int handler_recv_dequeue_large(const ptl_event_t *e)
MPIU_CHKPMEM_MALLOC(REQ_PTL(rreq)->chunk_buffer[0], void *, rreq->dev.segment_size - rreq->dev.segment_first, mpi_errno, "chunk_buffer");
REQ_PTL(rreq)->event_handler = handler_recv_unpack_complete;
ret = PtlGet(MPIDI_nem_ptl_global_md, (ptl_size_t)REQ_PTL(rreq)->chunk_buffer[0],
ret = MPID_nem_ptl_rptl_get(MPIDI_nem_ptl_global_md, (ptl_size_t)REQ_PTL(rreq)->chunk_buffer[0],
rreq->dev.segment_size - rreq->dev.segment_first, vc_ptl->id, vc_ptl->ptg, e->match_bits, 0, rreq);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlget", "**ptlget %s", MPID_nem_ptl_strerror(ret));
......@@ -331,7 +332,7 @@ static int handler_recv_dequeue_unpack_large(const ptl_event_t *e)
MPIU_CHKPMEM_MALLOC(REQ_PTL(rreq)->chunk_buffer[0], void *, rreq->dev.segment_size - rreq->dev.segment_first, mpi_errno, "chunk_buffer");
REQ_PTL(rreq)->event_handler = handler_recv_unpack_complete;
ret = PtlGet(MPIDI_nem_ptl_global_md, (ptl_size_t)REQ_PTL(rreq)->chunk_buffer[0],
ret = MPID_nem_ptl_rptl_get(MPIDI_nem_ptl_global_md, (ptl_size_t)REQ_PTL(rreq)->chunk_buffer[0],
rreq->dev.segment_size - rreq->dev.segment_first, vc_ptl->id, vc_ptl->ptg, e->match_bits, 0, rreq);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlget", "**ptlget %s", MPID_nem_ptl_strerror(ret));
......@@ -645,7 +646,7 @@ int MPID_nem_ptl_lmt_start_recv(MPIDI_VC_t *vc, MPID_Request *rreq, MPID_IOV s_
void * real_user_buf = (char *)rreq->dev.user_buf + dt_true_lb;
REQ_PTL(rreq)->event_handler = handler_recv_complete;
ret = PtlGet(MPIDI_nem_ptl_global_md, (ptl_size_t)((char *)real_user_buf + PTL_LARGE_THRESHOLD),
ret = MPID_nem_ptl_rptl_get(MPIDI_nem_ptl_global_md, (ptl_size_t)((char *)real_user_buf + PTL_LARGE_THRESHOLD),
data_sz - PTL_LARGE_THRESHOLD, vc_ptl->id, vc_ptl->ptg, match_bits, 0, rreq);
DBG_MSG_GET("global", data_sz - PTL_LARGE_THRESHOLD, vc->pg_rank, match_bits);
MPIU_DBG_MSG_P(CH3_CHANNEL, VERBOSE, " buf=%p", (char *)real_user_buf + PTL_LARGE_THRESHOLD);
......@@ -686,7 +687,7 @@ int MPID_nem_ptl_lmt_start_recv(MPIDI_VC_t *vc, MPID_Request *rreq, MPID_IOV s_
MPID_nem_ptl_strerror(ret));
REQ_PTL(rreq)->event_handler = handler_recv_complete;
ret = PtlGet(REQ_PTL(rreq)->md, 0, rreq->dev.segment_size, vc_ptl->id, vc_ptl->ptg,
ret = MPID_nem_ptl_rptl_get(REQ_PTL(rreq)->md, 0, rreq->dev.segment_size, vc_ptl->id, vc_ptl->ptg,
match_bits, PTL_LARGE_THRESHOLD, rreq);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlget", "**ptlget %s",
MPID_nem_ptl_strerror(ret));
......@@ -697,7 +698,7 @@ int MPID_nem_ptl_lmt_start_recv(MPIDI_VC_t *vc, MPID_Request *rreq, MPID_IOV s_
MPIU_CHKPMEM_MALLOC(REQ_PTL(rreq)->chunk_buffer[0], void *, rreq->dev.segment_size,
mpi_errno, "chunk_buffer");
REQ_PTL(rreq)->event_handler = handler_recv_unpack_complete;
ret = PtlGet(MPIDI_nem_ptl_global_md, (ptl_size_t)REQ_PTL(rreq)->chunk_buffer[0],
ret = MPID_nem_ptl_rptl_get(MPIDI_nem_ptl_global_md, (ptl_size_t)REQ_PTL(rreq)->chunk_buffer[0],
rreq->dev.segment_size, vc_ptl->id, vc_ptl->ptg, match_bits,
PTL_LARGE_THRESHOLD, rreq);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlget", "**ptlget %s",
......
......@@ -5,6 +5,7 @@
*/
#include "ptl_impl.h"
#include "rptl.h"
#undef FUNCNAME
#define FUNCNAME handler_send_complete
......@@ -101,7 +102,7 @@ static int handler_pack_chunk(const ptl_event_t *e)
sreq->dev.segment_first += PTL_LARGE_THRESHOLD;
/* notify receiver */
ret = PtlPut(MPIDI_nem_ptl_global_md, 0, 0, PTL_ACK_REQ, vc_ptl->id,
ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, 0, 0, PTL_ACK_REQ, vc_ptl->id,
vc_ptl->pt, ?????, 0, sreq,
NPTL_HEADER(?????, MPIDI_Process.my_pg_rank, me.match_bits));
......@@ -208,9 +209,9 @@ static int send_msg(ptl_hdr_data_t ssend_flag, struct MPIDI_VC *vc, const void *
MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "Small contig message");
REQ_PTL(sreq)->event_handler = handler_send_complete;
MPIU_DBG_MSG_P(CH3_CHANNEL, VERBOSE, "&REQ_PTL(sreq)->event_handler = %p", &(REQ_PTL(sreq)->event_handler));
ret = PtlPut(MPIDI_nem_ptl_global_md, (ptl_size_t)((char *)buf + dt_true_lb), data_sz, PTL_ACK_REQ, vc_ptl->id, vc_ptl->pt,
ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, (ptl_size_t)((char *)buf + dt_true_lb), data_sz, PTL_ACK_REQ, vc_ptl->id, vc_ptl->pt,
NPTL_MATCH(tag, comm->context_id + context_offset, comm->rank), 0, sreq,
NPTL_HEADER(ssend_flag, data_sz));
NPTL_HEADER(ssend_flag, data_sz), 1);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s", MPID_nem_ptl_strerror(ret));
DBG_MSG_PUT("global", data_sz, vc->pg_rank, NPTL_MATCH(tag, comm->context_id + context_offset, comm->rank), NPTL_HEADER(ssend_flag, data_sz));
MPIU_DBG_MSG_D(CH3_CHANNEL, VERBOSE, "id.nid = %#x", vc_ptl->id.phys.nid);
......@@ -245,9 +246,9 @@ static int send_msg(ptl_hdr_data_t ssend_flag, struct MPIDI_VC *vc, const void *
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmdbind", "**ptlmdbind %s", MPID_nem_ptl_strerror(ret));
REQ_PTL(sreq)->event_handler = handler_send_complete;
ret = PtlPut(REQ_PTL(sreq)->md, 0, data_sz, PTL_ACK_REQ, vc_ptl->id, vc_ptl->pt,
ret = MPID_nem_ptl_rptl_put(REQ_PTL(sreq)->md, 0, data_sz, PTL_ACK_REQ, vc_ptl->id, vc_ptl->pt,
NPTL_MATCH(tag, comm->context_id + context_offset, comm->rank), 0, sreq,
NPTL_HEADER(ssend_flag, data_sz));
NPTL_HEADER(ssend_flag, data_sz), 1);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s", MPID_nem_ptl_strerror(ret));
DBG_MSG_PUT("sreq", data_sz, vc->pg_rank, NPTL_MATCH(tag, comm->context_id + context_offset, comm->rank), NPTL_HEADER(ssend_flag, data_sz));
goto fn_exit;
......@@ -262,9 +263,9 @@ static int send_msg(ptl_hdr_data_t ssend_flag, struct MPIDI_VC *vc, const void *
MPID_Segment_pack(sreq->dev.segment_ptr, sreq->dev.segment_first, &last, REQ_PTL(sreq)->chunk_buffer[0]);
MPIU_Assert(last == sreq->dev.segment_size);
REQ_PTL(sreq)->event_handler = handler_send_complete;
ret = PtlPut(MPIDI_nem_ptl_global_md, (ptl_size_t)REQ_PTL(sreq)->chunk_buffer[0], data_sz, PTL_ACK_REQ,
ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, (ptl_size_t)REQ_PTL(sreq)->chunk_buffer[0], data_sz, PTL_ACK_REQ,
vc_ptl->id, vc_ptl->pt, NPTL_MATCH(tag, comm->context_id + context_offset, comm->rank), 0, sreq,
NPTL_HEADER(ssend_flag, data_sz));
NPTL_HEADER(ssend_flag, data_sz), 1);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s", MPID_nem_ptl_strerror(ret));
DBG_MSG_PUT("global", data_sz, vc->pg_rank, NPTL_MATCH(tag, comm->context_id + context_offset, comm->rank), NPTL_HEADER(ssend_flag, data_sz));
goto fn_exit;
......@@ -292,9 +293,9 @@ static int send_msg(ptl_hdr_data_t ssend_flag, struct MPIDI_VC *vc, const void *
REQ_PTL(sreq)->large = TRUE;
REQ_PTL(sreq)->event_handler = handler_large;
ret = PtlPut(MPIDI_nem_ptl_global_md, (ptl_size_t)((char *)buf + dt_true_lb), PTL_LARGE_THRESHOLD, PTL_ACK_REQ, vc_ptl->id, vc_ptl->pt,
ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, (ptl_size_t)((char *)buf + dt_true_lb), PTL_LARGE_THRESHOLD, PTL_ACK_REQ, vc_ptl->id, vc_ptl->pt,
NPTL_MATCH(tag, comm->context_id + context_offset, comm->rank), 0, sreq,
NPTL_HEADER(ssend_flag | NPTL_LARGE, data_sz));
NPTL_HEADER(ssend_flag | NPTL_LARGE, data_sz), 1);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s", MPID_nem_ptl_strerror(ret));
DBG_MSG_PUT("global", PTL_LARGE_THRESHOLD, vc->pg_rank, NPTL_MATCH(tag, comm->context_id + context_offset, comm->rank), NPTL_HEADER(ssend_flag | NPTL_LARGE, data_sz));
goto fn_exit;
......@@ -359,9 +360,9 @@ static int send_msg(ptl_hdr_data_t ssend_flag, struct MPIDI_VC *vc, const void *
REQ_PTL(sreq)->large = TRUE;
REQ_PTL(sreq)->event_handler = handler_large;
ret = PtlPut(REQ_PTL(sreq)->md, 0, PTL_LARGE_THRESHOLD, PTL_ACK_REQ, vc_ptl->id, vc_ptl->pt,
ret = MPID_nem_ptl_rptl_put(REQ_PTL(sreq)->md, 0, PTL_LARGE_THRESHOLD, PTL_ACK_REQ, vc_ptl->id, vc_ptl->pt,
NPTL_MATCH(tag, comm->context_id + context_offset, comm->rank), 0, sreq,
NPTL_HEADER(ssend_flag | NPTL_LARGE, data_sz));
NPTL_HEADER(ssend_flag | NPTL_LARGE, data_sz), 1);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s", MPID_nem_ptl_strerror(ret));
DBG_MSG_PUT("req", PTL_LARGE_THRESHOLD, vc->pg_rank, NPTL_MATCH(tag, comm->context_id + context_offset, comm->rank), NPTL_HEADER(ssend_flag | NPTL_LARGE, data_sz));
goto fn_exit;
......@@ -397,9 +398,9 @@ static int send_msg(ptl_hdr_data_t ssend_flag, struct MPIDI_VC *vc, const void *
REQ_PTL(sreq)->large = TRUE;
REQ_PTL(sreq)->event_handler = handler_large;
ret = PtlPut(MPIDI_nem_ptl_global_md, (ptl_size_t)REQ_PTL(sreq)->chunk_buffer[0], PTL_LARGE_THRESHOLD, PTL_ACK_REQ,
ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, (ptl_size_t)REQ_PTL(sreq)->chunk_buffer[0], PTL_LARGE_THRESHOLD, PTL_ACK_REQ,
vc_ptl->id, vc_ptl->pt, NPTL_MATCH(tag, comm->context_id + context_offset, comm->rank), 0, sreq,
NPTL_HEADER(ssend_flag | NPTL_LARGE, data_sz));
NPTL_HEADER(ssend_flag | NPTL_LARGE, data_sz), 1);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s", MPID_nem_ptl_strerror(ret));
DBG_MSG_PUT("global", PTL_LARGE_THRESHOLD, vc->pg_rank, NPTL_MATCH(tag, comm->context_id + context_offset, comm->rank), NPTL_HEADER(ssend_flag | NPTL_LARGE, data_sz));
goto fn_exit;
......@@ -438,9 +439,9 @@ static int send_msg(ptl_hdr_data_t ssend_flag, struct MPIDI_VC *vc, const void *
REQ_PTL(sreq)->large = TRUE;
REQ_PTL(sreq)->event_handler = handler_large_multi;
ret = PtlPut(MPIDI_nem_ptl_global_md, (ptl_size_t)REQ_PTL(sreq_)->chunk_buffer[0], PTL_LARGE_THRESHOLD, PTL_ACK_REQ, vc_ptl->id,
ret = MPID_nem_ptl_rptl_put(MPIDI_nem_ptl_global_md, (ptl_size_t)REQ_PTL(sreq_)->chunk_buffer[0], PTL_LARGE_THRESHOLD, PTL_ACK_REQ, vc_ptl->id,
vc_ptl->pt, NPTL_MATCH(tag, comm->context_id + context_offset, comm->rank), 0, sreq,
NPTL_HEADER(ssend_flag | NPTL_LARGE | NPTL_MULTIPLE, data_sz));
NPTL_HEADER(ssend_flag | NPTL_LARGE | NPTL_MULTIPLE, data_sz), 1);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlput", "**ptlput %s", MPID_nem_ptl_strerror(ret));
#endif
......
This diff is collapsed.
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/*
* (C) 2014 by Argonne National Laboratory.
* See COPYRIGHT in top-level directory.
*/
#if !defined RPTL_H_INCLUDED
#define RPTL_H_INCLUDED
#if !defined HAVE_MACRO_VA_ARGS
#error "portals requires VA_ARGS support"
#endif /* HAVE_MACRO_VA_ARGS */
#if defined HAVE__FUNC__
#define RPTLU_FUNC __func__
#elif defined HAVE_CAP__FUNC__
#define RPTLU_FUNC __FUNC__
#elif defined HAVE__FUNCTION__
#define RPTLU_FUNC __FUNCTION__
#else
#define RPTLU_FUNC "Unknown"
#endif
#define RPTLU_ERR_POP(ret, ...) \
{ \
if (ret) { \
MPIU_Error_printf("%s (%d): ", RPTLU_FUNC, __LINE__); \
MPIU_Error_printf(__VA_ARGS__); \
goto fn_fail; \
} \
}
struct rptl_op {
enum {
RPTL_OP_PUT,
RPTL_OP_GET
} op_type;
enum {
RPTL_OP_STATE_QUEUED,
RPTL_OP_STATE_ISSUED,
RPTL_OP_STATE_NACKED
} state;
union {
struct {
ptl_handle_md_t md_handle;
ptl_size_t local_offset;
ptl_size_t length;
ptl_ack_req_t ack_req;
ptl_process_t target_id;
ptl_pt_index_t pt_index;
ptl_match_bits_t match_bits;
ptl_size_t remote_offset;
void *user_ptr;
ptl_hdr_data_t hdr_data;
/* internal variables store events */
ptl_event_t *send;
ptl_event_t *ack;
int flow_control;
} put;
struct {
ptl_handle_md_t md_handle;
ptl_size_t local_offset;
ptl_size_t length;
ptl_process_t target_id;
ptl_pt_index_t pt_index;
ptl_match_bits_t match_bits;
ptl_size_t remote_offset;
void *user_ptr;
} get;
} u;
int events_ready;
struct rptl_op *next;
struct rptl_op *prev;