mpidrma.h 37.9 KB
Newer Older
1
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 3 4 5 6
/*
 *  (C) 2001 by Argonne National Laboratory.
 *      See COPYRIGHT in top-level directory.
 */

7 8
#if !defined(MPID_RMA_H_INCLUDED)
#define MPID_RMA_H_INCLUDED
9

10 11 12
#include "mpid_rma_types.h"
#include "mpid_rma_oplist.h"
#include "mpid_rma_shm.h"
13
#include "mpid_rma_issue.h"
14
#include "mpid_rma_lockqueue.h"
15

16
#undef FUNCNAME
17
#define FUNCNAME send_lock_msg
18 19
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
20
static inline int send_lock_msg(int dest, int lock_type, MPID_Win * win_ptr)
21
{
22 23 24 25 26 27 28
    int mpi_errno = MPI_SUCCESS;
    MPIDI_CH3_Pkt_t upkt;
    MPIDI_CH3_Pkt_lock_t *lock_pkt = &upkt.lock;
    MPID_Request *req = NULL;
    MPIDI_VC_t *vc;
    MPIDI_STATE_DECL(MPID_STATE_SEND_LOCK_MSG);
    MPIDI_RMA_FUNC_ENTER(MPID_STATE_SEND_LOCK_MSG);
29

30
    MPIDI_Comm_get_vc_set_active(win_ptr->comm_ptr, dest, &vc);
31

32
    MPIDI_Pkt_init(lock_pkt, MPIDI_CH3_PKT_LOCK);
33
    lock_pkt->target_win_handle = win_ptr->basic_info_table[dest].win_handle;
34
    lock_pkt->source_win_handle = win_ptr->handle;
35
    lock_pkt->request_handle = MPI_REQUEST_NULL;
36 37 38 39 40 41 42
    lock_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
    if (lock_type == MPI_LOCK_SHARED)
        lock_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED;
    else {
        MPIU_Assert(lock_type == MPI_LOCK_EXCLUSIVE);
        lock_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE;
    }
43

44 45 46 47
    MPIU_THREAD_CS_ENTER(CH3COMM, vc);
    mpi_errno = MPIDI_CH3_iStartMsg(vc, lock_pkt, sizeof(*lock_pkt), &req);
    MPIU_THREAD_CS_EXIT(CH3COMM, vc);
    MPIU_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rma_msg");
48

49 50 51 52
    /* release the request returned by iStartMsg */
    if (req != NULL) {
        MPID_Request_release(req);
    }
53

54
  fn_exit:
55
    MPIDI_RMA_FUNC_EXIT(MPID_STATE_SEND_LOCK_MSG);
56
    return mpi_errno;
57
    /* --BEGIN ERROR HANDLING-- */
58
  fn_fail:
59
    goto fn_exit;
60
    /* --END ERROR HANDLING-- */
61 62
}

63
#undef FUNCNAME
64
#define FUNCNAME send_unlock_msg
65 66
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
67
static inline int send_unlock_msg(int dest, MPID_Win * win_ptr, MPIDI_CH3_Pkt_flags_t flags)
68 69
{
    int mpi_errno = MPI_SUCCESS;
70 71 72 73 74 75
    MPIDI_CH3_Pkt_t upkt;
    MPIDI_CH3_Pkt_unlock_t *unlock_pkt = &upkt.unlock;
    MPID_Request *req = NULL;
    MPIDI_VC_t *vc;
    MPIDI_STATE_DECL(MPID_STATE_SEND_UNLOCK_MSG);
    MPIDI_RMA_FUNC_ENTER(MPID_STATE_SEND_UNLOCK_MSG);
76

77
    MPIDI_Comm_get_vc_set_active(win_ptr->comm_ptr, dest, &vc);
78

79 80
    /* Send a lock packet over to the target. wait for the lock_granted
     * reply. Then do all the RMA ops. */
81

82
    MPIDI_Pkt_init(unlock_pkt, MPIDI_CH3_PKT_UNLOCK);
83
    unlock_pkt->target_win_handle = win_ptr->basic_info_table[dest].win_handle;
84
    unlock_pkt->source_win_handle = win_ptr->handle;
85
    unlock_pkt->flags = flags;
86

87 88 89 90
    MPIU_THREAD_CS_ENTER(CH3COMM, vc);
    mpi_errno = MPIDI_CH3_iStartMsg(vc, unlock_pkt, sizeof(*unlock_pkt), &req);
    MPIU_THREAD_CS_EXIT(CH3COMM, vc);
    MPIU_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rma_msg");
91

92 93 94
    /* Release the request returned by iStartMsg */
    if (req != NULL) {
        MPID_Request_release(req);
95
    }
96

97
  fn_exit:
98
    MPIDI_RMA_FUNC_EXIT(MPID_STATE_SEND_UNLOCK_MSG);
99 100
    return mpi_errno;
    /* --BEGIN ERROR HANDLING-- */
101
  fn_fail:
102 103 104 105
    goto fn_exit;
    /* --END ERROR HANDLING-- */
}

106

107
#undef FUNCNAME
108
#define FUNCNAME MPIDI_CH3I_Send_lock_ack_pkt
109 110
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
111 112
static inline int MPIDI_CH3I_Send_lock_ack_pkt(MPIDI_VC_t * vc, MPID_Win * win_ptr,
                                               MPIDI_CH3_Pkt_flags_t flags,
113 114
                                               MPI_Win source_win_handle,
                                               MPI_Request request_handle)
115
{
116
    MPIDI_CH3_Pkt_t upkt;
117
    MPIDI_CH3_Pkt_lock_ack_t *lock_ack_pkt = &upkt.lock_ack;
118 119
    MPID_Request *req = NULL;
    int mpi_errno;
120
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SEND_LOCK_ACK_PKT);
121

122
    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SEND_LOCK_ACK_PKT);
123

124 125
    MPIU_Assert(!(source_win_handle != MPI_WIN_NULL && request_handle != MPI_REQUEST_NULL));

126 127 128
    /* send lock ack packet */
    MPIDI_Pkt_init(lock_ack_pkt, MPIDI_CH3_PKT_LOCK_ACK);
    lock_ack_pkt->source_win_handle = source_win_handle;
129
    lock_ack_pkt->request_handle = request_handle;
130
    lock_ack_pkt->target_rank = win_ptr->comm_ptr->rank;
131
    lock_ack_pkt->flags = flags;
132 133

    MPIU_DBG_MSG_FMT(CH3_OTHER, VERBOSE,
134 135
                     (MPIU_DBG_FDEST, "sending lock ack pkt on vc=%p, source_win_handle=%#08x",
                      vc, lock_ack_pkt->source_win_handle));
136 137

    MPIU_THREAD_CS_ENTER(CH3COMM, vc);
138
    mpi_errno = MPIDI_CH3_iStartMsg(vc, lock_ack_pkt, sizeof(*lock_ack_pkt), &req);
139 140 141
    MPIU_THREAD_CS_EXIT(CH3COMM, vc);
    if (mpi_errno) {
        MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
142 143
    }

144 145
    if (req != NULL) {
        MPID_Request_release(req);
146
    }
147

148
  fn_fail:
149
    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SEND_LOCK_ACK_PKT);
150 151

    return mpi_errno;
152 153
}

154 155 156 157 158 159
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_Send_lock_op_ack_pkt
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static inline int MPIDI_CH3I_Send_lock_op_ack_pkt(MPIDI_VC_t * vc, MPID_Win * win_ptr,
                                                  MPIDI_CH3_Pkt_flags_t flags,
160 161
                                                  MPI_Win source_win_handle,
                                                  MPI_Request request_handle)
162 163 164 165 166 167 168 169 170
{
    MPIDI_CH3_Pkt_t upkt;
    MPIDI_CH3_Pkt_lock_op_ack_t *lock_op_ack_pkt = &upkt.lock_op_ack;
    MPID_Request *req = NULL;
    int mpi_errno;
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SEND_LOCK_OP_ACK_PKT);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SEND_LOCK_OP_ACK_PKT);

171 172
    MPIU_Assert(!(source_win_handle != MPI_WIN_NULL && request_handle != MPI_REQUEST_NULL));

173 174 175
    /* send lock ack packet */
    MPIDI_Pkt_init(lock_op_ack_pkt, MPIDI_CH3_PKT_LOCK_OP_ACK);
    lock_op_ack_pkt->source_win_handle = source_win_handle;
176
    lock_op_ack_pkt->request_handle = request_handle;
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
    lock_op_ack_pkt->target_rank = win_ptr->comm_ptr->rank;
    lock_op_ack_pkt->flags = flags;

    MPIU_DBG_MSG_FMT(CH3_OTHER, VERBOSE,
                     (MPIU_DBG_FDEST, "sending lock op ack pkt on vc=%p, source_win_handle=%#08x",
                      vc, lock_op_ack_pkt->source_win_handle));

    MPIU_THREAD_CS_ENTER(CH3COMM, vc);
    mpi_errno = MPIDI_CH3_iStartMsg(vc, lock_op_ack_pkt, sizeof(*lock_op_ack_pkt), &req);
    MPIU_THREAD_CS_EXIT(CH3COMM, vc);
    if (mpi_errno) {
        MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
    }

    if (req != NULL) {
        MPID_Request_release(req);
    }

  fn_fail:
    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SEND_LOCK_OP_ACK_PKT);
    return mpi_errno;
}

200 201

#undef FUNCNAME
Xin Zhao's avatar
Xin Zhao committed
202
#define FUNCNAME MPIDI_CH3I_Send_flush_ack_pkt
203 204
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
205 206
static inline int MPIDI_CH3I_Send_flush_ack_pkt(MPIDI_VC_t * vc, MPID_Win * win_ptr,
                                                MPI_Win source_win_handle)
207
{
208
    MPIDI_CH3_Pkt_t upkt;
Xin Zhao's avatar
Xin Zhao committed
209
    MPIDI_CH3_Pkt_flush_ack_t *flush_ack_pkt = &upkt.flush_ack;
210
    MPID_Request *req;
211
    int mpi_errno = MPI_SUCCESS;
Xin Zhao's avatar
Xin Zhao committed
212
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SEND_FLUSH_ACK_PKT);
213

Xin Zhao's avatar
Xin Zhao committed
214
    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SEND_FLUSH_ACK_PKT);
215

Xin Zhao's avatar
Xin Zhao committed
216 217 218
    MPIDI_Pkt_init(flush_ack_pkt, MPIDI_CH3_PKT_FLUSH_ACK);
    flush_ack_pkt->source_win_handle = source_win_handle;
    flush_ack_pkt->target_rank = win_ptr->comm_ptr->rank;
219

220
    /* Because this is in a packet handler, it is already within a critical section */
221
    /* MPIU_THREAD_CS_ENTER(CH3COMM,vc); */
Xin Zhao's avatar
Xin Zhao committed
222
    mpi_errno = MPIDI_CH3_iStartMsg(vc, flush_ack_pkt, sizeof(*flush_ack_pkt), &req);
223 224
    /* MPIU_THREAD_CS_EXIT(CH3COMM,vc); */
    if (mpi_errno != MPI_SUCCESS) {
225
        MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
226 227
    }

228
    if (req != NULL) {
229
        MPID_Request_release(req);
230 231
    }

232
  fn_fail:
Xin Zhao's avatar
Xin Zhao committed
233
    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SEND_FLUSH_ACK_PKT);
234 235
    return mpi_errno;
}
236

237

238 239 240 241 242 243 244 245
#undef FUNCNAME
#define FUNCNAME send_decr_at_cnt_msg
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static inline int send_decr_at_cnt_msg(int dst, MPID_Win * win_ptr)
{
    MPIDI_CH3_Pkt_t upkt;
    MPIDI_CH3_Pkt_decr_at_counter_t *decr_at_cnt_pkt = &upkt.decr_at_cnt;
246
    MPIDI_VC_t *vc;
247 248 249 250 251 252
    MPID_Request *request = NULL;
    int mpi_errno = MPI_SUCCESS;
    MPIDI_STATE_DECL(MPID_STATE_SEND_DECR_AT_CNT_MSG);
    MPIDI_RMA_FUNC_ENTER(MPID_STATE_SEND_DECR_AT_CNT_MSG);

    MPIDI_Pkt_init(decr_at_cnt_pkt, MPIDI_CH3_PKT_DECR_AT_COUNTER);
253
    decr_at_cnt_pkt->target_win_handle = win_ptr->basic_info_table[dst].win_handle;
254 255 256

    MPIDI_Comm_get_vc_set_active(win_ptr->comm_ptr, dst, &vc);

257 258 259
    MPIU_THREAD_CS_ENTER(CH3COMM, vc);
    mpi_errno = MPIDI_CH3_iStartMsg(vc, decr_at_cnt_pkt, sizeof(*decr_at_cnt_pkt), &request);
    MPIU_THREAD_CS_EXIT(CH3COMM, vc);
260
    if (mpi_errno != MPI_SUCCESS) {
261
        MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
262 263 264 265 266 267 268 269 270 271 272 273 274 275 276
    }

    if (request != NULL) {
        MPID_Request_release(request);
    }

  fn_exit:
    MPIDI_RMA_FUNC_EXIT(MPID_STATE_SEND_DECR_AT_CNT_MSG);
    return mpi_errno;
    /* --BEGIN ERROR HANDLING-- */
  fn_fail:
    goto fn_exit;
    /* --END ERROR HANDLING-- */
}

277

278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294
#undef FUNCNAME
#define FUNCNAME send_flush_msg
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static inline int send_flush_msg(int dest, MPID_Win * win_ptr)
{
    int mpi_errno = MPI_SUCCESS;
    MPIDI_CH3_Pkt_t upkt;
    MPIDI_CH3_Pkt_flush_t *flush_pkt = &upkt.flush;
    MPID_Request *req = NULL;
    MPIDI_VC_t *vc;
    MPIDI_STATE_DECL(MPID_STATE_SEND_FLUSH_MSG);
    MPIDI_RMA_FUNC_ENTER(MPID_STATE_SEND_FLUSH_MSG);

    MPIDI_Comm_get_vc_set_active(win_ptr->comm_ptr, dest, &vc);

    MPIDI_Pkt_init(flush_pkt, MPIDI_CH3_PKT_FLUSH);
295
    flush_pkt->target_win_handle = win_ptr->basic_info_table[dest].win_handle;
296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316
    flush_pkt->source_win_handle = win_ptr->handle;

    MPIU_THREAD_CS_ENTER(CH3COMM, vc);
    mpi_errno = MPIDI_CH3_iStartMsg(vc, flush_pkt, sizeof(*flush_pkt), &req);
    MPIU_THREAD_CS_EXIT(CH3COMM, vc);
    MPIU_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rma_msg");

    /* Release the request returned by iStartMsg */
    if (req != NULL) {
        MPID_Request_release(req);
    }

  fn_exit:
    MPIDI_RMA_FUNC_EXIT(MPID_STATE_SEND_FLUSH_MSG);
    return mpi_errno;
    /* --BEGIN ERROR HANDLING-- */
  fn_fail:
    goto fn_exit;
    /* --END ERROR HANDLING-- */
}

317 318

/* enqueue an unsatisfied origin in passive target at target side. */
319 320 321
static inline int enqueue_lock_origin(MPID_Win * win_ptr, MPIDI_VC_t * vc,
                                      MPIDI_CH3_Pkt_t * pkt,
                                      MPIDI_msg_sz_t * buflen, MPID_Request ** reqp)
322
{
323
    MPIDI_RMA_Lock_entry_t *new_ptr = NULL;
324 325
    MPIDI_CH3_Pkt_flags_t flag;
    MPI_Win source_win_handle;
326
    MPI_Request request_handle;
327
    int lock_discarded = 0, data_discarded = 0;
328 329
    int mpi_errno = MPI_SUCCESS;

330
    (*reqp) = NULL;
331

332
    new_ptr = MPIDI_CH3I_Win_lock_entry_alloc(win_ptr, pkt);
333 334
    if (new_ptr != NULL) {
        MPL_LL_APPEND(win_ptr->lock_queue, win_ptr->lock_queue_tail, new_ptr);
335
        new_ptr->vc = vc;
336 337 338
    }
    else {
        lock_discarded = 1;
339
    }
340

341
    if (pkt->type == MPIDI_CH3_PKT_LOCK ||
342 343
        pkt->type == MPIDI_CH3_PKT_PUT_IMMED ||
        pkt->type == MPIDI_CH3_PKT_ACCUMULATE_IMMED ||
344
        pkt->type == MPIDI_CH3_PKT_GET ||
345
        pkt->type == MPIDI_CH3_PKT_GET_ACCUM_IMMED ||
346
        pkt->type == MPIDI_CH3_PKT_FOP_IMMED || pkt->type == MPIDI_CH3_PKT_CAS_IMMED) {
347

348 349
        /* return bytes of data processed in this pkt handler */
        (*buflen) = sizeof(MPIDI_CH3_Pkt_t);
350 351 352 353 354

        if (new_ptr != NULL)
            new_ptr->all_data_recved = 1;

        goto issue_ack;
355 356 357
    }
    else {
        MPI_Aint type_size = 0;
358
        MPI_Aint type_extent;
359
        MPIDI_msg_sz_t recv_data_sz = 0;
360
        MPIDI_msg_sz_t buf_size;
361 362 363 364 365 366 367 368 369 370 371 372
        MPID_Request *req = NULL;
        MPI_Datatype target_dtp;
        int target_count;
        int complete = 0;
        MPIDI_msg_sz_t data_len;
        char *data_buf = NULL;

        /* This is PUT, ACC, GACC */

        MPIDI_CH3_PKT_RMA_GET_TARGET_DATATYPE((*pkt), target_dtp, mpi_errno);
        MPIDI_CH3_PKT_RMA_GET_TARGET_COUNT((*pkt), target_count, mpi_errno);

373
        MPID_Datatype_get_extent_macro(target_dtp, type_extent);
374 375
        MPID_Datatype_get_size_macro(target_dtp, type_size);
        recv_data_sz = type_size * target_count;
376
        buf_size = type_extent * target_count;
377

378
        if (new_ptr != NULL) {
379 380
            if (win_ptr->current_lock_data_bytes + buf_size < MPIR_CVAR_CH3_RMA_LOCK_DATA_BYTES) {
                new_ptr->data = MPIU_Malloc(buf_size);
381 382 383 384
            }

            if (new_ptr->data == NULL) {
                /* Note that there are two possible reasons to make new_ptr->data to be NULL:
385 386
                 * (1) win_ptr->current_lock_data_bytes + buf_size >= MPIR_CVAR_CH3_RMA_LOCK_DATA_BYTES;
                 * (2) MPIU_Malloc(buf_size) failed.
387 388 389 390 391 392
                 * In such cases, we cannot allocate memory for lock data, so we give up
                 * buffering lock data, however, we still buffer lock request.
                 */
                MPIDI_CH3_Pkt_t new_pkt;
                MPIDI_CH3_Pkt_lock_t *lock_pkt = &new_pkt.lock;
                MPI_Win target_win_handle;
393
                MPIDI_CH3_Pkt_flags_t flags;
394 395

                MPIDI_CH3_PKT_RMA_GET_TARGET_WIN_HANDLE((*pkt), target_win_handle, mpi_errno);
396
                MPIDI_CH3_PKT_RMA_GET_FLAGS((*pkt), flags, mpi_errno);
397

398 399 400 401 402 403 404 405 406
                if (pkt->type == MPIDI_CH3_PKT_PUT || pkt->type == MPIDI_CH3_PKT_ACCUMULATE) {
                    MPIDI_CH3_PKT_RMA_GET_SOURCE_WIN_HANDLE((*pkt), source_win_handle, mpi_errno);
                    request_handle = MPI_REQUEST_NULL;
                }
                else {
                    source_win_handle = MPI_WIN_NULL;
                    MPIDI_CH3_PKT_RMA_GET_REQUEST_HANDLE((*pkt), request_handle, mpi_errno);
                }

407 408 409
                MPIDI_Pkt_init(lock_pkt, MPIDI_CH3_PKT_LOCK);
                lock_pkt->target_win_handle = target_win_handle;
                lock_pkt->source_win_handle = source_win_handle;
410
                lock_pkt->request_handle = request_handle;
411
                lock_pkt->flags = flags;
412 413 414 415 416 417 418 419

                /* replace original pkt with lock pkt */
                new_ptr->pkt = new_pkt;
                new_ptr->all_data_recved = 1;

                data_discarded = 1;
            }
            else {
420
                win_ptr->current_lock_data_bytes += buf_size;
421
                new_ptr->buf_size = buf_size;
422
            }
423 424 425 426 427 428 429
        }

        /* create request to receive upcoming requests */
        req = MPID_Request_create();
        MPIU_Object_set_ref(req, 1);

        /* fill in area in req that will be used in Receive_data_found() */
430
        if (lock_discarded || data_discarded) {
431
            req->dev.drop_data = TRUE;
432 433 434 435 436 437
            req->dev.user_buf = NULL;
            req->dev.user_count = target_count;
            req->dev.datatype = target_dtp;
            req->dev.recv_data_sz = recv_data_sz;
            req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete;
            req->dev.OnFinal = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete;
438
            req->dev.lock_queue_entry = new_ptr;
439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455

            data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t);
            data_buf = (char *) pkt + sizeof(MPIDI_CH3_Pkt_t);
            MPIU_Assert(req->dev.recv_data_sz > 0);
        }
        else {
            req->dev.user_buf = new_ptr->data;
            req->dev.user_count = target_count;
            req->dev.datatype = target_dtp;
            req->dev.recv_data_sz = recv_data_sz;
            req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete;
            req->dev.OnFinal = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete;
            req->dev.lock_queue_entry = new_ptr;

            data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t);
            data_buf = (char *) pkt + sizeof(MPIDI_CH3_Pkt_t);
            MPIU_Assert(req->dev.recv_data_sz > 0);
456 457 458
        }

        mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete);
459 460
        if (mpi_errno != MPI_SUCCESS)
            MPIU_ERR_POP(mpi_errno);
461 462 463 464 465 466

        /* return bytes of data processed in this pkt handler */
        (*buflen) = sizeof(MPIDI_CH3_Pkt_t) + data_len;

        if (complete) {
            mpi_errno = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete(vc, req, &complete);
467 468
            if (mpi_errno != MPI_SUCCESS)
                MPIU_ERR_POP(mpi_errno);
469
            if (complete) {
470
                goto issue_ack;
471 472 473 474 475 476
            }
        }

        (*reqp) = req;
    }

477
  issue_ack:
478
    if (pkt->type == MPIDI_CH3_PKT_LOCK) {
479 480 481 482
        if (lock_discarded)
            flag = MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED;
        else
            flag = MPIDI_CH3_PKT_FLAG_RMA_LOCK_QUEUED_DATA_QUEUED;
483

484 485 486
        MPIDI_CH3_PKT_RMA_GET_SOURCE_WIN_HANDLE((*pkt), source_win_handle, mpi_errno);
        MPIDI_CH3_PKT_RMA_GET_REQUEST_HANDLE((*pkt), request_handle, mpi_errno);

487 488 489 490
        mpi_errno =
            MPIDI_CH3I_Send_lock_ack_pkt(vc, win_ptr, flag, source_win_handle, request_handle);
        if (mpi_errno != MPI_SUCCESS)
            MPIU_ERR_POP(mpi_errno);
491 492
    }
    else {
493 494 495 496 497 498
        if (lock_discarded)
            flag = MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED;
        else if (data_discarded)
            flag = MPIDI_CH3_PKT_FLAG_RMA_LOCK_QUEUED_DATA_DISCARDED;
        else
            flag = MPIDI_CH3_PKT_FLAG_RMA_LOCK_QUEUED_DATA_QUEUED;
499

500 501 502 503 504 505 506 507 508 509
        if (pkt->type == MPIDI_CH3_PKT_PUT || pkt->type == MPIDI_CH3_PKT_PUT_IMMED ||
            pkt->type == MPIDI_CH3_PKT_ACCUMULATE || pkt->type == MPIDI_CH3_PKT_ACCUMULATE_IMMED) {
            MPIDI_CH3_PKT_RMA_GET_SOURCE_WIN_HANDLE((*pkt), source_win_handle, mpi_errno);
            request_handle = MPI_REQUEST_NULL;
        }
        else {
            source_win_handle = MPI_WIN_NULL;
            MPIDI_CH3_PKT_RMA_GET_REQUEST_HANDLE((*pkt), request_handle, mpi_errno);
        }

510 511 512 513
        mpi_errno =
            MPIDI_CH3I_Send_lock_op_ack_pkt(vc, win_ptr, flag, source_win_handle, request_handle);
        if (mpi_errno != MPI_SUCCESS)
            MPIU_ERR_POP(mpi_errno);
514 515
    }

516
  fn_exit:
517
    return mpi_errno;
518
  fn_fail:
519 520 521 522
    goto fn_exit;
}


523
static inline int handle_lock_ack(MPID_Win * win_ptr, int target_rank, MPIDI_CH3_Pkt_flags_t flags)
524
{
525
    MPIDI_RMA_Target_t *t = NULL;
526 527
    int mpi_errno = MPI_SUCCESS;

528 529 530 531 532 533 534 535 536 537
    MPIU_Assert(win_ptr->states.access_state == MPIDI_RMA_PER_TARGET ||
                win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_CALLED ||
                win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_ISSUED);

    if (win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_CALLED) {
        MPIDI_VC_t *orig_vc = NULL, *target_vc = NULL;
        MPIDI_Comm_get_vc(win_ptr->comm_ptr, win_ptr->comm_ptr->rank, &orig_vc);
        MPIDI_Comm_get_vc(win_ptr->comm_ptr, target_rank, &target_vc);
        if (win_ptr->comm_ptr->rank == target_rank ||
            (win_ptr->shm_allocated == TRUE && orig_vc->node_id == target_vc->node_id)) {
538 539 540 541 542 543 544
            if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED) {
                win_ptr->outstanding_locks--;
                MPIU_Assert(win_ptr->outstanding_locks >= 0);
            }
            else if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED) {
                /* re-send lock request message. */
                mpi_errno = send_lock_msg(target_rank, MPI_LOCK_SHARED, win_ptr);
545 546
                if (mpi_errno != MPI_SUCCESS)
                    MPIU_ERR_POP(mpi_errno);
547
            }
548 549 550 551
            goto fn_exit;
        }
    }
    else if (win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_ISSUED) {
552 553 554 555 556 557 558
        if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED) {
            win_ptr->outstanding_locks--;
            MPIU_Assert(win_ptr->outstanding_locks >= 0);
        }
        else if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED) {
            /* re-send lock request message. */
            mpi_errno = send_lock_msg(target_rank, MPI_LOCK_SHARED, win_ptr);
559 560
            if (mpi_errno != MPI_SUCCESS)
                MPIU_ERR_POP(mpi_errno);
561
        }
562
        goto fn_exit;
563 564
    }

565
    mpi_errno = MPIDI_CH3I_Win_find_target(win_ptr, target_rank, &t);
566 567
    if (mpi_errno != MPI_SUCCESS)
        MPIU_ERR_POP(mpi_errno);
568
    MPIU_Assert(t != NULL);
569 570 571 572 573 574 575

    if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED)
        t->access_state = MPIDI_RMA_LOCK_GRANTED;

    if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED)
        t->access_state = MPIDI_RMA_LOCK_CALLED;

576
  fn_exit:
577
    return mpi_errno;
578
  fn_fail:
579 580 581
    goto fn_exit;
}

582 583 584
static inline int adjust_op_piggybacked_with_lock(MPID_Win * win_ptr,
                                                  int target_rank, MPIDI_CH3_Pkt_flags_t flags)
{
585 586 587 588 589 590
    MPIDI_RMA_Target_t *target = NULL;
    MPIDI_RMA_Op_t *op = NULL;
    MPIDI_CH3_Pkt_flags_t op_flags = MPIDI_CH3_PKT_FLAG_NONE;
    int mpi_errno = MPI_SUCCESS;

    mpi_errno = MPIDI_CH3I_Win_find_target(win_ptr, target_rank, &target);
591 592
    if (mpi_errno != MPI_SUCCESS)
        MPIU_ERR_POP(mpi_errno);
593 594 595
    MPIU_Assert(target != NULL);

    op = target->pending_op_list;
596 597
    if (op != NULL)
        MPIDI_CH3_PKT_RMA_GET_FLAGS(op->pkt, op_flags, mpi_errno);
598

599 600
    if (op_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
        op_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE) {
601 602
        if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED ||
            flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_QUEUED_DATA_QUEUED) {
603

604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621
            if (!op->request) {
                if (op->ureq) {
                    /* Complete user request and release the ch3 ref */
                    MPID_Request_set_completed(op->ureq);
                    MPID_Request_release(op->ureq);
                }

                MPIDI_CH3I_RMA_Ops_free_elem(win_ptr, &(target->pending_op_list),
                                             &(target->pending_op_list_tail), op);
            }
            else {
                MPIDI_CH3I_RMA_Ops_unlink(&(target->pending_op_list),
                                          &(target->pending_op_list_tail), op);
                if (op->is_dt) {
                    MPIDI_CH3I_RMA_Ops_append(&(target->dt_op_list),
                                              &(target->dt_op_list_tail), op);
                }
                else if (op->pkt.type == MPIDI_CH3_PKT_PUT ||
622 623 624
                         op->pkt.type == MPIDI_CH3_PKT_PUT_IMMED ||
                         op->pkt.type == MPIDI_CH3_PKT_ACCUMULATE ||
                         op->pkt.type == MPIDI_CH3_PKT_ACCUMULATE_IMMED) {
625 626 627 628 629 630 631 632 633 634 635
                    MPIDI_CH3I_RMA_Ops_append(&(target->write_op_list),
                                              &(target->write_op_list_tail), op);
                }
                else {
                    MPIDI_CH3I_RMA_Ops_append(&(target->read_op_list),
                                              &(target->read_op_list_tail), op);
                }

                if (op->ureq) {
                    if (MPID_Request_is_complete(op->request)) {
                        /* Complete user request, let cleanup function to release
636
                         * ch3 ref */
637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653
                        MPID_Request_set_completed(op->ureq);
                    }
                    else {
                        /* Increase ref for completion handler */
                        MPIU_Object_add_ref(op->ureq);
                        op->request->dev.request_handle = op->ureq->handle;
                        if (op->request->dev.OnDataAvail == NULL) {
                            op->request->dev.OnDataAvail = MPIDI_CH3_ReqHandler_ReqOpsComplete;
                        }
                        op->request->dev.OnFinal = MPIDI_CH3_ReqHandler_ReqOpsComplete;
                    }
                }
            }
        }
        else if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_QUEUED_DATA_DISCARDED ||
                 flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED) {
            /* We need to re-transmit this operation, so we destroy
654 655
             * the internal request and erase all flags in current
             * operation. */
656 657 658 659 660 661 662 663
            if (op->request) {
                MPIDI_CH3_Request_destroy(op->request);
                op->request = NULL;
                win_ptr->active_req_cnt--;
            }
            MPIDI_CH3_PKT_RMA_ERASE_FLAGS(op->pkt, mpi_errno);

            target->next_op_to_issue = op;
664 665 666 667
            if (op_flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH)
                target->sync.sync_flag = MPIDI_RMA_SYNC_FLUSH;
            else if (op_flags & MPIDI_RMA_SYNC_UNLOCK)
                target->sync.sync_flag = MPIDI_RMA_SYNC_UNLOCK;
668 669
        }
    }
670

671
  fn_exit:
672
    return mpi_errno;
673
  fn_fail:
674 675 676 677
    goto fn_exit;
}


678 679 680 681
#undef FUNCNAME
#define FUNCNAME acquire_local_lock
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
682
static inline int acquire_local_lock(MPID_Win * win_ptr, int lock_type)
683 684 685 686 687
{
    int mpi_errno = MPI_SUCCESS;
    MPIDI_STATE_DECL(MPID_STATE_ACQUIRE_LOCAL_LOCK);
    MPIDI_RMA_FUNC_ENTER(MPID_STATE_ACQUIRE_LOCAL_LOCK);

Xin Zhao's avatar
Xin Zhao committed
688 689
    MPIR_T_PVAR_TIMER_START(RMA, rma_winlock_getlocallock);

690
    if (MPIDI_CH3I_Try_acquire_win_lock(win_ptr, lock_type) == 1) {
691
        mpi_errno = handle_lock_ack(win_ptr, win_ptr->comm_ptr->rank,
692 693 694
                                    MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED);
        if (mpi_errno)
            MPIU_ERR_POP(mpi_errno);
695 696 697 698 699
    }
    else {
        /* Queue the lock information. */
        MPIDI_CH3_Pkt_t pkt;
        MPIDI_CH3_Pkt_lock_t *lock_pkt = &pkt.lock;
700
        MPIDI_RMA_Lock_entry_t *new_ptr = NULL;
701
        MPIDI_VC_t *my_vc;
702 703

        MPIDI_Pkt_init(lock_pkt, MPIDI_CH3_PKT_LOCK);
704 705 706 707 708 709 710
        lock_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
        if (lock_type == MPI_LOCK_SHARED)
            lock_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED;
        else {
            MPIU_Assert(lock_type == MPI_LOCK_EXCLUSIVE);
            lock_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE;
        }
711

712 713
        new_ptr = MPIDI_CH3I_Win_lock_entry_alloc(win_ptr, &pkt);
        if (new_ptr == NULL) {
714
            mpi_errno = handle_lock_ack(win_ptr, win_ptr->comm_ptr->rank,
715 716 717
                                        MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED);
            if (mpi_errno != MPI_SUCCESS)
                MPIU_ERR_POP(mpi_errno);
718
            goto fn_exit;
719
        }
720
        MPL_LL_APPEND(win_ptr->lock_queue, win_ptr->lock_queue_tail, new_ptr);
721 722
        MPIDI_Comm_get_vc_set_active(win_ptr->comm_ptr, win_ptr->comm_ptr->rank, &my_vc);
        new_ptr->vc = my_vc;
723 724

        new_ptr->all_data_recved = 1;
725 726
    }

727
  fn_exit:
Xin Zhao's avatar
Xin Zhao committed
728
    MPIR_T_PVAR_TIMER_END(RMA, rma_winlock_getlocallock);
729
    MPIDI_RMA_FUNC_EXIT(MPID_STATE_ACQUIRE_LOCAL_LOCK);
730 731
    return mpi_errno;
    /* --BEGIN ERROR HANDLING-- */
732
  fn_fail:
733 734 735 736 737
    goto fn_exit;
    /* --END ERROR HANDLING-- */
}


738 739 740 741 742 743 744 745 746 747
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_RMA_Handle_flush_ack
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static inline int MPIDI_CH3I_RMA_Handle_flush_ack(MPID_Win * win_ptr, int target_rank)
{
    int mpi_errno = MPI_SUCCESS;
    MPIDI_RMA_Target_t *t;

    mpi_errno = MPIDI_CH3I_Win_find_target(win_ptr, target_rank, &t);
748 749
    if (mpi_errno != MPI_SUCCESS)
        MPIU_ERR_POP(mpi_errno);
750

751 752 753
    t->sync.outstanding_acks--;
    MPIU_Assert(t->sync.outstanding_acks >= 0);

754
    t->put_acc_issued = 0;      /* reset PUT_ACC_FLAG after FLUSH is completed */
755

756
  fn_exit:
757
    return mpi_errno;
758
  fn_fail:
759 760 761 762
    goto fn_exit;
}


763
#undef FUNCNAME
764
#define FUNCNAME do_accumulate_op
765 766
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
767 768
static inline int do_accumulate_op(void *source_buf, void *target_buf,
                                   int acc_count, MPI_Datatype acc_dtp, MPI_Op acc_op)
769 770
{
    int mpi_errno = MPI_SUCCESS;
771 772 773 774 775
    MPI_User_function *uop;
    MPIDI_STATE_DECL(MPID_STATE_DO_ACCUMULATE_OP);

    MPIDI_FUNC_ENTER(MPID_STATE_DO_ACCUMULATE_OP);

776
    if (acc_op == MPI_REPLACE) {
777
        /* simply copy the data */
778
        mpi_errno = MPIR_Localcopy(source_buf, acc_count, acc_dtp, target_buf, acc_count, acc_dtp);
779
        if (mpi_errno) {
780 781
            MPIU_ERR_POP(mpi_errno);
        }
782 783 784
        goto fn_exit;
    }

785
    if (HANDLE_GET_KIND(acc_op) == HANDLE_KIND_BUILTIN) {
786
        /* get the function by indexing into the op table */
787
        uop = MPIR_OP_HDL_TO_FN(acc_op);
788
    }
789 790 791 792 793
    else {
        /* --BEGIN ERROR HANDLING-- */
        mpi_errno =
            MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OP,
                                 "**opnotpredefined", "**opnotpredefined %d", acc_op);
794
        return mpi_errno;
795
        /* --END ERROR HANDLING-- */
796 797
    }

798 799
    if (MPIR_DATATYPE_IS_PREDEFINED(acc_dtp)) {
        (*uop) (source_buf, target_buf, &acc_count, &acc_dtp);
800
    }
801 802
    else {
        /* derived datatype */
803 804 805
        MPID_Segment *segp;
        DLOOP_VECTOR *dloop_vec;
        MPI_Aint first, last;
806
        int vec_len, i, count;
807
        MPI_Aint type_extent, type_size;
808
        MPI_Datatype type;
809
        MPID_Datatype *dtp;
810 811
        MPI_Aint curr_len;
        void *curr_loc;
812 813

        segp = MPID_Segment_alloc();
814 815 816 817 818 819
        /* --BEGIN ERROR HANDLING-- */
        if (!segp) {
            mpi_errno =
                MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__,
                                     MPI_ERR_OTHER, "**nomem", 0);
            MPIDI_FUNC_EXIT(MPID_STATE_DO_ACCUMULATE_OP);
820
            return mpi_errno;
821
        }
822 823
        /* --END ERROR HANDLING-- */
        MPID_Segment_init(NULL, acc_count, acc_dtp, segp, 0);
824
        first = 0;
825
        last = SEGMENT_IGNORE_LAST;
826

827 828
        MPID_Datatype_get_ptr(acc_dtp, dtp);
        vec_len = dtp->max_contig_blocks * acc_count + 1;
829 830 831
        /* +1 needed because Rob says so */
        dloop_vec = (DLOOP_VECTOR *)
            MPIU_Malloc(vec_len * sizeof(DLOOP_VECTOR));
832 833 834 835 836 837
        /* --BEGIN ERROR HANDLING-- */
        if (!dloop_vec) {
            mpi_errno =
                MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__,
                                     MPI_ERR_OTHER, "**nomem", 0);
            MPIDI_FUNC_EXIT(MPID_STATE_DO_ACCUMULATE_OP);
838
            return mpi_errno;
839
        }
840
        /* --END ERROR HANDLING-- */
841 842 843 844

        MPID_Segment_pack_vector(segp, first, &last, dloop_vec, &vec_len);

        type = dtp->eltype;
845 846
        MPIU_Assert(type != MPI_DATATYPE_NULL);

847
        MPID_Datatype_get_size_macro(type, type_size);
848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875
        MPID_Datatype_get_extent_macro(type, type_extent);

        i = 0;
        curr_loc = dloop_vec[0].DLOOP_VECTOR_BUF;
        curr_len = dloop_vec[0].DLOOP_VECTOR_LEN;
        while (i != vec_len) {
            if (curr_len < type_size) {
                MPIU_Assert(i != vec_len);
                i++;
                curr_len += dloop_vec[i].DLOOP_VECTOR_LEN;
                continue;
            }

            MPIU_Assign_trunc(count, curr_len / type_size, int);
            (*uop) ((char *) source_buf + MPIU_PtrToAint(curr_loc),
                    (char *) target_buf + MPIU_PtrToAint(curr_loc), &count, &type);

            if (curr_len % type_size == 0) {
                i++;
                if (i != vec_len) {
                    curr_loc = dloop_vec[i].DLOOP_VECTOR_BUF;
                    curr_len = dloop_vec[i].DLOOP_VECTOR_LEN;
                }
            }
            else {
                curr_loc = (void *) ((char *) curr_loc + type_extent * count);
                curr_len -= type_size * count;
            }
876 877
        }

878 879
        MPID_Segment_free(segp);
        MPIU_Free(dloop_vec);
880 881
    }

882
  fn_exit:
883
    MPIDI_FUNC_EXIT(MPID_STATE_DO_ACCUMULATE_OP);
884 885

    return mpi_errno;
886
  fn_fail:
887 888 889
    goto fn_exit;
}

890

891 892 893 894 895
static inline int check_piggyback_lock(MPID_Win * win_ptr, MPIDI_VC_t * vc,
                                       MPIDI_CH3_Pkt_t * pkt,
                                       MPIDI_msg_sz_t * buflen,
                                       int *acquire_lock_fail, MPID_Request ** reqp)
{
896 897 898 899 900
    int lock_type;
    MPIDI_CH3_Pkt_flags_t flags;
    int mpi_errno = MPI_SUCCESS;

    (*acquire_lock_fail) = 0;
901
    (*reqp) = NULL;
902 903

    MPIDI_CH3_PKT_RMA_GET_FLAGS((*pkt), flags, mpi_errno);
904
    if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED || flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE) {
905 906 907 908 909 910 911

        if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED)
            lock_type = MPI_LOCK_SHARED;
        else {
            MPIU_Assert(flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE);
            lock_type = MPI_LOCK_EXCLUSIVE;
        }
912 913 914

        if (MPIDI_CH3I_Try_acquire_win_lock(win_ptr, lock_type) == 0) {
            /* cannot acquire the lock, queue up this operation. */
915
            mpi_errno = enqueue_lock_origin(win_ptr, vc, pkt, buflen, reqp);
Xin Zhao's avatar