mpidrma.h 36.7 KB
Newer Older
1
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 3 4 5 6
/*
 *  (C) 2001 by Argonne National Laboratory.
 *      See COPYRIGHT in top-level directory.
 */

7 8
#if !defined(MPID_RMA_H_INCLUDED)
#define MPID_RMA_H_INCLUDED
9

10 11 12
#include "mpid_rma_types.h"
#include "mpid_rma_oplist.h"
#include "mpid_rma_shm.h"
13
#include "mpid_rma_issue.h"
14
#include "mpid_rma_lockqueue.h"
15

16
#undef FUNCNAME
17
#define FUNCNAME send_lock_msg
18 19
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
20
static inline int send_lock_msg(int dest, int lock_type, MPID_Win * win_ptr)
21
{
22 23 24 25 26 27 28
    int mpi_errno = MPI_SUCCESS;
    MPIDI_CH3_Pkt_t upkt;
    MPIDI_CH3_Pkt_lock_t *lock_pkt = &upkt.lock;
    MPID_Request *req = NULL;
    MPIDI_VC_t *vc;
    MPIDI_STATE_DECL(MPID_STATE_SEND_LOCK_MSG);
    MPIDI_RMA_FUNC_ENTER(MPID_STATE_SEND_LOCK_MSG);
29

30
    MPIDI_Comm_get_vc_set_active(win_ptr->comm_ptr, dest, &vc);
31

32 33 34
    MPIDI_Pkt_init(lock_pkt, MPIDI_CH3_PKT_LOCK);
    lock_pkt->target_win_handle = win_ptr->all_win_handles[dest];
    lock_pkt->source_win_handle = win_ptr->handle;
35
    lock_pkt->request_handle = MPI_REQUEST_NULL;
36 37 38 39 40 41 42
    lock_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
    if (lock_type == MPI_LOCK_SHARED)
        lock_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED;
    else {
        MPIU_Assert(lock_type == MPI_LOCK_EXCLUSIVE);
        lock_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE;
    }
43

44 45 46 47
    MPIU_THREAD_CS_ENTER(CH3COMM, vc);
    mpi_errno = MPIDI_CH3_iStartMsg(vc, lock_pkt, sizeof(*lock_pkt), &req);
    MPIU_THREAD_CS_EXIT(CH3COMM, vc);
    MPIU_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rma_msg");
48

49 50 51 52
    /* release the request returned by iStartMsg */
    if (req != NULL) {
        MPID_Request_release(req);
    }
53

54
  fn_exit:
55
    MPIDI_RMA_FUNC_EXIT(MPID_STATE_SEND_LOCK_MSG);
56
    return mpi_errno;
57
    /* --BEGIN ERROR HANDLING-- */
58
  fn_fail:
59
    goto fn_exit;
60
    /* --END ERROR HANDLING-- */
61 62
}

63
#undef FUNCNAME
64
#define FUNCNAME send_unlock_msg
65 66
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
67
static inline int send_unlock_msg(int dest, MPID_Win * win_ptr, MPIDI_CH3_Pkt_flags_t flags)
68 69
{
    int mpi_errno = MPI_SUCCESS;
70 71 72 73 74 75
    MPIDI_CH3_Pkt_t upkt;
    MPIDI_CH3_Pkt_unlock_t *unlock_pkt = &upkt.unlock;
    MPID_Request *req = NULL;
    MPIDI_VC_t *vc;
    MPIDI_STATE_DECL(MPID_STATE_SEND_UNLOCK_MSG);
    MPIDI_RMA_FUNC_ENTER(MPID_STATE_SEND_UNLOCK_MSG);
76

77
    MPIDI_Comm_get_vc_set_active(win_ptr->comm_ptr, dest, &vc);
78

79 80
    /* Send a lock packet over to the target. wait for the lock_granted
     * reply. Then do all the RMA ops. */
81

82 83
    MPIDI_Pkt_init(unlock_pkt, MPIDI_CH3_PKT_UNLOCK);
    unlock_pkt->target_win_handle = win_ptr->all_win_handles[dest];
84
    unlock_pkt->source_win_handle = win_ptr->handle;
85
    unlock_pkt->flags = flags;
86

87 88 89 90
    MPIU_THREAD_CS_ENTER(CH3COMM, vc);
    mpi_errno = MPIDI_CH3_iStartMsg(vc, unlock_pkt, sizeof(*unlock_pkt), &req);
    MPIU_THREAD_CS_EXIT(CH3COMM, vc);
    MPIU_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rma_msg");
91

92 93 94
    /* Release the request returned by iStartMsg */
    if (req != NULL) {
        MPID_Request_release(req);
95
    }
96

97
  fn_exit:
98
    MPIDI_RMA_FUNC_EXIT(MPID_STATE_SEND_UNLOCK_MSG);
99 100
    return mpi_errno;
    /* --BEGIN ERROR HANDLING-- */
101
  fn_fail:
102 103 104 105
    goto fn_exit;
    /* --END ERROR HANDLING-- */
}

106

107
#undef FUNCNAME
108
#define FUNCNAME MPIDI_CH3I_Send_lock_ack_pkt
109 110
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
111 112
static inline int MPIDI_CH3I_Send_lock_ack_pkt(MPIDI_VC_t * vc, MPID_Win * win_ptr,
                                               MPIDI_CH3_Pkt_flags_t flags,
113 114
                                               MPI_Win source_win_handle,
                                               MPI_Request request_handle)
115
{
116
    MPIDI_CH3_Pkt_t upkt;
117
    MPIDI_CH3_Pkt_lock_ack_t *lock_ack_pkt = &upkt.lock_ack;
118 119
    MPID_Request *req = NULL;
    int mpi_errno;
120
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SEND_LOCK_ACK_PKT);
121

122
    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SEND_LOCK_ACK_PKT);
123

124 125
    MPIU_Assert(!(source_win_handle != MPI_WIN_NULL && request_handle != MPI_REQUEST_NULL));

126 127 128
    /* send lock ack packet */
    MPIDI_Pkt_init(lock_ack_pkt, MPIDI_CH3_PKT_LOCK_ACK);
    lock_ack_pkt->source_win_handle = source_win_handle;
129
    lock_ack_pkt->request_handle = request_handle;
130
    lock_ack_pkt->target_rank = win_ptr->comm_ptr->rank;
131
    lock_ack_pkt->flags = flags;
132 133

    MPIU_DBG_MSG_FMT(CH3_OTHER, VERBOSE,
134 135
                     (MPIU_DBG_FDEST, "sending lock ack pkt on vc=%p, source_win_handle=%#08x",
                      vc, lock_ack_pkt->source_win_handle));
136 137

    MPIU_THREAD_CS_ENTER(CH3COMM, vc);
138
    mpi_errno = MPIDI_CH3_iStartMsg(vc, lock_ack_pkt, sizeof(*lock_ack_pkt), &req);
139 140 141
    MPIU_THREAD_CS_EXIT(CH3COMM, vc);
    if (mpi_errno) {
        MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
142 143
    }

144 145
    if (req != NULL) {
        MPID_Request_release(req);
146
    }
147

148
  fn_fail:
149
    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SEND_LOCK_ACK_PKT);
150 151

    return mpi_errno;
152 153
}

154 155 156 157 158 159
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_Send_lock_op_ack_pkt
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static inline int MPIDI_CH3I_Send_lock_op_ack_pkt(MPIDI_VC_t * vc, MPID_Win * win_ptr,
                                                  MPIDI_CH3_Pkt_flags_t flags,
160 161
                                                  MPI_Win source_win_handle,
                                                  MPI_Request request_handle)
162 163 164 165 166 167 168 169 170
{
    MPIDI_CH3_Pkt_t upkt;
    MPIDI_CH3_Pkt_lock_op_ack_t *lock_op_ack_pkt = &upkt.lock_op_ack;
    MPID_Request *req = NULL;
    int mpi_errno;
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SEND_LOCK_OP_ACK_PKT);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SEND_LOCK_OP_ACK_PKT);

171 172
    MPIU_Assert(!(source_win_handle != MPI_WIN_NULL && request_handle != MPI_REQUEST_NULL));

173 174 175
    /* send lock ack packet */
    MPIDI_Pkt_init(lock_op_ack_pkt, MPIDI_CH3_PKT_LOCK_OP_ACK);
    lock_op_ack_pkt->source_win_handle = source_win_handle;
176
    lock_op_ack_pkt->request_handle = request_handle;
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
    lock_op_ack_pkt->target_rank = win_ptr->comm_ptr->rank;
    lock_op_ack_pkt->flags = flags;

    MPIU_DBG_MSG_FMT(CH3_OTHER, VERBOSE,
                     (MPIU_DBG_FDEST, "sending lock op ack pkt on vc=%p, source_win_handle=%#08x",
                      vc, lock_op_ack_pkt->source_win_handle));

    MPIU_THREAD_CS_ENTER(CH3COMM, vc);
    mpi_errno = MPIDI_CH3_iStartMsg(vc, lock_op_ack_pkt, sizeof(*lock_op_ack_pkt), &req);
    MPIU_THREAD_CS_EXIT(CH3COMM, vc);
    if (mpi_errno) {
        MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
    }

    if (req != NULL) {
        MPID_Request_release(req);
    }

  fn_fail:
    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SEND_LOCK_OP_ACK_PKT);
    return mpi_errno;
}

200 201

#undef FUNCNAME
Xin Zhao's avatar
Xin Zhao committed
202
#define FUNCNAME MPIDI_CH3I_Send_flush_ack_pkt
203 204
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
205 206
static inline int MPIDI_CH3I_Send_flush_ack_pkt(MPIDI_VC_t * vc, MPID_Win * win_ptr,
                                                MPI_Win source_win_handle)
207
{
208
    MPIDI_CH3_Pkt_t upkt;
Xin Zhao's avatar
Xin Zhao committed
209
    MPIDI_CH3_Pkt_flush_ack_t *flush_ack_pkt = &upkt.flush_ack;
210
    MPID_Request *req;
211
    int mpi_errno = MPI_SUCCESS;
Xin Zhao's avatar
Xin Zhao committed
212
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SEND_FLUSH_ACK_PKT);
213

Xin Zhao's avatar
Xin Zhao committed
214
    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SEND_FLUSH_ACK_PKT);
215

Xin Zhao's avatar
Xin Zhao committed
216 217 218
    MPIDI_Pkt_init(flush_ack_pkt, MPIDI_CH3_PKT_FLUSH_ACK);
    flush_ack_pkt->source_win_handle = source_win_handle;
    flush_ack_pkt->target_rank = win_ptr->comm_ptr->rank;
219

220
    /* Because this is in a packet handler, it is already within a critical section */
221
    /* MPIU_THREAD_CS_ENTER(CH3COMM,vc); */
Xin Zhao's avatar
Xin Zhao committed
222
    mpi_errno = MPIDI_CH3_iStartMsg(vc, flush_ack_pkt, sizeof(*flush_ack_pkt), &req);
223 224
    /* MPIU_THREAD_CS_EXIT(CH3COMM,vc); */
    if (mpi_errno != MPI_SUCCESS) {
225
        MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
226 227
    }

228
    if (req != NULL) {
229
        MPID_Request_release(req);
230 231
    }

232
  fn_fail:
Xin Zhao's avatar
Xin Zhao committed
233
    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SEND_FLUSH_ACK_PKT);
234 235
    return mpi_errno;
}
236

237

238 239 240 241 242 243 244 245
#undef FUNCNAME
#define FUNCNAME send_decr_at_cnt_msg
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static inline int send_decr_at_cnt_msg(int dst, MPID_Win * win_ptr)
{
    MPIDI_CH3_Pkt_t upkt;
    MPIDI_CH3_Pkt_decr_at_counter_t *decr_at_cnt_pkt = &upkt.decr_at_cnt;
246
    MPIDI_VC_t *vc;
247 248 249 250 251 252 253 254 255 256
    MPID_Request *request = NULL;
    int mpi_errno = MPI_SUCCESS;
    MPIDI_STATE_DECL(MPID_STATE_SEND_DECR_AT_CNT_MSG);
    MPIDI_RMA_FUNC_ENTER(MPID_STATE_SEND_DECR_AT_CNT_MSG);

    MPIDI_Pkt_init(decr_at_cnt_pkt, MPIDI_CH3_PKT_DECR_AT_COUNTER);
    decr_at_cnt_pkt->target_win_handle = win_ptr->all_win_handles[dst];

    MPIDI_Comm_get_vc_set_active(win_ptr->comm_ptr, dst, &vc);

257 258 259
    MPIU_THREAD_CS_ENTER(CH3COMM, vc);
    mpi_errno = MPIDI_CH3_iStartMsg(vc, decr_at_cnt_pkt, sizeof(*decr_at_cnt_pkt), &request);
    MPIU_THREAD_CS_EXIT(CH3COMM, vc);
260
    if (mpi_errno != MPI_SUCCESS) {
261
        MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
262 263 264 265 266 267 268 269 270 271 272 273 274 275 276
    }

    if (request != NULL) {
        MPID_Request_release(request);
    }

  fn_exit:
    MPIDI_RMA_FUNC_EXIT(MPID_STATE_SEND_DECR_AT_CNT_MSG);
    return mpi_errno;
    /* --BEGIN ERROR HANDLING-- */
  fn_fail:
    goto fn_exit;
    /* --END ERROR HANDLING-- */
}

277

278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316
#undef FUNCNAME
#define FUNCNAME send_flush_msg
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static inline int send_flush_msg(int dest, MPID_Win * win_ptr)
{
    int mpi_errno = MPI_SUCCESS;
    MPIDI_CH3_Pkt_t upkt;
    MPIDI_CH3_Pkt_flush_t *flush_pkt = &upkt.flush;
    MPID_Request *req = NULL;
    MPIDI_VC_t *vc;
    MPIDI_STATE_DECL(MPID_STATE_SEND_FLUSH_MSG);
    MPIDI_RMA_FUNC_ENTER(MPID_STATE_SEND_FLUSH_MSG);

    MPIDI_Comm_get_vc_set_active(win_ptr->comm_ptr, dest, &vc);

    MPIDI_Pkt_init(flush_pkt, MPIDI_CH3_PKT_FLUSH);
    flush_pkt->target_win_handle = win_ptr->all_win_handles[dest];
    flush_pkt->source_win_handle = win_ptr->handle;

    MPIU_THREAD_CS_ENTER(CH3COMM, vc);
    mpi_errno = MPIDI_CH3_iStartMsg(vc, flush_pkt, sizeof(*flush_pkt), &req);
    MPIU_THREAD_CS_EXIT(CH3COMM, vc);
    MPIU_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rma_msg");

    /* Release the request returned by iStartMsg */
    if (req != NULL) {
        MPID_Request_release(req);
    }

  fn_exit:
    MPIDI_RMA_FUNC_EXIT(MPID_STATE_SEND_FLUSH_MSG);
    return mpi_errno;
    /* --BEGIN ERROR HANDLING-- */
  fn_fail:
    goto fn_exit;
    /* --END ERROR HANDLING-- */
}

317 318

/* enqueue an unsatisfied origin in passive target at target side. */
319 320 321
static inline int enqueue_lock_origin(MPID_Win * win_ptr, MPIDI_VC_t * vc,
                                      MPIDI_CH3_Pkt_t * pkt,
                                      MPIDI_msg_sz_t * buflen, MPID_Request ** reqp)
322
{
323
    MPIDI_RMA_Lock_entry_t *new_ptr = NULL;
324 325
    MPIDI_CH3_Pkt_flags_t flag;
    MPI_Win source_win_handle;
326
    MPI_Request request_handle;
327
    int lock_discarded = 0, data_discarded = 0;
328 329
    int mpi_errno = MPI_SUCCESS;

330
    (*reqp) = NULL;
331

332
    new_ptr = MPIDI_CH3I_Win_lock_entry_alloc(win_ptr, pkt);
333 334
    if (new_ptr != NULL) {
        MPL_LL_APPEND(win_ptr->lock_queue, win_ptr->lock_queue_tail, new_ptr);
335
        new_ptr->vc = vc;
336 337 338
    }
    else {
        lock_discarded = 1;
339
    }
340

341
    if (pkt->type == MPIDI_CH3_PKT_LOCK ||
342 343
        pkt->type == MPIDI_CH3_PKT_PUT_IMMED ||
        pkt->type == MPIDI_CH3_PKT_ACCUMULATE_IMMED ||
344
        pkt->type == MPIDI_CH3_PKT_GET ||
345
        pkt->type == MPIDI_CH3_PKT_GET_ACCUM_IMMED ||
346
        pkt->type == MPIDI_CH3_PKT_FOP_IMMED || pkt->type == MPIDI_CH3_PKT_CAS_IMMED) {
347

348 349
        /* return bytes of data processed in this pkt handler */
        (*buflen) = sizeof(MPIDI_CH3_Pkt_t);
350 351 352 353 354

        if (new_ptr != NULL)
            new_ptr->all_data_recved = 1;

        goto issue_ack;
355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373
    }
    else {
        MPI_Aint type_size = 0;
        MPIDI_msg_sz_t recv_data_sz = 0;
        MPID_Request *req = NULL;
        MPI_Datatype target_dtp;
        int target_count;
        int complete = 0;
        MPIDI_msg_sz_t data_len;
        char *data_buf = NULL;

        /* This is PUT, ACC, GACC */

        MPIDI_CH3_PKT_RMA_GET_TARGET_DATATYPE((*pkt), target_dtp, mpi_errno);
        MPIDI_CH3_PKT_RMA_GET_TARGET_COUNT((*pkt), target_count, mpi_errno);

        MPID_Datatype_get_size_macro(target_dtp, type_size);
        recv_data_sz = type_size * target_count;

374
        if (new_ptr != NULL) {
375
            if (win_ptr->current_lock_data_bytes + recv_data_sz < MPIR_CVAR_CH3_RMA_LOCK_DATA_BYTES) {
376 377 378 379 380 381 382 383 384 385 386 387 388
                new_ptr->data = MPIU_Malloc(recv_data_sz);
            }

            if (new_ptr->data == NULL) {
                /* Note that there are two possible reasons to make new_ptr->data to be NULL:
                 * (1) win_ptr->current_lock_data_bytes + recv_data_sz >= MPIR_CVAR_CH3_RMA_LOCK_DATA_BYTES;
                 * (2) MPIU_Malloc(recv_data_sz) failed.
                 * In such cases, we cannot allocate memory for lock data, so we give up
                 * buffering lock data, however, we still buffer lock request.
                 */
                MPIDI_CH3_Pkt_t new_pkt;
                MPIDI_CH3_Pkt_lock_t *lock_pkt = &new_pkt.lock;
                MPI_Win target_win_handle;
389
                MPIDI_CH3_Pkt_flags_t flags;
390 391

                MPIDI_CH3_PKT_RMA_GET_TARGET_WIN_HANDLE((*pkt), target_win_handle, mpi_errno);
392
                MPIDI_CH3_PKT_RMA_GET_FLAGS((*pkt), flags, mpi_errno);
393

394 395 396 397 398 399 400 401 402
                if (pkt->type == MPIDI_CH3_PKT_PUT || pkt->type == MPIDI_CH3_PKT_ACCUMULATE) {
                    MPIDI_CH3_PKT_RMA_GET_SOURCE_WIN_HANDLE((*pkt), source_win_handle, mpi_errno);
                    request_handle = MPI_REQUEST_NULL;
                }
                else {
                    source_win_handle = MPI_WIN_NULL;
                    MPIDI_CH3_PKT_RMA_GET_REQUEST_HANDLE((*pkt), request_handle, mpi_errno);
                }

403 404 405
                MPIDI_Pkt_init(lock_pkt, MPIDI_CH3_PKT_LOCK);
                lock_pkt->target_win_handle = target_win_handle;
                lock_pkt->source_win_handle = source_win_handle;
406
                lock_pkt->request_handle = request_handle;
407
                lock_pkt->flags = flags;
408 409 410 411 412 413 414 415 416 417 418

                /* replace original pkt with lock pkt */
                new_ptr->pkt = new_pkt;
                new_ptr->all_data_recved = 1;

                data_discarded = 1;
            }
            else {
                win_ptr->current_lock_data_bytes += recv_data_sz;
                new_ptr->data_size = recv_data_sz;
            }
419 420 421 422 423 424 425
        }

        /* create request to receive upcoming requests */
        req = MPID_Request_create();
        MPIU_Object_set_ref(req, 1);

        /* fill in area in req that will be used in Receive_data_found() */
426
        if (lock_discarded || data_discarded) {
427
            req->dev.drop_data = TRUE;
428 429 430 431 432 433
            req->dev.user_buf = NULL;
            req->dev.user_count = target_count;
            req->dev.datatype = target_dtp;
            req->dev.recv_data_sz = recv_data_sz;
            req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete;
            req->dev.OnFinal = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete;
434
            req->dev.lock_queue_entry = new_ptr;
435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451

            data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t);
            data_buf = (char *) pkt + sizeof(MPIDI_CH3_Pkt_t);
            MPIU_Assert(req->dev.recv_data_sz > 0);
        }
        else {
            req->dev.user_buf = new_ptr->data;
            req->dev.user_count = target_count;
            req->dev.datatype = target_dtp;
            req->dev.recv_data_sz = recv_data_sz;
            req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete;
            req->dev.OnFinal = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete;
            req->dev.lock_queue_entry = new_ptr;

            data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t);
            data_buf = (char *) pkt + sizeof(MPIDI_CH3_Pkt_t);
            MPIU_Assert(req->dev.recv_data_sz > 0);
452 453 454
        }

        mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete);
455 456
        if (mpi_errno != MPI_SUCCESS)
            MPIU_ERR_POP(mpi_errno);
457 458 459 460 461 462

        /* return bytes of data processed in this pkt handler */
        (*buflen) = sizeof(MPIDI_CH3_Pkt_t) + data_len;

        if (complete) {
            mpi_errno = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete(vc, req, &complete);
463 464
            if (mpi_errno != MPI_SUCCESS)
                MPIU_ERR_POP(mpi_errno);
465
            if (complete) {
466
                goto issue_ack;
467 468 469 470 471 472
            }
        }

        (*reqp) = req;
    }

473
  issue_ack:
474
    if (pkt->type == MPIDI_CH3_PKT_LOCK) {
475 476 477 478
        if (lock_discarded)
            flag = MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED;
        else
            flag = MPIDI_CH3_PKT_FLAG_RMA_LOCK_QUEUED_DATA_QUEUED;
479

480 481 482
        MPIDI_CH3_PKT_RMA_GET_SOURCE_WIN_HANDLE((*pkt), source_win_handle, mpi_errno);
        MPIDI_CH3_PKT_RMA_GET_REQUEST_HANDLE((*pkt), request_handle, mpi_errno);

483 484 485 486
        mpi_errno =
            MPIDI_CH3I_Send_lock_ack_pkt(vc, win_ptr, flag, source_win_handle, request_handle);
        if (mpi_errno != MPI_SUCCESS)
            MPIU_ERR_POP(mpi_errno);
487 488
    }
    else {
489 490 491 492 493 494
        if (lock_discarded)
            flag = MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED;
        else if (data_discarded)
            flag = MPIDI_CH3_PKT_FLAG_RMA_LOCK_QUEUED_DATA_DISCARDED;
        else
            flag = MPIDI_CH3_PKT_FLAG_RMA_LOCK_QUEUED_DATA_QUEUED;
495

496 497 498 499 500 501 502 503 504 505
        if (pkt->type == MPIDI_CH3_PKT_PUT || pkt->type == MPIDI_CH3_PKT_PUT_IMMED ||
            pkt->type == MPIDI_CH3_PKT_ACCUMULATE || pkt->type == MPIDI_CH3_PKT_ACCUMULATE_IMMED) {
            MPIDI_CH3_PKT_RMA_GET_SOURCE_WIN_HANDLE((*pkt), source_win_handle, mpi_errno);
            request_handle = MPI_REQUEST_NULL;
        }
        else {
            source_win_handle = MPI_WIN_NULL;
            MPIDI_CH3_PKT_RMA_GET_REQUEST_HANDLE((*pkt), request_handle, mpi_errno);
        }

506 507 508 509
        mpi_errno =
            MPIDI_CH3I_Send_lock_op_ack_pkt(vc, win_ptr, flag, source_win_handle, request_handle);
        if (mpi_errno != MPI_SUCCESS)
            MPIU_ERR_POP(mpi_errno);
510 511
    }

512
  fn_exit:
513
    return mpi_errno;
514
  fn_fail:
515 516 517 518
    goto fn_exit;
}


519
static inline int handle_lock_ack(MPID_Win * win_ptr, int target_rank, MPIDI_CH3_Pkt_flags_t flags)
520
{
521
    MPIDI_RMA_Target_t *t = NULL;
522 523
    int mpi_errno = MPI_SUCCESS;

524 525 526 527 528 529 530 531 532 533
    MPIU_Assert(win_ptr->states.access_state == MPIDI_RMA_PER_TARGET ||
                win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_CALLED ||
                win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_ISSUED);

    if (win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_CALLED) {
        MPIDI_VC_t *orig_vc = NULL, *target_vc = NULL;
        MPIDI_Comm_get_vc(win_ptr->comm_ptr, win_ptr->comm_ptr->rank, &orig_vc);
        MPIDI_Comm_get_vc(win_ptr->comm_ptr, target_rank, &target_vc);
        if (win_ptr->comm_ptr->rank == target_rank ||
            (win_ptr->shm_allocated == TRUE && orig_vc->node_id == target_vc->node_id)) {
534 535 536 537 538 539 540
            if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED) {
                win_ptr->outstanding_locks--;
                MPIU_Assert(win_ptr->outstanding_locks >= 0);
            }
            else if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED) {
                /* re-send lock request message. */
                mpi_errno = send_lock_msg(target_rank, MPI_LOCK_SHARED, win_ptr);
541 542
                if (mpi_errno != MPI_SUCCESS)
                    MPIU_ERR_POP(mpi_errno);
543
            }
544 545 546 547
            goto fn_exit;
        }
    }
    else if (win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_ISSUED) {
548 549 550 551 552 553 554
        if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED) {
            win_ptr->outstanding_locks--;
            MPIU_Assert(win_ptr->outstanding_locks >= 0);
        }
        else if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED) {
            /* re-send lock request message. */
            mpi_errno = send_lock_msg(target_rank, MPI_LOCK_SHARED, win_ptr);
555 556
            if (mpi_errno != MPI_SUCCESS)
                MPIU_ERR_POP(mpi_errno);
557
        }
558
        goto fn_exit;
559 560
    }

561
    mpi_errno = MPIDI_CH3I_Win_find_target(win_ptr, target_rank, &t);
562 563
    if (mpi_errno != MPI_SUCCESS)
        MPIU_ERR_POP(mpi_errno);
564
    MPIU_Assert(t != NULL);
565 566 567 568 569 570 571

    if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED)
        t->access_state = MPIDI_RMA_LOCK_GRANTED;

    if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED)
        t->access_state = MPIDI_RMA_LOCK_CALLED;

572
  fn_exit:
573
    return mpi_errno;
574
  fn_fail:
575 576 577
    goto fn_exit;
}

578 579 580
static inline int adjust_op_piggybacked_with_lock(MPID_Win * win_ptr,
                                                  int target_rank, MPIDI_CH3_Pkt_flags_t flags)
{
581 582 583 584 585 586
    MPIDI_RMA_Target_t *target = NULL;
    MPIDI_RMA_Op_t *op = NULL;
    MPIDI_CH3_Pkt_flags_t op_flags = MPIDI_CH3_PKT_FLAG_NONE;
    int mpi_errno = MPI_SUCCESS;

    mpi_errno = MPIDI_CH3I_Win_find_target(win_ptr, target_rank, &target);
587 588
    if (mpi_errno != MPI_SUCCESS)
        MPIU_ERR_POP(mpi_errno);
589 590 591
    MPIU_Assert(target != NULL);

    op = target->pending_op_list;
592 593
    if (op != NULL)
        MPIDI_CH3_PKT_RMA_GET_FLAGS(op->pkt, op_flags, mpi_errno);
594

595 596
    if (op_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
        op_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE) {
597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616
        if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED ||
            flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_QUEUED_DATA_QUEUED) {
            if (!op->request) {
                if (op->ureq) {
                    /* Complete user request and release the ch3 ref */
                    MPID_Request_set_completed(op->ureq);
                    MPID_Request_release(op->ureq);
                }

                MPIDI_CH3I_RMA_Ops_free_elem(win_ptr, &(target->pending_op_list),
                                             &(target->pending_op_list_tail), op);
            }
            else {
                MPIDI_CH3I_RMA_Ops_unlink(&(target->pending_op_list),
                                          &(target->pending_op_list_tail), op);
                if (op->is_dt) {
                    MPIDI_CH3I_RMA_Ops_append(&(target->dt_op_list),
                                              &(target->dt_op_list_tail), op);
                }
                else if (op->pkt.type == MPIDI_CH3_PKT_PUT ||
617 618 619
                         op->pkt.type == MPIDI_CH3_PKT_PUT_IMMED ||
                         op->pkt.type == MPIDI_CH3_PKT_ACCUMULATE ||
                         op->pkt.type == MPIDI_CH3_PKT_ACCUMULATE_IMMED) {
620 621 622 623 624 625 626 627 628 629 630
                    MPIDI_CH3I_RMA_Ops_append(&(target->write_op_list),
                                              &(target->write_op_list_tail), op);
                }
                else {
                    MPIDI_CH3I_RMA_Ops_append(&(target->read_op_list),
                                              &(target->read_op_list_tail), op);
                }

                if (op->ureq) {
                    if (MPID_Request_is_complete(op->request)) {
                        /* Complete user request, let cleanup function to release
631
                         * ch3 ref */
632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648
                        MPID_Request_set_completed(op->ureq);
                    }
                    else {
                        /* Increase ref for completion handler */
                        MPIU_Object_add_ref(op->ureq);
                        op->request->dev.request_handle = op->ureq->handle;
                        if (op->request->dev.OnDataAvail == NULL) {
                            op->request->dev.OnDataAvail = MPIDI_CH3_ReqHandler_ReqOpsComplete;
                        }
                        op->request->dev.OnFinal = MPIDI_CH3_ReqHandler_ReqOpsComplete;
                    }
                }
            }
        }
        else if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_QUEUED_DATA_DISCARDED ||
                 flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED) {
            /* We need to re-transmit this operation, so we destroy
649 650
             * the internal request and erase all flags in current
             * operation. */
651 652 653 654 655 656 657 658 659 660
            if (op->request) {
                MPIDI_CH3_Request_destroy(op->request);
                op->request = NULL;
                win_ptr->active_req_cnt--;
            }
            MPIDI_CH3_PKT_RMA_ERASE_FLAGS(op->pkt, mpi_errno);

            target->next_op_to_issue = op;
        }
    }
661

662
  fn_exit:
663
    return mpi_errno;
664
  fn_fail:
665 666 667 668
    goto fn_exit;
}


669 670 671 672
#undef FUNCNAME
#define FUNCNAME acquire_local_lock
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
673
static inline int acquire_local_lock(MPID_Win * win_ptr, int lock_type)
674 675 676 677 678
{
    int mpi_errno = MPI_SUCCESS;
    MPIDI_STATE_DECL(MPID_STATE_ACQUIRE_LOCAL_LOCK);
    MPIDI_RMA_FUNC_ENTER(MPID_STATE_ACQUIRE_LOCAL_LOCK);

Xin Zhao's avatar
Xin Zhao committed
679 680
    MPIR_T_PVAR_TIMER_START(RMA, rma_winlock_getlocallock);

681
    if (MPIDI_CH3I_Try_acquire_win_lock(win_ptr, lock_type) == 1) {
682
        mpi_errno = handle_lock_ack(win_ptr, win_ptr->comm_ptr->rank,
683 684 685
                                    MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED);
        if (mpi_errno)
            MPIU_ERR_POP(mpi_errno);
686 687 688 689 690
    }
    else {
        /* Queue the lock information. */
        MPIDI_CH3_Pkt_t pkt;
        MPIDI_CH3_Pkt_lock_t *lock_pkt = &pkt.lock;
691
        MPIDI_RMA_Lock_entry_t *new_ptr = NULL;
692
        MPIDI_VC_t *my_vc;
693 694

        MPIDI_Pkt_init(lock_pkt, MPIDI_CH3_PKT_LOCK);
695 696 697 698 699 700 701
        lock_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
        if (lock_type == MPI_LOCK_SHARED)
            lock_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED;
        else {
            MPIU_Assert(lock_type == MPI_LOCK_EXCLUSIVE);
            lock_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE;
        }
702

703 704
        new_ptr = MPIDI_CH3I_Win_lock_entry_alloc(win_ptr, &pkt);
        if (new_ptr == NULL) {
705
            mpi_errno = handle_lock_ack(win_ptr, win_ptr->comm_ptr->rank,
706 707 708
                                        MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED);
            if (mpi_errno != MPI_SUCCESS)
                MPIU_ERR_POP(mpi_errno);
709
            goto fn_exit;
710
        }
711
        MPL_LL_APPEND(win_ptr->lock_queue, win_ptr->lock_queue_tail, new_ptr);
712 713
        MPIDI_Comm_get_vc_set_active(win_ptr->comm_ptr, win_ptr->comm_ptr->rank, &my_vc);
        new_ptr->vc = my_vc;
714 715

        new_ptr->all_data_recved = 1;
716 717
    }

718
  fn_exit:
Xin Zhao's avatar
Xin Zhao committed
719
    MPIR_T_PVAR_TIMER_END(RMA, rma_winlock_getlocallock);
720
    MPIDI_RMA_FUNC_EXIT(MPID_STATE_ACQUIRE_LOCAL_LOCK);
721 722
    return mpi_errno;
    /* --BEGIN ERROR HANDLING-- */
723
  fn_fail:
724 725 726 727 728
    goto fn_exit;
    /* --END ERROR HANDLING-- */
}


729 730 731 732 733 734 735 736 737 738
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_RMA_Handle_flush_ack
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static inline int MPIDI_CH3I_RMA_Handle_flush_ack(MPID_Win * win_ptr, int target_rank)
{
    int mpi_errno = MPI_SUCCESS;
    MPIDI_RMA_Target_t *t;

    mpi_errno = MPIDI_CH3I_Win_find_target(win_ptr, target_rank, &t);
739 740
    if (mpi_errno != MPI_SUCCESS)
        MPIU_ERR_POP(mpi_errno);
741

742 743 744
    t->sync.outstanding_acks--;
    MPIU_Assert(t->sync.outstanding_acks >= 0);

745
    t->put_acc_issued = 0;      /* reset PUT_ACC_FLAG after FLUSH is completed */
746

747
  fn_exit:
748
    return mpi_errno;
749
  fn_fail:
750 751 752 753
    goto fn_exit;
}


754
#undef FUNCNAME
755
#define FUNCNAME do_accumulate_op
756 757
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
758 759
static inline int do_accumulate_op(void *source_buf, void *target_buf,
                                   int acc_count, MPI_Datatype acc_dtp, MPI_Op acc_op)
760 761
{
    int mpi_errno = MPI_SUCCESS;
762 763 764 765 766
    MPI_User_function *uop;
    MPIDI_STATE_DECL(MPID_STATE_DO_ACCUMULATE_OP);

    MPIDI_FUNC_ENTER(MPID_STATE_DO_ACCUMULATE_OP);

767
    if (acc_op == MPI_REPLACE) {
768
        /* simply copy the data */
769
        mpi_errno = MPIR_Localcopy(source_buf, acc_count, acc_dtp, target_buf, acc_count, acc_dtp);
770
        if (mpi_errno) {
771 772
            MPIU_ERR_POP(mpi_errno);
        }
773 774 775
        goto fn_exit;
    }

776
    if (HANDLE_GET_KIND(acc_op) == HANDLE_KIND_BUILTIN) {
777
        /* get the function by indexing into the op table */
778
        uop = MPIR_OP_HDL_TO_FN(acc_op);
779
    }
780 781 782 783 784
    else {
        /* --BEGIN ERROR HANDLING-- */
        mpi_errno =
            MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OP,
                                 "**opnotpredefined", "**opnotpredefined %d", acc_op);
785
        return mpi_errno;
786
        /* --END ERROR HANDLING-- */
787 788
    }

789 790
    if (MPIR_DATATYPE_IS_PREDEFINED(acc_dtp)) {
        (*uop) (source_buf, target_buf, &acc_count, &acc_dtp);
791
    }
792 793
    else {
        /* derived datatype */
794 795 796
        MPID_Segment *segp;
        DLOOP_VECTOR *dloop_vec;
        MPI_Aint first, last;
797 798
        int vec_len, i, count;
        MPI_Aint type_size;
799
        MPI_Datatype type;
800 801 802
        MPID_Datatype *dtp;

        segp = MPID_Segment_alloc();
803 804 805 806 807 808
        /* --BEGIN ERROR HANDLING-- */
        if (!segp) {
            mpi_errno =
                MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__,
                                     MPI_ERR_OTHER, "**nomem", 0);
            MPIDI_FUNC_EXIT(MPID_STATE_DO_ACCUMULATE_OP);
809
            return mpi_errno;
810
        }
811 812
        /* --END ERROR HANDLING-- */
        MPID_Segment_init(NULL, acc_count, acc_dtp, segp, 0);
813
        first = 0;
814
        last = SEGMENT_IGNORE_LAST;
815

816 817
        MPID_Datatype_get_ptr(acc_dtp, dtp);
        vec_len = dtp->max_contig_blocks * acc_count + 1;
818 819 820
        /* +1 needed because Rob says so */
        dloop_vec = (DLOOP_VECTOR *)
            MPIU_Malloc(vec_len * sizeof(DLOOP_VECTOR));
821 822 823 824 825 826
        /* --BEGIN ERROR HANDLING-- */
        if (!dloop_vec) {
            mpi_errno =
                MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__,
                                     MPI_ERR_OTHER, "**nomem", 0);
            MPIDI_FUNC_EXIT(MPID_STATE_DO_ACCUMULATE_OP);
827
            return mpi_errno;
828
        }
829
        /* --END ERROR HANDLING-- */
830 831 832 833 834

        MPID_Segment_pack_vector(segp, first, &last, dloop_vec, &vec_len);

        type = dtp->eltype;
        MPID_Datatype_get_size_macro(type, type_size);
835 836 837 838 839
        for (i = 0; i < vec_len; i++) {
            MPIU_Assign_trunc(count, (dloop_vec[i].DLOOP_VECTOR_LEN) / type_size, int);
            (*uop) ((char *) source_buf + MPIU_PtrToAint(dloop_vec[i].DLOOP_VECTOR_BUF),
                    (char *) target_buf + MPIU_PtrToAint(dloop_vec[i].DLOOP_VECTOR_BUF),
                    &count, &type);
840 841
        }

842 843
        MPID_Segment_free(segp);
        MPIU_Free(dloop_vec);
844 845
    }

846
  fn_exit:
847
    MPIDI_FUNC_EXIT(MPID_STATE_DO_ACCUMULATE_OP);
848 849

    return mpi_errno;
850
  fn_fail:
851 852 853
    goto fn_exit;
}

854

855 856 857 858 859
static inline int check_piggyback_lock(MPID_Win * win_ptr, MPIDI_VC_t * vc,
                                       MPIDI_CH3_Pkt_t * pkt,
                                       MPIDI_msg_sz_t * buflen,
                                       int *acquire_lock_fail, MPID_Request ** reqp)
{
860 861 862 863 864
    int lock_type;
    MPIDI_CH3_Pkt_flags_t flags;
    int mpi_errno = MPI_SUCCESS;

    (*acquire_lock_fail) = 0;
865
    (*reqp) = NULL;
866 867

    MPIDI_CH3_PKT_RMA_GET_FLAGS((*pkt), flags, mpi_errno);
868
    if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED || flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE) {
869 870 871 872 873 874 875

        if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED)
            lock_type = MPI_LOCK_SHARED;
        else {
            MPIU_Assert(flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE);
            lock_type = MPI_LOCK_EXCLUSIVE;
        }
876 877 878

        if (MPIDI_CH3I_Try_acquire_win_lock(win_ptr, lock_type) == 0) {
            /* cannot acquire the lock, queue up this operation. */
879
            mpi_errno = enqueue_lock_origin(win_ptr, vc, pkt, buflen, reqp);
880 881
            if (mpi_errno != MPI_SUCCESS)
                MPIU_ERR_POP(mpi_errno);
882 883 884 885
            (*acquire_lock_fail) = 1;
        }
    }

886
  fn_exit:
887
    return mpi_errno;
888
  fn_fail:
889 890 891
    goto fn_exit;
}

892
static inline int finish_op_on_target(MPID_Win * win_ptr, MPIDI_VC_t * vc,
893
                                      int has_response_data,
894 895
                                      MPIDI_CH3_Pkt_flags_t flags, MPI_Win source_win_handle)
{
896 897
    int mpi_errno = MPI_SUCCESS;

898
    if (!has_response_data) {
899
        /* This is PUT or ACC */
900 901
        if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
            flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE) {
902
            MPIDI_CH3_Pkt_flags_t pkt_flags = MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED;
903
            if ((flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) || (flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK))
904
                pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK;
905
            MPIU_Assert(source_win_handle != MPI_WIN_NULL);
906 907
            mpi_errno = MPIDI_CH3I_Send_lock_op_ack_pkt(vc, win_ptr,
                                                        pkt_flags,
908 909 910
                                                        source_win_handle, MPI_REQUEST_NULL);
            if (mpi_errno != MPI_SUCCESS)
                MPIU_ERR_POP(mpi_errno);
911
            MPIDI_CH3_Progress_signal_completion();
912 913
        }
        if (flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) {
914 915
            if (!(flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
                  flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE)) {
916
                /* If op is piggybacked with both LOCK and FLUSH,
917 918 919 920
                 * we only send LOCK ACK back, do not send FLUSH ACK. */
                mpi_errno = MPIDI_CH3I_Send_flush_ack_pkt(vc, win_ptr, source_win_handle);
                if (mpi_errno)
                    MPIU_ERR_POP(mpi_errno);
921
            }
922 923 924 925 926 927 928 929