Commit 42c52518 authored by Xin Zhao's avatar Xin Zhao
Browse files

Bug-fix: when encounter mpich failure, unlock inter-process lock.


Signed-off-by: Pavan Balaji's avatarPavan Balaji <balaji@anl.gov>
parent c8ecef8d
...@@ -268,6 +268,8 @@ int MPIDI_CH3_ReqHandler_GaccumRecvComplete( MPIDI_VC_t *vc, ...@@ -268,6 +268,8 @@ int MPIDI_CH3_ReqHandler_GaccumRecvComplete( MPIDI_VC_t *vc,
void *src = (void *)(rreq->dev.real_user_buf), *dest = (void *)(get_accum_resp_pkt->info.data); void *src = (void *)(rreq->dev.real_user_buf), *dest = (void *)(get_accum_resp_pkt->info.data);
mpi_errno = immed_copy(src, dest, rreq->dev.user_count * type_size); mpi_errno = immed_copy(src, dest, rreq->dev.user_count * type_size);
if (mpi_errno != MPI_SUCCESS) { if (mpi_errno != MPI_SUCCESS) {
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
MPIU_ERR_POP(mpi_errno); MPIU_ERR_POP(mpi_errno);
} }
} }
...@@ -279,6 +281,10 @@ int MPIDI_CH3_ReqHandler_GaccumRecvComplete( MPIDI_VC_t *vc, ...@@ -279,6 +281,10 @@ int MPIDI_CH3_ReqHandler_GaccumRecvComplete( MPIDI_VC_t *vc,
MPID_Segment *seg = MPID_Segment_alloc(); MPID_Segment *seg = MPID_Segment_alloc();
MPI_Aint last = type_size * rreq->dev.user_count; MPI_Aint last = type_size * rreq->dev.user_count;
if (seg == NULL) {
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
}
MPIU_ERR_CHKANDJUMP1(seg == NULL, mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s", "MPID_Segment"); MPIU_ERR_CHKANDJUMP1(seg == NULL, mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s", "MPID_Segment");
MPID_Segment_init(rreq->dev.real_user_buf, rreq->dev.user_count, rreq->dev.datatype, seg, 0); MPID_Segment_init(rreq->dev.real_user_buf, rreq->dev.user_count, rreq->dev.datatype, seg, 0);
MPID_Segment_pack(seg, 0, &last, resp_req->dev.user_buf); MPID_Segment_pack(seg, 0, &last, resp_req->dev.user_buf);
...@@ -289,13 +295,12 @@ int MPIDI_CH3_ReqHandler_GaccumRecvComplete( MPIDI_VC_t *vc, ...@@ -289,13 +295,12 @@ int MPIDI_CH3_ReqHandler_GaccumRecvComplete( MPIDI_VC_t *vc,
/* accumulate data from tmp_buf into user_buf */ /* accumulate data from tmp_buf into user_buf */
mpi_errno = do_accumulate_op(rreq->dev.user_buf, rreq->dev.real_user_buf, mpi_errno = do_accumulate_op(rreq->dev.user_buf, rreq->dev.real_user_buf,
rreq->dev.user_count, rreq->dev.datatype, rreq->dev.op); rreq->dev.user_count, rreq->dev.datatype, rreq->dev.op);
if (mpi_errno) {
MPIU_ERR_POP(mpi_errno);
}
if (win_ptr->shm_allocated == TRUE) if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr); MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
resp_req->dev.OnFinal = MPIDI_CH3_ReqHandler_GaccumSendComplete; resp_req->dev.OnFinal = MPIDI_CH3_ReqHandler_GaccumSendComplete;
resp_req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_GaccumSendComplete; resp_req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_GaccumSendComplete;
resp_req->dev.target_win_handle = rreq->dev.target_win_handle; resp_req->dev.target_win_handle = rreq->dev.target_win_handle;
...@@ -403,14 +408,13 @@ int MPIDI_CH3_ReqHandler_FOPRecvComplete( MPIDI_VC_t *vc, ...@@ -403,14 +408,13 @@ int MPIDI_CH3_ReqHandler_FOPRecvComplete( MPIDI_VC_t *vc,
if (rreq->dev.op != MPI_NO_OP) { if (rreq->dev.op != MPI_NO_OP) {
mpi_errno = do_accumulate_op(rreq->dev.user_buf, rreq->dev.real_user_buf, mpi_errno = do_accumulate_op(rreq->dev.user_buf, rreq->dev.real_user_buf,
1, rreq->dev.datatype, rreq->dev.op); 1, rreq->dev.datatype, rreq->dev.op);
if (mpi_errno) {
MPIU_ERR_POP(mpi_errno);
}
} }
if (win_ptr->shm_allocated == TRUE) if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr); MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
/* Send back data */ /* Send back data */
MPIDI_Pkt_init(fop_resp_pkt, MPIDI_CH3_PKT_FOP_RESP); MPIDI_Pkt_init(fop_resp_pkt, MPIDI_CH3_PKT_FOP_RESP);
fop_resp_pkt->request_handle = rreq->dev.resp_request_handle; fop_resp_pkt->request_handle = rreq->dev.resp_request_handle;
...@@ -1074,19 +1078,19 @@ static inline int perform_acc_in_lock_queue(MPID_Win *win_ptr, MPIDI_RMA_Lock_en ...@@ -1074,19 +1078,19 @@ static inline int perform_acc_in_lock_queue(MPID_Win *win_ptr, MPIDI_RMA_Lock_en
/* All data fits in packet header */ /* All data fits in packet header */
mpi_errno = do_accumulate_op(acc_pkt->info.data, acc_pkt->addr, mpi_errno = do_accumulate_op(acc_pkt->info.data, acc_pkt->addr,
acc_pkt->count, acc_pkt->datatype, acc_pkt->op); acc_pkt->count, acc_pkt->datatype, acc_pkt->op);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
} }
else { else {
MPIU_Assert(acc_pkt->type == MPIDI_CH3_PKT_ACCUMULATE); MPIU_Assert(acc_pkt->type == MPIDI_CH3_PKT_ACCUMULATE);
mpi_errno = do_accumulate_op(lock_entry->data, acc_pkt->addr, mpi_errno = do_accumulate_op(lock_entry->data, acc_pkt->addr,
acc_pkt->count, acc_pkt->datatype, acc_pkt->op); acc_pkt->count, acc_pkt->datatype, acc_pkt->op);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
} }
if (win_ptr->shm_allocated == TRUE) if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr); MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
mpi_errno = finish_op_on_target(win_ptr, lock_entry->vc, FALSE /* has no response data */, mpi_errno = finish_op_on_target(win_ptr, lock_entry->vc, FALSE /* has no response data */,
acc_pkt->flags, acc_pkt->source_win_handle); acc_pkt->flags, acc_pkt->source_win_handle);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno); if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
...@@ -1148,7 +1152,11 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win *win_ptr, MPIDI_RMA_Loc ...@@ -1148,7 +1152,11 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win *win_ptr, MPIDI_RMA_Loc
if (get_accum_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_IMMED_RESP) { if (get_accum_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_IMMED_RESP) {
void *src = (void *)(get_accum_pkt->addr), *dest = (void *)(get_accum_resp_pkt->info.data); void *src = (void *)(get_accum_pkt->addr), *dest = (void *)(get_accum_resp_pkt->info.data);
mpi_errno = immed_copy(src, dest, len); mpi_errno = immed_copy(src, dest, len);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno); if (mpi_errno != MPI_SUCCESS) {
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
MPIU_ERR_POP(mpi_errno);
}
} }
else { else {
if (MPIR_DATATYPE_IS_PREDEFINED(get_accum_pkt->datatype)) { if (MPIR_DATATYPE_IS_PREDEFINED(get_accum_pkt->datatype)) {
...@@ -1158,6 +1166,10 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win *win_ptr, MPIDI_RMA_Loc ...@@ -1158,6 +1166,10 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win *win_ptr, MPIDI_RMA_Loc
MPID_Segment *seg = MPID_Segment_alloc(); MPID_Segment *seg = MPID_Segment_alloc();
MPI_Aint last = type_size * get_accum_pkt->count; MPI_Aint last = type_size * get_accum_pkt->count;
if (seg == NULL) {
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
}
MPIU_ERR_CHKANDJUMP1(seg == NULL, mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s", "MPID_Segment"); MPIU_ERR_CHKANDJUMP1(seg == NULL, mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s", "MPID_Segment");
MPID_Segment_init(get_accum_pkt->addr, get_accum_pkt->count, MPID_Segment_init(get_accum_pkt->addr, get_accum_pkt->count,
get_accum_pkt->datatype, seg, 0); get_accum_pkt->datatype, seg, 0);
...@@ -1170,19 +1182,19 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win *win_ptr, MPIDI_RMA_Loc ...@@ -1170,19 +1182,19 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win *win_ptr, MPIDI_RMA_Loc
/* All data fits in packet header */ /* All data fits in packet header */
mpi_errno = do_accumulate_op(get_accum_pkt->info.data, get_accum_pkt->addr, mpi_errno = do_accumulate_op(get_accum_pkt->info.data, get_accum_pkt->addr,
get_accum_pkt->count, get_accum_pkt->datatype, get_accum_pkt->op); get_accum_pkt->count, get_accum_pkt->datatype, get_accum_pkt->op);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
} }
else { else {
MPIU_Assert(get_accum_pkt->type == MPIDI_CH3_PKT_GET_ACCUM); MPIU_Assert(get_accum_pkt->type == MPIDI_CH3_PKT_GET_ACCUM);
mpi_errno = do_accumulate_op(lock_entry->data, get_accum_pkt->addr, mpi_errno = do_accumulate_op(lock_entry->data, get_accum_pkt->addr,
get_accum_pkt->count, get_accum_pkt->datatype, get_accum_pkt->op); get_accum_pkt->count, get_accum_pkt->datatype, get_accum_pkt->op);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
} }
if (win_ptr->shm_allocated == TRUE) if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr); MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
/* here we increment the Active Target counter to guarantee the GET-like /* here we increment the Active Target counter to guarantee the GET-like
operation are completed when counter reaches zero. */ operation are completed when counter reaches zero. */
win_ptr->at_completion_counter++; win_ptr->at_completion_counter++;
...@@ -1295,7 +1307,11 @@ static inline int perform_fop_in_lock_queue(MPID_Win *win_ptr, MPIDI_RMA_Lock_en ...@@ -1295,7 +1307,11 @@ static inline int perform_fop_in_lock_queue(MPID_Win *win_ptr, MPIDI_RMA_Lock_en
/* copy data to resp pkt header */ /* copy data to resp pkt header */
void *src = fop_pkt->addr, *dest = fop_resp_pkt->info.data; void *src = fop_pkt->addr, *dest = fop_resp_pkt->info.data;
mpi_errno = immed_copy(src, dest, type_size); mpi_errno = immed_copy(src, dest, type_size);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno); if (mpi_errno != MPI_SUCCESS) {
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
MPIU_ERR_POP(mpi_errno);
}
} }
else { else {
MPIU_Memcpy(resp_req->dev.user_buf, fop_pkt->addr, MPIU_Memcpy(resp_req->dev.user_buf, fop_pkt->addr,
...@@ -1312,12 +1328,13 @@ static inline int perform_fop_in_lock_queue(MPID_Win *win_ptr, MPIDI_RMA_Lock_en ...@@ -1312,12 +1328,13 @@ static inline int perform_fop_in_lock_queue(MPID_Win *win_ptr, MPIDI_RMA_Lock_en
mpi_errno = do_accumulate_op(lock_entry->data, fop_pkt->addr, mpi_errno = do_accumulate_op(lock_entry->data, fop_pkt->addr,
1, fop_pkt->datatype, fop_pkt->op); 1, fop_pkt->datatype, fop_pkt->op);
} }
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
} }
if (win_ptr->shm_allocated == TRUE) if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr); MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
if (fop_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_IMMED_RESP) { if (fop_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_IMMED_RESP) {
/* send back the original data */ /* send back the original data */
MPIU_THREAD_CS_ENTER(CH3COMM,lock_entry->vc); MPIU_THREAD_CS_ENTER(CH3COMM,lock_entry->vc);
......
...@@ -812,7 +812,11 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt, ...@@ -812,7 +812,11 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
/* copy data from target buffer to response packet header */ /* copy data from target buffer to response packet header */
src = (void *) (get_accum_pkt->addr), dest = (void *) (get_accum_resp_pkt->info.data); src = (void *) (get_accum_pkt->addr), dest = (void *) (get_accum_resp_pkt->info.data);
mpi_errno = immed_copy(src, dest, len); mpi_errno = immed_copy(src, dest, len);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno); if (mpi_errno != MPI_SUCCESS) {
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
MPIU_ERR_POP(mpi_errno);
}
} }
else { else {
MPIU_Memcpy(resp_req->dev.user_buf, get_accum_pkt->addr, MPIU_Memcpy(resp_req->dev.user_buf, get_accum_pkt->addr,
...@@ -823,13 +827,12 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt, ...@@ -823,13 +827,12 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
mpi_errno = do_accumulate_op(get_accum_pkt->info.data, get_accum_pkt->addr, mpi_errno = do_accumulate_op(get_accum_pkt->info.data, get_accum_pkt->addr,
get_accum_pkt->count, get_accum_pkt->datatype, get_accum_pkt->count, get_accum_pkt->datatype,
get_accum_pkt->op); get_accum_pkt->op);
if (mpi_errno) {
MPIU_ERR_POP(mpi_errno);
}
if (win_ptr->shm_allocated == TRUE) if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr); MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
if (get_accum_resp_pkt->type == MPIDI_CH3_PKT_GET_ACCUM_RESP_IMMED) { if (get_accum_resp_pkt->type == MPIDI_CH3_PKT_GET_ACCUM_RESP_IMMED) {
iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) get_accum_resp_pkt; iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) get_accum_resp_pkt;
iov[0].MPID_IOV_LEN = sizeof(*get_accum_resp_pkt); iov[0].MPID_IOV_LEN = sizeof(*get_accum_resp_pkt);
...@@ -1209,20 +1212,23 @@ int MPIDI_CH3_PktHandler_FOP(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt, ...@@ -1209,20 +1212,23 @@ int MPIDI_CH3_PktHandler_FOP(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
/* copy data to resp pkt header */ /* copy data to resp pkt header */
void *src = fop_pkt->addr, *dest = fop_resp_pkt->info.data; void *src = fop_pkt->addr, *dest = fop_resp_pkt->info.data;
mpi_errno = immed_copy(src, dest, type_size); mpi_errno = immed_copy(src, dest, type_size);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno); if (mpi_errno != MPI_SUCCESS) {
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
MPIU_ERR_POP(mpi_errno);
}
/* Apply the op */ /* Apply the op */
if (fop_pkt->op != MPI_NO_OP) { if (fop_pkt->op != MPI_NO_OP) {
mpi_errno = do_accumulate_op(fop_pkt->info.data, fop_pkt->addr, mpi_errno = do_accumulate_op(fop_pkt->info.data, fop_pkt->addr,
1, fop_pkt->datatype, fop_pkt->op); 1, fop_pkt->datatype, fop_pkt->op);
if (mpi_errno != MPI_SUCCESS) {
MPIU_ERR_POP(mpi_errno);
}
} }
if (win_ptr->shm_allocated == TRUE) if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr); MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
/* send back the original data */ /* send back the original data */
MPIU_THREAD_CS_ENTER(CH3COMM,vc); MPIU_THREAD_CS_ENTER(CH3COMM,vc);
mpi_errno = MPIDI_CH3_iStartMsg(vc, fop_resp_pkt, sizeof(*fop_resp_pkt), &resp_req); mpi_errno = MPIDI_CH3_iStartMsg(vc, fop_resp_pkt, sizeof(*fop_resp_pkt), &resp_req);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment