Commit 7e3e73a2 authored by James Dinan's avatar James Dinan
Browse files

[svn-r10426] MPI-3 RMA Flush implementation

This commit implements MPI-3 RMA's flush and flush_all operations.

Reviewer: buntinas
parent a3545fff
......@@ -1803,6 +1803,8 @@ int MPIDI_CH3_PktHandler_LockGranted( MPIDI_VC_t *, MPIDI_CH3_Pkt_t *,
MPIDI_msg_sz_t *, MPID_Request ** );
int MPIDI_CH3_PktHandler_Unlock( MPIDI_VC_t *, MPIDI_CH3_Pkt_t *,
MPIDI_msg_sz_t *, MPID_Request ** );
int MPIDI_CH3_PktHandler_Flush( MPIDI_VC_t *, MPIDI_CH3_Pkt_t *,
MPIDI_msg_sz_t *, MPID_Request ** );
int MPIDI_CH3_PktHandler_PtRMADone( MPIDI_VC_t *, MPIDI_CH3_Pkt_t *,
MPIDI_msg_sz_t *, MPID_Request ** );
int MPIDI_CH3_PktHandler_LockPutUnlock( MPIDI_VC_t *, MPIDI_CH3_Pkt_t *,
......
......@@ -76,6 +76,7 @@ typedef enum MPIDI_CH3_Pkt_type
MPIDI_CH3_PKT_LOCK,
MPIDI_CH3_PKT_LOCK_GRANTED,
MPIDI_CH3_PKT_UNLOCK,
MPIDI_CH3_PKT_FLUSH,
MPIDI_CH3_PKT_PT_RMA_DONE,
MPIDI_CH3_PKT_LOCK_PUT_UNLOCK, /* optimization for single puts */
MPIDI_CH3_PKT_LOCK_GET_UNLOCK, /* optimization for single gets */
......@@ -338,6 +339,7 @@ MPIDI_CH3_Pkt_lock_granted_t;
typedef MPIDI_CH3_Pkt_lock_granted_t MPIDI_CH3_Pkt_pt_rma_done_t;
typedef MPIDI_CH3_Pkt_lock_t MPIDI_CH3_Pkt_unlock_t;
typedef MPIDI_CH3_Pkt_lock_t MPIDI_CH3_Pkt_flush_t;
typedef struct MPIDI_CH3_Pkt_lock_put_unlock
{
......@@ -408,6 +410,7 @@ typedef union MPIDI_CH3_Pkt
MPIDI_CH3_Pkt_lock_t lock;
MPIDI_CH3_Pkt_lock_granted_t lock_granted;
MPIDI_CH3_Pkt_unlock_t unlock;
MPIDI_CH3_Pkt_flush_t flush;
MPIDI_CH3_Pkt_pt_rma_done_t pt_rma_done;
MPIDI_CH3_Pkt_lock_put_unlock_t lock_put_unlock;
MPIDI_CH3_Pkt_lock_get_unlock_t lock_get_unlock;
......
......@@ -185,7 +185,8 @@ typedef struct MPIDI_VC * MPID_VCR;
enum MPIDI_CH3_Lock_states_e {
MPIDI_CH3_WIN_LOCK_NONE = 0,
MPIDI_CH3_WIN_LOCK_REQUESTED,
MPIDI_CH3_WIN_LOCK_GRANTED
MPIDI_CH3_WIN_LOCK_GRANTED,
MPIDI_CH3_WIN_LOCK_FLUSH
};
#define MPIDI_DEV_WIN_DECL \
......
......@@ -580,6 +580,8 @@ int MPIDI_CH3_PktHandler_Init( MPIDI_CH3_PktHandler_Fcn *pktArray[],
MPIDI_CH3_PktHandler_LockGranted;
pktArray[MPIDI_CH3_PKT_UNLOCK] =
MPIDI_CH3_PktHandler_Unlock;
pktArray[MPIDI_CH3_PKT_FLUSH] =
MPIDI_CH3_PktHandler_Flush;
pktArray[MPIDI_CH3_PKT_PT_RMA_DONE] =
MPIDI_CH3_PktHandler_PtRMADone;
pktArray[MPIDI_CH3_PKT_LOCK_PUT_UNLOCK] =
......
......@@ -81,26 +81,6 @@ int MPIDI_Rget_accumulate(const void *origin_addr, int origin_count,
}
#undef FUNCNAME
#define FUNCNAME MPIDI_Win_flush
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_Win_flush(int rank, MPID_Win *win)
{
MPIDI_FUNC_NOTIMPL(WIN_FLUSH)
}
#undef FUNCNAME
#define FUNCNAME MPIDI_Win_flush_all
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_Win_flush_all(MPID_Win *win)
{
MPIDI_FUNC_NOTIMPL(WIN_FLUSH_ALL)
}
#undef FUNCNAME
#define FUNCNAME MPIDI_Win_flush_local
#undef FCNAME
......
......@@ -98,6 +98,7 @@ static MPIU_INSTR_Duration_count *list_block; /* Inner; while waiting */
static int MPIDI_CH3I_Send_lock_msg(int dest, int lock_type, MPID_Win *win_ptr);
static int MPIDI_CH3I_Send_unlock_msg(int dest, MPID_Win *win_ptr);
static int MPIDI_CH3I_Send_flush_msg(int dest, MPID_Win *win_ptr);
static int MPIDI_CH3I_Wait_for_lock_granted(MPID_Win *win_ptr);
static int MPIDI_CH3I_Send_rma_msg(MPIDI_RMA_ops * rma_op, MPID_Win * win_ptr,
MPI_Win source_win_handle,
......@@ -113,7 +114,7 @@ static int MPIDI_CH3I_Send_contig_acc_msg(MPIDI_RMA_ops *, MPID_Win *,
MPI_Win, MPI_Win, MPID_Request ** );
static int MPIDI_CH3I_Send_immed_rmw_msg(MPIDI_RMA_ops *, MPID_Win *,
MPI_Win, MPI_Win, MPID_Request ** );
static int MPIDI_CH3I_Do_passive_target_rma(MPID_Win *, int *);
static int MPIDI_CH3I_Do_passive_target_rma(MPID_Win *, int *, int);
static int MPIDI_CH3I_Send_lock_put_or_acc(MPID_Win *);
static int MPIDI_CH3I_Send_lock_get(MPID_Win *);
static int MPIDI_CH3I_RMAListComplete(MPID_Win *);
......@@ -1988,8 +1989,8 @@ int MPIDI_Win_unlock(int dest, MPID_Win *win_ptr)
if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }
/* Now do all the RMA operations */
mpi_errno = MPIDI_CH3I_Do_passive_target_rma(win_ptr,
&wait_for_rma_done_pkt);
mpi_errno = MPIDI_CH3I_Do_passive_target_rma(win_ptr, &wait_for_rma_done_pkt,
1 /* unlock the target */);
if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }
}
......@@ -2036,6 +2037,122 @@ int MPIDI_Win_unlock(int dest, MPID_Win *win_ptr)
}
#undef FUNCNAME
#define FUNCNAME MPIDI_Win_flush_all
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_Win_flush_all(MPID_Win *win_ptr)
{
/* TODO: Currently only one rank can be locked. */
return MPIDI_Win_flush(win_ptr->lockRank, win_ptr);
}
#undef FUNCNAME
#define FUNCNAME MPIDI_Win_flush
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_Win_flush(int rank, MPID_Win *win_ptr)
{
int mpi_errno = MPI_SUCCESS;
int wait_for_rma_done_pkt = 0;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_WIN_FLUSH_ALL);
MPIDI_RMA_FUNC_ENTER(MPID_STATE_MPIDI_WIN_FLUSH_ALL);
/* Local flush: ops are performed immediately on the local process */
if (rank == win_ptr->comm_ptr->rank) {
MPIU_Assert(win_ptr->remote_lock_state == MPIDI_CH3_WIN_LOCK_GRANTED);
MPIU_Assert(win_ptr->rma_ops_list_head == NULL && win_ptr->rma_ops_list_tail == NULL);
goto fn_exit;
}
/* MT: If another thread is performing a flush, wait for them to finish. */
if (win_ptr->remote_lock_state == MPIDI_CH3_WIN_LOCK_FLUSH)
{
MPID_Progress_state progress_state;
MPID_Progress_start(&progress_state);
while (win_ptr->remote_lock_state != MPIDI_CH3_WIN_LOCK_GRANTED)
{
mpi_errno = MPID_Progress_wait(&progress_state);
/* --BEGIN ERROR HANDLING-- */
if (mpi_errno != MPI_SUCCESS) {
MPID_Progress_end(&progress_state);
MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**winnoprogress");
}
/* --END ERROR HANDLING-- */
}
MPID_Progress_end(&progress_state);
}
/* Send a lock packet over to the target, wait for the lock_granted
reply, and perform the RMA ops. */
if (win_ptr->remote_lock_state == MPIDI_CH3_WIN_LOCK_NONE) {
/* Ensure that win_lock is waiting at the head of the ops list */
MPIU_ERR_CHKANDJUMP(win_ptr->rma_ops_list_head == NULL ||
win_ptr->rma_ops_list_head->type != MPIDI_RMA_LOCK,
mpi_errno, MPI_ERR_OTHER, "**rmasync");
mpi_errno = MPIDI_CH3I_Send_lock_msg(win_ptr->rma_ops_list_head->target_rank,
win_ptr->rma_ops_list_head->lock_type, win_ptr);
if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }
}
if (win_ptr->remote_lock_state != MPIDI_CH3_WIN_LOCK_GRANTED) {
mpi_errno = MPIDI_CH3I_Wait_for_lock_granted(win_ptr);
if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }
}
win_ptr->remote_lock_state = MPIDI_CH3_WIN_LOCK_FLUSH;
mpi_errno = MPIDI_CH3I_Do_passive_target_rma(win_ptr, &wait_for_rma_done_pkt,
0 /* don't unlock the target */);
if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }
/* If the lock is a shared lock or we have done the single op optimization,
we need to wait until the target informs us that all operations are done
on the target. This ensures that third-party communication can be done
safely. */
if (wait_for_rma_done_pkt == 1) {
/* wait until the "pt rma done" packet is received from the target.
This packet resets the win_ptr->remote_lock_state flag. */
MPIDI_CH3I_Send_flush_msg(rank, win_ptr);
/* poke the progress engine until remote_lock_state flag is reset */
if (win_ptr->remote_lock_state != MPIDI_CH3_WIN_LOCK_GRANTED)
{
MPID_Progress_state progress_state;
MPID_Progress_start(&progress_state);
while (win_ptr->remote_lock_state != MPIDI_CH3_WIN_LOCK_GRANTED)
{
mpi_errno = MPID_Progress_wait(&progress_state);
/* --BEGIN ERROR HANDLING-- */
if (mpi_errno != MPI_SUCCESS) {
MPID_Progress_end(&progress_state);
MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**winnoprogress");
}
/* --END ERROR HANDLING-- */
}
MPID_Progress_end(&progress_state);
}
}
else {
win_ptr->remote_lock_state = MPIDI_CH3_WIN_LOCK_GRANTED;
}
fn_exit:
MPIU_Assert( win_ptr->rma_ops_list_head == NULL && win_ptr->rma_ops_list_tail == NULL );
MPIDI_RMA_FUNC_EXIT(MPID_STATE_MPIDI_WIN_FLUSH_ALL);
return mpi_errno;
/* --BEGIN ERROR HANDLING-- */
fn_fail:
goto fn_exit;
/* --END ERROR HANDLING-- */
}
#undef FUNCNAME
#define FUNCNAME MPIDI_Win_lock_all
#undef FCNAME
......@@ -2129,8 +2246,8 @@ int MPIDI_Win_sync(MPID_Win *win_ptr)
#define FUNCNAME MPIDI_CH3I_Do_passive_target_rma
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static int MPIDI_CH3I_Do_passive_target_rma(MPID_Win *win_ptr,
int *wait_for_rma_done_pkt)
static int MPIDI_CH3I_Do_passive_target_rma(MPID_Win *win_ptr,
int *wait_for_rma_done_pkt, int unlock_target)
{
int mpi_errno = MPI_SUCCESS, nops;
MPIDI_RMA_ops *curr_ptr;
......@@ -2141,7 +2258,8 @@ static int MPIDI_CH3I_Do_passive_target_rma(MPID_Win *win_ptr,
MPIDI_RMA_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_DO_PASSIVE_TARGET_RMA);
MPIU_Assert(win_ptr->remote_lock_state == MPIDI_CH3_WIN_LOCK_GRANTED);
MPIU_Assert(win_ptr->remote_lock_state == MPIDI_CH3_WIN_LOCK_GRANTED ||
win_ptr->remote_lock_state == MPIDI_CH3_WIN_LOCK_FLUSH);
if (win_ptr->remote_lock_mode == MPI_LOCK_EXCLUSIVE) {
/* Exclusive lock -- no need to wait for rma done pkt at the end. This
......@@ -2239,8 +2357,7 @@ static int MPIDI_CH3I_Do_passive_target_rma(MPID_Win *win_ptr,
MPIU_Assert(nops > 0);
MPIU_Assert(curr_ptr->target_rank == win_ptr->lockRank);
/* Could also be curr_ptr->next == NULL */
if (!curr_ptr->next)
if (curr_ptr->next == NULL && unlock_target)
source_win_handle = win_ptr->handle;
else
source_win_handle = MPI_WIN_NULL;
......@@ -2319,7 +2436,7 @@ static int MPIDI_CH3I_Do_passive_target_rma(MPID_Win *win_ptr,
MPIU_INSTR_STMT(list_block=MPIU_INSTR_GET_VAR(winunlock_block));
mpi_errno = MPIDI_CH3I_RMAListComplete(win_ptr);
}
else {
else if (unlock_target) {
/* No communication operations were left to process, but the RMA epoch
is open. Send an unlock message to release the lock at the target. */
mpi_errno = MPIDI_CH3I_Send_unlock_msg(win_ptr->lockRank, win_ptr);
......@@ -2475,6 +2592,46 @@ static int MPIDI_CH3I_Send_unlock_msg(int dest, MPID_Win *win_ptr) {
}
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_Send_flush_msg
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static int MPIDI_CH3I_Send_flush_msg(int dest, MPID_Win *win_ptr) {
int mpi_errno = MPI_SUCCESS;
MPIDI_CH3_Pkt_t upkt;
MPIDI_CH3_Pkt_flush_t *flush_pkt = &upkt.flush;
MPID_Request *req=NULL;
MPIDI_VC_t *vc;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_SEND_FLUSH_MSG);
MPIDI_RMA_FUNC_ENTER(MPID_STATE_MPIDI_SEND_FLUSH_MSG);
MPIU_Assert(win_ptr->remote_lock_state == MPIDI_CH3_WIN_LOCK_FLUSH);
MPIDI_Comm_get_vc_set_active(win_ptr->comm_ptr, dest, &vc);
MPIDI_Pkt_init(flush_pkt, MPIDI_CH3_PKT_FLUSH);
flush_pkt->target_win_handle = win_ptr->all_win_handles[dest];
flush_pkt->source_win_handle = win_ptr->handle;
MPIU_THREAD_CS_ENTER(CH3COMM,vc);
mpi_errno = MPIDI_CH3_iStartMsg(vc, flush_pkt, sizeof(*flush_pkt), &req);
MPIU_THREAD_CS_EXIT(CH3COMM,vc);
MPIU_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**winRMAmessage");
/* Release the request returned by iStartMsg */
if (req != NULL) {
MPID_Request_release(req);
}
fn_exit:
MPIDI_RMA_FUNC_EXIT(MPID_STATE_MPIDI_SEND_FLUSH_MSG);
return mpi_errno;
/* --BEGIN ERROR HANDLING-- */
fn_fail:
goto fn_exit;
/* --END ERROR HANDLING-- */
}
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_Send_lock_put_or_acc
......@@ -4319,8 +4476,11 @@ int MPIDI_CH3_PktHandler_PtRMADone( MPIDI_VC_t *vc ATTRIBUTE((unused)),
*buflen = sizeof(MPIDI_CH3_Pkt_t);
MPID_Win_get_ptr(pt_rma_done_pkt->source_win_handle, win_ptr);
/* reset the lock_granted flag in the window */
win_ptr->remote_lock_state = MPIDI_CH3_WIN_LOCK_NONE;
if (win_ptr->remote_lock_state == MPIDI_CH3_WIN_LOCK_FLUSH)
win_ptr->remote_lock_state = MPIDI_CH3_WIN_LOCK_GRANTED;
else
win_ptr->remote_lock_state = MPIDI_CH3_WIN_LOCK_NONE;
*rreqp = NULL;
MPIDI_CH3_Progress_signal_completion();
......@@ -4365,6 +4525,59 @@ int MPIDI_CH3_PktHandler_Unlock( MPIDI_VC_t *vc ATTRIBUTE((unused)),
}
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3_PktHandler_Flush
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3_PktHandler_Flush( MPIDI_VC_t *vc ATTRIBUTE((unused)),
MPIDI_CH3_Pkt_t *pkt,
MPIDI_msg_sz_t *buflen, MPID_Request **rreqp )
{
int mpi_errno = MPI_SUCCESS;
MPIDI_CH3_Pkt_flush_t * flush_pkt = &pkt->flush;
MPID_Win *win_ptr = NULL;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_FLUSH);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_FLUSH);
MPIU_DBG_MSG(CH3_OTHER,VERBOSE,"received flush pkt");
*buflen = sizeof(MPIDI_CH3_Pkt_t);
*rreqp = NULL;
/* This is a flush request packet */
if (flush_pkt->target_win_handle != MPI_WIN_NULL) {
MPID_Request *req=NULL;
flush_pkt->target_win_handle = MPI_WIN_NULL;
MPIU_THREAD_CS_ENTER(CH3COMM,vc);
mpi_errno = MPIDI_CH3_iStartMsg(vc, flush_pkt, sizeof(*flush_pkt), &req);
MPIU_THREAD_CS_EXIT(CH3COMM,vc);
MPIU_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**winRMAmessage");
/* Release the request returned by iStartMsg */
if (req != NULL) {
MPID_Request_release(req);
}
}
/* This is a flush response packet */
else {
MPID_Win_get_ptr(flush_pkt->source_win_handle, win_ptr);
win_ptr->remote_lock_state = MPIDI_CH3_WIN_LOCK_GRANTED;
MPIDI_CH3_Progress_signal_completion();
}
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PKTHANDLER_FLUSH);
return mpi_errno;
/* --BEGIN ERROR HANDLING-- */
fn_fail:
goto fn_exit;
/* --END ERROR HANDLING-- */
}
/* ------------------------------------------------------------------------ */
/* list_complete and list_block defined above */
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment