Commit b73778ea authored by Xin Zhao's avatar Xin Zhao
Browse files

Decrement Active Target counter at target side.



During PSCW, when there are active-message operations
to be issued in Win_complete, we piggback a AT_COMPLETE
flag with it so that when target receives it, it can
decrement a counter on target side and detect completion
when target counter reaches zero.
Signed-off-by: Pavan Balaji's avatarPavan Balaji <balaji@anl.gov>
parent 6578785d
......@@ -116,6 +116,7 @@ static inline MPIDI_RMA_Target_t *MPIDI_CH3I_Win_target_alloc(MPID_Win * win_ptr
e->lock_mode = 0;
e->outstanding_lock = 0;
e->disable_flush_local = 0;
e->win_complete_flag = 0;
e->sync.sync_flag = MPIDI_RMA_NONE;
e->sync.outstanding_acks = 0;
......
......@@ -96,6 +96,7 @@ typedef struct MPIDI_RMA_Target {
int lock_mode; /* e.g., MODE_NO_CHECK */
int outstanding_lock;
int disable_flush_local;
int win_complete_flag;
/* The target structure is free to be cleaned up when all of the
* following conditions hold true:
......
......@@ -1815,6 +1815,8 @@ int MPIDI_CH3_PktHandler_Flush( MPIDI_VC_t *, MPIDI_CH3_Pkt_t *,
MPIDI_msg_sz_t *, MPID_Request ** );
int MPIDI_CH3_PktHandler_FlushAck( MPIDI_VC_t *, MPIDI_CH3_Pkt_t *,
MPIDI_msg_sz_t *, MPID_Request ** );
int MPIDI_CH3_PktHandler_DecrAtCnt( MPIDI_VC_t *, MPIDI_CH3_Pkt_t *,
MPIDI_msg_sz_t *, MPID_Request ** );
int MPIDI_CH3_PktHandler_LockPutUnlock( MPIDI_VC_t *, MPIDI_CH3_Pkt_t *,
MPIDI_msg_sz_t *, MPID_Request ** );
int MPIDI_CH3_PktHandler_LockAccumUnlock( MPIDI_VC_t *, MPIDI_CH3_Pkt_t *,
......
......@@ -89,6 +89,7 @@ typedef enum {
MPIDI_CH3_PKT_UNLOCK,
MPIDI_CH3_PKT_FLUSH,
MPIDI_CH3_PKT_FLUSH_ACK,
MPIDI_CH3_PKT_DECR_AT_COUNTER,
MPIDI_CH3_PKT_LOCK_PUT_UNLOCK, /* optimization for single puts */
MPIDI_CH3_PKT_LOCK_GET_UNLOCK, /* optimization for single gets */
MPIDI_CH3_PKT_LOCK_ACCUM_UNLOCK, /* optimization for single accumulates */
......@@ -121,7 +122,7 @@ typedef enum {
MPIDI_CH3_PKT_FLAG_RMA_UNLOCK = 2,
MPIDI_CH3_PKT_FLAG_RMA_FLUSH = 4,
MPIDI_CH3_PKT_FLAG_RMA_REQ_ACK = 8,
MPIDI_CH3_PKT_FLAG_RMA_AT_COMPLETE = 16,
MPIDI_CH3_PKT_FLAG_RMA_DECR_AT_COUNTER = 16,
MPIDI_CH3_PKT_FLAG_RMA_NOCHECK = 32,
MPIDI_CH3_PKT_FLAG_RMA_SHARED = 64,
MPIDI_CH3_PKT_FLAG_RMA_EXCLUSIVE = 128,
......@@ -432,6 +433,11 @@ typedef struct MPIDI_CH3_Pkt_flush_ack {
* target state at the origin. */
} MPIDI_CH3_Pkt_flush_ack_t;
typedef struct MPIDI_CH3_Pkt_decr_at_counter {
MPIDI_CH3_Pkt_type_t type;
MPI_Win target_win_handle;
} MPIDI_CH3_Pkt_decr_at_counter_t;
typedef struct MPIDI_CH3_Pkt_lock_put_unlock {
MPIDI_CH3_Pkt_type_t type;
MPIDI_CH3_Pkt_flags_t flags;
......@@ -506,6 +512,7 @@ typedef union MPIDI_CH3_Pkt {
MPIDI_CH3_Pkt_unlock_t unlock;
MPIDI_CH3_Pkt_flush_t flush;
MPIDI_CH3_Pkt_flush_ack_t flush_ack;
MPIDI_CH3_Pkt_decr_at_counter_t decr_at_cnt;
MPIDI_CH3_Pkt_lock_put_unlock_t lock_put_unlock;
MPIDI_CH3_Pkt_lock_get_unlock_t lock_get_unlock;
MPIDI_CH3_Pkt_lock_accum_unlock_t lock_accum_unlock;
......
......@@ -186,6 +186,46 @@ static inline int MPIDI_CH3I_Send_flush_ack_pkt(MPIDI_VC_t *vc, MPID_Win *win_pt
}
#undef FUNCNAME
#define FUNCNAME send_decr_at_cnt_msg
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static inline int send_decr_at_cnt_msg(int dst, MPID_Win * win_ptr)
{
MPIDI_CH3_Pkt_t upkt;
MPIDI_CH3_Pkt_decr_at_counter_t *decr_at_cnt_pkt = &upkt.decr_at_cnt;
MPIDI_VC_t * vc;
MPID_Request *request = NULL;
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_SEND_DECR_AT_CNT_MSG);
MPIDI_RMA_FUNC_ENTER(MPID_STATE_SEND_DECR_AT_CNT_MSG);
MPIDI_Pkt_init(decr_at_cnt_pkt, MPIDI_CH3_PKT_DECR_AT_COUNTER);
decr_at_cnt_pkt->target_win_handle = win_ptr->all_win_handles[dst];
MPIDI_Comm_get_vc_set_active(win_ptr->comm_ptr, dst, &vc);
MPIU_THREAD_CS_ENTER(CH3COMM,vc);
mpi_errno = MPIDI_CH3_iStartMsg(vc, decr_at_cnt_pkt,
sizeof(*decr_at_cnt_pkt), &request);
MPIU_THREAD_CS_EXIT(CH3COMM,vc);
if (mpi_errno != MPI_SUCCESS) {
MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,"**ch3|rmamsg" );
}
if (request != NULL) {
MPID_Request_release(request);
}
fn_exit:
MPIDI_RMA_FUNC_EXIT(MPID_STATE_SEND_DECR_AT_CNT_MSG);
return mpi_errno;
/* --BEGIN ERROR HANDLING-- */
fn_fail:
goto fn_exit;
/* --END ERROR HANDLING-- */
}
#undef FUNCNAME
#define FUNCNAME acquire_local_lock
#undef FCNAME
......
......@@ -588,6 +588,8 @@ int MPIDI_CH3_PktHandler_Init( MPIDI_CH3_PktHandler_Fcn *pktArray[],
MPIDI_CH3_PktHandler_Flush;
pktArray[MPIDI_CH3_PKT_FLUSH_ACK] =
MPIDI_CH3_PktHandler_FlushAck;
pktArray[MPIDI_CH3_PKT_DECR_AT_COUNTER] =
MPIDI_CH3_PktHandler_DecrAtCnt;
pktArray[MPIDI_CH3_PKT_LOCK_PUT_UNLOCK] =
MPIDI_CH3_PktHandler_LockPutUnlock;
pktArray[MPIDI_CH3_PKT_LOCK_ACCUM_UNLOCK] =
......
......@@ -93,6 +93,13 @@ int MPIDI_CH3_ReqHandler_PutRecvComplete( MPIDI_VC_t *vc,
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
MPIDI_CH3_Progress_signal_completion();
}
if (rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_DECR_AT_COUNTER) {
win_ptr->at_completion_counter--;
MPIU_Assert(win_ptr->at_completion_counter >= 0);
/* Signal the local process when the op counter reaches 0. */
if (win_ptr->at_completion_counter == 0)
MPIDI_CH3_Progress_signal_completion();
}
mpi_errno = MPIDI_CH3_Finish_rma_op_target(vc, win_ptr, TRUE, rreq->dev.flags,
rreq->dev.source_win_handle);
......@@ -145,6 +152,13 @@ int MPIDI_CH3_ReqHandler_AccumRecvComplete( MPIDI_VC_t *vc,
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
MPIDI_CH3_Progress_signal_completion();
}
if (rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_DECR_AT_COUNTER) {
win_ptr->at_completion_counter--;
MPIU_Assert(win_ptr->at_completion_counter >= 0);
/* Signal the local process when the op counter reaches 0. */
if (win_ptr->at_completion_counter == 0)
MPIDI_CH3_Progress_signal_completion();
}
mpi_errno = MPIDI_CH3_Finish_rma_op_target(vc, win_ptr, TRUE, rreq->dev.flags,
rreq->dev.source_win_handle);
......@@ -720,6 +734,14 @@ int MPIDI_CH3_ReqHandler_FOPComplete( MPIDI_VC_t *vc,
}
}
if (rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_DECR_AT_COUNTER) {
win_ptr->at_completion_counter--;
MPIU_Assert(win_ptr->at_completion_counter >= 0);
/* Signal the local process when the op counter reaches 0. */
if (win_ptr->at_completion_counter == 0)
MPIDI_CH3_Progress_signal_completion();
}
/* There are additional steps to take if this is a passive
target RMA or the last operation from the source */
mpi_errno = MPIDI_CH3_Finish_rma_op_target(vc, win_ptr, TRUE, rreq->dev.flags,
......
......@@ -52,6 +52,14 @@ int MPIDI_CH3_ReqHandler_GetSendComplete( MPIDI_VC_t *vc ATTRIBUTE((unused)),
MPID_Win_get_ptr(sreq->dev.target_win_handle, win_ptr);
if (sreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_DECR_AT_COUNTER) {
win_ptr->at_completion_counter--;
MPIU_Assert(win_ptr->at_completion_counter >= 0);
/* Signal the local process when the op counter reaches 0. */
if (win_ptr->at_completion_counter == 0)
MPIDI_CH3_Progress_signal_completion();
}
mpi_errno = MPIDI_CH3_Finish_rma_op_target(NULL, win_ptr, FALSE, sreq->dev.flags, MPI_WIN_NULL);
if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }
......@@ -96,6 +104,14 @@ int MPIDI_CH3_ReqHandler_GaccumLikeSendComplete( MPIDI_VC_t *vc,
win_ptr->at_completion_counter--;
MPIU_Assert(win_ptr->at_completion_counter >= 0);
if (rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_DECR_AT_COUNTER) {
win_ptr->at_completion_counter--;
MPIU_Assert(win_ptr->at_completion_counter >= 0);
/* Signal the local process when the op counter reaches 0. */
if (win_ptr->at_completion_counter == 0)
MPIDI_CH3_Progress_signal_completion();
}
MPIDI_CH3U_Request_complete(rreq);
*complete = TRUE;
fn_exit:
......
......@@ -36,24 +36,6 @@ int MPIDI_CH3_PktHandler_Put(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
MPIU_DBG_MSG(CH3_OTHER, VERBOSE, "received put pkt");
if (put_pkt->count == 0) {
/* it's a 0-byte message sent just to decrement the
* completion counter. This happens only in
* post/start/complete/wait sync model; therefore, no need
* to check lock queue. */
if (put_pkt->target_win_handle != MPI_WIN_NULL) {
MPID_Win_get_ptr(put_pkt->target_win_handle, win_ptr);
mpi_errno =
MPIDI_CH3_Finish_rma_op_target(NULL, win_ptr, TRUE, put_pkt->flags, MPI_WIN_NULL);
if (mpi_errno) {
MPIU_ERR_POP(mpi_errno);
}
}
*buflen = sizeof(MPIDI_CH3_Pkt_t);
*rreqp = NULL;
goto fn_exit;
}
MPIU_Assert(put_pkt->target_win_handle != MPI_WIN_NULL);
MPID_Win_get_ptr(put_pkt->target_win_handle, win_ptr);
mpi_errno = MPIDI_CH3_Start_rma_op_target(win_ptr, put_pkt->flags);
......@@ -757,6 +739,13 @@ int MPIDI_CH3_PktHandler_CAS(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
MPID_Request_release(req);
}
if (cas_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_DECR_AT_COUNTER) {
win_ptr->at_completion_counter--;
MPIU_Assert(win_ptr->at_completion_counter >= 0);
/* Signal the local process when the op counter reaches 0. */
if (win_ptr->at_completion_counter == 0)
MPIDI_CH3_Progress_signal_completion();
}
/* There are additional steps to take if this is a passive
* target RMA or the last operation from the source */
......@@ -1604,6 +1593,38 @@ int MPIDI_CH3_PktHandler_FlushAck(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
}
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3_PktHandler_DecrAtCnt
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3_PktHandler_DecrAtCnt(MPIDI_VC_t * vc ATTRIBUTE((unused)),
MPIDI_CH3_Pkt_t * pkt,
MPIDI_msg_sz_t * buflen, MPID_Request ** rreqp)
{
MPIDI_CH3_Pkt_decr_at_counter_t *decr_at_cnt_pkt = &pkt->decr_at_cnt;
MPID_Win *win_ptr;
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_DECRATCNT);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_DECRATCNT);
MPID_Win_get_ptr(decr_at_cnt_pkt->target_win_handle, win_ptr);
win_ptr->at_completion_counter--;
MPIU_Assert(win_ptr->at_completion_counter >= 0);
*buflen = sizeof(MPIDI_CH3_Pkt_t);
*rreqp = NULL;
MPIDI_CH3_Progress_signal_completion();
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PKTHANDLER_DECRATCNT);
return mpi_errno;
fn_fail:
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3_PktHandler_Unlock
#undef FCNAME
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment