Commit 421f4359 authored by Xin Zhao's avatar Xin Zhao Committed by Pavan Balaji
Browse files

Reset flags for stream unit within one RMA operation.



For all stream units within one RMA operation, we only
needs to piggyback flags for the first operation to the
first stream unit, and piggyback flags for the last
operation to the last stream unit.

Note that for operations piggybacked with LOCK flag, we
should just issue the first stream unit, and wait until
we receive ACK from the target to decide if we continue
to issue the following units, or  re-transmit the first
unit.
Signed-off-by: Pavan Balaji's avatarPavan Balaji <balaji@anl.gov>
parent 0641e2f1
......@@ -712,6 +712,16 @@ static int issue_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
accum_pkt->flags |= flags;
if (j != 0) {
accum_pkt->flags &= ~MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED;
accum_pkt->flags &= ~MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE;
}
if (j != stream_unit_count - 1) {
accum_pkt->flags &= ~MPIDI_CH3_PKT_FLAG_RMA_UNLOCK;
accum_pkt->flags &= ~MPIDI_CH3_PKT_FLAG_RMA_FLUSH;
accum_pkt->flags &= ~MPIDI_CH3_PKT_FLAG_RMA_DECR_AT_COUNTER;
}
stream_offset = j * stream_elem_count * predefined_dtp_size;
stream_size = MPIR_MIN(stream_elem_count * predefined_dtp_size, rest_len);
rest_len -= stream_size;
......@@ -735,6 +745,14 @@ static int issue_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
rma_op->reqs[j] = curr_req;
win_ptr->active_req_cnt++;
}
if (accum_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
accum_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE) {
/* if piggybacked with LOCK flag, we
* only issue the first streaming unit */
MPIU_Assert(j == 0);
break;
}
} /* end of for loop */
fn_exit:
......@@ -866,6 +884,16 @@ static int issue_get_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
get_accum_pkt->flags |= flags;
if (j != 0) {
get_accum_pkt->flags &= ~MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED;
get_accum_pkt->flags &= ~MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE;
}
if (j != stream_unit_count - 1) {
get_accum_pkt->flags &= ~MPIDI_CH3_PKT_FLAG_RMA_UNLOCK;
get_accum_pkt->flags &= ~MPIDI_CH3_PKT_FLAG_RMA_FLUSH;
get_accum_pkt->flags &= ~MPIDI_CH3_PKT_FLAG_RMA_DECR_AT_COUNTER;
}
/* Create a request for the GACC response. Store the response buf, count, and
* datatype in it, and pass the request's handle in the GACC packet. When the
* response comes from the target, it will contain the request handle. */
......@@ -930,6 +958,14 @@ static int issue_get_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
rma_op->reqs[j] = curr_req;
win_ptr->active_req_cnt++;
if (get_accum_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
get_accum_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE) {
/* if piggybacked with LOCK flag, we
* only issue the first streaming unit */
MPIU_Assert(j == 0);
break;
}
} /* end of for loop */
fn_exit:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment