Commit 48d797a9 authored by Wesley Bland's avatar Wesley Bland Committed by Huiwei Lu
Browse files

Convert scheduler functions to use MPIC



The scheduler functions now use MPIC_* functions to handle their
communication instead of directly calling the MPID_* functions. This
helps to simplify code related to error handling and allows the
collectives to complete even if a failure is detected because the error
will be tracked via the errflag inside the request object.

Fixes #2222
Signed-off-by: default avatarHuiwei Lu <huiweilu@mcs.anl.gov>
parent 67ec0ab1
......@@ -124,8 +124,7 @@ fn_fail:
* performing reductions, calling callbacks, etc. */
static int MPIDU_Sched_start_entry(struct MPIDU_Sched *s, size_t idx, struct MPIDU_Sched_entry *e)
{
int mpi_errno = MPI_SUCCESS;
int context_offset;
int mpi_errno = MPI_SUCCESS, ret_errno = MPI_SUCCESS;
MPID_Request *r = s->req;
MPID_Comm *comm;
......@@ -134,43 +133,62 @@ static int MPIDU_Sched_start_entry(struct MPIDU_Sched *s, size_t idx, struct MPI
switch (e->type) {
case MPIDU_SCHED_ENTRY_SEND:
comm = e->u.send.comm;
context_offset = (comm->comm_kind == MPID_INTRACOMM) ?
MPID_CONTEXT_INTRA_COLL : MPID_CONTEXT_INTER_COLL;
MPIU_DBG_MSG_D(COMM, VERBOSE, "starting SEND entry %d\n", (int) idx);
if (e->u.send.count_p) {
/* deferred send */
/* originally there was no branch and send.count_p was set to
* &send.count, but this requires patching up the pointers
* during realloc of entries, so this is easier */
mpi_errno = MPID_Isend(e->u.send.buf, *e->u.send.count_p, e->u.send.datatype,
e->u.send.dest, s->tag, comm, context_offset,
&e->u.send.sreq);
ret_errno = MPIC_Isend(e->u.send.buf, *e->u.send.count_p, e->u.send.datatype,
e->u.send.dest, s->tag, comm, &e->u.send.sreq, &r->dev.errflag);
}
else {
if (e->u.send.is_sync) {
mpi_errno = MPID_Issend(e->u.send.buf, e->u.send.count, e->u.send.datatype,
e->u.send.dest, s->tag, comm, context_offset,
&e->u.send.sreq);
ret_errno = MPIC_Issend(e->u.send.buf, e->u.send.count, e->u.send.datatype,
e->u.send.dest, s->tag, comm, &e->u.send.sreq, &r->dev.errflag);
}
else {
mpi_errno = MPID_Isend(e->u.send.buf, e->u.send.count, e->u.send.datatype,
e->u.send.dest, s->tag, comm, context_offset,
&e->u.send.sreq);
ret_errno = MPIC_Isend(e->u.send.buf, e->u.send.count, e->u.send.datatype,
e->u.send.dest, s->tag, comm, &e->u.send.sreq, &r->dev.errflag);
}
}
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
e->status = MPIDU_SCHED_ENTRY_STATUS_STARTED;
/* Check if the error is actually fatal to the NBC or we can continue. */
if (unlikely(ret_errno)) {
if (MPIR_ERR_NONE == r->dev.errflag) {
if (MPIX_ERR_PROC_FAILED == MPIR_ERR_GET_CLASS(ret_errno)) {
r->dev.errflag = MPIR_ERR_PROC_FAILED;
} else {
r->dev.errflag = MPIR_ERR_OTHER;
}
}
e->status = MPIDU_SCHED_ENTRY_STATUS_FAILED;
MPIU_DBG_MSG_D(COMM, VERBOSE, "Sched SEND failed. Errflag: %d\n", (int) r->dev.errflag);
} else {
e->status = MPIDU_SCHED_ENTRY_STATUS_STARTED;
}
break;
case MPIDU_SCHED_ENTRY_RECV:
MPIU_DBG_MSG_D(COMM, VERBOSE, "starting RECV entry %d\n", (int) idx);
comm = e->u.recv.comm;
context_offset = (comm->comm_kind == MPID_INTRACOMM) ?
MPID_CONTEXT_INTRA_COLL : MPID_CONTEXT_INTER_COLL;
mpi_errno = MPID_Irecv(e->u.recv.buf, e->u.recv.count, e->u.recv.datatype,
e->u.recv.src, s->tag, comm, context_offset,
&e->u.recv.rreq);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
e->status = MPIDU_SCHED_ENTRY_STATUS_STARTED;
ret_errno = MPIC_Irecv(e->u.recv.buf, e->u.recv.count, e->u.recv.datatype,
e->u.recv.src, s->tag, comm, &e->u.recv.rreq);
/* Check if the error is actually fatal to the NBC or we can continue. */
if (unlikely(ret_errno)) {
if (MPIR_ERR_NONE == r->dev.errflag) {
if (MPIX_ERR_PROC_FAILED == MPIR_ERR_GET_CLASS(ret_errno)) {
r->dev.errflag = MPIR_ERR_PROC_FAILED;
} else {
r->dev.errflag = MPIR_ERR_OTHER;
}
}
e->status = MPIDU_SCHED_ENTRY_STATUS_FAILED;
MPIU_DBG_MSG_D(COMM, VERBOSE, "Sched SEND failed. Errflag: %d\n", (int) r->dev.errflag);
} else {
e->status = MPIDU_SCHED_ENTRY_STATUS_STARTED;
}
break;
case MPIDU_SCHED_ENTRY_REDUCE:
MPIU_DBG_MSG_D(COMM, VERBOSE, "starting REDUCE entry %d\n", (int) idx);
mpi_errno = MPIR_Reduce_local_impl(e->u.reduce.inbuf, e->u.reduce.inoutbuf, e->u.reduce.count,
e->u.reduce.datatype, e->u.reduce.op);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
......@@ -183,6 +201,7 @@ static int MPIDU_Sched_start_entry(struct MPIDU_Sched *s, size_t idx, struct MPI
e->status = MPIDU_SCHED_ENTRY_STATUS_COMPLETE;
break;
case MPIDU_SCHED_ENTRY_COPY:
MPIU_DBG_MSG_D(COMM, VERBOSE, "starting COPY entry %d\n", (int) idx);
mpi_errno = MPIR_Localcopy(e->u.copy.inbuf, e->u.copy.incount, e->u.copy.intype,
e->u.copy.outbuf, e->u.copy.outcount, e->u.copy.outtype);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
......@@ -191,30 +210,48 @@ static int MPIDU_Sched_start_entry(struct MPIDU_Sched *s, size_t idx, struct MPI
e->status = MPIDU_SCHED_ENTRY_STATUS_COMPLETE;
break;
case MPIDU_SCHED_ENTRY_NOP:
MPIU_DBG_MSG_D(COMM, VERBOSE, "starting NOOP entry %d\n", (int) idx);
/* nothing to be done */
break;
case MPIDU_SCHED_ENTRY_CB:
MPIU_DBG_MSG_D(COMM, VERBOSE, "starting CB entry %d\n", (int) idx);
if (e->u.cb.cb_type == MPIDU_SCHED_CB_TYPE_1) {
mpi_errno = e->u.cb.u.cb_p(r->comm, s->tag, e->u.cb.cb_state);
if (mpi_errno) {
ret_errno = e->u.cb.u.cb_p(r->comm, s->tag, e->u.cb.cb_state);
if (unlikely(ret_errno)) {
if (MPIR_ERR_NONE == r->dev.errflag) {
if (MPIX_ERR_PROC_FAILED == MPIR_ERR_GET_CLASS(ret_errno)) {
r->dev.errflag = MPIR_ERR_PROC_FAILED;
} else {
r->dev.errflag = MPIR_ERR_OTHER;
}
}
e->status = MPIDU_SCHED_ENTRY_STATUS_FAILED;
MPIU_ERR_POP(mpi_errno);
} else {
e->status = MPIDU_SCHED_ENTRY_STATUS_STARTED;
}
}
else if (e->u.cb.cb_type == MPIDU_SCHED_CB_TYPE_2) {
mpi_errno = e->u.cb.u.cb2_p(r->comm, s->tag, e->u.cb.cb_state, e->u.cb.cb_state2);
if (mpi_errno) {
ret_errno = e->u.cb.u.cb2_p(r->comm, s->tag, e->u.cb.cb_state, e->u.cb.cb_state2);
if (unlikely(ret_errno)) {
if (MPIR_ERR_NONE == r->dev.errflag) {
if (MPIX_ERR_PROC_FAILED == MPIR_ERR_GET_CLASS(ret_errno)) {
r->dev.errflag = MPIR_ERR_PROC_FAILED;
} else {
r->dev.errflag = MPIR_ERR_OTHER;
}
}
e->status = MPIDU_SCHED_ENTRY_STATUS_FAILED;
MPIU_ERR_POP(mpi_errno);
} else {
e->status = MPIDU_SCHED_ENTRY_STATUS_STARTED;
}
}
else {
MPIU_Assert_fmt_msg(FALSE, ("unknown callback type, e->u.cb.cb_type=%d", e->u.cb.cb_type));
MPIU_DBG_MSG_D(COMM, TYPICAL, "unknown callback type, e->u.cb.cb_type=%d", e->u.cb.cb_type);
}
e->status = MPIDU_SCHED_ENTRY_STATUS_COMPLETE;
break;
default:
MPIU_Assert_fmt_msg(FALSE, ("unknown entry type, e->type=%d", e->type));
MPIU_DBG_MSG_D(COMM, TYPICAL, "unknown entry type, e->type=%d", e->type);
break;
}
......@@ -248,13 +285,13 @@ static int MPIDU_Sched_continue(struct MPIDU_Sched *s)
}
/* _start_entry may have completed the operation, but won't update s->idx */
if (i == s->idx && e->status == MPIDU_SCHED_ENTRY_STATUS_COMPLETE) {
if (i == s->idx && e->status >= MPIDU_SCHED_ENTRY_STATUS_COMPLETE) {
++s->idx; /* this is valid even for barrier entries */
}
/* watch the indexing, s->idx might have been incremented above, so
* ||-short-circuit matters here */
if (e->is_barrier && (e->status != MPIDU_SCHED_ENTRY_STATUS_COMPLETE || (s->idx != i+1))) {
if (e->is_barrier && (e->status < MPIDU_SCHED_ENTRY_STATUS_COMPLETE || (s->idx != i+1))) {
/* we've hit a barrier but outstanding operations before this
* barrier remain, so we cannot proceed past the barrier */
break;
......@@ -368,7 +405,7 @@ int MPID_Sched_start(MPID_Sched_t *sp, MPID_Comm *comm, int tag, MPID_Request **
* progress engine can make progress on it */
MPL_DL_APPEND(all_schedules.head, s);
dprintf(stderr, "started schedule s=%p\n", s);
MPIU_DBG_MSG_P(COMM, TYPICAL, "started schedule s=%p\n", s);
MPIDU_Sched_dump(s);
fn_exit:
......@@ -795,7 +832,6 @@ static int MPIDU_Sched_progress_state(struct MPIDU_Sched_state *state, int *made
*made_progress = FALSE;
MPL_DL_FOREACH_SAFE(state->head, s, tmp) {
dprintf(stderr, "making progress on s=%p\n", s);
/*MPIDU_Sched_dump(s);*/
for (i = s->idx; i < s->num_entries; ++i) {
......@@ -804,38 +840,38 @@ static int MPIDU_Sched_progress_state(struct MPIDU_Sched_state *state, int *made
switch (e->type) {
case MPIDU_SCHED_ENTRY_SEND:
if (e->u.send.sreq != NULL && MPID_Request_is_complete(e->u.send.sreq)) {
dprintf(stderr, "completed SEND entry %d, sreq=%p\n", i, e->u.send.sreq);
MPIU_DBG_MSG_FMT(COMM, VERBOSE, (MPIU_DBG_FDEST, "completed SEND entry %d, sreq=%p\n", (int) i, e->u.send.sreq));
e->status = MPIDU_SCHED_ENTRY_STATUS_COMPLETE;
mpi_errno = e->u.send.sreq->status.MPI_ERROR;
/* This wait call won't enter the progress engine.
* It's just a convinient way to pull out the error
* information from the tag. */
MPIR_Process_status(&e->u.send.sreq->status, &s->req->dev.errflag);
MPID_Request_release(e->u.send.sreq);
e->u.send.sreq = NULL;
MPIR_Comm_release(e->u.send.comm, /*isDisconnect=*/FALSE);
dtype_release_if_not_builtin(e->u.send.datatype);
if (mpi_errno) {
if (e->u.send.sreq->status.MPI_ERROR)
e->status = MPIDU_SCHED_ENTRY_STATUS_FAILED;
MPIU_ERR_POP(mpi_errno);
}
else {
else
e->status = MPIDU_SCHED_ENTRY_STATUS_COMPLETE;
}
}
break;
case MPIDU_SCHED_ENTRY_RECV:
if (e->u.recv.rreq != NULL && MPID_Request_is_complete(e->u.recv.rreq)) {
dprintf(stderr, "completed RECV entry %d, rreq=%p\n", i, e->u.recv.rreq);
mpi_errno = e->u.recv.rreq->status.MPI_ERROR;
MPIU_DBG_MSG_FMT(COMM, VERBOSE, (MPIU_DBG_FDEST, "completed RECV entry %d, rreq=%p\n", (int) i, e->u.recv.rreq));
/* This wait call won't enter the progress engine.
* It's just a convinient way to pull out the error
* information from the tag. */
MPIR_Process_status(&e->u.recv.rreq->status, &s->req->dev.errflag);
MPIR_Request_extract_status(e->u.recv.rreq, e->u.recv.status);
MPID_Request_release(e->u.recv.rreq);
e->u.recv.rreq = NULL;
MPIR_Comm_release(e->u.recv.comm, /*isDisconnect=*/FALSE);
dtype_release_if_not_builtin(e->u.recv.datatype);
if (mpi_errno) {
if (e->u.recv.rreq->status.MPI_ERROR)
e->status = MPIDU_SCHED_ENTRY_STATUS_FAILED;
MPIU_ERR_POP(mpi_errno);
}
else {
else
e->status = MPIDU_SCHED_ENTRY_STATUS_COMPLETE;
}
}
break;
default:
......@@ -844,28 +880,40 @@ static int MPIDU_Sched_progress_state(struct MPIDU_Sched_state *state, int *made
break;
}
if (i == s->idx && e->status == MPIDU_SCHED_ENTRY_STATUS_COMPLETE) {
if (i == s->idx && e->status >= MPIDU_SCHED_ENTRY_STATUS_COMPLETE) {
++s->idx;
MPIU_DBG_MSG_D(COMM, VERBOSE, "completed OTHER entry %d\n", (int) i);
if (e->is_barrier) {
dprintf(stderr, "completed barrier in entry %d\n", i);
/* post/perform the next round of operations */
mpi_errno = MPIDU_Sched_continue(s);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
}
else if (e->is_barrier && e->status != MPIDU_SCHED_ENTRY_STATUS_COMPLETE) {
else if (e->is_barrier && e->status < MPIDU_SCHED_ENTRY_STATUS_COMPLETE) {
/* don't process anything after this barrier entry */
break;
}
}
if (s->idx == s->num_entries) {
dprintf(stderr, "completing and dequeuing s=%p r=%p\n", s, s->req);
MPIU_DBG_MSG_FMT(COMM, VERBOSE, (MPIU_DBG_FDEST, "completing and dequeuing s=%p r=%p\n", s, s->req));
/* dequeue this schedule from the state, it's complete */
MPL_DL_DELETE(state->head, s);
/* TODO refactor into a sched_complete routine? */
switch (s->req->dev.errflag) {
case MPIR_ERR_PROC_FAILED:
MPIU_ERR_SET(s->req->status.MPI_ERROR, MPIX_ERR_PROC_FAILED, "**comm");
break;
case MPIR_ERR_OTHER:
MPIU_ERR_SET(s->req->status.MPI_ERROR, MPI_ERR_OTHER, "**comm");
break;
case MPIR_ERR_NONE:
default:
break;
}
MPID_REQUEST_SET_COMPLETED(s->req);
MPID_Request_release(s->req);
s->req = NULL;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment