Commit f38b5131 authored by Igor Ivanov's avatar Igor Ivanov
Browse files

netmode/mxm: Add mprobe/imrecv support


Signed-off-by: default avatarIgor Ivanov <Igor.Ivanov@itseez.com>
parent 696eb4aa
......@@ -265,6 +265,11 @@ static inline void _mxm_req_wait(mxm_req_base_t * req)
mxm_wait(&mxm_wreq);
}
static inline int _mxm_eager_threshold(void)
{
return 262144;
}
/*
* Tag management section
*/
......
......@@ -70,7 +70,7 @@ static MPIDI_Comm_ops_t comm_ops = {
MPID_nem_mxm_ssend, /* ssend */
MPID_nem_mxm_isend, /* isend */
MPID_nem_mxm_isend, /* irsend */
MPID_nem_mxm_issend, /* issend */
MPID_nem_mxm_issend,/* issend */
NULL, /* send_init */
NULL, /* bsend_init */
......@@ -142,6 +142,8 @@ int MPID_nem_mxm_init(MPIDI_PG_t * pg_p, int pg_rank, char **bc_val_p, int *val_
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
MPIDI_Anysource_improbe_fn = MPID_nem_mxm_anysource_improbe;
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MXM_INIT);
return mpi_errno;
......@@ -266,8 +268,12 @@ int MPID_nem_mxm_vc_init(MPIDI_VC_t * vc)
vc_area->pending_sends = 0;
vc->rndvSend_fn = NULL;
vc->rndvRecv_fn = NULL;
/* Use default rendezvous functions */
vc->eager_max_msg_sz = _mxm_eager_threshold();
vc->ready_eager_max_msg_sz = vc->eager_max_msg_sz;
vc->rndvSend_fn = MPID_nem_lmt_RndvSend;
vc->rndvRecv_fn = MPID_nem_lmt_RndvRecv;
vc->sendNoncontig_fn = MPID_nem_mxm_SendNoncontig;
vc->comm_ops = &comm_ops;
......
......@@ -107,11 +107,76 @@ int MPID_nem_mxm_improbe(MPIDI_VC_t * vc, int source, int tag, MPID_Comm * comm,
int *flag, MPID_Request ** message, MPI_Status * status)
{
int mpi_errno = MPI_SUCCESS;
mxm_error_t err;
mxm_recv_req_t mxm_req;
mxm_message_h mxm_msg;
MPID_nem_mxm_vc_area *vc_area = (vc ? VC_BASE(vc) : NULL);
MPIDI_STATE_DECL(MPID_STATE_MXM_IMPROBE);
MPIDI_FUNC_ENTER(MPID_STATE_MXM_IMPROBE);
MPIU_Assert(0 && "not currently implemented");
mxm_req.base.state = MXM_REQ_NEW;
mxm_req.base.mq = (mxm_mq_h) comm->dev.ch.netmod_priv;
mxm_req.base.conn = (vc_area ? vc_area->mxm_ep->mxm_conn : 0);
mxm_req.tag = _mxm_tag_mpi2mxm(tag, comm->context_id + context_offset);
mxm_req.tag_mask = _mxm_tag_mask(tag);
err = mxm_req_mprobe(&mxm_req, &mxm_msg);
if (MXM_OK == err) {
MPID_Request *req;
*flag = 1;
req = MPID_Request_create();
MPIU_Object_set_ref(req, 2);
req->kind = MPID_REQUEST_MPROBE;
req->comm = comm;
MPIR_Comm_add_ref(comm);
req->ch.vc = vc;
// MPIDI_Request_set_sync_send_flag(req, 1); /* set this flag in case MXM_REQ_OP_SEND_SYNC*/
MPIDI_Request_set_msg_type(req, MPIDI_REQUEST_EAGER_MSG);
req->dev.recv_pending_count = 1;
_mxm_to_mpi_status(mxm_req.base.error, &req->status);
req->status.MPI_TAG = _mxm_tag_mxm2mpi(mxm_req.completion.sender_tag);
req->status.MPI_SOURCE = mxm_req.completion.sender_imm;
req->dev.recv_data_sz = mxm_req.completion.sender_len;
MPIR_STATUS_SET_COUNT(req->status, req->dev.recv_data_sz);
req->dev.tmpbuf = MPIU_Malloc(req->dev.recv_data_sz);
MPIU_Assert(req->dev.tmpbuf);
mxm_req.base.completed_cb = NULL;
mxm_req.base.context = req;
mxm_req.base.data_type = MXM_REQ_DATA_BUFFER;
mxm_req.base.data.buffer.ptr = req->dev.tmpbuf;
mxm_req.base.data.buffer.length = req->dev.recv_data_sz;
err = mxm_message_recv(&mxm_req, mxm_msg);
_mxm_req_wait(&mxm_req.base);
MPIDI_CH3U_Request_complete(req);
*message = req;
/* TODO: Should we change status
_mxm_to_mpi_status(mxm_req.base.error, status);
*/
status->MPI_SOURCE = req->status.MPI_SOURCE;
status->MPI_TAG = req->status.MPI_TAG;
MPIR_STATUS_SET_COUNT(*status, req->dev.recv_data_sz);
_dbg_mxm_output(8,
"imProbe ========> Found USER msg (context %d from %d tag %d size %d) \n",
comm->context_id + context_offset, status->MPI_SOURCE, status->MPI_TAG, MPIR_STATUS_GET_COUNT(*status));
}
else if (MXM_ERR_NO_MESSAGE == err) {
*flag = 0;
*message = NULL;
}
else {
mpi_errno = MPI_ERR_INTERN;
}
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MXM_IMPROBE);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment