Commit c986b927 authored by Xin Zhao's avatar Xin Zhao Committed by Pavan Balaji
Browse files

Add counter in op struct to remember number of stream units issued.



Add a counter in op struct to remember number of stream units
that have already been issued. For example, when the first stream
unit piggybacked with LOCK is issued out, we temporarily stop
issuing the following units. After the origin receives the ACK
from the target, it can continue to issue the following units.
This counter helps avoid issuing the first unit again.
Signed-off-by: Pavan Balaji's avatarPavan Balaji <balaji@anl.gov>
parent 421f4359
......@@ -706,10 +706,14 @@ static int issue_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
MPIU_Assert(stream_elem_count > 0 && stream_unit_count > 0);
rest_len = total_len;
MPIU_Assert(rma_op->issued_stream_count >= 0);
for (j = 0; j < stream_unit_count; j++) {
MPIDI_msg_sz_t stream_offset, stream_size;
MPID_Request *curr_req = NULL;
if (j < rma_op->issued_stream_count)
continue;
accum_pkt->flags |= flags;
if (j != 0) {
......@@ -746,6 +750,8 @@ static int issue_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
win_ptr->active_req_cnt++;
}
rma_op->issued_stream_count++;
if (accum_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
accum_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE) {
/* if piggybacked with LOCK flag, we
......@@ -755,6 +761,9 @@ static int issue_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
}
} /* end of for loop */
/* Mark that all stream units have been issued */
rma_op->issued_stream_count = -1;
fn_exit:
MPIDI_RMA_FUNC_EXIT(MPID_STATE_ISSUE_ACC_OP);
return mpi_errno;
......@@ -877,11 +886,16 @@ static int issue_get_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
for (i = 0; i < rma_op->reqs_size; i++)
rma_op->reqs[i] = NULL;
MPIU_Assert(rma_op->issued_stream_count >= 0);
for (j = 0; j < stream_unit_count; j++) {
MPIDI_msg_sz_t stream_offset, stream_size;
MPID_Request *resp_req = NULL;
MPID_Request *curr_req = NULL;
if (j < rma_op->issued_stream_count)
continue;
get_accum_pkt->flags |= flags;
if (j != 0) {
......@@ -959,6 +973,8 @@ static int issue_get_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
rma_op->reqs[j] = curr_req;
win_ptr->active_req_cnt++;
rma_op->issued_stream_count++;
if (get_accum_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
get_accum_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE) {
/* if piggybacked with LOCK flag, we
......@@ -968,6 +984,9 @@ static int issue_get_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
}
} /* end of for loop */
/* Mark that all stream units have been issued */
rma_op->issued_stream_count = -1;
fn_exit:
MPIDI_RMA_FUNC_EXIT(MPID_STATE_ISSUE_GET_ACC_OP);
return mpi_errno;
......
......@@ -52,6 +52,7 @@ static inline MPIDI_RMA_Op_t *MPIDI_CH3I_Win_op_alloc(MPID_Win * win_ptr)
e->ureq = NULL;
e->is_dt = 0;
e->piggyback_lock_candidate = 0;
e->issued_stream_count = 0;
return e;
}
......
......@@ -73,6 +73,9 @@ typedef struct MPIDI_RMA_Op {
int is_dt;
int piggyback_lock_candidate;
int issued_stream_count; /* when >= 0, it specifies number of stream units that have been issued;
* when < 0, it means all stream units of this operation haven been issued. */
MPID_Request *ureq;
} MPIDI_RMA_Op_t;
......
......@@ -655,6 +655,9 @@ static inline int adjust_op_piggybacked_with_lock(MPID_Win * win_ptr,
MPIDI_CH3_PKT_RMA_ERASE_FLAGS(op->pkt, mpi_errno);
target->next_op_to_issue = op;
op->issued_stream_count = 0;
if (op_flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH)
target->sync.sync_flag = MPIDI_RMA_SYNC_FLUSH;
else if (op_flags & MPIDI_RMA_SYNC_UNLOCK)
......
......@@ -370,7 +370,16 @@ static inline int issue_ops_target(MPID_Win * win_ptr, MPIDI_RMA_Target_t * targ
* PUT/ACC operation. */
}
target->next_op_to_issue = curr_op->next;
if ((curr_op->pkt.type == MPIDI_CH3_PKT_ACCUMULATE ||
curr_op->pkt.type == MPIDI_CH3_PKT_GET_ACCUM) && curr_op->issued_stream_count > 0) {
/* For ACC-like operations, if not all stream units
* are issued out, we stick to the current operation,
* otherwise we move on to the next operation. */
target->next_op_to_issue = curr_op;
}
else
target->next_op_to_issue = curr_op->next;
if (target->next_op_to_issue == NULL) {
if (flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH || flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK) {
/* We are done with ending sync, unset target's sync_flag. */
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment