Commit cbc53593 authored by Norio Yamaguchi's avatar Norio Yamaguchi
Browse files

Fix the implementation of RMA in netmod-IB

Corresponding to the implementations of RMA in the upper layer.
parent 8790bf36
......@@ -462,6 +462,7 @@ int MPID_nem_ib_drain_scq(int dont_call_progress)
req->dev.recv_data_sz = type_size * req->dev.user_count;
int complete = 0;
int (*reqFn) (MPIDI_VC_t *, MPID_Request *, int *);
mpi_errno =
MPIDI_CH3U_Receive_data_found(req, REQ_FIELD(req, lmt_pack_buf), &data_len,
&complete);
......@@ -472,7 +473,12 @@ int MPID_nem_ib_drain_scq(int dont_call_progress)
MPIU_Free(REQ_FIELD(req, lmt_pack_buf));
MPID_nem_ib_lmt_send_PKT_LMT_DONE(req->ch.vc, req);
MPIDI_CH3U_Request_complete(req);
reqFn = req->dev.OnFinal;
if (reqFn) {
reqFn(req->ch.vc, req, &complete);
} else {
MPIDI_CH3U_Request_complete(req);
}
}
/* decrement the number of entries in IB command queue */
......@@ -498,7 +504,7 @@ int MPID_nem_ib_drain_scq(int dont_call_progress)
MPIU_Free(REQ_FIELD(req, lmt_pack_buf));
complete = 0;
mpi_errno = MPIDI_CH3_ReqHandler_PutAccumRespComplete(req->ch.vc, req, &complete); // call MPIDI_CH3U_Request_complete()
mpi_errno = MPIDI_CH3_ReqHandler_PutRecvComplete(req->ch.vc, req, &complete); // call MPIDI_CH3U_Request_complete()
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
MPIU_Assert(complete == TRUE);
......@@ -530,7 +536,7 @@ int MPID_nem_ib_drain_scq(int dont_call_progress)
/* All dtype data has been received, call req handler */
mpi_errno =
MPIDI_CH3_ReqHandler_PutRespDerivedDTComplete(req->ch.vc, req, &complete);
MPIDI_CH3_ReqHandler_PutDerivedDTRecvComplete(req->ch.vc, req, &complete);
MPIU_ERR_CHKANDJUMP1(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
"**ch3|postrecv %s", "MPIDI_CH3_PKT_PUT");
/* return 'complete == FALSE' */
......@@ -571,7 +577,7 @@ int MPID_nem_ib_drain_scq(int dont_call_progress)
MPIU_Free(REQ_FIELD(req, lmt_pack_buf));
complete = 0;
mpi_errno = MPIDI_CH3_ReqHandler_PutAccumRespComplete(req->ch.vc, req, &complete); // call MPIDI_CH3U_Request_complete()
mpi_errno = MPIDI_CH3_ReqHandler_AccumRecvComplete(req->ch.vc, req, &complete); // call MPIDI_CH3U_Request_complete()
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
MPIU_Assert(complete == TRUE);
......@@ -603,7 +609,7 @@ int MPID_nem_ib_drain_scq(int dont_call_progress)
/* All dtype data has been received, call req handler */
mpi_errno =
MPIDI_CH3_ReqHandler_AccumRespDerivedDTComplete(req->ch.vc, req, &complete);
MPIDI_CH3_ReqHandler_AccumDerivedDTRecvComplete(req->ch.vc, req, &complete);
MPIU_ERR_CHKANDJUMP1(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
"**ch3|postrecv %s", "MPIDI_CH3_ACCUMULATE");
/* return 'complete == FALSE' */
......@@ -1819,7 +1825,6 @@ int MPID_nem_ib_PktHandler_Put(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
MPIU_Assert(put_pkt->target_win_handle != MPI_WIN_NULL);
MPID_Win_get_ptr(put_pkt->target_win_handle, win_ptr);
mpi_errno = MPIDI_CH3_Start_rma_op_target(win_ptr, put_pkt->flags);
req = MPID_Request_create();
MPIU_Object_set_ref(req, 1); /* decrement only in drain_scq ? */
......@@ -1831,6 +1836,7 @@ int MPID_nem_ib_PktHandler_Put(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.target_win_handle = put_pkt->target_win_handle;
req->dev.source_win_handle = put_pkt->source_win_handle;
req->dev.flags = put_pkt->flags;
req->dev.OnFinal = MPIDI_CH3_ReqHandler_PutRecvComplete;
if (MPIR_DATATYPE_IS_PREDEFINED(put_pkt->datatype)) {
MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_PUT_RESP);
......@@ -1838,6 +1844,12 @@ int MPID_nem_ib_PktHandler_Put(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
MPID_Datatype_get_size_macro(put_pkt->datatype, type_size);
req->dev.recv_data_sz = type_size * put_pkt->count;
if (put_pkt->immed_len > 0) {
/* See if we can receive some data from packet header. */
MPIU_Memcpy(req->dev.user_buf, put_pkt->data, put_pkt->immed_len);
req->dev.user_buf = (void*)((char*)req->dev.user_buf + put_pkt->immed_len);
req->dev.recv_data_sz -= put_pkt->immed_len;
}
}
else {
/* derived datatype */
......@@ -1945,7 +1957,6 @@ int MPID_nem_ib_PktHandler_Accumulate(MPIDI_VC_t * vc,
MPIU_Assert(accum_pkt->target_win_handle != MPI_WIN_NULL);
MPID_Win_get_ptr(accum_pkt->target_win_handle, win_ptr);
mpi_errno = MPIDI_CH3_Start_rma_op_target(win_ptr, accum_pkt->flags);
req = MPID_Request_create();
MPIU_Object_set_ref(req, 1);
......@@ -1960,12 +1971,8 @@ int MPID_nem_ib_PktHandler_Accumulate(MPIDI_VC_t * vc,
req->dev.source_win_handle = accum_pkt->source_win_handle;
req->dev.flags = accum_pkt->flags;
if (accum_pkt->type == MPIDI_CH3_PKT_GET_ACCUM) {
req->dev.resp_request_handle = accum_pkt->request_handle;
}
else {
req->dev.resp_request_handle = MPI_REQUEST_NULL;
}
req->dev.resp_request_handle = MPI_REQUEST_NULL;
req->dev.OnFinal = MPIDI_CH3_ReqHandler_AccumRecvComplete;
if (MPIR_DATATYPE_IS_PREDEFINED(accum_pkt->datatype)) {
MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_ACCUM_RESP);
......@@ -1978,13 +1985,22 @@ int MPID_nem_ib_PktHandler_Accumulate(MPIDI_VC_t * vc,
MPIU_Assert(true_lb == 0);
req->dev.user_buf = MPIU_Malloc(accum_pkt->count * (MPIR_MAX(extent, true_extent)));
req->dev.final_user_buf = req->dev.user_buf;
MPID_Datatype_get_size_macro(accum_pkt->datatype, type_size);
req->dev.recv_data_sz = type_size * accum_pkt->count;
if (accum_pkt->immed_len > 0) {
/* See if we can receive some data from packet header. */
MPIU_Memcpy(req->dev.user_buf, accum_pkt->data, accum_pkt->immed_len);
req->dev.user_buf = (void*)((char*)req->dev.user_buf + accum_pkt->immed_len);
req->dev.recv_data_sz -= accum_pkt->immed_len;
}
}
else {
MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_ACCUM_RESP_DERIVED_DT);
req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_AccumRespDerivedDTComplete;
req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_AccumDerivedDTRecvComplete;
req->dev.datatype = MPI_DATATYPE_NULL;
req->dev.dtype_info = (MPIDI_RMA_dtype_info *) MPIU_Malloc(sizeof(MPIDI_RMA_dtype_info));
......@@ -2137,6 +2153,25 @@ int MPID_nem_ib_PktHandler_GetResp(MPIDI_VC_t * vc,
MPID_Request_get_ptr(get_resp_pkt->request_handle, req);
MPID_Win *win_ptr;
int target_rank = get_resp_pkt->target_rank;
MPID_Win_get_ptr(get_resp_pkt->source_win_handle, win_ptr);
/* decrement ack_counter on target */
if (get_resp_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED) {
mpi_errno = set_lock_sync_counter(win_ptr, target_rank);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
if (get_resp_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK) {
mpi_errno = MPIDI_CH3I_RMA_Handle_flush_ack(win_ptr, target_rank);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
if (get_resp_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK_ACK) {
mpi_errno = MPIDI_CH3I_RMA_Handle_flush_ack(win_ptr, target_rank);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
void *write_to_buf;
req->ch.lmt_data_sz = s_cookie_buf->len;
......
......@@ -302,9 +302,6 @@ static int MPID_nem_ib_iSendContig_core(MPIDI_VC_t * vc, MPID_Request * sreq, vo
if (((MPIDI_CH3_Pkt_t *) hdr)->type == MPIDI_CH3_PKT_GET) {
//printf("isendcontig_core,MPIDI_CH3_PKT_GET,ref_count=%d\n", sreq->ref_count);
}
if (hdr && ((MPIDI_CH3_Pkt_t *) hdr)->type == MPIDI_CH3_PKT_ACCUM_IMMED) {
dprintf("isendcontig_core,MPIDI_CH3_PKT_ACCUM_IMMED,ref_count=%d\n", sreq->ref_count);
}
if (hdr && ((MPIDI_CH3_Pkt_t *) hdr)->type == MPIDI_CH3_PKT_ACCUMULATE) {
dprintf("isendcontig_core,MPIDI_CH3_PKT_ACCUMULATE,ref_count=%d\n", sreq->ref_count);
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment