Commit e2db1e48 authored by Xin Zhao's avatar Xin Zhao
Browse files

Revert "netmod/mxm: Avoid calling mxm send req handling from mxm send completion callback"

This reverts commit 20f1f116.
parent c16466e3
......@@ -69,8 +69,6 @@ void MPID_nem_mxm_get_adi_msg(mxm_conn_h conn, mxm_imm_t imm, void *data,
void MPID_nem_mxm_anysource_posted(MPID_Request * req);
int MPID_nem_mxm_anysource_matched(MPID_Request * req);
int _mxm_handle_sreq(MPID_Request * req);
/* List type as queue
* Operations, initialization etc
*/
......@@ -176,25 +174,6 @@ typedef struct {
/* macro for mxm private in REQ */
#define REQ_BASE(reqp) ((reqp) ? (MPID_nem_mxm_req_area *)((reqp)->ch.netmod_area.padding) : NULL)
typedef GENERIC_Q_DECL(struct MPID_Request) MPID_nem_mxm_reqq_t;
#define MPID_nem_mxm_queue_empty(q) GENERIC_Q_EMPTY (q)
#define MPID_nem_mxm_queue_head(q) GENERIC_Q_HEAD (q)
#define MPID_nem_mxm_queue_enqueue(qp, ep) do { \
/* add refcount so req doesn't get freed before it's dequeued */ \
MPIR_Request_add_ref(ep); \
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, \
"MPID_nem_mxm_queue_enqueue req=%p (handle=%#x), queue=%p", \
ep, (ep)->handle, qp)); \
GENERIC_Q_ENQUEUE (qp, ep, dev.next); \
} while (0)
#define MPID_nem_mxm_queue_dequeue(qp, ep) do { \
GENERIC_Q_DEQUEUE (qp, ep, dev.next); \
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, \
"MPID_nem_mxm_queue_dequeuereq=%p (handle=%#x), queue=%p", \
*(ep), *(ep) ? (*(ep))->handle : -1, qp)); \
MPID_Request_release(*(ep)); \
} while (0)
typedef struct MPID_nem_mxm_module_t {
char *runtime_version;
const char *compiletime_version;
......@@ -209,7 +188,6 @@ typedef struct MPID_nem_mxm_module_t {
int mxm_np;
MPID_nem_mxm_ep_t *endpoint;
list_head_t free_queue;
MPID_nem_mxm_reqq_t sreq_queue;
struct {
int bulk_connect; /* use bulk connect */
int bulk_disconnect; /* use bulk disconnect */
......
......@@ -456,8 +456,6 @@ static int _mxm_init(int rank, int size)
list_grow_mxm_req(&_mxm_obj.free_queue);
MPIU_Assert(list_length(&_mxm_obj.free_queue) == MXM_MPICH_MAX_REQ);
_mxm_obj.sreq_queue.head = _mxm_obj.sreq_queue.tail = NULL;
mxm_obj = &_mxm_obj;
fn_exit:
......
......@@ -24,16 +24,10 @@ static int _mxm_process_rdtype(MPID_Request ** rreq_p, MPI_Datatype datatype,
int MPID_nem_mxm_poll(int in_blocking_progress)
{
int mpi_errno = MPI_SUCCESS;
MPID_Request *req = NULL;
MPIDI_STATE_DECL(MPID_STATE_MXM_POLL);
MPIDI_FUNC_ENTER(MPID_STATE_MXM_POLL);
while (!MPID_nem_mxm_queue_empty(mxm_obj->sreq_queue)) {
MPID_nem_mxm_queue_dequeue(&mxm_obj->sreq_queue, &req);
_mxm_handle_sreq(req);
}
mpi_errno = _mxm_poll();
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
......@@ -78,7 +72,6 @@ void MPID_nem_mxm_get_adi_msg(mxm_conn_h conn, mxm_imm_t imm, void *data,
vc = mxm_conn_ctx_get(conn);
_dbg_mxm_output(5, "========> Getting ADI msg (from=%d data_size %d) \n", vc->pg_rank, length);
_dbg_mxm_out_buf(data, (length > 16 ? 16 : length));
MPID_nem_handle_pkt(vc, data, (MPIDI_msg_sz_t) (length));
}
......@@ -151,10 +144,6 @@ int MPID_nem_mxm_anysource_matched(MPID_Request * req)
int MPID_nem_mxm_recv(MPIDI_VC_t * vc, MPID_Request * rreq)
{
int mpi_errno = MPI_SUCCESS;
MPIDI_msg_sz_t data_sz;
int dt_contig;
MPI_Aint dt_true_lb;
MPID_Datatype *dt_ptr;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_MXM_RECV);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_MXM_RECV);
......@@ -163,15 +152,18 @@ int MPID_nem_mxm_recv(MPIDI_VC_t * vc, MPID_Request * rreq)
MPIU_Assert(((rreq->dev.match.parts.rank == MPI_ANY_SOURCE) && (vc == NULL)) ||
(vc && !vc->ch.is_local));
MPIDI_Datatype_get_info(rreq->dev.user_count, rreq->dev.datatype, dt_contig, data_sz,
dt_ptr, dt_true_lb);
{
MPIR_Context_id_t context_id = rreq->dev.match.parts.context_id;
int tag = rreq->dev.match.parts.tag;
MPIDI_msg_sz_t data_sz;
int dt_contig;
MPI_Aint dt_true_lb;
MPID_Datatype *dt_ptr;
MPID_nem_mxm_vc_area *vc_area = NULL;
MPID_nem_mxm_req_area *req_area = NULL;
MPIDI_Datatype_get_info(rreq->dev.user_count, rreq->dev.datatype, dt_contig, data_sz,
dt_ptr, dt_true_lb);
rreq->dev.OnDataAvail = NULL;
rreq->dev.tmpbuf = NULL;
rreq->ch.vc = vc;
......@@ -231,6 +223,7 @@ static int _mxm_handle_rreq(MPID_Request * req)
MPIDI_msg_sz_t userbuf_sz;
MPID_Datatype *dt_ptr;
MPIDI_msg_sz_t data_sz;
MPIDI_VC_t *vc = NULL;
MPID_nem_mxm_vc_area *vc_area ATTRIBUTE((unused)) = NULL;
MPID_nem_mxm_req_area *req_area = NULL;
void *tmp_buf = NULL;
......@@ -326,7 +319,7 @@ static int _mxm_handle_rreq(MPID_Request * req)
}
}
MPIDI_CH3U_Handle_recv_req(req->ch.vc, req, &complete);
MPIDI_CH3U_Handle_recv_req(vc, req, &complete);
MPIU_Assert(complete == TRUE);
if (tmp_buf) MPIU_Free(tmp_buf);
......
......@@ -15,6 +15,7 @@ enum {
};
static int _mxm_handle_sreq(MPID_Request * req);
static void _mxm_send_completion_cb(void *context);
static int _mxm_isend(MPID_nem_mxm_ep_t * ep, MPID_nem_mxm_req_area * req,
int type, mxm_mq_h mxm_mq, int mxm_rank, int id, mxm_tag_t tag, int block);
......@@ -234,7 +235,6 @@ int MPID_nem_mxm_send(MPIDI_VC_t * vc, const void *buf, int count, MPI_Datatype
MPIDI_Request_create_sreq(sreq, mpi_errno, goto fn_exit);
MPIU_Assert(sreq != NULL);
MPIDI_Request_set_type(sreq, MPIDI_REQUEST_TYPE_SEND);
MPIDI_VC_FAI_send_seqnum(vc, seqnum);
MPIDI_Request_set_seqnum(sreq, seqnum);
if (HANDLE_GET_KIND(datatype) != HANDLE_KIND_BUILTIN) {
......@@ -336,8 +336,7 @@ int MPID_nem_mxm_ssend(MPIDI_VC_t * vc, const void *buf, int count, MPI_Datatype
/* create a request */
MPIDI_Request_create_sreq(sreq, mpi_errno, goto fn_exit);
MPIU_Assert(sreq != NULL);
MPIDI_Request_set_type(sreq, MPIDI_REQUEST_TYPE_SSEND);
MPIDI_Request_set_type(sreq, MPIDI_REQUEST_TYPE_SEND);
MPIDI_VC_FAI_send_seqnum(vc, seqnum);
MPIDI_Request_set_seqnum(sreq, seqnum);
if (HANDLE_GET_KIND(datatype) != HANDLE_KIND_BUILTIN) {
......@@ -440,7 +439,6 @@ int MPID_nem_mxm_isend(MPIDI_VC_t * vc, const void *buf, int count, MPI_Datatype
MPIDI_Request_create_sreq(sreq, mpi_errno, goto fn_exit);
MPIU_Assert(sreq != NULL);
MPIDI_Request_set_type(sreq, MPIDI_REQUEST_TYPE_SEND);
MPIDI_VC_FAI_send_seqnum(vc, seqnum);
MPIDI_Request_set_seqnum(sreq, seqnum);
if (HANDLE_GET_KIND(datatype) != HANDLE_KIND_BUILTIN) {
......@@ -543,8 +541,7 @@ int MPID_nem_mxm_issend(MPIDI_VC_t * vc, const void *buf, int count, MPI_Datatyp
/* create a request */
MPIDI_Request_create_sreq(sreq, mpi_errno, goto fn_exit);
MPIU_Assert(sreq != NULL);
MPIDI_Request_set_type(sreq, MPIDI_REQUEST_TYPE_SSEND);
MPIDI_Request_set_type(sreq, MPIDI_REQUEST_TYPE_SEND);
MPIDI_VC_FAI_send_seqnum(vc, seqnum);
MPIDI_Request_set_seqnum(sreq, seqnum);
if (HANDLE_GET_KIND(datatype) != HANDLE_KIND_BUILTIN) {
......@@ -622,9 +619,10 @@ int MPID_nem_mxm_issend(MPIDI_VC_t * vc, const void *buf, int count, MPI_Datatyp
}
int _mxm_handle_sreq(MPID_Request * req)
static int _mxm_handle_sreq(MPID_Request * req)
{
int complete = FALSE;
int (*reqFn) (MPIDI_VC_t *, MPID_Request *, int *);
MPID_nem_mxm_vc_area *vc_area = NULL;
MPID_nem_mxm_req_area *req_area = NULL;
......@@ -636,10 +634,8 @@ int _mxm_handle_sreq(MPID_Request * req)
16 ? 16 : req_area->iov_buf[0].length));
vc_area->pending_sends -= 1;
if (req->dev.tmpbuf) {
if (req->dev.datatype_ptr || req->ch.noncontig) {
MPIU_Free(req->dev.tmpbuf);
}
if (((req->dev.datatype_ptr != NULL) && (req->dev.tmpbuf != NULL))) {
MPIU_Free(req->dev.tmpbuf);
}
if (req_area->iov_count > MXM_MPICH_MAX_IOV) {
......@@ -648,8 +644,19 @@ int _mxm_handle_sreq(MPID_Request * req)
req_area->iov_count = 0;
}
MPIDI_CH3U_Handle_send_req(req->ch.vc, req, &complete);
MPIU_Assert(complete == TRUE);
reqFn = req->dev.OnDataAvail;
if (!reqFn) {
MPIDI_CH3U_Request_complete(req);
MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, ".... complete");
}
else {
MPIDI_VC_t *vc = req->ch.vc;
reqFn(vc, req, &complete);
if (!complete) {
MPIU_Assert(complete == TRUE);
}
}
return complete;
}
......@@ -676,7 +683,7 @@ static void _mxm_send_completion_cb(void *context)
req, req->status.MPI_ERROR);
if (likely(!MPIR_STATUS_GET_CANCEL_BIT(req->status))) {
MPID_nem_mxm_queue_enqueue(&mxm_obj->sreq_queue, req);
_mxm_handle_sreq(req);
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment