Commit b56f4f1d authored by Kenneth Raffenetti's avatar Kenneth Raffenetti
Browse files

portals4: implement cancel send



All MPI_Sends in the Portals4 netmod will cause some or all of the data to be
sent eagerly to the receiver. Canceling a send means having to find the data in
the unexpected message queue and removing it in order to preserve matching.
Because the message queues exist at the netmod level, it needs its own cancel
protocol.

The protocol is modeled on a similar case in CH3, but with its own method
for searching the unexpected queue. Custom netmod packet handlers are used to
receive and process the control messages.

Known Issue:
  Because we are using different PTs for the send and cancel message, it is
  possible the cancel request could arrive before the message being canceled.
Signed-off-by: default avatarAntonio Pena Monferrer <apenya@mcs.anl.gov>
parent b0f5772f
......@@ -200,6 +200,36 @@ int MPID_nem_ptl_lmt_handle_cookie(MPIDI_VC_t *vc, MPID_Request *req, MPID_IOV s
int MPID_nem_ptl_lmt_done_send(MPIDI_VC_t *vc, MPID_Request *req);
int MPID_nem_ptl_lmt_done_recv(MPIDI_VC_t *vc, MPID_Request *req);
/* packet handlers */
int MPID_nem_ptl_pkt_cancel_send_req_handler(MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *pkt,
MPIDI_msg_sz_t *buflen, MPID_Request **rreqp);
int MPID_nem_ptl_pkt_cancel_send_resp_handler(MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *pkt,
MPIDI_msg_sz_t *buflen, MPID_Request **rreqp);
/* local packet types */
typedef enum MPIDI_nem_ptl_pkt_type {
MPIDI_NEM_PTL_PKT_CANCEL_SEND_REQ,
MPIDI_NEM_PTL_PKT_CANCEL_SEND_RESP,
MPIDI_NEM_TCP_PKT_INVALID = -1 /* force signed, to avoid warnings */
} MPIDI_nem_ptl_pkt_type_t;
typedef struct MPIDI_nem_ptl_pkt_cancel_send_req
{
MPIDI_CH3_Pkt_type_t type;
unsigned subtype;
MPIDI_Message_match match;
MPI_Request sender_req_id;
} MPIDI_nem_ptl_pkt_cancel_send_req_t;
typedef struct MPIDI_nem_ptl_pkt_cancel_send_resp
{
MPIDI_CH3_Pkt_type_t type;
unsigned subtype;
MPI_Request sender_req_id;
int ack;
} MPIDI_nem_ptl_pkt_cancel_send_resp_t;
/* debugging */
const char *MPID_nem_ptl_strerror(int ret);
......
......@@ -76,6 +76,7 @@ static MPIDI_Comm_ops_t comm_ops = {
MPID_nem_ptl_improbe /* improbe */
};
static MPIDI_CH3_PktHandler_Fcn *MPID_nem_ptl_pkt_handlers[2]; /* for CANCEL_SEND_REQ and CANCEL_SEND_RESP */
#undef FUNCNAME
#define FUNCNAME get_target_info
......@@ -412,6 +413,13 @@ static int vc_init(MPIDI_VC_t *vc)
vc_ch->iStartContigMsg = MPID_nem_ptl_iStartContigMsg;
vc_ch->iSendContig = MPID_nem_ptl_iSendContig;
vc_ch->num_pkt_handlers = 2;
vc_ch->pkt_handler = MPID_nem_ptl_pkt_handlers;
MPID_nem_ptl_pkt_handlers[MPIDI_NEM_PTL_PKT_CANCEL_SEND_REQ] =
MPID_nem_ptl_pkt_cancel_send_req_handler;
MPID_nem_ptl_pkt_handlers[MPIDI_NEM_PTL_PKT_CANCEL_SEND_RESP] =
MPID_nem_ptl_pkt_cancel_send_resp_handler;
vc_ch->lmt_initiate_lmt = MPID_nem_ptl_lmt_initiate_lmt;
vc_ch->lmt_start_recv = MPID_nem_ptl_lmt_start_recv;
vc_ch->lmt_start_send = MPID_nem_ptl_lmt_start_send;
......
......@@ -292,3 +292,116 @@ int MPID_nem_ptl_anysource_improbe(int tag, MPID_Comm * comm, int context_offset
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME MPID_nem_ptl_pkt_cancel_send_req_handler
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
int MPID_nem_ptl_pkt_cancel_send_req_handler(MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *pkt,
MPIDI_msg_sz_t *buflen, MPID_Request **rreqp)
{
int ret, mpi_errno = MPI_SUCCESS;
MPIDI_nem_ptl_pkt_cancel_send_req_t *req_pkt = (MPIDI_nem_ptl_pkt_cancel_send_req_t *)pkt;
MPID_PKT_DECL_CAST(upkt, MPIDI_nem_ptl_pkt_cancel_send_resp_t, resp_pkt);
MPID_Request *search_req, *resp_req;
ptl_me_t me;
MPID_nem_ptl_vc_area *const vc_ptl = VC_PTL(vc);
MPIU_DBG_MSG_FMT(CH3_OTHER,VERBOSE,(MPIU_DBG_FDEST,
"received cancel send req pkt, sreq=0x%08x, rank=%d, tag=%d, context=%d",
req_pkt->sender_req_id, req_pkt->match.parts.rank,
req_pkt->match.parts.tag, req_pkt->match.parts.context_id));
/* create a dummy request and search for the message */
/* create a request */
search_req = MPID_Request_create();
MPID_nem_ptl_init_req(search_req);
MPIU_ERR_CHKANDJUMP1(!search_req, mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s", "MPID_Request_create");
MPIU_Object_set_ref(search_req, 2); /* 1 ref for progress engine and 1 ref for us */
search_req->kind = MPID_REQUEST_MPROBE;
/* create a dummy ME to use for searching the list */
me.start = NULL;
me.length = 0;
me.ct_handle = PTL_CT_NONE;
me.uid = PTL_UID_ANY;
me.options = ( PTL_ME_OP_PUT | PTL_ME_USE_ONCE );
me.min_free = 0;
me.match_bits = NPTL_MATCH(req_pkt->match.parts.tag, req_pkt->match.parts.context_id, req_pkt->match.parts.rank);
me.match_id = vc_ptl->id;
me.ignore_bits = NPTL_MATCH_IGNORE;
/* FIXME: this should use a custom handler that throws the data away inline */
REQ_PTL(search_req)->event_handler = handle_mprobe;
/* submit a search request */
ret = PtlMESearch(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_pt, &me, PTL_SEARCH_DELETE, search_req);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmesearch", "**ptlmesearch %s", MPID_nem_ptl_strerror(ret));
DBG_MSG_MESearch("REG", vc ? vc->pg_rank : 0, me, search_req);
/* wait for search request to complete */
do {
mpi_errno = MPID_nem_ptl_poll(FALSE);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
} while (!MPID_Request_is_complete(search_req));
/* send response */
resp_pkt->type = MPIDI_NEM_PKT_NETMOD;
resp_pkt->subtype = MPIDI_NEM_PTL_PKT_CANCEL_SEND_RESP;
resp_pkt->ack = REQ_PTL(search_req)->found;
resp_pkt->sender_req_id = req_pkt->sender_req_id;
MPID_nem_ptl_iStartContigMsg(vc, resp_pkt, sizeof(*resp_pkt), NULL,
0, &resp_req);
/* if the message was found, free the temporary buffer used to copy the data */
if (REQ_PTL(search_req)->found)
MPIU_Free(search_req->dev.tmpbuf);
MPID_Request_release(search_req);
if (resp_req != NULL)
MPID_Request_release(resp_req);
fn_exit:
return mpi_errno;
fn_fail:
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME MPID_nem_ptl_pkt_cancel_send_resp_handler
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
int MPID_nem_ptl_pkt_cancel_send_resp_handler(MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *pkt,
MPIDI_msg_sz_t *buflen, MPID_Request **rreqp)
{
int mpi_errno = MPI_SUCCESS;
MPID_Request *sreq;
MPIDI_nem_ptl_pkt_cancel_send_resp_t *resp_pkt = (MPIDI_nem_ptl_pkt_cancel_send_resp_t *)pkt;
int i, ret;
MPID_Request_get_ptr(resp_pkt->sender_req_id, sreq);
if (resp_pkt->ack) {
MPIR_STATUS_SET_CANCEL_BIT(sreq->status, TRUE);
/* remove any remaining get MEs */
for (i = 0; i < REQ_PTL(sreq)->num_gets; i++) {
ret = PtlMEUnlink(REQ_PTL(sreq)->get_me_p[i]);
MPIU_ERR_CHKANDJUMP1(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmeunlink", "**ptlmeunlink %s", MPID_nem_ptl_strerror(ret));
}
MPIU_DBG_MSG(CH3_OTHER,TYPICAL,"message cancelled");
} else {
MPIR_STATUS_SET_CANCEL_BIT(sreq->status, FALSE);
MPIU_DBG_MSG(CH3_OTHER,TYPICAL,"unable to cancel message");
}
MPIDI_CH3U_Request_complete(sreq);
*rreqp = NULL;
fn_exit:
return mpi_errno;
fn_fail:
goto fn_exit;
}
......@@ -87,7 +87,7 @@ static int handler_recv_dequeue_complete(const ptl_event_t *e)
MPIDI_STATE_DECL(MPID_STATE_HANDLER_RECV_DEQUEUE_COMPLETE);
MPIDI_FUNC_ENTER(MPID_STATE_HANDLER_RECV_DEQUEUE_COMPLETE);
MPIU_Assert(e->type == PTL_EVENT_PUT || e->type == PTL_EVENT_PUT_OVERFLOW);
dequeue_req(e);
......
......@@ -248,15 +248,17 @@ static int send_msg(ptl_hdr_data_t ssend_flag, struct MPIDI_VC *vc, const void *
MPID_nem_ptl_request_create_sreq(sreq, mpi_errno, comm);
sreq->dev.match.parts.rank = dest;
sreq->dev.match.parts.tag = tag;
sreq->dev.match.parts.context_id = comm->context_id + context_offset;
if (!vc_ptl->id_initialized) {
mpi_errno = MPID_nem_ptl_init_id(vc);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
MPIDI_Datatype_get_info(count, datatype, dt_contig, data_sz, dt_ptr, dt_true_lb);
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "count=%d datatype=%#x contig=%d data_sz=%lu", count, datatype, dt_contig, data_sz));
if (data_sz <= PTL_LARGE_THRESHOLD) {
/* Small message. Send all data eagerly */
if (dt_contig) {
......@@ -492,12 +494,37 @@ int MPID_nem_ptl_issend(struct MPIDI_VC *vc, const void *buf, int count, MPI_Dat
int MPID_nem_ptl_cancel_send(struct MPIDI_VC *vc, struct MPID_Request *sreq)
{
int mpi_errno = MPI_SUCCESS;
MPID_PKT_DECL_CAST(upkt, MPIDI_nem_ptl_pkt_cancel_send_req_t, csr_pkt);
MPID_Request *csr_sreq;
int was_incomplete;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_CANCEL_SEND);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_CANCEL_SEND);
/* portals4 has no way of cancelling a send */
MPIU_ERR_SETFATAL(mpi_errno, MPI_ERR_OTHER, "**notimpl");
/* The completion counter and reference count are incremented to keep
the request around long enough to receive a
response regardless of what the user does (free the request before
waiting, etc.). */
MPIDI_CH3U_Request_increment_cc(sreq, &was_incomplete);
if (!was_incomplete) {
/* The reference count is incremented only if the request was
complete before the increment. */
MPIR_Request_add_ref(sreq);
}
csr_pkt->type = MPIDI_NEM_PKT_NETMOD;
csr_pkt->subtype = MPIDI_NEM_PTL_PKT_CANCEL_SEND_REQ;
csr_pkt->match.parts.rank = sreq->dev.match.parts.rank;
csr_pkt->match.parts.tag = sreq->dev.match.parts.tag;
csr_pkt->match.parts.context_id = sreq->dev.match.parts.context_id;
csr_pkt->sender_req_id = sreq->handle;
MPID_nem_ptl_iStartContigMsg(vc, csr_pkt, sizeof(*csr_pkt), NULL,
0, &csr_sreq);
if (csr_sreq != NULL)
MPID_Request_release(csr_sreq);
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_CANCEL_SEND);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment