Commit b971c83b authored by Darius Buntinas's avatar Darius Buntinas
Browse files

[svn-r473] Fixed waitall scalability bug (3830)

parent 2d96925b
......@@ -552,3 +552,72 @@ fn_exit:
return mpi_error;
}
/* MPIR_Grequest_wait: Waits until all generalized requests have
completed. This routine groups grequests by class and calls the
wait_fn on the whole class. */
#undef FUNCNAME
#define FUNCNAME MPIR_Grequest_waitall
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIR_Grequest_waitall(int count,
MPID_Request **request_ptrs,
MPI_Status array_of_statuses[] )
{
MPIX_Grequest_wait_function *wait_fn = NULL;
void ** state_ptrs;
int i, n_greq;
int mpi_error = MPI_SUCCESS;
MPIX_Grequest_class curr_class;
MPIU_CHKLMEM_DECL(1);
MPIU_CHKLMEM_MALLOC(state_ptrs, void *, sizeof(void*)*count, mpi_error, "state_ptrs");
/* loop over all requests, group greqs with the same class and
call wait_fn on the groups. (Only consecutive greqs with the
same class are being grouped) */
n_greq = 0;
for (i = 0; i < count; ++i)
{
/* skip over requests we're not interested in */
if (request_ptrs[i] == NULL || *request_ptrs[i]->cc_ptr == 0 || request_ptrs[i]->kind != MPID_UREQUEST)
continue;
if (n_greq == 0 || request_ptrs[i]->greq_class == curr_class)
{
/* if this is the first grequest of a group, or if it's the
same class as the last one, add its state to the list */
curr_class = request_ptrs[i]->greq_class;
wait_fn = request_ptrs[i]->wait_fn;
state_ptrs[n_greq] = request_ptrs[i]->grequest_extra_state;
++n_greq;
}
else
{
/* greq with a new class: wait on the list of greqs we've
created, then start a new list*/
mpi_error = (wait_fn)(n_greq, state_ptrs, 0, NULL);
if (mpi_error) MPIU_ERR_POP(mpi_error);
curr_class = request_ptrs[i]->greq_class;
wait_fn = request_ptrs[i]->wait_fn;
state_ptrs[0] = request_ptrs[i]->grequest_extra_state;
n_greq = 1;
}
}
if (n_greq)
{
/* wait on the last group of greqs */
mpi_error = (wait_fn)(n_greq, state_ptrs, 0, NULL);
if (mpi_error) MPIU_ERR_POP(mpi_error);
}
fn_exit:
MPIU_CHKLMEM_FREEALL();
return mpi_error;
fn_fail:
goto fn_exit;
}
......@@ -76,8 +76,8 @@ int MPI_Waitall(int count, MPI_Request array_of_requests[],
MPID_Request ** request_ptrs = request_ptr_array;
MPI_Status * status_ptr;
MPID_Progress_state progress_state;
int i;
int n_completed, n_native=0;
int i, j;
int n_completed;
int active_flag;
int rc;
int mpi_errno = MPI_SUCCESS;
......@@ -161,42 +161,51 @@ int MPI_Waitall(int count, MPI_Request array_of_requests[],
MPID_Progress_start(&progress_state);
for(;;)
/* first complete all generalized requests */
mpi_errno = MPIR_Grequest_waitall(count, request_ptrs, array_of_statuses);
if (mpi_errno != MPI_SUCCESS) goto fn_fail;
for (i = 0; i < count; i++)
{
mpi_errno = MPIR_Grequest_progress_poke(count,
request_ptrs, array_of_statuses);
if (mpi_errno != MPI_SUCCESS) goto fn_fail;
if (request_ptrs[i] == NULL) continue;
for (i = 0; i < count; i++)
{
if (request_ptrs[i] != NULL && *request_ptrs[i]->cc_ptr == 0)
{
status_ptr = (array_of_statuses != MPI_STATUSES_IGNORE) ? &array_of_statuses[i] : MPI_STATUS_IGNORE;
rc = MPIR_Request_complete(&array_of_requests[i], request_ptrs[i], status_ptr, &active_flag);
if (rc == MPI_SUCCESS)
{
request_ptrs[i] = NULL;
}
else
{
mpi_errno = MPI_ERR_IN_STATUS;
if (status_ptr != MPI_STATUS_IGNORE)
{
status_ptr->MPI_ERROR = rc;
}
}
n_completed += 1;
}
}
if (mpi_errno == MPI_ERR_IN_STATUS)
{
if (array_of_statuses != MPI_STATUSES_IGNORE)
{
for (i = 0; i < count; i++)
{
if (request_ptrs[i] == NULL)
/* wait for ith request to complete */
while (*request_ptrs[i]->cc_ptr != 0)
{
/* generalized requests should already be finished */
MPIU_Assert(request_ptrs[i]->kind != MPID_UREQUEST);
mpi_errno = MPID_Progress_wait(&progress_state);
if (mpi_errno != MPI_SUCCESS)
{
/* --BEGIN ERROR HANDLING-- */
MPID_Progress_end(&progress_state);
goto fn_fail;
/* --END ERROR HANDLING-- */
}
}
/* complete the request and check the status */
status_ptr = (array_of_statuses != MPI_STATUSES_IGNORE) ? &array_of_statuses[i] : MPI_STATUS_IGNORE;
rc = MPIR_Request_complete(&array_of_requests[i], request_ptrs[i], status_ptr, &active_flag);
if (rc == MPI_SUCCESS)
{
request_ptrs[i] = NULL;
array_of_statuses[i].MPI_ERROR = MPI_SUCCESS;
}
else
{
/* req completed with an error */
mpi_errno = MPI_ERR_IN_STATUS;
if (status_ptr != MPI_STATUS_IGNORE)
{
/* set the error code for this request */
status_ptr->MPI_ERROR = rc;
/* set the error codes for the rest of the uncompleted requests to PENDING */
for (j = i+1; j < count; ++j)
{
if (request_ptrs[i] == NULL)
{
array_of_statuses[i].MPI_ERROR = MPI_SUCCESS;
}
......@@ -206,42 +215,19 @@ int MPI_Waitall(int count, MPI_Request array_of_requests[],
{
array_of_statuses[i].MPI_ERROR = MPI_ERR_PENDING;
}
}
}
}
break;
}
else if (n_completed == count)
{
break;
}
for (i=0; i<count; i++) {
if (request_ptrs[i] != NULL &&
request_ptrs[i]->kind != MPID_UREQUEST)
{
n_native=1;
break;
}
}
if (n_native > 0) {
mpi_errno = MPID_Progress_wait(&progress_state);
if (mpi_errno != MPI_SUCCESS)
{
/* --BEGIN ERROR HANDLING-- */
MPID_Progress_end(&progress_state);
goto fn_fail;
/* --END ERROR HANDLING-- */
}
}
}
}
}
break;
}
}
MPID_Progress_end(&progress_state);
/* ... end of body of routine ... */
fn_exit:
fn_exit:
if (count > MPID_REQUEST_PTR_ARRAY_SIZE)
{
MPIU_CHKLMEM_FREEALL();
......@@ -251,7 +237,7 @@ int MPI_Waitall(int count, MPI_Request array_of_requests[],
MPIU_THREAD_SINGLE_CS_EXIT("pt2pt");
return mpi_errno;
fn_fail:
fn_fail:
/* --BEGIN ERROR HANDLING-- */
#ifdef HAVE_ERROR_CHECKING
mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment