Commit 6578785d authored by Xin Zhao's avatar Xin Zhao
Browse files

Detect remote completion by FLUSH / FLUSH_ACK messages.



When the origin wants to do a FLUSH sync, if there are
active-message operations that are going to be issued,
we piggback the FLUSH message with the last operation;
if no such operations, we just send a single FLUSH packet.

If the last operation is a write op (PUT, ACC) or only
a single FLUSH packet is sent, after target recieves it,
target will send back a single FLUSH_ACK packet;
if the last operation contains a read action (GET, GACC, FOP,
CAS), after target receiveds it, target will piggback a
FLUSH_ACK flag with the response packet.

After origin receives the FLUSH_ACK packet or response packet
with FLUSH_ACK flag, it will decrement the counter which
indicates number of outgoing sync messages (FLUSH / UNLOCK).
When that counter reaches zero, origin can know that remote
completion is achieved.
Signed-off-by: Pavan Balaji's avatarPavan Balaji <balaji@anl.gov>
parent fe15ea26
......@@ -124,7 +124,8 @@ typedef enum {
MPIDI_CH3_PKT_FLAG_RMA_AT_COMPLETE = 16,
MPIDI_CH3_PKT_FLAG_RMA_NOCHECK = 32,
MPIDI_CH3_PKT_FLAG_RMA_SHARED = 64,
MPIDI_CH3_PKT_FLAG_RMA_EXCLUSIVE = 128
MPIDI_CH3_PKT_FLAG_RMA_EXCLUSIVE = 128,
MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK = 256
} MPIDI_CH3_Pkt_flags_t;
typedef struct MPIDI_CH3_Pkt_send {
......@@ -268,6 +269,10 @@ typedef struct MPIDI_CH3_Pkt_get {
typedef struct MPIDI_CH3_Pkt_get_resp {
MPIDI_CH3_Pkt_type_t type;
MPI_Request request_handle;
/* followings are used to decrement ack_counter at origin */
int target_rank;
MPI_Win source_win_handle;
MPIDI_CH3_Pkt_flags_t flags;
} MPIDI_CH3_Pkt_get_resp_t;
typedef struct MPIDI_CH3_Pkt_accum {
......@@ -308,6 +313,10 @@ typedef struct MPIDI_CH3_Pkt_get_accum {
typedef struct MPIDI_CH3_Pkt_get_accum_resp {
MPIDI_CH3_Pkt_type_t type;
MPI_Request request_handle;
/* followings are used to decrement ack_counter at origin */
int target_rank;
MPI_Win source_win_handle;
MPIDI_CH3_Pkt_flags_t flags;
} MPIDI_CH3_Pkt_get_accum_resp_t;
typedef struct MPIDI_CH3_Pkt_accum_immed {
......@@ -348,6 +357,10 @@ typedef struct MPIDI_CH3_Pkt_cas_resp {
MPIDI_CH3_Pkt_type_t type;
MPI_Request request_handle;
MPIDI_CH3_CAS_Immed_u data;
/* followings are used to decrement ack_counter at orign */
int target_rank;
MPI_Win source_win_handle;
MPIDI_CH3_Pkt_flags_t flags;
} MPIDI_CH3_Pkt_cas_resp_t;
typedef struct MPIDI_CH3_Pkt_fop {
......@@ -369,6 +382,10 @@ typedef struct MPIDI_CH3_Pkt_fop_resp {
MPIDI_CH3_Pkt_type_t type;
MPI_Request request_handle;
int data[MPIDI_RMA_FOP_RESP_IMMED_INTS];
/* followings are used to decrement ack_counter at orign */
int target_rank;
MPI_Win source_win_handle;
MPIDI_CH3_Pkt_flags_t flags;
} MPIDI_CH3_Pkt_fop_resp_t;
typedef struct MPIDI_CH3_Pkt_lock {
......
......@@ -226,6 +226,34 @@ static inline int acquire_local_lock(MPID_Win * win_ptr, int lock_type)
}
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_RMA_Handle_flush_ack
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static inline int MPIDI_CH3I_RMA_Handle_flush_ack(MPID_Win * win_ptr, int target_rank)
{
int mpi_errno = MPI_SUCCESS;
MPIDI_RMA_Target_t *t;
mpi_errno = MPIDI_CH3I_Win_find_target(win_ptr, target_rank, &t);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
if (t == NULL) {
win_ptr->outstanding_unlocks--;
MPIU_Assert(win_ptr->outstanding_unlocks >= 0);
}
else {
t->sync.outstanding_acks--;
MPIU_Assert(t->sync.outstanding_acks >= 0);
}
fn_exit:
return mpi_errno;
fn_fail:
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME do_accumulate_op
#undef FCNAME
......
......@@ -88,6 +88,12 @@ int MPIDI_CH3_ReqHandler_PutRecvComplete( MPIDI_VC_t *vc,
MPID_Win_get_ptr(rreq->dev.target_win_handle, win_ptr);
if (rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) {
mpi_errno = MPIDI_CH3I_Send_flush_ack_pkt(vc, win_ptr, rreq->dev.source_win_handle);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
MPIDI_CH3_Progress_signal_completion();
}
mpi_errno = MPIDI_CH3_Finish_rma_op_target(vc, win_ptr, TRUE, rreq->dev.flags,
rreq->dev.source_win_handle);
if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }
......@@ -134,6 +140,12 @@ int MPIDI_CH3_ReqHandler_AccumRecvComplete( MPIDI_VC_t *vc,
MPIU_ERR_POP(mpi_errno);
}
if (rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) {
mpi_errno = MPIDI_CH3I_Send_flush_ack_pkt(vc, win_ptr, rreq->dev.source_win_handle);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
MPIDI_CH3_Progress_signal_completion();
}
mpi_errno = MPIDI_CH3_Finish_rma_op_target(vc, win_ptr, TRUE, rreq->dev.flags,
rreq->dev.source_win_handle);
if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }
......@@ -178,6 +190,9 @@ int MPIDI_CH3_ReqHandler_GaccumRecvComplete( MPIDI_VC_t *vc,
get_accum_resp_pkt->request_handle = rreq->dev.resp_request_handle;
get_accum_resp_pkt->target_rank = win_ptr->comm_ptr->rank;
get_accum_resp_pkt->source_win_handle = rreq->dev.source_win_handle;
get_accum_resp_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH)
get_accum_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK;
MPID_Datatype_get_size_macro(rreq->dev.datatype, type_size);
......@@ -459,10 +474,13 @@ int MPIDI_CH3_ReqHandler_GetDerivedDTRecvComplete( MPIDI_VC_t *vc,
MPIDI_CH3_Pkt_t upkt;
MPIDI_CH3_Pkt_get_resp_t * get_resp_pkt = &upkt.get_resp;
MPID_Request * sreq;
MPID_Win *win_ptr;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_GETDERIVEDDTRECVCOMPLETE);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_GETDERIVEDDTRECVCOMPLETE);
MPID_Win_get_ptr(rreq->dev.target_win_handle, win_ptr);
/* create derived datatype */
create_derived_datatype(rreq, &new_dtp);
MPIU_Free(rreq->dev.dtype_info);
......@@ -485,6 +503,11 @@ int MPIDI_CH3_ReqHandler_GetDerivedDTRecvComplete( MPIDI_VC_t *vc,
MPIDI_Pkt_init(get_resp_pkt, MPIDI_CH3_PKT_GET_RESP);
get_resp_pkt->request_handle = rreq->dev.request_handle;
get_resp_pkt->target_rank = win_ptr->comm_ptr->rank;
get_resp_pkt->source_win_handle = rreq->dev.source_win_handle;
get_resp_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH)
get_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK;
sreq->dev.segment_ptr = MPID_Segment_alloc( );
MPIU_ERR_CHKANDJUMP1((sreq->dev.segment_ptr == NULL), mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s", "MPID_Segment_alloc");
......@@ -628,6 +651,11 @@ int MPIDI_CH3_ReqHandler_FOPComplete( MPIDI_VC_t *vc,
fop_resp_pkt->request_handle = rreq->dev.request_handle;
MPID_Win_get_ptr(rreq->dev.target_win_handle, win_ptr);
fop_resp_pkt->target_rank = win_ptr->comm_ptr->rank;
fop_resp_pkt->source_win_handle = rreq->dev.source_win_handle;
fop_resp_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH)
fop_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK;
/* Copy original data into the send buffer. If data will fit in the
header, use that. Otherwise allocate a temporary buffer. */
......
......@@ -219,6 +219,11 @@ int MPIDI_CH3_PktHandler_Get(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
MPIDI_Pkt_init(get_resp_pkt, MPIDI_CH3_PKT_GET_RESP);
get_resp_pkt->request_handle = get_pkt->request_handle;
get_resp_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (get_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH)
get_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK;
get_resp_pkt->target_rank = win_ptr->comm_ptr->rank;
get_resp_pkt->source_win_handle = get_pkt->source_win_handle;
iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) get_resp_pkt;
iov[0].MPID_IOV_LEN = sizeof(*get_resp_pkt);
......@@ -703,6 +708,11 @@ int MPIDI_CH3_PktHandler_CAS(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
MPIDI_Pkt_init(cas_resp_pkt, MPIDI_CH3_PKT_CAS_RESP);
cas_resp_pkt->request_handle = cas_pkt->request_handle;
cas_resp_pkt->source_win_handle = cas_pkt->source_win_handle;
cas_resp_pkt->target_rank = win_ptr->comm_ptr->rank;
cas_resp_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (cas_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH)
cas_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK;
/* Copy old value into the response packet */
MPID_Datatype_get_size_macro(cas_pkt->datatype, len);
......@@ -777,12 +787,22 @@ int MPIDI_CH3_PktHandler_CASResp(MPIDI_VC_t * vc ATTRIBUTE((unused)),
MPIDI_CH3_Pkt_cas_resp_t *cas_resp_pkt = &pkt->cas_resp;
MPID_Request *req;
MPI_Aint len;
MPID_Win *win_ptr;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_CASRESP);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_CASRESP);
MPIU_DBG_MSG(CH3_OTHER, VERBOSE, "received CAS response pkt");
MPID_Win_get_ptr(cas_resp_pkt->source_win_handle, win_ptr);
/* decrement ack_counter on this target */
if (cas_resp_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK) {
int target_rank = cas_resp_pkt->target_rank;
mpi_errno = MPIDI_CH3I_RMA_Handle_flush_ack(win_ptr, target_rank);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
}
MPID_Request_get_ptr(cas_resp_pkt->request_handle, req);
MPID_Datatype_get_size_macro(req->dev.datatype, len);
......@@ -837,6 +857,10 @@ int MPIDI_CH3_PktHandler_FOP(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.target_win_handle = fop_pkt->target_win_handle;
req->dev.request_handle = fop_pkt->request_handle;
req->dev.flags = fop_pkt->flags;
/* fop_pkt->source_win_handle is set in MPIDI_Fetch_and_op,
here we pass it to receiving request, so that after receiving
is finished, we can pass it to sending back pkt. */
req->dev.source_win_handle = fop_pkt->source_win_handle;
MPID_Datatype_get_size_macro(req->dev.datatype, len);
MPIU_Assert(len <= sizeof(MPIDI_CH3_FOP_Immed_u));
......@@ -906,12 +930,22 @@ int MPIDI_CH3_PktHandler_FOPResp(MPIDI_VC_t * vc ATTRIBUTE((unused)),
MPID_Request *req;
int complete = 0;
MPI_Aint len;
MPID_Win *win_ptr;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_FOPRESP);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_FOPRESP);
MPIU_DBG_MSG(CH3_OTHER, VERBOSE, "received FOP response pkt");
MPID_Win_get_ptr(fop_resp_pkt->source_win_handle, win_ptr);
/* decrement ack_counter */
if (fop_resp_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK) {
int target_rank = fop_resp_pkt->target_rank;
mpi_errno = MPIDI_CH3I_RMA_Handle_flush_ack(win_ptr, target_rank);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
MPID_Request_get_ptr(fop_resp_pkt->request_handle, req);
MPID_Datatype_get_size_macro(req->dev.datatype, len);
......@@ -967,12 +1001,22 @@ int MPIDI_CH3_PktHandler_Get_AccumResp(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
MPIDI_msg_sz_t data_len;
int mpi_errno = MPI_SUCCESS;
MPI_Aint type_size;
MPID_Win *win_ptr;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_GET_ACCUM_RESP);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_GET_ACCUM_RESP);
MPIU_DBG_MSG(CH3_OTHER, VERBOSE, "received Get-Accumulate response pkt");
MPID_Win_get_ptr(get_accum_resp_pkt->source_win_handle, win_ptr);
/* decrement ack_counter on target */
if (get_accum_resp_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK) {
int target_rank = get_accum_resp_pkt->target_rank;
mpi_errno = MPIDI_CH3I_RMA_Handle_flush_ack(win_ptr, target_rank);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t);
data_buf = (char *) pkt + sizeof(MPIDI_CH3_Pkt_t);
......@@ -1444,12 +1488,22 @@ int MPIDI_CH3_PktHandler_GetResp(MPIDI_VC_t * vc ATTRIBUTE((unused)),
MPIDI_msg_sz_t data_len;
int mpi_errno = MPI_SUCCESS;
MPI_Aint type_size;
MPID_Win *win_ptr;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_GETRESP);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_GETRESP);
MPIU_DBG_MSG(CH3_OTHER, VERBOSE, "received get response pkt");
MPID_Win_get_ptr(get_resp_pkt->source_win_handle, win_ptr);
/* decrement ack_counter on target */
if (get_resp_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK) {
int target_rank = get_resp_pkt->target_rank;
mpi_errno = MPIDI_CH3I_RMA_Handle_flush_ack(win_ptr, target_rank);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t);
data_buf = (char *) pkt + sizeof(MPIDI_CH3_Pkt_t);
......@@ -1513,6 +1567,8 @@ int MPIDI_CH3_PktHandler_FlushAck(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
{
MPIDI_CH3_Pkt_flush_ack_t *flush_ack_pkt = &pkt->flush_ack;
MPID_Win *win_ptr = NULL;
int mpi_errno = MPI_SUCCESS;
int target_rank = flush_ack_pkt->target_rank;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_FLUSHACK);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_FLUSHACK);
......@@ -1522,6 +1578,11 @@ int MPIDI_CH3_PktHandler_FlushAck(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
*buflen = sizeof(MPIDI_CH3_Pkt_t);
MPID_Win_get_ptr(flush_ack_pkt->source_win_handle, win_ptr);
/* decrement ack_counter on target */
mpi_errno = MPIDI_CH3I_RMA_Handle_flush_ack(win_ptr, target_rank);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
MPIU_Assert(win_ptr->targets[flush_ack_pkt->target_rank].remote_lock_state !=
MPIDI_CH3_WIN_LOCK_NONE);
......@@ -1538,6 +1599,8 @@ int MPIDI_CH3_PktHandler_FlushAck(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PKTHANDLER_FLUSHACK);
fn_exit:
return MPI_SUCCESS;
fn_fail:
goto fn_exit;
}
......@@ -1594,6 +1657,11 @@ int MPIDI_CH3_PktHandler_Flush(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
*buflen = sizeof(MPIDI_CH3_Pkt_t);
*rreqp = NULL;
MPID_Win_get_ptr(flush_pkt->target_win_handle, win_ptr);
mpi_errno = MPIDI_CH3I_Send_flush_ack_pkt(vc, win_ptr, flush_pkt->source_win_handle);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
/* This is a flush request packet */
if (flush_pkt->target_win_handle != MPI_WIN_NULL) {
MPID_Request *req = NULL;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment