ch3u_rma_progress.c 30.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/*
 *  (C) 2001 by Argonne National Laboratory.
 *      See COPYRIGHT in top-level directory.
 */

#include "mpidimpl.h"
#include "mpidrma.h"

/*
=== BEGIN_MPI_T_CVAR_INFO_BLOCK ===

cvars:
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
    - name        : MPIR_CVAR_CH3_RMA_ACTIVE_REQ_THRESHOLD
      category    : CH3
      type        : int
      default     : 2097152
      class       : none
      verbosity   : MPI_T_VERBOSITY_USER_BASIC
      scope       : MPI_T_SCOPE_ALL_EQ
      description : >-
         Threshold of number of active requests to trigger
         blocking waiting in operation routines. When the
         value is negative, we never blockingly wait in
         operation routines. When the value is zero, we always
         trigger blocking waiting in operation routines to
         wait until no. of active requests becomes zero. When the
         value is positive, we do blocking waiting in operation
         routines to wait until no. of active requests being
         reduced to this value.

32 33 34
=== END_MPI_T_CVAR_INFO_BLOCK ===
*/

35
static inline int check_and_switch_target_state(MPID_Win * win_ptr, MPIDI_RMA_Target_t * target,
36 37
                                                int *is_able_to_issue, int *made_progress);
static inline int check_and_switch_window_state(MPID_Win * win_ptr, int *is_able_to_issue,
38
                                                int *made_progress);
39 40
static inline int issue_ops_target(MPID_Win * win_ptr, MPIDI_RMA_Target_t * target,
                                   int *made_progress);
41 42
static inline int issue_ops_win(MPID_Win * win_ptr, int *made_progress);

43
/* check if we can switch window-wide state: FENCE_ISSUED, PSCW_ISSUED, LOCK_ALL_ISSUED */
44
#undef FUNCNAME
45
#define FUNCNAME check_and_switch_window_state
46 47
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
48 49
static inline int check_and_switch_window_state(MPID_Win * win_ptr, int *is_able_to_issue,
                                                int *made_progress)
50
{
51
    MPID_Request *fence_req_ptr = NULL;
52
    int i, mpi_errno = MPI_SUCCESS;
53
    MPIDI_STATE_DECL(MPID_STATE_CHECK_AND_SWITCH_WINDOW_STATE);
54

55
    MPIDI_RMA_FUNC_ENTER(MPID_STATE_CHECK_AND_SWITCH_WINDOW_STATE);
56 57

    (*made_progress) = 0;
58
    (*is_able_to_issue) = 0;
59

60 61
    switch (win_ptr->states.access_state) {
    case MPIDI_RMA_FENCE_ISSUED:
62 63 64
        MPID_Request_get_ptr(win_ptr->fence_sync_req, fence_req_ptr);
        if (MPID_Request_is_complete(fence_req_ptr)) {
            win_ptr->states.access_state = MPIDI_RMA_FENCE_GRANTED;
65 66
            (*is_able_to_issue) = 1;

67 68 69
            MPID_Request_release(fence_req_ptr);
            win_ptr->fence_sync_req = MPI_REQUEST_NULL;

70 71
            MPIDI_CH3I_num_active_issued_win--;
            MPIU_Assert(MPIDI_CH3I_num_active_issued_win >= 0);
72

73 74
            (*made_progress) = 1;
        }
75 76 77
        break;

    case MPIDI_RMA_PSCW_ISSUED:
78 79
        if (win_ptr->start_req == NULL) {
            /* for MPI_MODE_NOCHECK and all targets on SHM,
80
             * we do not create PSCW requests on window. */
81
            win_ptr->states.access_state = MPIDI_RMA_PSCW_GRANTED;
82
            (*is_able_to_issue) = 1;
83

84 85
            MPIDI_CH3I_num_active_issued_win--;
            MPIU_Assert(MPIDI_CH3I_num_active_issued_win >= 0);
86

87 88 89 90 91 92 93 94 95 96 97 98 99
            (*made_progress) = 1;
        }
        else {
            for (i = 0; i < win_ptr->start_grp_size; i++) {
                MPID_Request *start_req_ptr = NULL;
                if (win_ptr->start_req[i] == MPI_REQUEST_NULL)
                    continue;
                MPID_Request_get_ptr(win_ptr->start_req[i], start_req_ptr);
                if (MPID_Request_is_complete(start_req_ptr)) {
                    MPID_Request_release(start_req_ptr);
                    win_ptr->start_req[i] = MPI_REQUEST_NULL;
                }
                else {
100
                    break;
101 102 103
                }
            }

104 105
            if (i == win_ptr->start_grp_size) {
                win_ptr->states.access_state = MPIDI_RMA_PSCW_GRANTED;
106
                (*is_able_to_issue) = 1;
107

108 109
                MPIDI_CH3I_num_active_issued_win--;
                MPIU_Assert(MPIDI_CH3I_num_active_issued_win >= 0);
110

111 112 113 114 115
                (*made_progress) = 1;

                MPIU_Free(win_ptr->start_req);
                win_ptr->start_req = NULL;
            }
116
        }
117 118 119
        break;

    case MPIDI_RMA_LOCK_ALL_ISSUED:
120 121
        if (win_ptr->outstanding_locks == 0) {
            win_ptr->states.access_state = MPIDI_RMA_LOCK_ALL_GRANTED;
122 123
            (*is_able_to_issue) = 1;

124 125
            (*made_progress) = 1;
        }
126 127
        break;

128 129 130 131 132 133 134 135
    case MPIDI_RMA_PER_TARGET:
    case MPIDI_RMA_LOCK_ALL_CALLED:
    case MPIDI_RMA_FENCE_GRANTED:
    case MPIDI_RMA_PSCW_GRANTED:
    case MPIDI_RMA_LOCK_ALL_GRANTED:
        (*is_able_to_issue) = 1;
        break;

136 137
    default:
        break;
138
    }   /* end of switch */
139 140

  fn_exit:
141
    MPIDI_RMA_FUNC_EXIT(MPID_STATE_CHECK_AND_SWITCH_WINDOW_STATE);
142 143 144 145 146 147 148 149 150
    return mpi_errno;
    /* --BEGIN ERROR HANDLING-- */
  fn_fail:
    goto fn_exit;
    /* --END ERROR HANDLING-- */
}


#undef FUNCNAME
151
#define FUNCNAME check_and_switch_target_state
152 153
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
154
static inline int check_and_switch_target_state(MPID_Win * win_ptr, MPIDI_RMA_Target_t * target,
155
                                                int *is_able_to_issue, int *made_progress)
156 157 158 159 160
{
    int rank = win_ptr->comm_ptr->rank;
    int mpi_errno = MPI_SUCCESS;

    (*made_progress) = 0;
161
    (*is_able_to_issue) = 0;
162

163
    if (target == NULL)
164 165
        goto fn_exit;

166 167 168 169 170
    switch (target->access_state) {
    case MPIDI_RMA_LOCK_CALLED:
        if (target->sync.sync_flag == MPIDI_RMA_SYNC_NONE ||
            target->sync.sync_flag == MPIDI_RMA_SYNC_FLUSH_LOCAL ||
            target->sync.sync_flag == MPIDI_RMA_SYNC_FLUSH) {
171 172
            if (target->pending_op_list_head == NULL ||
                !target->pending_op_list_head->piggyback_lock_candidate) {
173 174 175 176
                /* issue lock request */
                target->access_state = MPIDI_RMA_LOCK_ISSUED;
                if (target->target_rank == rank) {
                    mpi_errno = acquire_local_lock(win_ptr, target->lock_type);
177 178
                    if (mpi_errno != MPI_SUCCESS)
                        MPIU_ERR_POP(mpi_errno);
179 180
                }
                else {
181 182 183
                    mpi_errno = send_lock_msg(target->target_rank, target->lock_type, win_ptr);
                    if (mpi_errno != MPI_SUCCESS)
                        MPIU_ERR_POP(mpi_errno);
184 185 186 187 188 189
                }

                (*made_progress) = 1;
            }
        }
        else if (target->sync.sync_flag == MPIDI_RMA_SYNC_UNLOCK) {
190
            if (target->pending_op_list_head == NULL) {
191
                /* No RMA operation has ever been posted to this target,
192 193
                 * finish issuing, no need to acquire the lock. Cleanup
                 * function will clean it up. */
194 195 196 197 198 199 200 201 202 203 204 205
                target->access_state = MPIDI_RMA_LOCK_GRANTED;

                target->sync.outstanding_acks--;
                MPIU_Assert(target->sync.outstanding_acks >= 0);

                /* We are done with ending synchronization, unset target's sync_flag. */
                target->sync.sync_flag = MPIDI_RMA_SYNC_NONE;

                (*made_progress) = 1;
            }
            else {
                /* if we reach WIN_UNLOCK and there is still operation existing
206 207
                 * in pending list, this operation must be the only operation
                 * and it is prepared to piggyback LOCK and UNLOCK. */
208 209
                MPIU_Assert(target->pending_op_list_head->next == NULL);
                MPIU_Assert(target->pending_op_list_head->piggyback_lock_candidate);
210 211 212 213 214 215 216
            }
        }
        break;

    case MPIDI_RMA_LOCK_GRANTED:
    case MPIDI_RMA_NONE:
        if (target->sync.sync_flag == MPIDI_RMA_SYNC_FLUSH) {
217
            if (target->pending_op_list_head == NULL) {
218 219 220 221 222 223 224
                if (target->target_rank == rank) {
                    target->sync.outstanding_acks--;
                    MPIU_Assert(target->sync.outstanding_acks >= 0);
                }
                else {
                    if (target->put_acc_issued) {
                        mpi_errno = send_flush_msg(target->target_rank, win_ptr);
225 226
                        if (mpi_errno != MPI_SUCCESS)
                            MPIU_ERR_POP(mpi_errno);
227 228
                    }
                    else {
229
                        /* We did not issue PUT/ACC since the last
230 231
                         * synchronization call, therefore here we
                         * don't need ACK back */
232 233
                        target->sync.outstanding_acks--;
                        MPIU_Assert(target->sync.outstanding_acks >= 0);
234 235
                    }
                }
236 237 238 239 240

                /* We are done with ending synchronization, unset target's sync_flag. */
                target->sync.sync_flag = MPIDI_RMA_SYNC_NONE;

                (*made_progress) = 1;
241
            }
242 243
        }
        else if (target->sync.sync_flag == MPIDI_RMA_SYNC_UNLOCK) {
244
            if (target->pending_op_list_head == NULL) {
245
                if (target->target_rank == rank) {
246
                    target->sync.outstanding_acks--;
247
                    MPIU_Assert(target->sync.outstanding_acks >= 0);
248

249
                    mpi_errno = MPIDI_CH3I_Release_lock(win_ptr);
250 251
                    if (mpi_errno != MPI_SUCCESS)
                        MPIU_ERR_POP(mpi_errno);
252
                }
253 254 255 256
                else {
                    MPIDI_CH3_Pkt_flags_t flag = MPIDI_CH3_PKT_FLAG_NONE;
                    if (!target->put_acc_issued) {
                        /* We did not issue PUT/ACC since the last
257 258
                         * synchronization call, therefore here we
                         * don't need ACK back */
259 260 261 262 263 264
                        target->sync.outstanding_acks--;
                        MPIU_Assert(target->sync.outstanding_acks >= 0);

                        flag = MPIDI_CH3_PKT_FLAG_RMA_UNLOCK_NO_ACK;
                    }
                    mpi_errno = send_unlock_msg(target->target_rank, win_ptr, flag);
265 266
                    if (mpi_errno != MPI_SUCCESS)
                        MPIU_ERR_POP(mpi_errno);
267 268 269 270 271
                }

                /* We are done with ending synchronization, unset target's sync_flag. */
                target->sync.sync_flag = MPIDI_RMA_SYNC_NONE;

272 273 274
                (*made_progress) = 1;
            }
        }
275
        break;
276

277 278
    default:
        break;
279
    }   /* end of switch */
280

281 282 283 284
    if (target->pending_op_list_head != NULL && target->access_state != MPIDI_RMA_LOCK_ISSUED) {
        (*is_able_to_issue) = 1;
    }

285
  fn_exit:
286
    return mpi_errno;
287
  fn_fail:
288 289
    goto fn_exit;
}
290 291


292 293 294 295
#undef FUNCNAME
#define FUNCNAME issue_ops_target
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
296
static inline int issue_ops_target(MPID_Win * win_ptr, MPIDI_RMA_Target_t * target,
297 298 299 300 301
                                   int *made_progress)
{
    MPIDI_RMA_Op_t *curr_op = NULL;
    MPIDI_CH3_Pkt_flags_t flags;
    int first_op = 1, mpi_errno = MPI_SUCCESS;
302

303
    (*made_progress) = 0;
304

305 306
    if (win_ptr->non_empty_slots == 0 || target == NULL)
        goto fn_exit;
307

308 309 310 311
    /* Issue out operations in the list. */
    curr_op = target->next_op_to_issue;
    while (curr_op != NULL) {

312 313
        if (target->access_state == MPIDI_RMA_LOCK_ISSUED) {
            /* It is possible that the previous OP+LOCK changes
314
             * lock state to LOCK_ISSUED. */
315 316
            break;
        }
317 318

        if (curr_op->next == NULL &&
319
            target->sync.sync_flag == MPIDI_RMA_SYNC_NONE && curr_op->ureq == NULL) {
320
            /* Skip the last OP if sync_flag is NONE since we
321 322 323 324 325
             * want to leave it to the ending synchronization
             * so that we can piggyback LOCK / FLUSH.
             * However, if it is a request-based RMA, do not
             * skip it (otherwise a wait call before unlock
             * will be blocked). */
326
            break;
327 328
        }

329 330
        flags = MPIDI_CH3_PKT_FLAG_NONE;

331 332 333 334
        if (first_op) {
            /* piggyback on first OP. */
            if (target->access_state == MPIDI_RMA_LOCK_CALLED) {
                MPIU_Assert(curr_op->piggyback_lock_candidate);
335 336 337 338 339 340
                if (target->lock_type == MPI_LOCK_SHARED)
                    flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED;
                else {
                    MPIU_Assert(target->lock_type == MPI_LOCK_EXCLUSIVE);
                    flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE;
                }
341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361
                target->access_state = MPIDI_RMA_LOCK_ISSUED;
            }
            first_op = 0;
        }

        if (curr_op->next == NULL) {
            /* piggyback on last OP. */
            if (target->sync.sync_flag == MPIDI_RMA_SYNC_FLUSH) {
                flags |= MPIDI_CH3_PKT_FLAG_RMA_FLUSH;
                if (target->win_complete_flag)
                    flags |= MPIDI_CH3_PKT_FLAG_RMA_DECR_AT_COUNTER;
            }
            else if (target->sync.sync_flag == MPIDI_RMA_SYNC_UNLOCK) {
                flags |= MPIDI_CH3_PKT_FLAG_RMA_UNLOCK;
            }
        }

        mpi_errno = issue_rma_op(curr_op, win_ptr, target, flags);
        if (mpi_errno != MPI_SUCCESS)
            MPIU_ERR_POP(mpi_errno);

362 363
        (*made_progress) = 1;

364
        if (curr_op->pkt.type == MPIDI_CH3_PKT_PUT ||
365 366 367
            curr_op->pkt.type == MPIDI_CH3_PKT_PUT_IMMED ||
            curr_op->pkt.type == MPIDI_CH3_PKT_ACCUMULATE ||
            curr_op->pkt.type == MPIDI_CH3_PKT_ACCUMULATE_IMMED) {
368
            target->put_acc_issued = 1; /* set PUT_ACC_FLAG when sending
369
                                         * PUT/ACC operation. */
370 371
        }

372 373 374 375 376 377 378 379 380 381
        if ((curr_op->pkt.type == MPIDI_CH3_PKT_ACCUMULATE ||
             curr_op->pkt.type == MPIDI_CH3_PKT_GET_ACCUM) && curr_op->issued_stream_count > 0) {
            /* For ACC-like operations, if not all stream units
             * are issued out, we stick to the current operation,
             * otherwise we move on to the next operation. */
            target->next_op_to_issue = curr_op;
        }
        else
            target->next_op_to_issue = curr_op->next;

382 383 384 385 386 387 388
        if (target->next_op_to_issue == NULL) {
            if (flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH || flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK) {
                /* We are done with ending sync, unset target's sync_flag. */
                target->sync.sync_flag = MPIDI_RMA_SYNC_NONE;
            }
        }

389 390
        if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
            flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE) {
391
            /* If this operation is piggybacked with LOCK,
392 393 394
             * do not move it out of pending list, and do
             * not complete the user request, because we
             * may need to re-transmit it. */
395 396 397
            break;
        }

398 399 400 401 402
        if (curr_op->ureq != NULL) {
            mpi_errno = set_user_req_after_issuing_op(curr_op);
            if (mpi_errno != MPI_SUCCESS)
                MPIU_ERR_POP(mpi_errno);
        }
403

404 405
        if (curr_op->reqs_size == 0) {
            MPIU_Assert(curr_op->reqs == NULL);
406
            /* Sending is completed immediately. */
407
            MPIDI_CH3I_RMA_Ops_free_elem(win_ptr, &(target->pending_op_list_head),
408 409 410
                                         &(target->pending_op_list_tail), curr_op);
        }
        else {
411 412 413
            MPI_Datatype target_datatype;
            int is_derived = FALSE;

414
            /* Sending is not completed immediately. */
415

416
            MPIDI_CH3I_RMA_Ops_unlink(&(target->pending_op_list_head),
417
                                      &(target->pending_op_list_tail), curr_op);
418 419 420 421 422 423 424 425 426 427 428 429 430

            MPIDI_CH3_PKT_RMA_GET_TARGET_DATATYPE(curr_op->pkt, target_datatype, mpi_errno);

            if ((target_datatype != MPI_DATATYPE_NULL &&
                 !MPIR_DATATYPE_IS_PREDEFINED(target_datatype)) ||
                (curr_op->origin_datatype != MPI_DATATYPE_NULL &&
                 !MPIR_DATATYPE_IS_PREDEFINED(curr_op->origin_datatype)) ||
                (curr_op->result_datatype != MPI_DATATYPE_NULL &&
                 !MPIR_DATATYPE_IS_PREDEFINED(curr_op->result_datatype))) {
                is_derived = TRUE;
            }

            if (is_derived) {
431 432
                MPIDI_CH3I_RMA_Ops_append(&(target->issued_dt_op_list_head),
                                          &(target->issued_dt_op_list_tail), curr_op);
433 434
            }
            else if (curr_op->pkt.type == MPIDI_CH3_PKT_PUT ||
435 436 437
                     curr_op->pkt.type == MPIDI_CH3_PKT_PUT_IMMED ||
                     curr_op->pkt.type == MPIDI_CH3_PKT_ACCUMULATE ||
                     curr_op->pkt.type == MPIDI_CH3_PKT_ACCUMULATE_IMMED) {
438 439
                MPIDI_CH3I_RMA_Ops_append(&(target->issued_write_op_list_head),
                                          &(target->issued_write_op_list_tail), curr_op);
440 441
            }
            else {
442 443
                MPIDI_CH3I_RMA_Ops_append(&(target->issued_read_op_list_head),
                                          &(target->issued_read_op_list_tail), curr_op);
444 445 446 447 448
            }
        }

        curr_op = target->next_op_to_issue;

449
    }   /* end of while loop */
450 451 452 453 454 455 456 457 458 459 460

  fn_exit:
    return mpi_errno;
  fn_fail:
    goto fn_exit;
}

#undef FUNCNAME
#define FUNCNAME issue_ops_win
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
461
static inline int issue_ops_win(MPID_Win * win_ptr, int *made_progress)
462 463
{
    int mpi_errno = MPI_SUCCESS;
464
    int start_slot, end_slot, i, idx;
465 466 467 468 469 470 471
    MPIDI_RMA_Target_t *target = NULL;

    (*made_progress) = 0;

    if (win_ptr->non_empty_slots == 0)
        goto fn_exit;

472
    /* FIXME: we should optimize the issuing pattern here. */
473 474 475 476

    start_slot = win_ptr->comm_ptr->rank % win_ptr->num_slots;
    end_slot = start_slot + win_ptr->num_slots;
    for (i = start_slot; i < end_slot; i++) {
477 478 479 480
        if (i < win_ptr->num_slots)
            idx = i;
        else
            idx = i - win_ptr->num_slots;
481

482
        target = win_ptr->slots[idx].target_list_head;
483
        while (target != NULL) {
484
            int temp_progress = 0;
485
            int is_able_to_issue = 0;
486 487

            /* check target state */
488 489
            mpi_errno = check_and_switch_target_state(win_ptr, target, &is_able_to_issue,
                                                      &temp_progress);
490 491 492 493
            if (mpi_errno != MPI_SUCCESS)
                MPIU_ERR_POP(mpi_errno);
            if (temp_progress)
                (*made_progress) = 1;
494 495 496 497
            if (!is_able_to_issue) {
                target = target->next;
                continue;
            }
498

499 500
            /* issue operations to this target */
            mpi_errno = issue_ops_target(win_ptr, target, &temp_progress);
501 502 503 504
            if (mpi_errno != MPI_SUCCESS)
                MPIU_ERR_POP(mpi_errno);
            if (temp_progress)
                (*made_progress) = 1;
505 506 507 508 509

            target = target->next;
        }
    }

510
  fn_exit:
511
    return mpi_errno;
512
  fn_fail:
513 514 515 516
    goto fn_exit;
}


517 518 519 520 521 522 523 524
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_RMA_Free_ops_before_completion
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3I_RMA_Free_ops_before_completion(MPID_Win * win_ptr)
{
    MPIDI_RMA_Op_t *curr_op = NULL;
    MPIDI_RMA_Target_t *curr_target = NULL;
525
    MPIDI_RMA_Op_t **op_list_head = NULL, **op_list_tail = NULL;
526 527 528 529
    int read_flag = 0;
    int i, made_progress = 0;
    int mpi_errno = MPI_SUCCESS;

530 531 532
    /* If we are in an free_ops_before_completion, the window must be holding
     * up resources.  If it isn't, we are in the wrong window and
     * incorrectly entered this function. */
533
    MPIU_ERR_CHKANDJUMP(win_ptr->non_empty_slots == 0, mpi_errno, MPI_ERR_OTHER, "**rmanoop");
534 535

    /* make nonblocking progress once */
536
    mpi_errno = MPIDI_CH3I_RMA_Make_progress_win(win_ptr, &made_progress);
537 538
    if (mpi_errno != MPI_SUCCESS)
        MPIU_ERR_POP(mpi_errno);
539

540
    if (win_ptr->states.access_state == MPIDI_RMA_FENCE_ISSUED ||
541 542
        win_ptr->states.access_state == MPIDI_RMA_PSCW_ISSUED ||
        win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_ISSUED)
543 544 545 546
        goto fn_exit;

    /* find targets that have operations */
    for (i = 0; i < win_ptr->num_slots; i++) {
547 548
        if (win_ptr->slots[i].target_list_head != NULL) {
            curr_target = win_ptr->slots[i].target_list_head;
549
            while (curr_target != NULL) {
550 551
                if (curr_target->issued_read_op_list_head != NULL ||
                    curr_target->issued_write_op_list_head != NULL) {
552 553 554 555 556 557 558 559 560
                    if (win_ptr->states.access_state == MPIDI_RMA_PER_TARGET ||
                        win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_CALLED) {
                        if (curr_target->access_state == MPIDI_RMA_LOCK_GRANTED)
                            break;
                    }
                    else {
                        break;
                    }
                }
561
                curr_target = curr_target->next;
562
            }
563 564
            if (curr_target != NULL)
                break;
565 566
        }
    }
567

568 569
    if (curr_target == NULL)
        goto fn_exit;
570

571
    /* After we do this, all following Win_flush_local
572
     * must do a Win_flush instead. */
573
    curr_target->sync.upgrade_flush_local = 1;
574

575 576 577 578 579
    op_list_head = &curr_target->issued_read_op_list_head;
    op_list_tail = &curr_target->issued_read_op_list_tail;
    read_flag = 1;

    curr_op = *op_list_head;
580 581

    /* free all ops in the list since we do not need to maintain them anymore */
582 583 584 585 586 587 588 589 590 591
    while (1) {
        if (curr_op != NULL) {
            if (curr_op->reqs_size > 0) {
                MPIU_Assert(curr_op->reqs != NULL);
                for (i = 0; i < curr_op->reqs_size; i++) {
                    if (curr_op->reqs[i] != NULL) {
                        MPID_Request_release(curr_op->reqs[i]);
                        curr_op->reqs[i] = NULL;
                        win_ptr->active_req_cnt--;
                    }
592 593
                }

594 595 596 597 598 599 600
                /* free req array in this op */
                MPIU_Free(curr_op->reqs);
                curr_op->reqs = NULL;
                curr_op->reqs_size = 0;
            }
            MPL_LL_DELETE(*op_list_head, *op_list_tail, curr_op);
            MPIDI_CH3I_Win_op_free(win_ptr, curr_op);
601
        }
602
        else {
603
            if (read_flag == 1) {
604 605
                op_list_head = &curr_target->issued_write_op_list_head;
                op_list_head = &curr_target->issued_write_op_list_tail;
606 607
                read_flag = 0;
            }
608 609 610 611
            else {
                /* we reach the tail of write_op_list, break out. */
                break;
            }
612
        }
613
        curr_op = *op_list_head;
614
    }
615

616
  fn_exit:
617
    return mpi_errno;
618
  fn_fail:
619 620 621 622
    goto fn_exit;
}


623 624 625 626 627 628
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_RMA_Cleanup_ops_aggressive
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3I_RMA_Cleanup_ops_aggressive(MPID_Win * win_ptr)
{
629
    int i, local_completed = 0, remote_completed ATTRIBUTE((unused)) = 0;
630 631 632 633 634 635 636
    int mpi_errno = MPI_SUCCESS;
    MPIDI_RMA_Target_t *curr_target = NULL;
    int made_progress = 0;

    /* If we are in an aggressive cleanup, the window must be holding
     * up resources.  If it isn't, we are in the wrong window and
     * incorrectly entered this function. */
637
    MPIU_ERR_CHKANDJUMP(win_ptr->non_empty_slots == 0, mpi_errno, MPI_ERR_OTHER, "**rmanoop");
638 639 640

    /* find the first target that has something to issue */
    for (i = 0; i < win_ptr->num_slots; i++) {
641 642 643
        if (win_ptr->slots[i].target_list_head != NULL) {
            curr_target = win_ptr->slots[i].target_list_head;
            while (curr_target != NULL && curr_target->pending_op_list_head == NULL)
644
                curr_target = curr_target->next;
645 646
            if (curr_target != NULL)
                break;
647 648 649
        }
    }

650 651
    if (curr_target == NULL)
        goto fn_exit;
652 653 654 655 656 657 658 659 660 661 662 663

    if (curr_target->sync.sync_flag < MPIDI_RMA_SYNC_FLUSH_LOCAL)
        curr_target->sync.sync_flag = MPIDI_RMA_SYNC_FLUSH_LOCAL;

    /* Issue out all operations. */
    mpi_errno = MPIDI_CH3I_RMA_Make_progress_target(win_ptr, curr_target->target_rank,
                                                    &made_progress);
    if (mpi_errno != MPI_SUCCESS)
        MPIU_ERR_POP(mpi_errno);

    /* Wait for local completion. */
    do {
664
        mpi_errno = MPIDI_CH3I_RMA_Cleanup_ops_target(win_ptr, curr_target);
665 666
        if (mpi_errno != MPI_SUCCESS)
            MPIU_ERR_POP(mpi_errno);
667 668 669

        MPIDI_CH3I_RMA_ops_completion(win_ptr, curr_target, local_completed, remote_completed);

670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689
        if (!local_completed) {
            mpi_errno = wait_progress_engine();
            if (mpi_errno != MPI_SUCCESS)
                MPIU_ERR_POP(mpi_errno);
        }
    } while (!local_completed);

  fn_exit:
    return mpi_errno;
  fn_fail:
    goto fn_exit;
}


#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_RMA_Cleanup_target_aggressive
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3I_RMA_Cleanup_target_aggressive(MPID_Win * win_ptr, MPIDI_RMA_Target_t ** target)
{
690
    int i, local_completed ATTRIBUTE((unused)) = 0, remote_completed = 0;
691 692 693 694 695 696 697 698 699
    int made_progress = 0;
    MPIDI_RMA_Target_t *curr_target = NULL;
    int mpi_errno = MPI_SUCCESS;

    (*target) = NULL;

    /* If we are in an aggressive cleanup, the window must be holding
     * up resources.  If it isn't, we are in the wrong window and
     * incorrectly entered this function. */
700
    MPIU_ERR_CHKANDJUMP(win_ptr->non_empty_slots == 0, mpi_errno, MPI_ERR_OTHER, "**rmanotarget");
701 702 703 704 705 706 707 708 709 710 711

    if (win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_CALLED) {
        /* switch to window-wide protocol */
        MPIDI_VC_t *orig_vc = NULL, *target_vc = NULL;
        MPIDI_Comm_get_vc(win_ptr->comm_ptr, win_ptr->comm_ptr->rank, &orig_vc);
        for (i = 0; i < win_ptr->comm_ptr->local_size; i++) {
            if (i == win_ptr->comm_ptr->rank)
                continue;
            MPIDI_Comm_get_vc(win_ptr->comm_ptr, i, &target_vc);
            if (orig_vc->node_id != target_vc->node_id) {
                mpi_errno = MPIDI_CH3I_Win_find_target(win_ptr, i, &curr_target);
712 713
                if (mpi_errno)
                    MPIU_ERR_POP(mpi_errno);
714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729
                if (curr_target == NULL) {
                    win_ptr->outstanding_locks++;
                    mpi_errno = send_lock_msg(i, MPI_LOCK_SHARED, win_ptr);
                    if (mpi_errno != MPI_SUCCESS)
                        MPIU_ERR_POP(mpi_errno);
                }
            }
        }
        win_ptr->states.access_state = MPIDI_RMA_LOCK_ALL_ISSUED;
    }

    do {
        /* find a non-empty slot and set the FLUSH flag on the first
         * target */
        /* TODO: we should think about better strategies on selecting the target */
        for (i = 0; i < win_ptr->num_slots; i++)
730
            if (win_ptr->slots[i].target_list_head != NULL)
731
                break;
732
        curr_target = win_ptr->slots[i].target_list_head;
733 734 735 736 737 738 739 740 741 742 743 744 745
        if (curr_target->sync.sync_flag < MPIDI_RMA_SYNC_FLUSH) {
            curr_target->sync.sync_flag = MPIDI_RMA_SYNC_FLUSH;
            curr_target->sync.outstanding_acks++;
        }

        /* Issue out all operations. */
        mpi_errno = MPIDI_CH3I_RMA_Make_progress_target(win_ptr, curr_target->target_rank,
                                                        &made_progress);
        if (mpi_errno != MPI_SUCCESS)
            MPIU_ERR_POP(mpi_errno);

        /* Wait for remote completion. */
        do {
746
            mpi_errno = MPIDI_CH3I_RMA_Cleanup_ops_target(win_ptr, curr_target);
747 748
            if (mpi_errno != MPI_SUCCESS)
                MPIU_ERR_POP(mpi_errno);
749 750 751

            MPIDI_CH3I_RMA_ops_completion(win_ptr, curr_target, local_completed, remote_completed);

752 753 754 755 756 757 758 759
            if (!remote_completed) {
                mpi_errno = wait_progress_engine();
                if (mpi_errno != MPI_SUCCESS)
                    MPIU_ERR_POP(mpi_errno);
            }
        } while (!remote_completed);

        /* Cleanup the target. */
Xin Zhao's avatar
Xin Zhao committed
760
        mpi_errno = MPIDI_CH3I_Win_target_dequeue_and_free(win_ptr, curr_target);
761 762
        if (mpi_errno != MPI_SUCCESS)
            MPIU_ERR_POP(mpi_errno);
763 764 765 766 767 768 769 770 771 772 773 774 775

        /* check if we got a target */
        (*target) = MPIDI_CH3I_Win_target_alloc(win_ptr);

    } while ((*target) == NULL);

  fn_exit:
    return mpi_errno;
  fn_fail:
    goto fn_exit;
}


776 777 778 779 780 781
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_RMA_Make_progress_target
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3I_RMA_Make_progress_target(MPID_Win * win_ptr, int target_rank, int *made_progress)
{
782
    int temp_progress = 0;
783
    int is_able_to_issue = 0;
784
    MPIDI_RMA_Target_t *target = NULL;
785 786 787 788
    int mpi_errno = MPI_SUCCESS;

    (*made_progress) = 0;

789
    /* check window state */
790
    mpi_errno = check_and_switch_window_state(win_ptr, &is_able_to_issue, &temp_progress);
791 792 793 794
    if (mpi_errno != MPI_SUCCESS)
        MPIU_ERR_POP(mpi_errno);
    if (temp_progress)
        (*made_progress) = 1;
795 796
    if (!is_able_to_issue)
        goto fn_exit;
797

798 799
    /* find target element */
    mpi_errno = MPIDI_CH3I_Win_find_target(win_ptr, target_rank, &target);
800 801
    if (mpi_errno != MPI_SUCCESS)
        MPIU_ERR_POP(mpi_errno);
802

803
    /* check target state */
804
    mpi_errno = check_and_switch_target_state(win_ptr, target, &is_able_to_issue, &temp_progress);
805 806 807 808
    if (mpi_errno)
        MPIU_ERR_POP(mpi_errno);
    if (temp_progress)
        (*made_progress) = 1;
809 810
    if (!is_able_to_issue)
        goto fn_exit;
811

812 813
    /* issue operations to this target */
    mpi_errno = issue_ops_target(win_ptr, target, &temp_progress);
814 815 816 817
    if (mpi_errno)
        MPIU_ERR_POP(mpi_errno);
    if (temp_progress)
        (*made_progress) = 1;
818 819 820 821 822 823 824 825 826 827 828 829 830 831

  fn_exit:
    return mpi_errno;
  fn_fail:
    goto fn_exit;
}


#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_RMA_Make_progress_win
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3I_RMA_Make_progress_win(MPID_Win * win_ptr, int *made_progress)
{
832
    int temp_progress = 0;
833
    int is_able_to_issue = 0;
834 835 836 837 838
    int mpi_errno = MPI_SUCCESS;

    (*made_progress) = 0;

    /* check window state */
839
    mpi_errno = check_and_switch_window_state(win_ptr, &is_able_to_issue, &temp_progress);
840 841 842 843
    if (mpi_errno != MPI_SUCCESS)
        MPIU_ERR_POP(mpi_errno);
    if (temp_progress)
        (*made_progress) = 1;
844 845
    if (!is_able_to_issue)
        goto fn_exit;
846

847
    /* issue operations on window */
848
    mpi_errno = issue_ops_win(win_ptr, &temp_progress);
849 850 851 852
    if (mpi_errno)
        MPIU_ERR_POP(mpi_errno);
    if (temp_progress)
        (*made_progress) = 1;
853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872

  fn_exit:
    return mpi_errno;
  fn_fail:
    goto fn_exit;
}


#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_RMA_Make_progress_global
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3I_RMA_Make_progress_global(int *made_progress)
{
    MPIDI_RMA_Win_list_t *win_elem = MPIDI_RMA_Win_list;
    int mpi_errno = MPI_SUCCESS;

    (*made_progress) = 0;

    for (win_elem = MPIDI_RMA_Win_list; win_elem; win_elem = win_elem->next) {
873
        int temp_progress = 0;
874

875 876 877 878
        if (win_elem->win_ptr->states.access_state == MPIDI_RMA_NONE ||
            win_elem->win_ptr->states.access_state == MPIDI_RMA_FENCE_GRANTED ||
            win_elem->win_ptr->states.access_state == MPIDI_RMA_PSCW_GRANTED)
            continue;
879

880
        mpi_errno = MPIDI_CH3I_RMA_Make_progress_win(win_elem->win_ptr, &temp_progress);
881 882 883 884
        if (mpi_errno != MPI_SUCCESS)
            MPIU_ERR_POP(mpi_errno);
        if (temp_progress)
            (*made_progress) = 1;
885 886 887 888 889 890 891
    }

  fn_exit:
    return mpi_errno;
  fn_fail:
    goto fn_exit;
}