Commit 81e2b274 authored by Xin Zhao's avatar Xin Zhao
Browse files

Remove lock_type and origin_rank areas from RMA packet.



Originally we added lock_type and origin_rank areas
in RMA packet, in order to piggyback passive lock request
with RMA operations. However, those areas potentially
enlarged the packet union size, and actually they are
not necessary and can be completetly avoided.

"Lock_type" is used to remember what types of lock (shared or
exclusive) the origin wants to acquire on the target. To remove
it from RMA packet, we use flags (already exists in RMA packet)
to remember such information.

"Origin_rank" is used to remember which origin has sent lock
request to the target, so that when the lock is granted to this
origin later, the target can send ack to that origin. Actually
the target does not need to store origin_rank but can only store
origin_vc, which is known from progress engine on target side.
Therefore, we can completely remove origin_rank from RMA packet.
Signed-off-by: Pavan Balaji's avatarPavan Balaji <balaji@anl.gov>
parent d46b848a
......@@ -339,8 +339,6 @@ static int issue_put_op(MPIDI_RMA_Op_t * rma_op, MPID_Win *win_ptr,
rma_op->request = NULL;
put_pkt->flags |= flags;
if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK)
put_pkt->lock_type = target_ptr->lock_type;
MPID_Datatype_get_size_macro(rma_op->origin_datatype, origin_type_size);
MPIU_Assign_trunc(len, rma_op->origin_count * origin_type_size, size_t);
......@@ -413,9 +411,6 @@ static int issue_acc_op(MPIDI_RMA_Op_t *rma_op, MPID_Win *win_ptr,
accum_pkt->flags |= flags;
if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK)
accum_pkt->lock_type = target_ptr->lock_type;
MPID_Datatype_get_size_macro(rma_op->origin_datatype, origin_type_size);
MPIU_Assign_trunc(len, rma_op->origin_count * origin_type_size, size_t);
......@@ -516,9 +511,6 @@ static int issue_get_acc_op(MPIDI_RMA_Op_t *rma_op, MPID_Win *win_ptr,
get_accum_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_IMMED_RESP;
}
if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK)
get_accum_pkt->lock_type = target_ptr->lock_type;
MPID_Datatype_get_size_macro(rma_op->origin_datatype, origin_type_size);
MPIU_Assign_trunc(len, rma_op->origin_count * origin_type_size, size_t);
......@@ -650,9 +642,6 @@ static int issue_get_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
get_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_IMMED_RESP;
}
if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK)
get_pkt->lock_type = target_ptr->lock_type;
comm_ptr = win_ptr->comm_ptr;
MPIDI_Comm_get_vc_set_active(comm_ptr, rma_op->target_rank, &vc);
......@@ -750,8 +739,6 @@ static int issue_cas_op(MPIDI_RMA_Op_t * rma_op,
cas_pkt->request_handle = rma_op->request->handle;
cas_pkt->flags |= flags;
if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK)
cas_pkt->lock_type = target_ptr->lock_type;
MPIU_Memcpy((void *) &cas_pkt->origin_data, rma_op->origin_addr, len);
MPIU_Memcpy((void *) &cas_pkt->compare_data, rma_op->compare_addr, len);
......@@ -821,8 +808,6 @@ static int issue_fop_op(MPIDI_RMA_Op_t * rma_op,
fop_pkt->request_handle = resp_req->handle;
fop_pkt->flags |= flags;
if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK)
fop_pkt->lock_type = target_ptr->lock_type;
MPID_Datatype_get_size_macro(rma_op->origin_datatype, origin_type_size);
MPIU_Assign_trunc(len, rma_op->origin_count * origin_type_size, size_t);
......
......@@ -32,6 +32,7 @@ static inline MPIDI_RMA_Lock_entry_t *MPIDI_CH3I_Win_lock_entry_alloc(MPID_Win *
if (new_ptr != NULL) {
new_ptr->next = NULL;
new_ptr->pkt = (*pkt);
new_ptr->vc = NULL;
new_ptr->data = NULL;
new_ptr->data_size = 0;
new_ptr->all_data_recved = 0;
......
......@@ -131,6 +131,7 @@ extern MPIDI_RMA_Win_list_t *MPIDI_RMA_Win_list, *MPIDI_RMA_Win_list_tail;
typedef struct MPIDI_RMA_Lock_entry {
struct MPIDI_RMA_Lock_entry *next;
MPIDI_CH3_Pkt_t pkt; /* all information for this request packet */
MPIDI_VC_t *vc;
void *data; /* for queued PUTs / ACCs / GACCs, data is copied here */
int data_size;
int all_data_recved; /* indicate if all data has been received */
......
......@@ -109,21 +109,20 @@ typedef enum {
/* These flags can be "OR'ed" together */
typedef enum {
MPIDI_CH3_PKT_FLAG_NONE = 0,
MPIDI_CH3_PKT_FLAG_RMA_LOCK = 1,
MPIDI_CH3_PKT_FLAG_RMA_UNLOCK = 2,
MPIDI_CH3_PKT_FLAG_RMA_FLUSH = 4,
MPIDI_CH3_PKT_FLAG_RMA_REQ_ACK = 8,
MPIDI_CH3_PKT_FLAG_RMA_DECR_AT_COUNTER = 16,
MPIDI_CH3_PKT_FLAG_RMA_NOCHECK = 32,
MPIDI_CH3_PKT_FLAG_RMA_SHARED = 64,
MPIDI_CH3_PKT_FLAG_RMA_EXCLUSIVE = 128,
MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK = 256,
MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED = 512,
MPIDI_CH3_PKT_FLAG_RMA_LOCK_QUEUED_DATA_QUEUED = 1024,
MPIDI_CH3_PKT_FLAG_RMA_LOCK_QUEUED_DATA_DISCARDED = 2048,
MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED = 4096,
MPIDI_CH3_PKT_FLAG_RMA_UNLOCK_NO_ACK = 8192,
MPIDI_CH3_PKT_FLAG_RMA_IMMED_RESP = 16384
MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED = 1,
MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE = 2,
MPIDI_CH3_PKT_FLAG_RMA_UNLOCK = 4,
MPIDI_CH3_PKT_FLAG_RMA_FLUSH = 8,
MPIDI_CH3_PKT_FLAG_RMA_REQ_ACK = 16,
MPIDI_CH3_PKT_FLAG_RMA_DECR_AT_COUNTER = 32,
MPIDI_CH3_PKT_FLAG_RMA_NOCHECK = 64,
MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK = 128,
MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED = 256,
MPIDI_CH3_PKT_FLAG_RMA_LOCK_QUEUED_DATA_QUEUED = 512,
MPIDI_CH3_PKT_FLAG_RMA_LOCK_QUEUED_DATA_DISCARDED = 1024,
MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED = 2048,
MPIDI_CH3_PKT_FLAG_RMA_UNLOCK_NO_ACK = 4096,
MPIDI_CH3_PKT_FLAG_RMA_IMMED_RESP = 8192
} MPIDI_CH3_Pkt_flags_t;
typedef struct MPIDI_CH3_Pkt_send {
......@@ -330,72 +329,6 @@ MPIDI_CH3_PKT_DEFS
} \
}
#define MPIDI_CH3_PKT_RMA_GET_LOCK_TYPE(pkt_, lock_type_, err_) \
{ \
/* This macro returns lock type in RMA operation \
packets (PUT, GET, ACC, GACC, FOP, CAS) and RMA control \
packets (LOCK). */ \
err_ = MPI_SUCCESS; \
switch((pkt_).type) { \
case (MPIDI_CH3_PKT_PUT): \
lock_type_ = (pkt_).put.lock_type; \
break; \
case (MPIDI_CH3_PKT_GET): \
lock_type_ = (pkt_).get.lock_type; \
break; \
case (MPIDI_CH3_PKT_ACCUMULATE): \
lock_type_ = (pkt_).accum.lock_type; \
break; \
case (MPIDI_CH3_PKT_GET_ACCUM): \
lock_type_ = (pkt_).get_accum.lock_type; \
break; \
case (MPIDI_CH3_PKT_CAS): \
lock_type_ = (pkt_).cas.lock_type; \
break; \
case (MPIDI_CH3_PKT_FOP): \
lock_type_ = (pkt_).fop.lock_type; \
break; \
case (MPIDI_CH3_PKT_LOCK): \
lock_type_ = (pkt_).lock.lock_type; \
break; \
default: \
MPIU_ERR_SETANDJUMP1(err_, MPI_ERR_OTHER, "**invalidpkt", "**invalidpkt %d", (pkt_).type); \
} \
}
#define MPIDI_CH3_PKT_RMA_GET_ORIGIN_RANK(pkt_, origin_rank_, err_) \
{ \
/* This macro returns origin rank (used in acquiring lock) in \
RMA operation packets (PUT, GET, ACC, GACC, FOP, CAS) and \
RMA control packets (LOCK). */ \
err_ = MPI_SUCCESS; \
switch((pkt_).type) { \
case (MPIDI_CH3_PKT_PUT): \
origin_rank_ = (pkt_).put.origin_rank; \
break; \
case (MPIDI_CH3_PKT_GET): \
origin_rank_ = (pkt_).get.origin_rank; \
break; \
case (MPIDI_CH3_PKT_ACCUMULATE): \
origin_rank_ = (pkt_).accum.origin_rank; \
break; \
case (MPIDI_CH3_PKT_GET_ACCUM): \
origin_rank_ = (pkt_).get_accum.origin_rank; \
break; \
case (MPIDI_CH3_PKT_CAS): \
origin_rank_ = (pkt_).cas.origin_rank; \
break; \
case (MPIDI_CH3_PKT_FOP): \
origin_rank_ = (pkt_).fop.origin_rank; \
break; \
case (MPIDI_CH3_PKT_LOCK): \
origin_rank_ = (pkt_).lock.origin_rank; \
break; \
default: \
MPIU_ERR_SETANDJUMP1(err_, MPI_ERR_OTHER, "**invalidpkt", "**invalidpkt %d", (pkt_).type); \
} \
}
#define MPIDI_CH3_PKT_RMA_GET_FLAGS(pkt_, flags_, err_) \
{ \
/* This macro returns flags in RMA operation packets (PUT, GET, \
......@@ -435,6 +368,9 @@ MPIDI_CH3_PKT_DEFS
case (MPIDI_CH3_PKT_CAS_RESP): \
flags_ = (pkt_).cas_resp.flags; \
break; \
case (MPIDI_CH3_PKT_LOCK): \
flags_ = (pkt_).lock.flags; \
break; \
case (MPIDI_CH3_PKT_UNLOCK): \
flags_ = (pkt_).unlock.flags; \
break; \
......@@ -488,6 +424,9 @@ MPIDI_CH3_PKT_DEFS
case (MPIDI_CH3_PKT_CAS_RESP): \
(pkt_).cas_resp.flags = MPIDI_CH3_PKT_FLAG_NONE; \
break; \
case (MPIDI_CH3_PKT_LOCK): \
(pkt_).lock.flags = MPIDI_CH3_PKT_FLAG_NONE; \
break; \
case (MPIDI_CH3_PKT_UNLOCK): \
(pkt_).unlock.flags = MPIDI_CH3_PKT_FLAG_NONE; \
break; \
......@@ -646,8 +585,6 @@ typedef struct MPIDI_CH3_Pkt_put {
* with shared locks. Otherwise set to NULL*/
char data[MPIDI_RMA_IMMED_BYTES];
int immed_len;
int lock_type; /* used when piggybacking LOCK message. */
int origin_rank; /* used when piggybacking LOCK message. */
} MPIDI_CH3_Pkt_put_t;
typedef struct MPIDI_CH3_Pkt_get {
......@@ -665,8 +602,6 @@ typedef struct MPIDI_CH3_Pkt_get {
MPI_Win source_win_handle; /* Used in the last RMA operation in an
* epoch in the case of passive target rma
* with shared locks. Otherwise set to NULL*/
int lock_type; /* used when piggybacking LOCK message. */
int origin_rank; /* used when piggybacking LOCK message. */
} MPIDI_CH3_Pkt_get_t;
typedef struct MPIDI_CH3_Pkt_get_resp {
......@@ -698,8 +633,6 @@ typedef struct MPIDI_CH3_Pkt_accum {
* with shared locks. Otherwise set to NULL*/
char data[MPIDI_RMA_IMMED_BYTES];
int immed_len;
int lock_type; /* used when piggybacking LOCK message. */
int origin_rank; /* used when piggybacking LOCK message. */
} MPIDI_CH3_Pkt_accum_t;
typedef struct MPIDI_CH3_Pkt_get_accum {
......@@ -720,8 +653,6 @@ typedef struct MPIDI_CH3_Pkt_get_accum {
* with shared locks. Otherwise set to NULL*/
char data[MPIDI_RMA_IMMED_BYTES];
int immed_len;
int lock_type; /* used when piggybacking LOCK message. */
int origin_rank; /* used when piggybacking LOCK message. */
} MPIDI_CH3_Pkt_get_accum_t;
typedef struct MPIDI_CH3_Pkt_get_accum_resp {
......@@ -749,8 +680,6 @@ typedef struct MPIDI_CH3_Pkt_cas {
* in passive target rma. Otherwise set to NULL*/
MPIDI_CH3_CAS_Immed_u origin_data;
MPIDI_CH3_CAS_Immed_u compare_data;
int lock_type; /* used when piggybacking LOCK message. */
int origin_rank; /* used when piggybacking LOCK message. */
} MPIDI_CH3_Pkt_cas_t;
typedef struct MPIDI_CH3_Pkt_cas_resp {
......@@ -777,8 +706,6 @@ typedef struct MPIDI_CH3_Pkt_fop {
* in passive target rma. Otherwise set to NULL*/
char data[MPIDI_RMA_IMMED_BYTES];
int immed_len;
int lock_type; /* used when piggybacking LOCK message. */
int origin_rank; /* used when piggybacking LOCK message. */
} MPIDI_CH3_Pkt_fop_t;
typedef struct MPIDI_CH3_Pkt_fop_resp {
......@@ -794,10 +721,9 @@ typedef struct MPIDI_CH3_Pkt_fop_resp {
typedef struct MPIDI_CH3_Pkt_lock {
MPIDI_CH3_Pkt_type_t type;
int lock_type;
MPIDI_CH3_Pkt_flags_t flags;
MPI_Win target_win_handle;
MPI_Win source_win_handle;
int origin_rank;
} MPIDI_CH3_Pkt_lock_t;
typedef struct MPIDI_CH3_Pkt_unlock {
......
......@@ -32,8 +32,13 @@ static inline int send_lock_msg(int dest, int lock_type, MPID_Win * win_ptr)
MPIDI_Pkt_init(lock_pkt, MPIDI_CH3_PKT_LOCK);
lock_pkt->target_win_handle = win_ptr->all_win_handles[dest];
lock_pkt->source_win_handle = win_ptr->handle;
lock_pkt->lock_type = lock_type;
lock_pkt->origin_rank = win_ptr->comm_ptr->rank;
lock_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (lock_type == MPI_LOCK_SHARED)
lock_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED;
else {
MPIU_Assert(lock_type == MPI_LOCK_EXCLUSIVE);
lock_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE;
}
MPIU_THREAD_CS_ENTER(CH3COMM, vc);
mpi_errno = MPIDI_CH3_iStartMsg(vc, lock_pkt, sizeof(*lock_pkt), &req);
......@@ -321,6 +326,7 @@ static inline int enqueue_lock_origin(MPID_Win *win_ptr, MPIDI_VC_t *vc,
new_ptr = MPIDI_CH3I_Win_lock_entry_alloc(win_ptr, pkt);
if (new_ptr != NULL) {
MPL_LL_APPEND(win_ptr->lock_queue, win_ptr->lock_queue_tail, new_ptr);
new_ptr->vc = vc;
}
else {
lock_discarded = 1;
......@@ -386,18 +392,16 @@ static inline int enqueue_lock_origin(MPID_Win *win_ptr, MPIDI_VC_t *vc,
MPIDI_CH3_Pkt_t new_pkt;
MPIDI_CH3_Pkt_lock_t *lock_pkt = &new_pkt.lock;
MPI_Win target_win_handle;
int lock_type, origin_rank;
MPIDI_CH3_Pkt_flags_t flags;
MPIDI_CH3_PKT_RMA_GET_TARGET_WIN_HANDLE((*pkt), target_win_handle, mpi_errno);
MPIDI_CH3_PKT_RMA_GET_SOURCE_WIN_HANDLE((*pkt), source_win_handle, mpi_errno);
MPIDI_CH3_PKT_RMA_GET_ORIGIN_RANK((*pkt), origin_rank, mpi_errno);
MPIDI_CH3_PKT_RMA_GET_LOCK_TYPE((*pkt), lock_type, mpi_errno);
MPIDI_CH3_PKT_RMA_GET_FLAGS((*pkt), flags, mpi_errno);
MPIDI_Pkt_init(lock_pkt, MPIDI_CH3_PKT_LOCK);
lock_pkt->target_win_handle = target_win_handle;
lock_pkt->source_win_handle = source_win_handle;
lock_pkt->lock_type = lock_type;
lock_pkt->origin_rank = origin_rank;
lock_pkt->flags = flags;
/* replace original pkt with lock pkt */
new_ptr->pkt = new_pkt;
......@@ -571,7 +575,8 @@ static inline int adjust_op_piggybacked_with_lock (MPID_Win *win_ptr,
op = target->pending_op_list;
if (op != NULL) MPIDI_CH3_PKT_RMA_GET_FLAGS(op->pkt, op_flags, mpi_errno);
if (op_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK) {
if (op_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
op_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE) {
if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED ||
flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_QUEUED_DATA_QUEUED) {
if (!op->request) {
......@@ -664,10 +669,16 @@ static inline int acquire_local_lock(MPID_Win * win_ptr, int lock_type)
MPIDI_CH3_Pkt_t pkt;
MPIDI_CH3_Pkt_lock_t *lock_pkt = &pkt.lock;
MPIDI_RMA_Lock_entry_t *new_ptr = NULL;
MPIDI_VC_t *my_vc;
MPIDI_Pkt_init(lock_pkt, MPIDI_CH3_PKT_LOCK);
lock_pkt->lock_type = lock_type;
lock_pkt->origin_rank = win_ptr->comm_ptr->rank;
lock_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (lock_type == MPI_LOCK_SHARED)
lock_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED;
else {
MPIU_Assert(lock_type == MPI_LOCK_EXCLUSIVE);
lock_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE;
}
new_ptr = MPIDI_CH3I_Win_lock_entry_alloc(win_ptr, &pkt);
if (new_ptr == NULL) {
......@@ -677,6 +688,8 @@ static inline int acquire_local_lock(MPID_Win * win_ptr, int lock_type)
goto fn_exit;
}
MPL_LL_APPEND(win_ptr->lock_queue, win_ptr->lock_queue_tail, new_ptr);
MPIDI_Comm_get_vc_set_active(win_ptr->comm_ptr, win_ptr->comm_ptr->rank, &my_vc);
new_ptr->vc = my_vc;
new_ptr->all_data_recved = 1;
}
......@@ -834,9 +847,16 @@ static inline int check_piggyback_lock(MPID_Win *win_ptr, MPIDI_VC_t *vc,
(*reqp) = NULL;
MPIDI_CH3_PKT_RMA_GET_FLAGS((*pkt), flags, mpi_errno);
MPIDI_CH3_PKT_RMA_GET_LOCK_TYPE((*pkt), lock_type, mpi_errno);
if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE) {
if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED)
lock_type = MPI_LOCK_SHARED;
else {
MPIU_Assert(flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE);
lock_type = MPI_LOCK_EXCLUSIVE;
}
if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK) {
if (MPIDI_CH3I_Try_acquire_win_lock(win_ptr, lock_type) == 0) {
/* cannot acquire the lock, queue up this operation. */
mpi_errno = enqueue_lock_origin(win_ptr, vc, pkt, buflen, reqp);
......@@ -859,7 +879,8 @@ static inline int finish_op_on_target(MPID_Win *win_ptr, MPIDI_VC_t *vc,
if (type == MPIDI_CH3_PKT_PUT || type == MPIDI_CH3_PKT_ACCUMULATE) {
/* This is PUT or ACC */
if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK) {
if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE) {
MPIDI_CH3_Pkt_flags_t pkt_flags = MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED;
if ((flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) ||
(flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK))
......@@ -871,7 +892,8 @@ static inline int finish_op_on_target(MPID_Win *win_ptr, MPIDI_VC_t *vc,
MPIDI_CH3_Progress_signal_completion();
}
if (flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) {
if (!(flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK)) {
if (!(flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE)) {
/* If op is piggybacked with both LOCK and FLUSH,
we only send LOCK ACK back, do not send FLUSH ACK. */
mpi_errno = MPIDI_CH3I_Send_flush_ack_pkt(vc, win_ptr,
......@@ -888,7 +910,8 @@ static inline int finish_op_on_target(MPID_Win *win_ptr, MPIDI_VC_t *vc,
MPIDI_CH3_Progress_signal_completion();
}
if (flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK) {
if (!(flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK)) {
if (!(flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE)) {
/* If op is piggybacked with both LOCK and UNLOCK,
we only send LOCK ACK back, do not send FLUSH (UNLOCK) ACK. */
mpi_errno = MPIDI_CH3I_Send_flush_ack_pkt(vc, win_ptr,
......
......@@ -238,7 +238,8 @@ int MPIDI_CH3_ReqHandler_GaccumRecvComplete( MPIDI_VC_t *vc,
get_accum_resp_pkt->target_rank = win_ptr->comm_ptr->rank;
get_accum_resp_pkt->source_win_handle = rreq->dev.source_win_handle;
get_accum_resp_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK)
if (rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE)
get_accum_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED;
if ((rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) ||
(rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK))
......@@ -595,7 +596,8 @@ int MPIDI_CH3_ReqHandler_GetDerivedDTRecvComplete( MPIDI_VC_t *vc,
get_resp_pkt->target_rank = win_ptr->comm_ptr->rank;
get_resp_pkt->source_win_handle = rreq->dev.source_win_handle;
get_resp_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK)
if (rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE)
get_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED;
if ((rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) ||
(rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK))
......@@ -831,7 +833,6 @@ static int create_derived_datatype(MPID_Request *req, MPID_Datatype **dtp)
static inline int perform_put_in_lock_queue(MPID_Win *win_ptr, MPIDI_RMA_Lock_entry_t *lock_entry)
{
MPIDI_CH3_Pkt_put_t *put_pkt = &((lock_entry->pkt).put);
MPIDI_VC_t *vc = NULL;
int mpi_errno = MPI_SUCCESS;
if (lock_entry->data == NULL) {
......@@ -846,11 +847,8 @@ static inline int perform_put_in_lock_queue(MPID_Win *win_ptr, MPIDI_RMA_Lock_en
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
}
/* get vc object */
MPIDI_Comm_get_vc(win_ptr->comm_ptr, put_pkt->origin_rank, &vc);
/* do final action */
mpi_errno = finish_op_on_target(win_ptr, vc, MPIDI_CH3_PKT_PUT,
mpi_errno = finish_op_on_target(win_ptr, lock_entry->vc, MPIDI_CH3_PKT_PUT,
put_pkt->flags, put_pkt->source_win_handle);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
......@@ -866,7 +864,6 @@ static inline int perform_get_in_lock_queue(MPID_Win *win_ptr, MPIDI_RMA_Lock_en
MPIDI_CH3_Pkt_get_resp_t *get_resp_pkt = &upkt.get_resp;
MPIDI_CH3_Pkt_get_t *get_pkt = &((lock_entry->pkt).get);
MPID_Request *sreq = NULL;
MPIDI_VC_t *vc = NULL;
MPI_Aint type_size;
size_t len;
int iovcnt;
......@@ -894,7 +891,8 @@ static inline int perform_get_in_lock_queue(MPID_Win *win_ptr, MPIDI_RMA_Lock_en
MPIDI_Pkt_init(get_resp_pkt, MPIDI_CH3_PKT_GET_RESP);
get_resp_pkt->request_handle = get_pkt->request_handle;
get_resp_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (get_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK)
if (get_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
get_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE)
get_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED;
if ((get_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) ||
(get_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK))
......@@ -938,10 +936,7 @@ static inline int perform_get_in_lock_queue(MPID_Win *win_ptr, MPIDI_RMA_Lock_en
iovcnt = 2;
}
/* get vc object */
MPIDI_Comm_get_vc(win_ptr->comm_ptr, get_pkt->origin_rank, &vc);
mpi_errno = MPIDI_CH3_iSendv(vc, sreq, iov, iovcnt);
mpi_errno = MPIDI_CH3_iSendv(lock_entry->vc, sreq, iov, iovcnt);
if (mpi_errno != MPI_SUCCESS) {
MPID_Request_release(sreq);
MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
......@@ -957,7 +952,6 @@ static inline int perform_get_in_lock_queue(MPID_Win *win_ptr, MPIDI_RMA_Lock_en
static inline int perform_acc_in_lock_queue(MPID_Win *win_ptr, MPIDI_RMA_Lock_entry_t *lock_entry)
{
MPIDI_CH3_Pkt_accum_t *acc_pkt = &((lock_entry->pkt).accum);
MPIDI_VC_t *vc = NULL;
int mpi_errno = MPI_SUCCESS;
MPIU_Assert(lock_entry->all_data_recved == 1);
......@@ -980,10 +974,7 @@ static inline int perform_acc_in_lock_queue(MPID_Win *win_ptr, MPIDI_RMA_Lock_en
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
/* get vc object */
MPIDI_Comm_get_vc(win_ptr->comm_ptr, acc_pkt->origin_rank, &vc);
mpi_errno = finish_op_on_target(win_ptr, vc, MPIDI_CH3_PKT_ACCUMULATE,
mpi_errno = finish_op_on_target(win_ptr, lock_entry->vc, MPIDI_CH3_PKT_ACCUMULATE,
acc_pkt->flags, acc_pkt->source_win_handle);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
......@@ -1000,7 +991,6 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win *win_ptr, MPIDI_RMA_Loc
MPIDI_CH3_Pkt_get_accum_resp_t *get_accum_resp_pkt = &upkt.get_accum_resp;
MPIDI_CH3_Pkt_get_accum_t *get_accum_pkt = &((lock_entry->pkt).get_accum);
MPID_Request *sreq = NULL;
MPIDI_VC_t *vc = NULL;
MPI_Aint type_size;
size_t len;
int iovcnt;
......@@ -1065,7 +1055,8 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win *win_ptr, MPIDI_RMA_Loc
MPIDI_Pkt_init(get_accum_resp_pkt, MPIDI_CH3_PKT_GET_ACCUM_RESP);
get_accum_resp_pkt->request_handle = get_accum_pkt->request_handle;
get_accum_resp_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (get_accum_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK)
if (get_accum_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
get_accum_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE)
get_accum_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED;
if ((get_accum_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) ||
(get_accum_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK))
......@@ -1108,10 +1099,7 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win *win_ptr, MPIDI_RMA_Loc
iovcnt = 2;
}
/* get vc object */
MPIDI_Comm_get_vc(win_ptr->comm_ptr, get_accum_pkt->origin_rank, &vc);
mpi_errno = MPIDI_CH3_iSendv(vc, sreq, iov, iovcnt);
mpi_errno = MPIDI_CH3_iSendv(lock_entry->vc, sreq, iov, iovcnt);
if (mpi_errno != MPI_SUCCESS) {
MPID_Request_release(sreq);
MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
......@@ -1130,21 +1118,18 @@ static inline int perform_fop_in_lock_queue(MPID_Win *win_ptr, MPIDI_RMA_Lock_en
MPIDI_CH3_Pkt_fop_resp_t *fop_resp_pkt = &upkt.fop_resp;
MPIDI_CH3_Pkt_fop_t *fop_pkt = &((lock_entry->pkt).fop);
MPID_Request *resp_req = NULL;
MPIDI_VC_t *vc = NULL;
int mpi_errno = MPI_SUCCESS;
/* FIXME: this function is same with PktHandler_FOP(), should
do code refactoring on both of them. */
/* get vc object */
MPIDI_Comm_get_vc(win_ptr->comm_ptr, fop_pkt->origin_rank, &vc);
MPIDI_Pkt_init(fop_resp_pkt, MPIDI_CH3_PKT_FOP_RESP);
fop_resp_pkt->request_handle = fop_pkt->request_handle;
fop_resp_pkt->source_win_handle = fop_pkt->source_win_handle;
fop_resp_pkt->target_rank = win_ptr->comm_ptr->rank;
fop_resp_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (fop_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK)
if (fop_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
fop_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE)
fop_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED;
if ((fop_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) ||
(fop_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK))
......@@ -1170,9 +1155,9 @@ static inline int perform_fop_in_lock_queue(MPID_Win *win_ptr, MPIDI_RMA_Lock_en
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
/* send back the original data */
MPIU_THREAD_CS_ENTER(CH3COMM,vc);
mpi_errno = MPIDI_CH3_iStartMsg(vc, fop_resp_pkt, sizeof(*fop_resp_pkt), &resp_req);
MPIU_THREAD_CS_EXIT(CH3COMM,vc);
MPIU_THREAD_CS_ENTER(CH3COMM,lock_entry->vc);
mpi_errno = MPIDI_CH3_iStartMsg(lock_entry->vc, fop_resp_pkt, sizeof(*fop_resp_pkt), &resp_req);
MPIU_THREAD_CS_EXIT(CH3COMM,lock_entry->vc);
MPIU_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
if (resp_req != NULL) {
......@@ -1196,7 +1181,7 @@ static inline int perform_fop_in_lock_queue(MPID_Win *win_ptr, MPIDI_RMA_Lock_en
}
/* do final action */
mpi_errno = finish_op_on_target(win_ptr, vc, MPIDI_CH3_PKT_FOP,
mpi_errno = finish_op_on_target(win_ptr, lock_entry->vc, MPIDI_CH3_PKT_FOP,
fop_pkt->flags, fop_pkt->source_win_handle);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
......@@ -1213,19 +1198,16 @@ static inline int perform_cas_in_lock_queue(MPID_Win *win_ptr, MPIDI_RMA_Lock_en
MPIDI_CH3_Pkt_cas_resp_t *cas_resp_pkt = &upkt.cas_resp;
MPIDI_CH3_Pkt_cas_t *cas_pkt = &((lock_entry->pkt).cas);
MPID_Request *send_req = NULL;
MPIDI_VC_t *vc = NULL;
MPI_Aint len;
int mpi_errno = MPI_SUCCESS;
/* get vc object */
MPIDI_Comm_get_vc(win_ptr->comm_ptr, cas_pkt->origin_rank, &vc);
MPIDI_Pkt_init(cas_resp_pkt, MPIDI_CH3_PKT_CAS_RESP);
cas_resp_pkt->request_handle = cas_pkt->request_handle;
cas_resp_pkt->source_win_handle = cas_pkt->source_win_handle;
cas_resp_pkt->target_rank = win_ptr->comm_ptr->rank;
cas_resp_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (cas_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK)
if (cas_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
cas_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE)
cas_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED;
if ((cas_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) ||
(cas_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK))
......@@ -1249,9 +1231,9 @@ static inline int perform_cas_in_lock_queue(MPID_Win *win_ptr, MPIDI_RMA_Lock_en
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
/* Send the response packet */
MPIU_THREAD_CS_ENTER(CH3COMM, vc);
mpi_errno = MPIDI_CH3_iStartMsg(vc, cas_resp_pkt, sizeof(*cas_resp_pkt), &send_req);
MPIU_THREAD_CS_EXIT(CH3COMM, vc);
MPIU_THREAD_CS_ENTER(CH3COMM, lock_entry->vc);
mpi_errno = MPIDI_CH3_iStartMsg(lock_entry->vc, cas_resp_pkt, sizeof(*cas_resp_pkt), &send_req);
MPIU_THREAD_CS_EXIT(CH3COMM, lock_entry->vc);
MPIU_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
......@@ -1275,7 +1257,7 @@ static inline int perform_cas_in_lock_queue(MPID_Win *win_ptr, MPIDI_RMA_Lock_en
}
/* do final action */
mpi_errno = finish_op_on_target(win_ptr, vc, MPIDI_CH3_PKT_CAS,
mpi_errno = finish_op_on_target(win_ptr, lock_entry->vc, MPIDI_CH3_PKT_CAS,
cas_pkt->flags, cas_pkt->source_win_handle);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
......@@ -1295,16 +1277,17 @@ static inline int perform_op_in_lock_queue(MPID_Win *win_ptr, MPIDI_RMA_Lock_ent
/* single LOCK request */
MPIDI_CH3_Pkt_lock_t *lock_pkt = &(lock_entry->pkt.lock);
if (lock_pkt->origin_rank == win_ptr->comm_ptr->rank) {
mpi_errno = handle_lock_ack(win_ptr, lock_pkt->origin_rank,
MPIDI_VC_t *my_vc = NULL;
MPIDI_Comm_get_vc_set_active(win_ptr->comm_ptr, win_ptr->comm_ptr->rank, &my_vc);
if (lock_entry->vc == my_vc) {
mpi_errno = handle_lock_ack(win_ptr, win_ptr->comm_ptr->rank,
MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
else {