Commit 41a365ec authored by Xin Zhao's avatar Xin Zhao
Browse files

Add blocking ops / targets aggressively cleanup functions.



When we run out of resources for operations and targets,
we need to make the runtime to complete some operations
so that it can free some resources.

For RMA operations, we implement by doing an internal
FLUSH_LOCAL for one target and waiting for operation
resources; for RMA targets, we implement by doing an
internal FLUSH operation for one target and wait for
target resources.
Signed-off-by: Pavan Balaji's avatarPavan Balaji <balaji@anl.gov>
parent ab058906
......@@ -396,6 +396,8 @@ be in the range 0 to %d
**rmaattach:Memory cannot be attached
**rmashared:Memory cannot be shared
**rmaflavor:Incorrect window flavor
**rmanoop:No RMA operation resources can be freed from the window
**rmanotarget:No RMA target resources can be freed from the window
**assert:Invalid assert argument
**lockassertval:Invalid assert argument passed to MPI_Win_lock
**lockassertval %d: Invalid assert argument (%d) passed to MPI_Win_lock
......
......@@ -10,6 +10,8 @@
#include "mpl_utlist.h"
#include "mpid_rma_types.h"
int MPIDI_CH3I_RMA_Cleanup_ops_aggressive(MPID_Win * win_ptr);
int MPIDI_CH3I_RMA_Cleanup_target_aggressive(MPID_Win * win_ptr, MPIDI_RMA_Target_t ** target);
int MPIDI_CH3I_RMA_Make_progress_target(MPID_Win * win_ptr, int target_rank, int *made_progress);
int MPIDI_CH3I_RMA_Make_progress_win(MPID_Win * win_ptr, int *made_progress);
......@@ -166,7 +168,10 @@ static inline int MPIDI_CH3I_Win_create_target(MPID_Win * win_ptr, int target_ra
slot = &(win_ptr->slots[target_rank]);
t = MPIDI_CH3I_Win_target_alloc(win_ptr);
MPIU_ERR_CHKANDJUMP(t == NULL, mpi_errno, MPI_ERR_OTHER, "**nomem");
if (t == NULL) {
mpi_errno = MPIDI_CH3I_RMA_Cleanup_target_aggressive(win_ptr, &t);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
}
t->target_rank = target_rank;
......@@ -580,7 +585,10 @@ static inline int MPIDI_CH3I_RMA_Ops_alloc_tail(MPID_Win * win_ptr, MPIDI_RMA_Op
MPIDI_RMA_Op_t *tmp_ptr;
tmp_ptr = MPIDI_CH3I_Win_op_alloc(win_ptr);
MPIU_ERR_CHKANDJUMP(tmp_ptr == NULL, mpi_errno, MPI_ERR_OTHER, "**nomem");
if (tmp_ptr == NULL) {
mpi_errno = MPIDI_CH3I_RMA_Cleanup_ops_aggressive(win_ptr, &tmp_ptr);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
}
MPL_LL_APPEND(*list, *list_tail, tmp_ptr);
......
......@@ -422,6 +422,156 @@ static inline int issue_ops_win(MPID_Win *win_ptr, int *made_progress)
}
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_RMA_Cleanup_ops_aggressive
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3I_RMA_Cleanup_ops_aggressive(MPID_Win * win_ptr)
{
int i, local_completed = 0, remote_completed = 0;
int mpi_errno = MPI_SUCCESS;
MPIDI_RMA_Target_t *curr_target = NULL;
int made_progress = 0;
/* If we are in an aggressive cleanup, the window must be holding
* up resources. If it isn't, we are in the wrong window and
* incorrectly entered this function. */
MPIU_ERR_CHKANDJUMP(win_ptr->non_empty_slots == 0, mpi_errno, MPI_ERR_OTHER,
"**rmanoop");
/* find the first target that has something to issue */
for (i = 0; i < win_ptr->num_slots; i++) {
if (win_ptr->slots[i].target_list != NULL) {
curr_target = win_ptr->slots[i].target_list;
while (curr_target != NULL && curr_target->pending_op_list == NULL)
curr_target = curr_target->next;
if (curr_target != NULL) break;
}
}
if (curr_target == NULL) goto fn_exit;
if (curr_target->sync.sync_flag < MPIDI_RMA_SYNC_FLUSH_LOCAL)
curr_target->sync.sync_flag = MPIDI_RMA_SYNC_FLUSH_LOCAL;
/* Issue out all operations. */
mpi_errno = MPIDI_CH3I_RMA_Make_progress_target(win_ptr, curr_target->target_rank,
&made_progress);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
/* Wait for local completion. */
do {
mpi_errno = MPIDI_CH3I_RMA_Cleanup_ops_target(win_ptr, curr_target,
&local_completed,
&remote_completed);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
if (!local_completed) {
mpi_errno = wait_progress_engine();
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
} while (!local_completed);
fn_exit:
return mpi_errno;
fn_fail:
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_RMA_Cleanup_target_aggressive
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3I_RMA_Cleanup_target_aggressive(MPID_Win * win_ptr, MPIDI_RMA_Target_t ** target)
{
int i, local_completed = 0, remote_completed = 0;
int made_progress = 0;
MPIDI_RMA_Target_t *curr_target = NULL;
int mpi_errno = MPI_SUCCESS;
(*target) = NULL;
/* If we are in an aggressive cleanup, the window must be holding
* up resources. If it isn't, we are in the wrong window and
* incorrectly entered this function. */
MPIU_ERR_CHKANDJUMP(win_ptr->non_empty_slots == 0, mpi_errno, MPI_ERR_OTHER,
"**rmanotarget");
if (win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_CALLED) {
/* switch to window-wide protocol */
MPIDI_VC_t *orig_vc = NULL, *target_vc = NULL;
MPIDI_Comm_get_vc(win_ptr->comm_ptr, win_ptr->comm_ptr->rank, &orig_vc);
for (i = 0; i < win_ptr->comm_ptr->local_size; i++) {
if (i == win_ptr->comm_ptr->rank)
continue;
MPIDI_Comm_get_vc(win_ptr->comm_ptr, i, &target_vc);
if (orig_vc->node_id != target_vc->node_id) {
mpi_errno = MPIDI_CH3I_Win_find_target(win_ptr, i, &curr_target);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
if (curr_target == NULL) {
win_ptr->outstanding_locks++;
mpi_errno = send_lock_msg(i, MPI_LOCK_SHARED, win_ptr);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
}
}
win_ptr->states.access_state = MPIDI_RMA_LOCK_ALL_ISSUED;
}
do {
/* find a non-empty slot and set the FLUSH flag on the first
* target */
/* TODO: we should think about better strategies on selecting the target */
for (i = 0; i < win_ptr->num_slots; i++)
if (win_ptr->slots[i].target_list != NULL)
break;
curr_target = win_ptr->slots[i].target_list;
if (curr_target->sync.sync_flag < MPIDI_RMA_SYNC_FLUSH) {
curr_target->sync.sync_flag = MPIDI_RMA_SYNC_FLUSH;
curr_target->sync.have_remote_incomplete_ops = 0;
curr_target->sync.outstanding_acks++;
}
/* Issue out all operations. */
mpi_errno = MPIDI_CH3I_RMA_Make_progress_target(win_ptr, curr_target->target_rank,
&made_progress);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
/* Wait for remote completion. */
do {
mpi_errno = MPIDI_CH3I_RMA_Cleanup_ops_target(win_ptr, curr_target,
&local_completed,
&remote_completed);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
if (!remote_completed) {
mpi_errno = wait_progress_engine();
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
} while (!remote_completed);
/* Cleanup the target. */
mpi_errno = MPIDI_CH3I_RMA_Cleanup_single_target(win_ptr, curr_target);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
/* check if we got a target */
(*target) = MPIDI_CH3I_Win_target_alloc(win_ptr);
} while ((*target) == NULL);
fn_exit:
return mpi_errno;
fn_fail:
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_RMA_Make_progress_target
#undef FCNAME
......
......@@ -73,7 +73,10 @@ int MPIDI_Put(const void *origin_addr, int origin_count, MPI_Datatype
/* queue it up */
new_ptr = MPIDI_CH3I_Win_op_alloc(win_ptr);
MPIU_ERR_CHKANDJUMP(new_ptr == NULL, mpi_errno, MPI_ERR_OTHER, "**nomem");
if (new_ptr == NULL) {
mpi_errno = MPIDI_CH3I_RMA_Cleanup_ops_aggressive(win_ptr, &new_ptr);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
}
put_pkt = &(new_ptr->pkt.put);
MPIDI_Pkt_init(put_pkt, MPIDI_CH3_PKT_PUT);
......@@ -183,7 +186,10 @@ int MPIDI_Get(void *origin_addr, int origin_count, MPI_Datatype
/* queue it up */
new_ptr = MPIDI_CH3I_Win_op_alloc(win_ptr);
MPIU_ERR_CHKANDJUMP(new_ptr == NULL, mpi_errno, MPI_ERR_OTHER, "**nomem");
if (new_ptr == NULL) {
mpi_errno = MPIDI_CH3I_RMA_Cleanup_ops_aggressive(win_ptr, &new_ptr);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
}
get_pkt = &(new_ptr->pkt.get);
MPIDI_Pkt_init(get_pkt, MPIDI_CH3_PKT_GET);
......@@ -294,7 +300,10 @@ int MPIDI_Accumulate(const void *origin_addr, int origin_count, MPI_Datatype
/* queue it up */
new_ptr = MPIDI_CH3I_Win_op_alloc(win_ptr);
MPIU_ERR_CHKANDJUMP(new_ptr == NULL, mpi_errno, MPI_ERR_OTHER, "**nomem");
if (new_ptr == NULL) {
mpi_errno = MPIDI_CH3I_RMA_Cleanup_ops_aggressive(win_ptr, &new_ptr);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
}
/* If predefined and contiguous, use a simplified element */
if (MPIR_DATATYPE_IS_PREDEFINED(origin_datatype) &&
......@@ -442,7 +451,10 @@ int MPIDI_Get_accumulate(const void *origin_addr, int origin_count,
/* Append the operation to the window's RMA ops queue */
new_ptr = MPIDI_CH3I_Win_op_alloc(win_ptr);
MPIU_ERR_CHKANDJUMP(new_ptr == NULL, mpi_errno, MPI_ERR_OTHER, "**nomem");
if (new_ptr == NULL) {
mpi_errno = MPIDI_CH3I_RMA_Cleanup_ops_aggressive(win_ptr, &new_ptr);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
}
/* TODO: Can we use the MPIDI_RMA_ACC_CONTIG optimization? */
......@@ -578,7 +590,10 @@ int MPIDI_Compare_and_swap(const void *origin_addr, const void *compare_addr,
/* Append this operation to the RMA ops queue */
new_ptr = MPIDI_CH3I_Win_op_alloc(win_ptr);
MPIU_ERR_CHKANDJUMP(new_ptr == NULL, mpi_errno, MPI_ERR_OTHER, "**nomem");
if (new_ptr == NULL) {
mpi_errno = MPIDI_CH3I_RMA_Cleanup_ops_aggressive(win_ptr, &new_ptr);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
}
cas_pkt = &(new_ptr->pkt.cas);
MPIDI_Pkt_init(cas_pkt, MPIDI_CH3_PKT_CAS);
......@@ -670,7 +685,10 @@ int MPIDI_Fetch_and_op(const void *origin_addr, void *result_addr,
/* Append this operation to the RMA ops queue */
new_ptr = MPIDI_CH3I_Win_op_alloc(win_ptr);
MPIU_ERR_CHKANDJUMP(new_ptr == NULL, mpi_errno, MPI_ERR_OTHER, "**nomem");
if (new_ptr == NULL) {
mpi_errno = MPIDI_CH3I_RMA_Cleanup_ops_aggressive(win_ptr, &new_ptr);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
}
fop_pkt = &(new_ptr->pkt.fop);
MPIDI_Pkt_init(fop_pkt, MPIDI_CH3_PKT_FOP);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment