Commit 45afd1fd authored by Xin Zhao's avatar Xin Zhao
Browse files

Support handling different LOCK ACKs

No reviewer.
parent b8c9f31b
......@@ -2160,7 +2160,8 @@ int MPID_nem_ib_PktHandler_GetResp(MPIDI_VC_t * vc,
/* decrement ack_counter on target */
if (get_resp_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED) {
mpi_errno = set_lock_sync_counter(win_ptr, target_rank);
mpi_errno = set_lock_sync_counter(win_ptr, target_rank,
get_resp_pkt->flags);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
if (get_resp_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK) {
......
......@@ -1826,6 +1826,8 @@ int MPIDI_CH3_PktHandler_Lock( MPIDI_VC_t *, MPIDI_CH3_Pkt_t *,
MPIDI_msg_sz_t *, MPID_Request ** );
int MPIDI_CH3_PktHandler_LockAck( MPIDI_VC_t *, MPIDI_CH3_Pkt_t *,
MPIDI_msg_sz_t *, MPID_Request ** );
int MPIDI_CH3_PktHandler_LockOpAck( MPIDI_VC_t *, MPIDI_CH3_Pkt_t *,
MPIDI_msg_sz_t *, MPID_Request ** );
int MPIDI_CH3_PktHandler_Unlock( MPIDI_VC_t *, MPIDI_CH3_Pkt_t *,
MPIDI_msg_sz_t *, MPID_Request ** );
int MPIDI_CH3_PktHandler_Flush( MPIDI_VC_t *, MPIDI_CH3_Pkt_t *,
......
......@@ -90,6 +90,7 @@ typedef enum {
MPIDI_CH3_PKT_FLUSH,
MPIDI_CH3_PKT_DECR_AT_COUNTER,
MPIDI_CH3_PKT_LOCK_ACK,
MPIDI_CH3_PKT_LOCK_OP_ACK,
MPIDI_CH3_PKT_FLUSH_ACK,
/* RMA Packets end here */
MPIDI_CH3_PKT_FLOW_CNTL_UPDATE, /* FIXME: Unused */
......@@ -378,6 +379,63 @@ MPIDI_CH3_PKT_DEFS
} \
}
#define MPIDI_CH3_PKT_RMA_ERASE_FLAGS(pkt_, err_) \
{ \
err_ = MPI_SUCCESS; \
switch((pkt_).type) { \
case (MPIDI_CH3_PKT_PUT): \
(pkt_).put.flags = MPIDI_CH3_PKT_FLAG_NONE; \
break; \
case (MPIDI_CH3_PKT_GET): \
(pkt_).get.flags = MPIDI_CH3_PKT_FLAG_NONE; \
break; \
case (MPIDI_CH3_PKT_ACCUMULATE): \
(pkt_).accum.flags = MPIDI_CH3_PKT_FLAG_NONE; \
break; \
case (MPIDI_CH3_PKT_GET_ACCUM): \
(pkt_).get_accum.flags = MPIDI_CH3_PKT_FLAG_NONE; \
break; \
case (MPIDI_CH3_PKT_CAS): \
(pkt_).cas.flags = MPIDI_CH3_PKT_FLAG_NONE; \
break; \
case (MPIDI_CH3_PKT_FOP): \
(pkt_).fop.flags = MPIDI_CH3_PKT_FLAG_NONE; \
break; \
default: \
MPIU_ERR_SETANDJUMP1(err_, MPI_ERR_OTHER, "**invalidpkt", "**invalidpkt %d", (pkt_).type); \
} \
}
#define MPIDI_CH3_PKT_RMA_GET_SOURCE_WIN_HANDLE(pkt_, win_hdl_, err_) \
{ \
err_ = MPI_SUCCESS; \
switch((pkt_).type) { \
case (MPIDI_CH3_PKT_PUT): \
win_hdl_ = (pkt_).put.source_win_handle; \
break; \
case (MPIDI_CH3_PKT_GET): \
win_hdl_ = (pkt_).get.source_win_handle; \
break; \
case (MPIDI_CH3_PKT_ACCUMULATE): \
win_hdl_ = (pkt_).accum.source_win_handle; \
break; \
case (MPIDI_CH3_PKT_GET_ACCUM): \
win_hdl_ = (pkt_).get_accum.source_win_handle; \
break; \
case (MPIDI_CH3_PKT_CAS): \
win_hdl_ = (pkt_).cas.source_win_handle; \
break; \
case (MPIDI_CH3_PKT_FOP): \
win_hdl_ = (pkt_).fop.source_win_handle; \
break; \
case (MPIDI_CH3_PKT_LOCK): \
win_hdl_ = (pkt_).lock.source_win_handle; \
break; \
default: \
MPIU_ERR_SETANDJUMP1(err_, MPI_ERR_OTHER, "**invalidpkt", "**invalidpkt %d", (pkt_).type); \
} \
}
#define MPIDI_CH3_PKT_RMA_GET_TARGET_WIN_HANDLE(pkt_, win_hdl_, err_) \
{ \
err_ = MPI_SUCCESS; \
......@@ -641,7 +699,7 @@ typedef struct MPIDI_CH3_Pkt_decr_at_counter {
/*********************************************************************************/
/* RMA control response packet (from target to origin, including LOCK_ACK, */
/* FLUSH_ACK) */
/* LOCK_OP_ACK, FLUSH_ACK) */
/*********************************************************************************/
typedef struct MPIDI_CH3_Pkt_lock_ack {
......@@ -651,6 +709,13 @@ typedef struct MPIDI_CH3_Pkt_lock_ack {
int target_rank;
} MPIDI_CH3_Pkt_lock_ack_t;
typedef struct MPIDI_CH3_Pkt_lock_op_ack {
MPIDI_CH3_Pkt_type_t type;
MPIDI_CH3_Pkt_flags_t flags;
MPI_Win source_win_handle;
int target_rank;
} MPIDI_CH3_Pkt_lock_op_ack_t;
typedef struct MPIDI_CH3_Pkt_flush_ack {
MPIDI_CH3_Pkt_type_t type;
MPI_Win source_win_handle;
......@@ -699,6 +764,7 @@ typedef union MPIDI_CH3_Pkt {
MPIDI_CH3_Pkt_flush_t flush;
MPIDI_CH3_Pkt_decr_at_counter_t decr_at_cnt;
MPIDI_CH3_Pkt_lock_ack_t lock_ack;
MPIDI_CH3_Pkt_lock_op_ack_t lock_op_ack;
MPIDI_CH3_Pkt_flush_ack_t flush_ack;
/* RMA packets end here */
MPIDI_CH3_Pkt_close_t close;
......
......@@ -142,6 +142,48 @@ static inline int MPIDI_CH3I_Send_lock_ack_pkt(MPIDI_VC_t * vc, MPID_Win * win_p
return mpi_errno;
}
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_Send_lock_op_ack_pkt
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static inline int MPIDI_CH3I_Send_lock_op_ack_pkt(MPIDI_VC_t * vc, MPID_Win * win_ptr,
MPIDI_CH3_Pkt_flags_t flags,
MPI_Win source_win_handle)
{
MPIDI_CH3_Pkt_t upkt;
MPIDI_CH3_Pkt_lock_op_ack_t *lock_op_ack_pkt = &upkt.lock_op_ack;
MPID_Request *req = NULL;
int mpi_errno;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SEND_LOCK_OP_ACK_PKT);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SEND_LOCK_OP_ACK_PKT);
/* send lock ack packet */
MPIDI_Pkt_init(lock_op_ack_pkt, MPIDI_CH3_PKT_LOCK_OP_ACK);
lock_op_ack_pkt->source_win_handle = source_win_handle;
lock_op_ack_pkt->target_rank = win_ptr->comm_ptr->rank;
lock_op_ack_pkt->flags = flags;
MPIU_DBG_MSG_FMT(CH3_OTHER, VERBOSE,
(MPIU_DBG_FDEST, "sending lock op ack pkt on vc=%p, source_win_handle=%#08x",
vc, lock_op_ack_pkt->source_win_handle));
MPIU_THREAD_CS_ENTER(CH3COMM, vc);
mpi_errno = MPIDI_CH3_iStartMsg(vc, lock_op_ack_pkt, sizeof(*lock_op_ack_pkt), &req);
MPIU_THREAD_CS_EXIT(CH3COMM, vc);
if (mpi_errno) {
MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
}
if (req != NULL) {
MPID_Request_release(req);
}
fn_fail:
MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SEND_LOCK_OP_ACK_PKT);
return mpi_errno;
}
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_Send_flush_ack_pkt
......@@ -269,25 +311,33 @@ static inline int enqueue_lock_origin(MPID_Win *win_ptr, MPIDI_VC_t *vc,
MPID_Request **reqp)
{
MPIDI_RMA_Lock_entry_t *new_ptr = NULL;
MPIDI_CH3_Pkt_flags_t flag;
MPI_Win source_win_handle;
int lock_discarded = 0, data_discarded = 0;
int mpi_errno = MPI_SUCCESS;
(*reqp) = NULL;
new_ptr = MPIDI_CH3I_Win_lock_entry_alloc(win_ptr, pkt);
if (new_ptr == NULL) {
/* FIXME: we run out of resources of lock requests, needs to
send LOCK DISCARDED packet back to origin */
if (new_ptr != NULL) {
MPL_LL_APPEND(win_ptr->lock_queue, win_ptr->lock_queue_tail, new_ptr);
}
else {
lock_discarded = 1;
}
MPL_LL_APPEND(win_ptr->lock_queue, win_ptr->lock_queue_tail, new_ptr);
if (pkt->type == MPIDI_CH3_PKT_LOCK ||
pkt->type == MPIDI_CH3_PKT_GET ||
pkt->type == MPIDI_CH3_PKT_FOP ||
pkt->type == MPIDI_CH3_PKT_CAS) {
new_ptr->all_data_recved = 1;
/* return bytes of data processed in this pkt handler */
(*buflen) = sizeof(MPIDI_CH3_Pkt_t);
goto fn_exit;
if (new_ptr != NULL)
new_ptr->all_data_recved = 1;
goto issue_ack;
}
else {
MPI_Aint type_size = 0;
......@@ -310,18 +360,55 @@ static inline int enqueue_lock_origin(MPID_Win *win_ptr, MPIDI_VC_t *vc,
recv_data_sz = type_size * target_count;
if (recv_data_sz <= MPIDI_RMA_IMMED_BYTES) {
/* all data fits in packet header */
new_ptr->all_data_recved = 1;
/* return bytes of data processed in this pkt handler */
(*buflen) = sizeof(MPIDI_CH3_Pkt_t);
goto fn_exit;
if (new_ptr != NULL)
new_ptr->all_data_recved = 1;
goto issue_ack;
}
/* allocate tmp buffer to recieve data. */
new_ptr->data = MPIU_Malloc(recv_data_sz);
if (new_ptr->data == NULL) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %d",
recv_data_sz);
if (new_ptr != NULL) {
if (win_ptr->current_lock_data_bytes + recv_data_sz
< MPIR_CVAR_CH3_RMA_LOCK_DATA_BYTES) {
new_ptr->data = MPIU_Malloc(recv_data_sz);
}
if (new_ptr->data == NULL) {
/* Note that there are two possible reasons to make new_ptr->data to be NULL:
* (1) win_ptr->current_lock_data_bytes + recv_data_sz >= MPIR_CVAR_CH3_RMA_LOCK_DATA_BYTES;
* (2) MPIU_Malloc(recv_data_sz) failed.
* In such cases, we cannot allocate memory for lock data, so we give up
* buffering lock data, however, we still buffer lock request.
*/
MPIDI_CH3_Pkt_t new_pkt;
MPIDI_CH3_Pkt_lock_t *lock_pkt = &new_pkt.lock;
MPI_Win target_win_handle;
int lock_type, origin_rank;
MPIDI_CH3_PKT_RMA_GET_TARGET_WIN_HANDLE((*pkt), target_win_handle, mpi_errno);
MPIDI_CH3_PKT_RMA_GET_SOURCE_WIN_HANDLE((*pkt), source_win_handle, mpi_errno);
MPIDI_CH3_PKT_RMA_GET_ORIGIN_RANK((*pkt), origin_rank, mpi_errno);
MPIDI_CH3_PKT_RMA_GET_LOCK_TYPE((*pkt), lock_type, mpi_errno);
MPIDI_Pkt_init(lock_pkt, MPIDI_CH3_PKT_LOCK);
lock_pkt->target_win_handle = target_win_handle;
lock_pkt->source_win_handle = source_win_handle;
lock_pkt->lock_type = lock_type;
lock_pkt->origin_rank = origin_rank;
/* replace original pkt with lock pkt */
new_ptr->pkt = new_pkt;
new_ptr->all_data_recved = 1;
data_discarded = 1;
}
else {
win_ptr->current_lock_data_bytes += recv_data_sz;
new_ptr->data_size = recv_data_sz;
}
}
/* create request to receive upcoming requests */
......@@ -329,27 +416,44 @@ static inline int enqueue_lock_origin(MPID_Win *win_ptr, MPIDI_VC_t *vc,
MPIU_Object_set_ref(req, 1);
/* fill in area in req that will be used in Receive_data_found() */
req->dev.user_buf = new_ptr->data;
req->dev.user_count = target_count;
req->dev.datatype = target_dtp;
req->dev.recv_data_sz = recv_data_sz;
req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete;
req->dev.OnFinal = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete;
req->dev.lock_queue_entry = new_ptr;
MPIDI_CH3_PKT_RMA_GET_IMMED_LEN((*pkt), immed_len, mpi_errno);
MPIDI_CH3_PKT_RMA_GET_IMMED_DATA_PTR((*pkt), immed_data, mpi_errno);
if (immed_len > 0) {
/* see if we can receive some data from packet header */
MPIU_Memcpy(req->dev.user_buf, immed_data, (size_t)immed_len);
req->dev.user_buf = (void*)((char*)req->dev.user_buf + immed_len);
req->dev.recv_data_sz -= immed_len;
if (lock_discarded || data_discarded) {
req->dev.user_buf = NULL;
req->dev.user_count = target_count;
req->dev.datatype = target_dtp;
req->dev.recv_data_sz = recv_data_sz;
req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete;
req->dev.OnFinal = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete;
MPIDI_CH3_PKT_RMA_GET_IMMED_LEN((*pkt), immed_len, mpi_errno);
if (immed_len > 0) {
req->dev.recv_data_sz -= immed_len;
}
data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t);
data_buf = (char *) pkt + sizeof(MPIDI_CH3_Pkt_t);
MPIU_Assert(req->dev.recv_data_sz > 0);
}
else {
req->dev.user_buf = new_ptr->data;
req->dev.user_count = target_count;
req->dev.datatype = target_dtp;
req->dev.recv_data_sz = recv_data_sz;
req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete;
req->dev.OnFinal = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete;
req->dev.lock_queue_entry = new_ptr;
MPIDI_CH3_PKT_RMA_GET_IMMED_LEN((*pkt), immed_len, mpi_errno);
MPIDI_CH3_PKT_RMA_GET_IMMED_DATA_PTR((*pkt), immed_data, mpi_errno);
if (immed_len > 0) {
/* see if we can receive some data from packet header */
MPIU_Memcpy(req->dev.user_buf, immed_data, (size_t)immed_len);
req->dev.user_buf = (void*)((char*)req->dev.user_buf + immed_len);
req->dev.recv_data_sz -= immed_len;
}
data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t);
data_buf = (char *) pkt + sizeof(MPIDI_CH3_Pkt_t);
MPIU_Assert(req->dev.recv_data_sz > 0);
}
data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t);
data_buf = (char *) pkt + sizeof(MPIDI_CH3_Pkt_t);
MPIU_Assert(req->dev.recv_data_sz > 0);
mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
......@@ -361,13 +465,31 @@ static inline int enqueue_lock_origin(MPID_Win *win_ptr, MPIDI_VC_t *vc,
mpi_errno = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete(vc, req, &complete);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
if (complete) {
goto fn_exit;
goto issue_ack;
}
}
(*reqp) = req;
}
issue_ack:
MPIDI_CH3_PKT_RMA_GET_SOURCE_WIN_HANDLE((*pkt), source_win_handle, mpi_errno);
if (pkt->type == MPIDI_CH3_PKT_LOCK) {
if (lock_discarded) flag = MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED;
else flag = MPIDI_CH3_PKT_FLAG_RMA_LOCK_QUEUED_DATA_QUEUED;
mpi_errno = MPIDI_CH3I_Send_lock_ack_pkt(vc, win_ptr, flag, source_win_handle);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
}
else {
if (lock_discarded) flag = MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED;
else if (data_discarded) flag = MPIDI_CH3_PKT_FLAG_RMA_LOCK_QUEUED_DATA_DISCARDED;
else flag = MPIDI_CH3_PKT_FLAG_RMA_LOCK_QUEUED_DATA_QUEUED;
mpi_errno = MPIDI_CH3I_Send_lock_op_ack_pkt(vc, win_ptr, flag, source_win_handle);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
}
fn_exit:
return mpi_errno;
fn_fail:
......@@ -375,7 +497,8 @@ static inline int enqueue_lock_origin(MPID_Win *win_ptr, MPIDI_VC_t *vc,
}
static inline int set_lock_sync_counter(MPID_Win *win_ptr, int target_rank)
static inline int set_lock_sync_counter(MPID_Win *win_ptr, int target_rank,
MPIDI_CH3_Pkt_flags_t flags)
{
MPIDI_RMA_Target_t *t = NULL;
int mpi_errno = MPI_SUCCESS;
......@@ -390,21 +513,125 @@ static inline int set_lock_sync_counter(MPID_Win *win_ptr, int target_rank)
MPIDI_Comm_get_vc(win_ptr->comm_ptr, target_rank, &target_vc);
if (win_ptr->comm_ptr->rank == target_rank ||
(win_ptr->shm_allocated == TRUE && orig_vc->node_id == target_vc->node_id)) {
win_ptr->outstanding_locks--;
MPIU_Assert(win_ptr->outstanding_locks >= 0);
if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED) {
win_ptr->outstanding_locks--;
MPIU_Assert(win_ptr->outstanding_locks >= 0);
}
else if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED) {
/* re-send lock request message. */
mpi_errno = send_lock_msg(target_rank, MPI_LOCK_SHARED, win_ptr);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
}
goto fn_exit;
}
}
else if (win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_ISSUED) {
win_ptr->outstanding_locks--;
MPIU_Assert(win_ptr->outstanding_locks >= 0);
if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED) {
win_ptr->outstanding_locks--;
MPIU_Assert(win_ptr->outstanding_locks >= 0);
}
else if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED) {
/* re-send lock request message. */
mpi_errno = send_lock_msg(target_rank, MPI_LOCK_SHARED, win_ptr);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
}
goto fn_exit;
}
mpi_errno = MPIDI_CH3I_Win_find_target(win_ptr, target_rank, &t);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
MPIU_Assert(t != NULL);
t->access_state = MPIDI_RMA_LOCK_GRANTED;
if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED)
t->access_state = MPIDI_RMA_LOCK_GRANTED;
if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED)
t->access_state = MPIDI_RMA_LOCK_CALLED;
fn_exit:
return mpi_errno;
fn_fail:
goto fn_exit;
}
static inline int adjust_op_piggybacked_with_lock (MPID_Win *win_ptr,
int target_rank,
MPIDI_CH3_Pkt_flags_t flags) {
MPIDI_RMA_Target_t *target = NULL;
MPIDI_RMA_Op_t *op = NULL;
MPIDI_CH3_Pkt_flags_t op_flags = MPIDI_CH3_PKT_FLAG_NONE;
int mpi_errno = MPI_SUCCESS;
mpi_errno = MPIDI_CH3I_Win_find_target(win_ptr, target_rank, &target);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
MPIU_Assert(target != NULL);
op = target->pending_op_list;
if (op != NULL) MPIDI_CH3_PKT_RMA_GET_FLAGS(op->pkt, op_flags, mpi_errno);
if (op_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK) {
if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED ||
flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_QUEUED_DATA_QUEUED) {
if (!op->request) {
if (op->ureq) {
/* Complete user request and release the ch3 ref */
MPID_Request_set_completed(op->ureq);
MPID_Request_release(op->ureq);
}
MPIDI_CH3I_RMA_Ops_free_elem(win_ptr, &(target->pending_op_list),
&(target->pending_op_list_tail), op);
}
else {
MPIDI_CH3I_RMA_Ops_unlink(&(target->pending_op_list),
&(target->pending_op_list_tail), op);
if (op->is_dt) {
MPIDI_CH3I_RMA_Ops_append(&(target->dt_op_list),
&(target->dt_op_list_tail), op);
}
else if (op->pkt.type == MPIDI_CH3_PKT_PUT ||
op->pkt.type == MPIDI_CH3_PKT_ACCUMULATE) {
MPIDI_CH3I_RMA_Ops_append(&(target->write_op_list),
&(target->write_op_list_tail), op);
}
else {
MPIDI_CH3I_RMA_Ops_append(&(target->read_op_list),
&(target->read_op_list_tail), op);
}
if (op->ureq) {
if (MPID_Request_is_complete(op->request)) {
/* Complete user request, let cleanup function to release
ch3 ref */
MPID_Request_set_completed(op->ureq);
}
else {
/* Increase ref for completion handler */
MPIU_Object_add_ref(op->ureq);
op->request->dev.request_handle = op->ureq->handle;
if (op->request->dev.OnDataAvail == NULL) {
op->request->dev.OnDataAvail = MPIDI_CH3_ReqHandler_ReqOpsComplete;
}
op->request->dev.OnFinal = MPIDI_CH3_ReqHandler_ReqOpsComplete;
}
}
}
}
else if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_QUEUED_DATA_DISCARDED ||
flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED) {
/* We need to re-transmit this operation, so we destroy
the internal request and erase all flags in current
operation. */
if (op->request) {
MPIDI_CH3_Request_destroy(op->request);
op->request = NULL;
win_ptr->active_req_cnt--;
}
MPIDI_CH3_PKT_RMA_ERASE_FLAGS(op->pkt, mpi_errno);
target->next_op_to_issue = op;
}
}
fn_exit:
return mpi_errno;
......@@ -426,7 +653,8 @@ static inline int acquire_local_lock(MPID_Win * win_ptr, int lock_type)
MPIR_T_PVAR_TIMER_START(RMA, rma_winlock_getlocallock);
if (MPIDI_CH3I_Try_acquire_win_lock(win_ptr, lock_type) == 1) {
mpi_errno = set_lock_sync_counter(win_ptr, win_ptr->comm_ptr->rank);
mpi_errno = set_lock_sync_counter(win_ptr, win_ptr->comm_ptr->rank,
MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
else {
......@@ -441,8 +669,10 @@ static inline int acquire_local_lock(MPID_Win * win_ptr, int lock_type)
new_ptr = MPIDI_CH3I_Win_lock_entry_alloc(win_ptr, &pkt);
if (new_ptr == NULL) {
/* FIXME: we run out of resources of lock requests, needs to
send LOCK DISCARDED packet back to origin */
mpi_errno = set_lock_sync_counter(win_ptr, win_ptr->comm_ptr->rank,
MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
goto fn_exit;
}
MPL_LL_APPEND(win_ptr->lock_queue, win_ptr->lock_queue_tail, new_ptr);
......@@ -609,7 +839,6 @@ static inline int check_piggyback_lock(MPID_Win *win_ptr, MPIDI_VC_t *vc,
/* cannot acquire the lock, queue up this operation. */
mpi_errno = enqueue_lock_origin(win_ptr, vc, pkt, buflen, reqp);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
(*acquire_lock_fail) = 1;
}
}
......@@ -633,9 +862,9 @@ static inline int finish_op_on_target(MPID_Win *win_ptr, MPIDI_VC_t *vc,
if ((flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) ||
(flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK))
pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK;
mpi_errno = MPIDI_CH3I_Send_lock_ack_pkt(vc, win_ptr,
pkt_flags,
source_win_handle);
mpi_errno = MPIDI_CH3I_Send_lock_op_ack_pkt(vc, win_ptr,
pkt_flags,
source_win_handle);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
MPIDI_CH3_Progress_signal_completion();
}
......
......@@ -581,6 +581,8 @@ int MPIDI_CH3_PktHandler_Init( MPIDI_CH3_PktHandler_Fcn *pktArray[],
MPIDI_CH3_PktHandler_Lock;
pktArray[MPIDI_CH3_PKT_LOCK_ACK] =
MPIDI_CH3_PktHandler_LockAck;
pktArray[MPIDI_CH3_PKT_LOCK_OP_ACK] =
MPIDI_CH3_PktHandler_LockOpAck;
pktArray[MPIDI_CH3_PKT_UNLOCK] =
MPIDI_CH3_PktHandler_Unlock;
pktArray[MPIDI_CH3_PKT_FLUSH] =
......
......@@ -1292,7 +1292,8 @@ static inline int perform_op_in_lock_queue(MPID_Win *win_ptr, MPIDI_RMA_Lock_ent
MPIDI_CH3_Pkt_lock_t *lock_pkt = &(lock_entry->pkt.lock);
if (lock_pkt->origin_rank == win_ptr->comm_ptr->rank) {
mpi_errno = set_lock_sync_counter(win_ptr, lock_pkt->origin_rank);
mpi_errno = set_lock_sync_counter(win_ptr, lock_pkt->origin_rank,
MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
else {
......
......@@ -355,12 +355,25 @@ static inline int issue_ops_target(MPID_Win * win_ptr, MPIDI_RMA_Target_t *targe
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
(*made_progress) = 1;
if (curr_op->request != NULL)
win_ptr->active_req_cnt++;
if (curr_op->pkt.type == MPIDI_CH3_PKT_PUT ||
curr_op->pkt.type == MPIDI_CH3_PKT_ACCUMULATE) {
target->put_acc_issued = 1; /* set PUT_ACC_FLAG when sending
PUT/ACC operation. */
}
if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK) {
/* If this operation is piggybacked with LOCK,
do not move it out of pending list, and do
not complete the user request, because we
may need to re-transmit it. */
break;
}
if (!curr_op->request) {
if (curr_op->ureq) {
/* Complete user request and release the ch3 ref */
......@@ -374,6 +387,7 @@ static inline int issue_ops_target(MPID_Win * win_ptr, MPIDI_RMA_Target_t *targe
}
else {
/* Sending is not completed immediately. */
MPIDI_CH3I_RMA_Ops_unlink(&(target->pending_op_list),
&(target->pending_op_list_tail), curr_op);
if (curr_op->is_dt) {
......@@ -418,13 +432,10 @@ static inline int issue_ops_target(MPID_Win * win_ptr, MPIDI_RMA_Target_t *targe
}
curr_op->request->dev.OnFinal = MPIDI_CH3_ReqHandler_ReqOpsComplete;