mpidrma.h 39.5 KB
Newer Older
1
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 3 4 5 6
/*
 *  (C) 2001 by Argonne National Laboratory.
 *      See COPYRIGHT in top-level directory.
 */

7 8
#if !defined(MPID_RMA_H_INCLUDED)
#define MPID_RMA_H_INCLUDED
9

10 11 12
#include "mpid_rma_types.h"
#include "mpid_rma_oplist.h"
#include "mpid_rma_shm.h"
13
#include "mpid_rma_issue.h"
14
#include "mpid_rma_lockqueue.h"
15

16
#undef FUNCNAME
17
#define FUNCNAME send_lock_msg
18 19
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
20
static inline int send_lock_msg(int dest, int lock_type, MPID_Win * win_ptr)
21
{
22 23 24 25 26 27 28
    int mpi_errno = MPI_SUCCESS;
    MPIDI_CH3_Pkt_t upkt;
    MPIDI_CH3_Pkt_lock_t *lock_pkt = &upkt.lock;
    MPID_Request *req = NULL;
    MPIDI_VC_t *vc;
    MPIDI_STATE_DECL(MPID_STATE_SEND_LOCK_MSG);
    MPIDI_RMA_FUNC_ENTER(MPID_STATE_SEND_LOCK_MSG);
29

30
    MPIDI_Comm_get_vc_set_active(win_ptr->comm_ptr, dest, &vc);
31

32
    MPIDI_Pkt_init(lock_pkt, MPIDI_CH3_PKT_LOCK);
33
    lock_pkt->target_win_handle = win_ptr->basic_info_table[dest].win_handle;
34
    lock_pkt->source_win_handle = win_ptr->handle;
35
    lock_pkt->request_handle = MPI_REQUEST_NULL;
36 37 38 39 40 41 42
    lock_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
    if (lock_type == MPI_LOCK_SHARED)
        lock_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED;
    else {
        MPIU_Assert(lock_type == MPI_LOCK_EXCLUSIVE);
        lock_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE;
    }
43

44 45 46 47
    MPIU_THREAD_CS_ENTER(CH3COMM, vc);
    mpi_errno = MPIDI_CH3_iStartMsg(vc, lock_pkt, sizeof(*lock_pkt), &req);
    MPIU_THREAD_CS_EXIT(CH3COMM, vc);
    MPIU_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rma_msg");
48

49 50 51 52
    /* release the request returned by iStartMsg */
    if (req != NULL) {
        MPID_Request_release(req);
    }
53

54
  fn_exit:
55
    MPIDI_RMA_FUNC_EXIT(MPID_STATE_SEND_LOCK_MSG);
56
    return mpi_errno;
57
    /* --BEGIN ERROR HANDLING-- */
58
  fn_fail:
59
    goto fn_exit;
60
    /* --END ERROR HANDLING-- */
61 62
}

63
#undef FUNCNAME
64
#define FUNCNAME send_unlock_msg
65 66
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
67
static inline int send_unlock_msg(int dest, MPID_Win * win_ptr, MPIDI_CH3_Pkt_flags_t flags)
68 69
{
    int mpi_errno = MPI_SUCCESS;
70 71 72 73 74 75
    MPIDI_CH3_Pkt_t upkt;
    MPIDI_CH3_Pkt_unlock_t *unlock_pkt = &upkt.unlock;
    MPID_Request *req = NULL;
    MPIDI_VC_t *vc;
    MPIDI_STATE_DECL(MPID_STATE_SEND_UNLOCK_MSG);
    MPIDI_RMA_FUNC_ENTER(MPID_STATE_SEND_UNLOCK_MSG);
76

77
    MPIDI_Comm_get_vc_set_active(win_ptr->comm_ptr, dest, &vc);
78

79 80
    /* Send a lock packet over to the target. wait for the lock_granted
     * reply. Then do all the RMA ops. */
81

82
    MPIDI_Pkt_init(unlock_pkt, MPIDI_CH3_PKT_UNLOCK);
83
    unlock_pkt->target_win_handle = win_ptr->basic_info_table[dest].win_handle;
84
    unlock_pkt->source_win_handle = win_ptr->handle;
85
    unlock_pkt->flags = flags;
86

87 88 89 90
    MPIU_THREAD_CS_ENTER(CH3COMM, vc);
    mpi_errno = MPIDI_CH3_iStartMsg(vc, unlock_pkt, sizeof(*unlock_pkt), &req);
    MPIU_THREAD_CS_EXIT(CH3COMM, vc);
    MPIU_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rma_msg");
91

92 93 94
    /* Release the request returned by iStartMsg */
    if (req != NULL) {
        MPID_Request_release(req);
95
    }
96

97
  fn_exit:
98
    MPIDI_RMA_FUNC_EXIT(MPID_STATE_SEND_UNLOCK_MSG);
99 100
    return mpi_errno;
    /* --BEGIN ERROR HANDLING-- */
101
  fn_fail:
102 103 104 105
    goto fn_exit;
    /* --END ERROR HANDLING-- */
}

106

107
#undef FUNCNAME
108
#define FUNCNAME MPIDI_CH3I_Send_lock_ack_pkt
109 110
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
111 112
static inline int MPIDI_CH3I_Send_lock_ack_pkt(MPIDI_VC_t * vc, MPID_Win * win_ptr,
                                               MPIDI_CH3_Pkt_flags_t flags,
113 114
                                               MPI_Win source_win_handle,
                                               MPI_Request request_handle)
115
{
116
    MPIDI_CH3_Pkt_t upkt;
117
    MPIDI_CH3_Pkt_lock_ack_t *lock_ack_pkt = &upkt.lock_ack;
118 119
    MPID_Request *req = NULL;
    int mpi_errno;
120
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SEND_LOCK_ACK_PKT);
121

122
    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SEND_LOCK_ACK_PKT);
123

124 125
    MPIU_Assert(!(source_win_handle != MPI_WIN_NULL && request_handle != MPI_REQUEST_NULL));

126 127 128
    /* send lock ack packet */
    MPIDI_Pkt_init(lock_ack_pkt, MPIDI_CH3_PKT_LOCK_ACK);
    lock_ack_pkt->source_win_handle = source_win_handle;
129
    lock_ack_pkt->request_handle = request_handle;
130
    lock_ack_pkt->target_rank = win_ptr->comm_ptr->rank;
131
    lock_ack_pkt->flags = flags;
132 133

    MPIU_DBG_MSG_FMT(CH3_OTHER, VERBOSE,
134 135
                     (MPIU_DBG_FDEST, "sending lock ack pkt on vc=%p, source_win_handle=%#08x",
                      vc, lock_ack_pkt->source_win_handle));
136 137

    MPIU_THREAD_CS_ENTER(CH3COMM, vc);
138
    mpi_errno = MPIDI_CH3_iStartMsg(vc, lock_ack_pkt, sizeof(*lock_ack_pkt), &req);
139 140 141
    MPIU_THREAD_CS_EXIT(CH3COMM, vc);
    if (mpi_errno) {
        MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
142 143
    }

144 145
    if (req != NULL) {
        MPID_Request_release(req);
146
    }
147

148
  fn_fail:
149
    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SEND_LOCK_ACK_PKT);
150 151

    return mpi_errno;
152 153
}

154 155 156 157 158 159
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_Send_lock_op_ack_pkt
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static inline int MPIDI_CH3I_Send_lock_op_ack_pkt(MPIDI_VC_t * vc, MPID_Win * win_ptr,
                                                  MPIDI_CH3_Pkt_flags_t flags,
160 161
                                                  MPI_Win source_win_handle,
                                                  MPI_Request request_handle)
162 163 164 165 166 167 168 169 170
{
    MPIDI_CH3_Pkt_t upkt;
    MPIDI_CH3_Pkt_lock_op_ack_t *lock_op_ack_pkt = &upkt.lock_op_ack;
    MPID_Request *req = NULL;
    int mpi_errno;
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SEND_LOCK_OP_ACK_PKT);

    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SEND_LOCK_OP_ACK_PKT);

171 172
    MPIU_Assert(!(source_win_handle != MPI_WIN_NULL && request_handle != MPI_REQUEST_NULL));

173 174 175
    /* send lock ack packet */
    MPIDI_Pkt_init(lock_op_ack_pkt, MPIDI_CH3_PKT_LOCK_OP_ACK);
    lock_op_ack_pkt->source_win_handle = source_win_handle;
176
    lock_op_ack_pkt->request_handle = request_handle;
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
    lock_op_ack_pkt->target_rank = win_ptr->comm_ptr->rank;
    lock_op_ack_pkt->flags = flags;

    MPIU_DBG_MSG_FMT(CH3_OTHER, VERBOSE,
                     (MPIU_DBG_FDEST, "sending lock op ack pkt on vc=%p, source_win_handle=%#08x",
                      vc, lock_op_ack_pkt->source_win_handle));

    MPIU_THREAD_CS_ENTER(CH3COMM, vc);
    mpi_errno = MPIDI_CH3_iStartMsg(vc, lock_op_ack_pkt, sizeof(*lock_op_ack_pkt), &req);
    MPIU_THREAD_CS_EXIT(CH3COMM, vc);
    if (mpi_errno) {
        MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
    }

    if (req != NULL) {
        MPID_Request_release(req);
    }

  fn_fail:
    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SEND_LOCK_OP_ACK_PKT);
    return mpi_errno;
}

200 201

#undef FUNCNAME
Xin Zhao's avatar
Xin Zhao committed
202
#define FUNCNAME MPIDI_CH3I_Send_flush_ack_pkt
203 204
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
205 206
static inline int MPIDI_CH3I_Send_flush_ack_pkt(MPIDI_VC_t * vc, MPID_Win * win_ptr,
                                                MPI_Win source_win_handle)
207
{
208
    MPIDI_CH3_Pkt_t upkt;
Xin Zhao's avatar
Xin Zhao committed
209
    MPIDI_CH3_Pkt_flush_ack_t *flush_ack_pkt = &upkt.flush_ack;
210
    MPID_Request *req;
211
    int mpi_errno = MPI_SUCCESS;
Xin Zhao's avatar
Xin Zhao committed
212
    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SEND_FLUSH_ACK_PKT);
213

Xin Zhao's avatar
Xin Zhao committed
214
    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SEND_FLUSH_ACK_PKT);
215

Xin Zhao's avatar
Xin Zhao committed
216 217 218
    MPIDI_Pkt_init(flush_ack_pkt, MPIDI_CH3_PKT_FLUSH_ACK);
    flush_ack_pkt->source_win_handle = source_win_handle;
    flush_ack_pkt->target_rank = win_ptr->comm_ptr->rank;
219

220
    /* Because this is in a packet handler, it is already within a critical section */
221
    /* MPIU_THREAD_CS_ENTER(CH3COMM,vc); */
Xin Zhao's avatar
Xin Zhao committed
222
    mpi_errno = MPIDI_CH3_iStartMsg(vc, flush_ack_pkt, sizeof(*flush_ack_pkt), &req);
223 224
    /* MPIU_THREAD_CS_EXIT(CH3COMM,vc); */
    if (mpi_errno != MPI_SUCCESS) {
225
        MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
226 227
    }

228
    if (req != NULL) {
229
        MPID_Request_release(req);
230 231
    }

232
  fn_fail:
Xin Zhao's avatar
Xin Zhao committed
233
    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SEND_FLUSH_ACK_PKT);
234 235
    return mpi_errno;
}
236

237

238 239 240 241 242 243 244 245
#undef FUNCNAME
#define FUNCNAME send_decr_at_cnt_msg
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static inline int send_decr_at_cnt_msg(int dst, MPID_Win * win_ptr)
{
    MPIDI_CH3_Pkt_t upkt;
    MPIDI_CH3_Pkt_decr_at_counter_t *decr_at_cnt_pkt = &upkt.decr_at_cnt;
246
    MPIDI_VC_t *vc;
247 248 249 250 251 252
    MPID_Request *request = NULL;
    int mpi_errno = MPI_SUCCESS;
    MPIDI_STATE_DECL(MPID_STATE_SEND_DECR_AT_CNT_MSG);
    MPIDI_RMA_FUNC_ENTER(MPID_STATE_SEND_DECR_AT_CNT_MSG);

    MPIDI_Pkt_init(decr_at_cnt_pkt, MPIDI_CH3_PKT_DECR_AT_COUNTER);
253
    decr_at_cnt_pkt->target_win_handle = win_ptr->basic_info_table[dst].win_handle;
254 255 256

    MPIDI_Comm_get_vc_set_active(win_ptr->comm_ptr, dst, &vc);

257 258 259
    MPIU_THREAD_CS_ENTER(CH3COMM, vc);
    mpi_errno = MPIDI_CH3_iStartMsg(vc, decr_at_cnt_pkt, sizeof(*decr_at_cnt_pkt), &request);
    MPIU_THREAD_CS_EXIT(CH3COMM, vc);
260
    if (mpi_errno != MPI_SUCCESS) {
261
        MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
262 263 264 265 266 267 268 269 270 271 272 273 274 275 276
    }

    if (request != NULL) {
        MPID_Request_release(request);
    }

  fn_exit:
    MPIDI_RMA_FUNC_EXIT(MPID_STATE_SEND_DECR_AT_CNT_MSG);
    return mpi_errno;
    /* --BEGIN ERROR HANDLING-- */
  fn_fail:
    goto fn_exit;
    /* --END ERROR HANDLING-- */
}

277

278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294
#undef FUNCNAME
#define FUNCNAME send_flush_msg
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static inline int send_flush_msg(int dest, MPID_Win * win_ptr)
{
    int mpi_errno = MPI_SUCCESS;
    MPIDI_CH3_Pkt_t upkt;
    MPIDI_CH3_Pkt_flush_t *flush_pkt = &upkt.flush;
    MPID_Request *req = NULL;
    MPIDI_VC_t *vc;
    MPIDI_STATE_DECL(MPID_STATE_SEND_FLUSH_MSG);
    MPIDI_RMA_FUNC_ENTER(MPID_STATE_SEND_FLUSH_MSG);

    MPIDI_Comm_get_vc_set_active(win_ptr->comm_ptr, dest, &vc);

    MPIDI_Pkt_init(flush_pkt, MPIDI_CH3_PKT_FLUSH);
295
    flush_pkt->target_win_handle = win_ptr->basic_info_table[dest].win_handle;
296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316
    flush_pkt->source_win_handle = win_ptr->handle;

    MPIU_THREAD_CS_ENTER(CH3COMM, vc);
    mpi_errno = MPIDI_CH3_iStartMsg(vc, flush_pkt, sizeof(*flush_pkt), &req);
    MPIU_THREAD_CS_EXIT(CH3COMM, vc);
    MPIU_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rma_msg");

    /* Release the request returned by iStartMsg */
    if (req != NULL) {
        MPID_Request_release(req);
    }

  fn_exit:
    MPIDI_RMA_FUNC_EXIT(MPID_STATE_SEND_FLUSH_MSG);
    return mpi_errno;
    /* --BEGIN ERROR HANDLING-- */
  fn_fail:
    goto fn_exit;
    /* --END ERROR HANDLING-- */
}

317 318

/* enqueue an unsatisfied origin in passive target at target side. */
319 320 321
static inline int enqueue_lock_origin(MPID_Win * win_ptr, MPIDI_VC_t * vc,
                                      MPIDI_CH3_Pkt_t * pkt,
                                      MPIDI_msg_sz_t * buflen, MPID_Request ** reqp)
322
{
323
    MPIDI_RMA_Lock_entry_t *new_ptr = NULL;
324 325
    MPIDI_CH3_Pkt_flags_t flag;
    MPI_Win source_win_handle;
326
    MPI_Request request_handle;
327
    int lock_discarded = 0, data_discarded = 0;
328 329
    int mpi_errno = MPI_SUCCESS;

330
    (*reqp) = NULL;
331

332
    new_ptr = MPIDI_CH3I_Win_lock_entry_alloc(win_ptr, pkt);
333 334
    if (new_ptr != NULL) {
        MPL_LL_APPEND(win_ptr->lock_queue, win_ptr->lock_queue_tail, new_ptr);
335
        new_ptr->vc = vc;
336 337 338
    }
    else {
        lock_discarded = 1;
339
    }
340

341
    if (pkt->type == MPIDI_CH3_PKT_LOCK ||
342 343
        pkt->type == MPIDI_CH3_PKT_PUT_IMMED ||
        pkt->type == MPIDI_CH3_PKT_ACCUMULATE_IMMED ||
344
        pkt->type == MPIDI_CH3_PKT_GET ||
345
        pkt->type == MPIDI_CH3_PKT_GET_ACCUM_IMMED ||
346
        pkt->type == MPIDI_CH3_PKT_FOP_IMMED || pkt->type == MPIDI_CH3_PKT_CAS_IMMED) {
347

348 349
        /* return bytes of data processed in this pkt handler */
        (*buflen) = sizeof(MPIDI_CH3_Pkt_t);
350 351 352 353 354

        if (new_ptr != NULL)
            new_ptr->all_data_recved = 1;

        goto issue_ack;
355 356 357
    }
    else {
        MPI_Aint type_size = 0;
358
        MPI_Aint type_extent;
359
        MPIDI_msg_sz_t recv_data_sz = 0;
360
        MPIDI_msg_sz_t buf_size;
361 362 363 364 365 366 367
        MPID_Request *req = NULL;
        MPI_Datatype target_dtp;
        int target_count;
        int complete = 0;
        MPIDI_msg_sz_t data_len;
        char *data_buf = NULL;

368
        /* This is PUT, ACC, GACC, FOP */
369 370 371 372

        MPIDI_CH3_PKT_RMA_GET_TARGET_DATATYPE((*pkt), target_dtp, mpi_errno);
        MPIDI_CH3_PKT_RMA_GET_TARGET_COUNT((*pkt), target_count, mpi_errno);

373
        MPID_Datatype_get_extent_macro(target_dtp, type_extent);
374
        MPID_Datatype_get_size_macro(target_dtp, type_size);
375

376 377 378 379 380 381 382 383 384 385 386 387 388 389 390
        if (pkt->type == MPIDI_CH3_PKT_PUT) {
            recv_data_sz = type_size * target_count;
            buf_size = type_extent * target_count;
        }
        else {
            MPI_Aint stream_offset, stream_elem_count;
            MPI_Aint total_len, rest_len;

            MPIDI_CH3_PKT_RMA_GET_STREAM_OFFSET((*pkt), stream_offset, mpi_errno);
            stream_elem_count = MPIDI_CH3U_SRBuf_size / type_extent;
            total_len = type_size * target_count;
            rest_len = total_len - stream_offset;
            recv_data_sz = MPIR_MIN(rest_len, type_size * stream_elem_count);
            buf_size = type_extent * (recv_data_sz / type_size);
        }
391

392
        if (new_ptr != NULL) {
393 394
            if (win_ptr->current_lock_data_bytes + buf_size < MPIR_CVAR_CH3_RMA_LOCK_DATA_BYTES) {
                new_ptr->data = MPIU_Malloc(buf_size);
395 396 397 398
            }

            if (new_ptr->data == NULL) {
                /* Note that there are two possible reasons to make new_ptr->data to be NULL:
399 400
                 * (1) win_ptr->current_lock_data_bytes + buf_size >= MPIR_CVAR_CH3_RMA_LOCK_DATA_BYTES;
                 * (2) MPIU_Malloc(buf_size) failed.
401 402 403 404 405 406
                 * In such cases, we cannot allocate memory for lock data, so we give up
                 * buffering lock data, however, we still buffer lock request.
                 */
                MPIDI_CH3_Pkt_t new_pkt;
                MPIDI_CH3_Pkt_lock_t *lock_pkt = &new_pkt.lock;
                MPI_Win target_win_handle;
407
                MPIDI_CH3_Pkt_flags_t flags;
408 409

                MPIDI_CH3_PKT_RMA_GET_TARGET_WIN_HANDLE((*pkt), target_win_handle, mpi_errno);
410
                MPIDI_CH3_PKT_RMA_GET_FLAGS((*pkt), flags, mpi_errno);
411

412 413 414 415 416 417 418 419 420
                if (pkt->type == MPIDI_CH3_PKT_PUT || pkt->type == MPIDI_CH3_PKT_ACCUMULATE) {
                    MPIDI_CH3_PKT_RMA_GET_SOURCE_WIN_HANDLE((*pkt), source_win_handle, mpi_errno);
                    request_handle = MPI_REQUEST_NULL;
                }
                else {
                    source_win_handle = MPI_WIN_NULL;
                    MPIDI_CH3_PKT_RMA_GET_REQUEST_HANDLE((*pkt), request_handle, mpi_errno);
                }

421 422 423
                MPIDI_Pkt_init(lock_pkt, MPIDI_CH3_PKT_LOCK);
                lock_pkt->target_win_handle = target_win_handle;
                lock_pkt->source_win_handle = source_win_handle;
424
                lock_pkt->request_handle = request_handle;
425
                lock_pkt->flags = flags;
426 427 428 429 430 431 432 433

                /* replace original pkt with lock pkt */
                new_ptr->pkt = new_pkt;
                new_ptr->all_data_recved = 1;

                data_discarded = 1;
            }
            else {
434
                win_ptr->current_lock_data_bytes += buf_size;
435
                new_ptr->buf_size = buf_size;
436
            }
437 438 439 440 441 442 443
        }

        /* create request to receive upcoming requests */
        req = MPID_Request_create();
        MPIU_Object_set_ref(req, 1);

        /* fill in area in req that will be used in Receive_data_found() */
444
        if (lock_discarded || data_discarded) {
445
            req->dev.drop_data = TRUE;
446 447 448 449 450 451
            req->dev.user_buf = NULL;
            req->dev.user_count = target_count;
            req->dev.datatype = target_dtp;
            req->dev.recv_data_sz = recv_data_sz;
            req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete;
            req->dev.OnFinal = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete;
452
            req->dev.lock_queue_entry = new_ptr;
453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469

            data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t);
            data_buf = (char *) pkt + sizeof(MPIDI_CH3_Pkt_t);
            MPIU_Assert(req->dev.recv_data_sz > 0);
        }
        else {
            req->dev.user_buf = new_ptr->data;
            req->dev.user_count = target_count;
            req->dev.datatype = target_dtp;
            req->dev.recv_data_sz = recv_data_sz;
            req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete;
            req->dev.OnFinal = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete;
            req->dev.lock_queue_entry = new_ptr;

            data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t);
            data_buf = (char *) pkt + sizeof(MPIDI_CH3_Pkt_t);
            MPIU_Assert(req->dev.recv_data_sz > 0);
470 471 472
        }

        mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete);
473 474
        if (mpi_errno != MPI_SUCCESS)
            MPIU_ERR_POP(mpi_errno);
475 476 477 478 479 480

        /* return bytes of data processed in this pkt handler */
        (*buflen) = sizeof(MPIDI_CH3_Pkt_t) + data_len;

        if (complete) {
            mpi_errno = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete(vc, req, &complete);
481 482
            if (mpi_errno != MPI_SUCCESS)
                MPIU_ERR_POP(mpi_errno);
483
            if (complete) {
484
                goto issue_ack;
485 486 487 488 489 490
            }
        }

        (*reqp) = req;
    }

491
  issue_ack:
492
    if (pkt->type == MPIDI_CH3_PKT_LOCK) {
493 494 495 496
        if (lock_discarded)
            flag = MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED;
        else
            flag = MPIDI_CH3_PKT_FLAG_RMA_LOCK_QUEUED_DATA_QUEUED;
497

498 499 500
        MPIDI_CH3_PKT_RMA_GET_SOURCE_WIN_HANDLE((*pkt), source_win_handle, mpi_errno);
        MPIDI_CH3_PKT_RMA_GET_REQUEST_HANDLE((*pkt), request_handle, mpi_errno);

501 502 503 504
        mpi_errno =
            MPIDI_CH3I_Send_lock_ack_pkt(vc, win_ptr, flag, source_win_handle, request_handle);
        if (mpi_errno != MPI_SUCCESS)
            MPIU_ERR_POP(mpi_errno);
505 506
    }
    else {
507 508 509 510 511 512
        if (lock_discarded)
            flag = MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED;
        else if (data_discarded)
            flag = MPIDI_CH3_PKT_FLAG_RMA_LOCK_QUEUED_DATA_DISCARDED;
        else
            flag = MPIDI_CH3_PKT_FLAG_RMA_LOCK_QUEUED_DATA_QUEUED;
513

514 515 516 517 518 519 520 521 522 523
        if (pkt->type == MPIDI_CH3_PKT_PUT || pkt->type == MPIDI_CH3_PKT_PUT_IMMED ||
            pkt->type == MPIDI_CH3_PKT_ACCUMULATE || pkt->type == MPIDI_CH3_PKT_ACCUMULATE_IMMED) {
            MPIDI_CH3_PKT_RMA_GET_SOURCE_WIN_HANDLE((*pkt), source_win_handle, mpi_errno);
            request_handle = MPI_REQUEST_NULL;
        }
        else {
            source_win_handle = MPI_WIN_NULL;
            MPIDI_CH3_PKT_RMA_GET_REQUEST_HANDLE((*pkt), request_handle, mpi_errno);
        }

524 525 526 527
        mpi_errno =
            MPIDI_CH3I_Send_lock_op_ack_pkt(vc, win_ptr, flag, source_win_handle, request_handle);
        if (mpi_errno != MPI_SUCCESS)
            MPIU_ERR_POP(mpi_errno);
528 529
    }

530
  fn_exit:
531
    return mpi_errno;
532
  fn_fail:
533 534 535 536
    goto fn_exit;
}


537
static inline int handle_lock_ack(MPID_Win * win_ptr, int target_rank, MPIDI_CH3_Pkt_flags_t flags)
538
{
539
    MPIDI_RMA_Target_t *t = NULL;
540 541
    int mpi_errno = MPI_SUCCESS;

542 543 544 545 546 547 548 549 550 551
    MPIU_Assert(win_ptr->states.access_state == MPIDI_RMA_PER_TARGET ||
                win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_CALLED ||
                win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_ISSUED);

    if (win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_CALLED) {
        MPIDI_VC_t *orig_vc = NULL, *target_vc = NULL;
        MPIDI_Comm_get_vc(win_ptr->comm_ptr, win_ptr->comm_ptr->rank, &orig_vc);
        MPIDI_Comm_get_vc(win_ptr->comm_ptr, target_rank, &target_vc);
        if (win_ptr->comm_ptr->rank == target_rank ||
            (win_ptr->shm_allocated == TRUE && orig_vc->node_id == target_vc->node_id)) {
552 553 554 555 556 557 558
            if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED) {
                win_ptr->outstanding_locks--;
                MPIU_Assert(win_ptr->outstanding_locks >= 0);
            }
            else if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED) {
                /* re-send lock request message. */
                mpi_errno = send_lock_msg(target_rank, MPI_LOCK_SHARED, win_ptr);
559 560
                if (mpi_errno != MPI_SUCCESS)
                    MPIU_ERR_POP(mpi_errno);
561
            }
562 563 564 565
            goto fn_exit;
        }
    }
    else if (win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_ISSUED) {
566 567 568 569 570 571 572
        if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED) {
            win_ptr->outstanding_locks--;
            MPIU_Assert(win_ptr->outstanding_locks >= 0);
        }
        else if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED) {
            /* re-send lock request message. */
            mpi_errno = send_lock_msg(target_rank, MPI_LOCK_SHARED, win_ptr);
573 574
            if (mpi_errno != MPI_SUCCESS)
                MPIU_ERR_POP(mpi_errno);
575
        }
576
        goto fn_exit;
577 578
    }

579
    mpi_errno = MPIDI_CH3I_Win_find_target(win_ptr, target_rank, &t);
580 581
    if (mpi_errno != MPI_SUCCESS)
        MPIU_ERR_POP(mpi_errno);
582
    MPIU_Assert(t != NULL);
583 584 585 586 587 588 589

    if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED)
        t->access_state = MPIDI_RMA_LOCK_GRANTED;

    if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED)
        t->access_state = MPIDI_RMA_LOCK_CALLED;

590
  fn_exit:
591
    return mpi_errno;
592
  fn_fail:
593 594 595
    goto fn_exit;
}

596 597 598
static inline int adjust_op_piggybacked_with_lock(MPID_Win * win_ptr,
                                                  int target_rank, MPIDI_CH3_Pkt_flags_t flags)
{
599 600 601
    MPIDI_RMA_Target_t *target = NULL;
    MPIDI_RMA_Op_t *op = NULL;
    MPIDI_CH3_Pkt_flags_t op_flags = MPIDI_CH3_PKT_FLAG_NONE;
602
    int i;
603 604 605
    int mpi_errno = MPI_SUCCESS;

    mpi_errno = MPIDI_CH3I_Win_find_target(win_ptr, target_rank, &target);
606 607
    if (mpi_errno != MPI_SUCCESS)
        MPIU_ERR_POP(mpi_errno);
608 609 610
    MPIU_Assert(target != NULL);

    op = target->pending_op_list;
611 612
    if (op != NULL)
        MPIDI_CH3_PKT_RMA_GET_FLAGS(op->pkt, op_flags, mpi_errno);
613

614 615
    if (op_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
        op_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE) {
616 617
        if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED ||
            flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_QUEUED_DATA_QUEUED) {
618

619 620 621 622 623
            if (op->ureq != NULL) {
                mpi_errno = set_user_req_after_issuing_op(op);
                if (mpi_errno != MPI_SUCCESS)
                    MPIU_ERR_POP(mpi_errno);
            }
624

625 626
            if (op->reqs_size == 0) {
                MPIU_Assert(op->reqs == NULL);
627 628 629 630 631 632 633 634 635 636 637
                MPIDI_CH3I_RMA_Ops_free_elem(win_ptr, &(target->pending_op_list),
                                             &(target->pending_op_list_tail), op);
            }
            else {
                MPIDI_CH3I_RMA_Ops_unlink(&(target->pending_op_list),
                                          &(target->pending_op_list_tail), op);
                if (op->is_dt) {
                    MPIDI_CH3I_RMA_Ops_append(&(target->dt_op_list),
                                              &(target->dt_op_list_tail), op);
                }
                else if (op->pkt.type == MPIDI_CH3_PKT_PUT ||
638 639 640
                         op->pkt.type == MPIDI_CH3_PKT_PUT_IMMED ||
                         op->pkt.type == MPIDI_CH3_PKT_ACCUMULATE ||
                         op->pkt.type == MPIDI_CH3_PKT_ACCUMULATE_IMMED) {
641 642 643 644 645 646 647 648 649 650 651 652
                    MPIDI_CH3I_RMA_Ops_append(&(target->write_op_list),
                                              &(target->write_op_list_tail), op);
                }
                else {
                    MPIDI_CH3I_RMA_Ops_append(&(target->read_op_list),
                                              &(target->read_op_list_tail), op);
                }
            }
        }
        else if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_QUEUED_DATA_DISCARDED ||
                 flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED) {
            /* We need to re-transmit this operation, so we destroy
653 654
             * the internal request and erase all flags in current
             * operation. */
655 656 657 658 659 660 661 662 663 664 665 666 667
            if (op->reqs_size > 0) {
                MPIU_Assert(op->reqs != NULL);
                for (i = 0; i < op->reqs_size; i++) {
                    if (op->reqs[i] != NULL) {
                        MPIDI_CH3_Request_destroy(op->reqs[i]);
                        op->reqs[i] = NULL;
                        win_ptr->active_req_cnt--;
                    }
                }
                /* free req array in this op */
                MPIU_Free(op->reqs);
                op->reqs = NULL;
                op->reqs_size = 0;
668 669 670 671
            }
            MPIDI_CH3_PKT_RMA_ERASE_FLAGS(op->pkt, mpi_errno);

            target->next_op_to_issue = op;
672 673 674

            op->issued_stream_count = 0;

675 676 677 678
            if (op_flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH)
                target->sync.sync_flag = MPIDI_RMA_SYNC_FLUSH;
            else if (op_flags & MPIDI_RMA_SYNC_UNLOCK)
                target->sync.sync_flag = MPIDI_RMA_SYNC_UNLOCK;
679 680
        }
    }
681

682
  fn_exit:
683
    return mpi_errno;
684
  fn_fail:
685 686 687 688
    goto fn_exit;
}


689 690 691 692
#undef FUNCNAME
#define FUNCNAME acquire_local_lock
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
693
static inline int acquire_local_lock(MPID_Win * win_ptr, int lock_type)
694 695 696 697 698
{
    int mpi_errno = MPI_SUCCESS;
    MPIDI_STATE_DECL(MPID_STATE_ACQUIRE_LOCAL_LOCK);
    MPIDI_RMA_FUNC_ENTER(MPID_STATE_ACQUIRE_LOCAL_LOCK);

Xin Zhao's avatar
Xin Zhao committed
699 700
    MPIR_T_PVAR_TIMER_START(RMA, rma_winlock_getlocallock);

701
    if (MPIDI_CH3I_Try_acquire_win_lock(win_ptr, lock_type) == 1) {
702
        mpi_errno = handle_lock_ack(win_ptr, win_ptr->comm_ptr->rank,
703 704 705
                                    MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED);
        if (mpi_errno)
            MPIU_ERR_POP(mpi_errno);
706 707 708 709 710
    }
    else {
        /* Queue the lock information. */
        MPIDI_CH3_Pkt_t pkt;
        MPIDI_CH3_Pkt_lock_t *lock_pkt = &pkt.lock;
711
        MPIDI_RMA_Lock_entry_t *new_ptr = NULL;
712
        MPIDI_VC_t *my_vc;
713 714

        MPIDI_Pkt_init(lock_pkt, MPIDI_CH3_PKT_LOCK);
715 716 717 718 719 720 721
        lock_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
        if (lock_type == MPI_LOCK_SHARED)
            lock_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED;
        else {
            MPIU_Assert(lock_type == MPI_LOCK_EXCLUSIVE);
            lock_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE;
        }
722

723 724
        new_ptr = MPIDI_CH3I_Win_lock_entry_alloc(win_ptr, &pkt);
        if (new_ptr == NULL) {
725
            mpi_errno = handle_lock_ack(win_ptr, win_ptr->comm_ptr->rank,
726 727 728
                                        MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED);
            if (mpi_errno != MPI_SUCCESS)
                MPIU_ERR_POP(mpi_errno);
729
            goto fn_exit;
730
        }
731
        MPL_LL_APPEND(win_ptr->lock_queue, win_ptr->lock_queue_tail, new_ptr);
732 733
        MPIDI_Comm_get_vc_set_active(win_ptr->comm_ptr, win_ptr->comm_ptr->rank, &my_vc);
        new_ptr->vc = my_vc;
734 735

        new_ptr->all_data_recved = 1;
736 737
    }

738
  fn_exit:
Xin Zhao's avatar
Xin Zhao committed
739
    MPIR_T_PVAR_TIMER_END(RMA, rma_winlock_getlocallock);
740
    MPIDI_RMA_FUNC_EXIT(MPID_STATE_ACQUIRE_LOCAL_LOCK);
741 742
    return mpi_errno;
    /* --BEGIN ERROR HANDLING-- */
743
  fn_fail:
744 745 746 747 748
    goto fn_exit;
    /* --END ERROR HANDLING-- */
}


749 750 751 752 753 754 755 756 757 758
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_RMA_Handle_flush_ack
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static inline int MPIDI_CH3I_RMA_Handle_flush_ack(MPID_Win * win_ptr, int target_rank)
{
    int mpi_errno = MPI_SUCCESS;
    MPIDI_RMA_Target_t *t;

    mpi_errno = MPIDI_CH3I_Win_find_target(win_ptr, target_rank, &t);
759 760
    if (mpi_errno != MPI_SUCCESS)
        MPIU_ERR_POP(mpi_errno);
761

762 763 764
    t->sync.outstanding_acks--;
    MPIU_Assert(t->sync.outstanding_acks >= 0);

765
    t->put_acc_issued = 0;      /* reset PUT_ACC_FLAG after FLUSH is completed */
766

767
  fn_exit:
768
    return mpi_errno;
769
  fn_fail:
770 771 772 773
    goto fn_exit;
}


774
#undef FUNCNAME
775
#define FUNCNAME do_accumulate_op
776 777
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
778 779 780
static inline int do_accumulate_op(void *source_buf, int source_count, MPI_Datatype source_dtp,
                                   void *target_buf, int target_count, MPI_Datatype target_dtp,
                                   MPI_Aint stream_offset, MPI_Op acc_op)
781 782
{
    int mpi_errno = MPI_SUCCESS;
783 784
    MPI_User_function *uop = NULL;
    MPI_Aint source_dtp_size, source_dtp_extent;
785 786 787 788
    MPIDI_STATE_DECL(MPID_STATE_DO_ACCUMULATE_OP);

    MPIDI_FUNC_ENTER(MPID_STATE_DO_ACCUMULATE_OP);

789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804
    MPIU_Assert(MPIR_DATATYPE_IS_PREDEFINED(source_dtp));
    MPID_Datatype_get_size_macro(source_dtp, source_dtp_size);
    MPID_Datatype_get_extent_macro(source_dtp, source_dtp_extent);

    if (acc_op != MPI_REPLACE) {
        if (HANDLE_GET_KIND(acc_op) == HANDLE_KIND_BUILTIN) {
            /* get the function by indexing into the op table */
            uop = MPIR_OP_HDL_TO_FN(acc_op);
        }
        else {
            /* --BEGIN ERROR HANDLING-- */
            mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
                                             FCNAME, __LINE__, MPI_ERR_OP,
                                             "**opnotpredefined", "**opnotpredefined %d", acc_op);
            return mpi_errno;
            /* --END ERROR HANDLING-- */
805
        }
806 807 808
    }


809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825
    if (MPIR_DATATYPE_IS_PREDEFINED(target_dtp)) {
        /* apply op if target dtp is predefined dtp */

        MPIU_Assert(source_dtp == target_dtp);

        MPI_Aint real_stream_offset = (stream_offset / source_dtp_size) * source_dtp_extent;
        void *curr_target_buf = (void *) ((char *) target_buf + real_stream_offset);

        if (acc_op == MPI_REPLACE) {
            mpi_errno = MPIR_Localcopy(source_buf, source_count, source_dtp,
                                       curr_target_buf, source_count, source_dtp);
            if (mpi_errno != MPI_SUCCESS)
                MPIU_ERR_POP(mpi_errno);
        }
        else {
            (*uop) (source_buf, curr_target_buf, &source_count, &source_dtp);
        }
826
    }
827 828
    else {
        /* derived datatype */
829 830 831
        MPID_Segment *segp;
        DLOOP_VECTOR *dloop_vec;
        MPI_Aint first, last;
832
        int vec_len, i, count;
833
        MPI_Aint type_extent, type_size;
834
        MPI_Datatype type;
835
        MPID_Datatype *dtp;
836 837
        MPI_Aint curr_len;
        void *curr_loc;
838
        int accumulated_count;
839 840

        segp = MPID_Segment_alloc();
841 842 843 844 845 846
        /* --BEGIN ERROR HANDLING-- */
        if (!segp) {
            mpi_errno =
                MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__,
                                     MPI_ERR_OTHER, "**nomem", 0);
            MPIDI_FUNC_EXIT(MPID_STATE_DO_ACCUMULATE_OP);
847
            return mpi_errno;
848
        }
849
        /* --END ERROR HANDLING-- */
850 851 852
        MPID_Segment_init(NULL, target_count, target_dtp, segp, 0);
        first = stream_offset;
        last = first + source_count * source_dtp_size;
853

854 855
        MPID_Datatype_get_ptr(target_dtp, dtp);
        vec_len = dtp->max_contig_blocks * target_count + 1;
856 857 858
        /* +1 needed because Rob says so */
        dloop_vec = (DLOOP_VECTOR *)
            MPIU_Malloc(vec_len * sizeof(DLOOP_VECTOR));
859 860 861 862 863 864
        /* --BEGIN ERROR HANDLING-- */
        if (!dloop_vec) {
            mpi_errno =
                MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__,
                                     MPI_ERR_OTHER, "**nomem", 0);
            MPIDI_FUNC_EXIT(MPID_STATE_DO_ACCUMULATE_OP);
865
            return mpi_errno;
866
        }
867
        /* --END ERROR HANDLING-- */
868 869 870

        MPID_Segment_pack_vector(segp, first, &last, dloop_vec, &vec_len);

871
        type = dtp->basic_type;
872 873
        MPIU_Assert(type != MPI_DATATYPE_NULL);

874 875 876
        MPIU_Assert(type == source_dtp);
        type_size = source_dtp_size;
        type_extent = source_dtp_extent;
877 878 879 880

        i = 0;
        curr_loc = dloop_vec[0].DLOOP_VECTOR_BUF;
        curr_len = dloop_vec[0].DLOOP_VECTOR_LEN;
881
        accumulated_count = 0;
882 883 884 885 886 887 888 889 890
        while (i != vec_len) {
            if (curr_len < type_size) {
                MPIU_Assert(i != vec_len);
                i++;
                curr_len += dloop_vec[i].DLOOP_VECTOR_LEN;
                continue;
            }

            MPIU_Assign_trunc(count, curr_len / type_size, int);
891 892 893 894 895 896 897 898 899 900 901 902 903

            if (acc_op == MPI_REPLACE) {
                mpi_errno = MPIR_Localcopy((char *) source_buf + type_extent * accumulated_count,
                                           count, type,
                                           (char *) target_buf + MPIU_PtrToAint(curr_loc),
                                           count, type);
                if (mpi_errno != MPI_SUCCESS)
                    MPIU_ERR_POP(mpi_errno);
            }
            else {
                (*uop) ((char *) source_buf + type_extent * accumulated_count,
                        (char *) target_buf + MPIU_PtrToAint(curr_loc), &count, &type);
            }
904 905 906 907 908 909 910 911 912 913 914 915

            if (curr_len % type_size == 0) {
                i++;
                if (i != vec_len) {
                    curr_loc = dloop_vec[i].DLOOP_VECTOR_BUF;
                    curr_len = dloop_vec[i].DLOOP_VECTOR_LEN;
                }
            }
            else {
                curr_loc = (void *) ((char *) curr_loc + type_extent * count);
                curr_len -= type_size * count;
            }
916 917

            accumulated_count += count;
918 919
        }

920 921
        MPID_Segment_free(segp);
        MPIU_Free(dloop_vec);
922 923
    }

924
  fn_exit:
925
    MPIDI_FUNC_EXIT(MPID_STATE_DO_ACCUMULATE_OP);
926 927

    return mpi_errno;
928
  fn_fail:
929 930