Commit 4739df59 authored by Xin Zhao's avatar Xin Zhao
Browse files

Perf-optimize: support piggybacking LOCK on large RMA operations.

Originally we only allows LOCK request to be piggybacked
with small RMA operations (all data can be fit in packet
header). This brings communication overhead for larger
operations since origin side needs to wait for the LOCK
ACK before it can transmit data to the target.

In this patch we add support of piggybacking LOCK with
RMA operations with arbitrary size. Note that (1) this
only works with basic datatypes; (2) if the LOCK cannot
be satisfied, we temporarily buffer this operation on
the target side.

No reviewer.
parent c73451c0
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/*
* (C) 2014 by Argonne National Laboratory.
* See COPYRIGHT in top-level directory.
*/
#if !defined(MPID_RMA_LOCKQUEUE_H_INCLUDED)
#define MPID_RMA_LOCKQUEUE_H_INCLUDED
#include "mpl_utlist.h"
#include "mpid_rma_types.h"
MPIR_T_PVAR_DOUBLE_TIMER_DECL_EXTERN(RMA, rma_lockqueue_alloc);
MPIR_T_PVAR_DOUBLE_TIMER_DECL_EXTERN(RMA, rma_winlock_getlocallock);
/* MPIDI_CH3I_Win_lock_entry_alloc(): return a new lock queue entry and
* initialize it. If we cannot get one, return NULL. */
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_Win_lock_entry_alloc
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static inline int MPIDI_CH3I_Win_lock_entry_alloc(MPID_Win * win_ptr,
MPIDI_CH3_Pkt_t *pkt,
MPIDI_Win_lock_queue **lock_entry)
{
MPIDI_Win_lock_queue *new_ptr = NULL;
int mpi_errno = MPI_SUCCESS;
/* FIXME: we should use a lock entry queue to manage all this. */
/* allocate lock queue entry */
MPIR_T_PVAR_TIMER_START(RMA, rma_lockqueue_alloc);
new_ptr = (MPIDI_Win_lock_queue *) MPIU_Malloc(sizeof(MPIDI_Win_lock_queue));
MPIR_T_PVAR_TIMER_END(RMA, rma_lockqueue_alloc);
if (!new_ptr) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s",
"MPIDI_Win_lock_queue");
}
new_ptr->next = NULL;
new_ptr->pkt = (*pkt);
new_ptr->data = NULL;
new_ptr->all_data_recved = 0;
(*lock_entry) = new_ptr;
fn_exit:
return mpi_errno;
fn_fail:
goto fn_exit;
}
#endif /* MPID_RMA_ISSUE_H_INCLUDED */
......@@ -132,6 +132,8 @@ extern MPIDI_RMA_Win_list_t *MPIDI_RMA_Win_list, *MPIDI_RMA_Win_list_tail;
typedef struct MPIDI_Win_lock_queue {
struct MPIDI_Win_lock_queue *next;
MPIDI_CH3_Pkt_t pkt; /* all information for this request packet */
void *data; /* for queued PUTs / ACCs / GACCs, data is copied here */
int all_data_recved; /* indicate if all data has been received */
} MPIDI_Win_lock_queue;
typedef MPIDI_RMA_Op_t *MPIDI_RMA_Ops_list_t;
......
......@@ -1918,6 +1918,8 @@ int MPIDI_CH3_ReqHandler_GaccumDerivedDTRecvComplete( MPIDI_VC_t *,
int * );
int MPIDI_CH3_ReqHandler_GetDerivedDTRecvComplete( MPIDI_VC_t *,
MPID_Request *, int * );
int MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete( MPIDI_VC_t *,
MPID_Request *, int * );
/* Send Handlers */
int MPIDI_CH3_ReqHandler_SendReloadIOV( MPIDI_VC_t *vc, MPID_Request *sreq,
int *complete );
......
......@@ -215,6 +215,79 @@ MPIDI_CH3_PKT_DEFS
} \
}
#define MPIDI_CH3_PKT_RMA_GET_TARGET_COUNT(pkt_, count_, err_) \
{ \
err_ = MPI_SUCCESS; \
switch((pkt_).type) { \
case (MPIDI_CH3_PKT_PUT): \
count_ = (pkt_).put.count; \
break; \
case (MPIDI_CH3_PKT_GET): \
count_ = (pkt_).get.count; \
break; \
case (MPIDI_CH3_PKT_ACCUMULATE): \
count_ = (pkt_).accum.count; \
break; \
case (MPIDI_CH3_PKT_GET_ACCUM): \
count_ = (pkt_).get_accum.count; \
break; \
case (MPIDI_CH3_PKT_CAS): \
case (MPIDI_CH3_PKT_FOP): \
count_ = 1; \
break; \
default: \
MPIU_ERR_SETANDJUMP1(err_, MPI_ERR_OTHER, "**invalidpkt", "**invalidpkt %d", (pkt_).type); \
} \
}
#define MPIDI_CH3_PKT_RMA_GET_IMMED_LEN(pkt_, immed_len_, err_) \
{ \
err_ = MPI_SUCCESS; \
switch((pkt_).type) { \
case (MPIDI_CH3_PKT_PUT): \
immed_len_ = (pkt_).put.immed_len; \
break; \
case (MPIDI_CH3_PKT_ACCUMULATE): \
immed_len_ = (pkt_).accum.immed_len; \
break; \
case (MPIDI_CH3_PKT_GET_ACCUM): \
immed_len_ = (pkt_).get_accum.immed_len; \
break; \
case (MPIDI_CH3_PKT_FOP): \
immed_len_ = (pkt_).fop.immed_len; \
break; \
case (MPIDI_CH3_PKT_CAS): \
/* FIXME: we should deal with CAS here */ \
break; \
default: \
MPIU_ERR_SETANDJUMP1(err_, MPI_ERR_OTHER, "**invalidpkt", "**invalidpkt %d", (pkt_).type); \
} \
}
#define MPIDI_CH3_PKT_RMA_GET_IMMED_DATA_PTR(pkt_, immed_data_, err_) \
{ \
err_ = MPI_SUCCESS; \
switch((pkt_).type) { \
case (MPIDI_CH3_PKT_PUT): \
immed_data_ = (pkt_).put.data; \
break; \
case (MPIDI_CH3_PKT_ACCUMULATE): \
immed_data_ = (pkt_).accum.data; \
break; \
case (MPIDI_CH3_PKT_GET_ACCUM): \
immed_data_ = (pkt_).get_accum.data; \
break; \
case (MPIDI_CH3_PKT_FOP): \
immed_data_ = (pkt_).fop.data; \
break; \
case (MPIDI_CH3_PKT_CAS): \
/* FIXME: we should deal with CAS here */ \
break; \
default: \
MPIU_ERR_SETANDJUMP1(err_, MPI_ERR_OTHER, "**invalidpkt", "**invalidpkt %d", (pkt_).type); \
} \
}
#define MPIDI_CH3_PKT_RMA_GET_LOCK_TYPE(pkt_, lock_type_, err_) \
{ \
err_ = MPI_SUCCESS; \
......@@ -302,54 +375,27 @@ MPIDI_CH3_PKT_DEFS
} \
}
#define MPIDI_CH3_PKT_RMA_UNSET_FLAG(pkt_, flag_, err_) \
{ \
err_ = MPI_SUCCESS; \
switch((pkt_).type) { \
case (MPIDI_CH3_PKT_PUT): \
(pkt_).put.flags &= ~(flag_); \
break; \
case (MPIDI_CH3_PKT_GET): \
(pkt_).get.flags &= ~(flag_); \
break; \
case (MPIDI_CH3_PKT_ACCUMULATE): \
(pkt_).accum.flags &= ~(flag_); \
break; \
case (MPIDI_CH3_PKT_GET_ACCUM): \
(pkt_).get_accum.flags &= ~(flag_); \
break; \
case (MPIDI_CH3_PKT_CAS): \
(pkt_).cas.flags &= ~(flag_); \
break; \
case (MPIDI_CH3_PKT_FOP): \
(pkt_).fop.flags &= ~(flag_); \
break; \
default: \
MPIU_ERR_SETANDJUMP1(err_, MPI_ERR_OTHER, "**invalidpkt", "**invalidpkt %d", (pkt_).type); \
} \
}
#define MPIDI_CH3_PKT_RMA_SET_FLAG(pkt_, flag_, err_) \
#define MPIDI_CH3_PKT_RMA_GET_TARGET_WIN_HANDLE(pkt_, win_hdl_, err_) \
{ \
err_ = MPI_SUCCESS; \
switch((pkt_).type) { \
case (MPIDI_CH3_PKT_PUT): \
(pkt_).put.flags |= (flag_); \
win_hdl_ = (pkt_).put.target_win_handle; \
break; \
case (MPIDI_CH3_PKT_GET): \
(pkt_).get.flags |= (flag_); \
win_hdl_ = (pkt_).get.target_win_handle; \
break; \
case (MPIDI_CH3_PKT_ACCUMULATE): \
(pkt_).accum.flags |= (flag_); \
win_hdl_ = (pkt_).accum.target_win_handle; \
break; \
case (MPIDI_CH3_PKT_GET_ACCUM): \
(pkt_).get_accum.flags |= (flag_); \
win_hdl_ = (pkt_).get_accum.target_win_handle; \
break; \
case (MPIDI_CH3_PKT_CAS): \
(pkt_).cas.flags |= (flag_); \
win_hdl_ = (pkt_).cas.target_win_handle; \
break; \
case (MPIDI_CH3_PKT_FOP): \
(pkt_).fop.flags |= (flag_); \
win_hdl_ = (pkt_).fop.target_win_handle; \
break; \
default: \
MPIU_ERR_SETANDJUMP1(err_, MPI_ERR_OTHER, "**invalidpkt", "**invalidpkt %d", (pkt_).type); \
......
......@@ -440,6 +440,7 @@ typedef struct MPIDI_Request {
MPI_Win target_win_handle;
MPI_Win source_win_handle;
MPIDI_CH3_Pkt_flags_t flags; /* flags that were included in the original RMA packet header */
struct MPIDI_Win_lock_queue *lock_queue_entry;
MPI_Request resp_request_handle; /* Handle for get_accumulate response */
MPIDI_REQUEST_SEQNUM
......
......@@ -11,9 +11,7 @@
#include "mpid_rma_oplist.h"
#include "mpid_rma_shm.h"
#include "mpid_rma_issue.h"
MPIR_T_PVAR_DOUBLE_TIMER_DECL_EXTERN(RMA, rma_lockqueue_alloc);
MPIR_T_PVAR_DOUBLE_TIMER_DECL_EXTERN(RMA, rma_winlock_getlocallock);
#include "mpid_rma_lockqueue.h"
#undef FUNCNAME
#define FUNCNAME send_lock_msg
......@@ -161,7 +159,9 @@ static inline int MPIDI_CH3I_Send_flush_ack_pkt(MPIDI_VC_t *vc, MPID_Win *win_pt
MPIDI_Pkt_init(flush_ack_pkt, MPIDI_CH3_PKT_FLUSH_ACK);
flush_ack_pkt->source_win_handle = source_win_handle;
flush_ack_pkt->target_rank = win_ptr->comm_ptr->rank;
flush_ack_pkt->flags = flags;
flush_ack_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK)
flush_ack_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED;
/* Because this is in a packet handler, it is already within a critical section */
/* MPIU_THREAD_CS_ENTER(CH3COMM,vc); */
......@@ -225,23 +225,109 @@ static inline int send_decr_at_cnt_msg(int dst, MPID_Win * win_ptr)
/* enqueue an unsatisfied origin in passive target at target side. */
static inline int enqueue_lock_origin(MPID_Win *win_ptr, MPIDI_CH3_Pkt_t *pkt)
static inline int enqueue_lock_origin(MPID_Win *win_ptr, MPIDI_VC_t *vc,
MPIDI_CH3_Pkt_t *pkt,
MPIDI_msg_sz_t *buflen,
MPID_Request **reqp)
{
MPIDI_Win_lock_queue *new_ptr = NULL;
int mpi_errno = MPI_SUCCESS;
MPIR_T_PVAR_TIMER_START(RMA, rma_lockqueue_alloc);
new_ptr = (MPIDI_Win_lock_queue *) MPIU_Malloc(sizeof(MPIDI_Win_lock_queue));
MPIR_T_PVAR_TIMER_END(RMA, rma_lockqueue_alloc);
if (!new_ptr) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s",
"MPIDI_Win_lock_queue");
}
(*reqp) = NULL;
new_ptr->next = NULL;
new_ptr->pkt = (*pkt);
mpi_errno = MPIDI_CH3I_Win_lock_entry_alloc(win_ptr, pkt, &new_ptr);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
MPIU_Assert(new_ptr != NULL);
MPL_LL_APPEND(win_ptr->lock_queue, win_ptr->lock_queue_tail, new_ptr);
if (pkt->type == MPIDI_CH3_PKT_LOCK ||
pkt->type == MPIDI_CH3_PKT_GET ||
pkt->type == MPIDI_CH3_PKT_FOP ||
pkt->type == MPIDI_CH3_PKT_CAS) {
new_ptr->all_data_recved = 1;
/* return bytes of data processed in this pkt handler */
(*buflen) = sizeof(MPIDI_CH3_Pkt_t);
goto fn_exit;
}
else {
MPI_Aint type_size = 0;
MPIDI_msg_sz_t recv_data_sz = 0;
MPID_Request *req = NULL;
MPI_Datatype target_dtp;
int target_count;
size_t immed_len = 0;
void *immed_data = NULL;
int complete = 0;
MPIDI_msg_sz_t data_len;
char *data_buf = NULL;
/* This is PUT, ACC, GACC */
MPIDI_CH3_PKT_RMA_GET_TARGET_DATATYPE((*pkt), target_dtp, mpi_errno);
MPIDI_CH3_PKT_RMA_GET_TARGET_COUNT((*pkt), target_count, mpi_errno);
MPID_Datatype_get_size_macro(target_dtp, type_size);
recv_data_sz = type_size * target_count;
if (recv_data_sz <= MPIDI_RMA_IMMED_BYTES) {
/* all data fits in packet header */
new_ptr->all_data_recved = 1;
/* return bytes of data processed in this pkt handler */
(*buflen) = sizeof(MPIDI_CH3_Pkt_t);
goto fn_exit;
}
/* allocate tmp buffer to recieve data. */
new_ptr->data = MPIU_Malloc(recv_data_sz);
if (new_ptr->data == NULL) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %d",
recv_data_sz);
}
/* create request to receive upcoming requests */
req = MPID_Request_create();
MPIU_Object_set_ref(req, 1);
/* fill in area in req that will be used in Receive_data_found() */
req->dev.user_buf = new_ptr->data;
req->dev.user_count = target_count;
req->dev.datatype = target_dtp;
req->dev.recv_data_sz = recv_data_sz;
req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete;
req->dev.OnFinal = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete;
req->dev.lock_queue_entry = new_ptr;
MPIDI_CH3_PKT_RMA_GET_IMMED_LEN((*pkt), immed_len, mpi_errno);
MPIDI_CH3_PKT_RMA_GET_IMMED_DATA_PTR((*pkt), immed_data, mpi_errno);
if (immed_len > 0) {
/* see if we can receive some data from packet header */
MPIU_Memcpy(req->dev.user_buf, immed_data, immed_len);
req->dev.user_buf = (void*)((char*)req->dev.user_buf + immed_len);
req->dev.recv_data_sz -= immed_len;
}
data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t);
data_buf = (char *) pkt + sizeof(MPIDI_CH3_Pkt_t);
MPIU_Assert(req->dev.recv_data_sz > 0);
mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
/* return bytes of data processed in this pkt handler */
(*buflen) = sizeof(MPIDI_CH3_Pkt_t) + data_len;
if (complete) {
mpi_errno = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete(vc, req, &complete);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
if (complete) {
goto fn_exit;
}
}
(*reqp) = req;
}
fn_exit:
return mpi_errno;
fn_fail:
......@@ -294,13 +380,18 @@ static inline int acquire_local_lock(MPID_Win * win_ptr, int lock_type)
/* Queue the lock information. */
MPIDI_CH3_Pkt_t pkt;
MPIDI_CH3_Pkt_lock_t *lock_pkt = &pkt.lock;
MPIDI_Win_lock_queue *new_ptr = NULL;
MPIDI_Pkt_init(lock_pkt, MPIDI_CH3_PKT_LOCK);
lock_pkt->lock_type = lock_type;
lock_pkt->origin_rank = win_ptr->comm_ptr->rank;
mpi_errno = enqueue_lock_origin(win_ptr, &pkt);
mpi_errno = MPIDI_CH3I_Win_lock_entry_alloc(win_ptr, &pkt, &new_ptr);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
MPIU_Assert(new_ptr != NULL);
MPL_LL_APPEND(win_ptr->lock_queue, win_ptr->lock_queue_tail, new_ptr);
new_ptr->all_data_recved = 1;
}
fn_exit:
......@@ -443,31 +534,29 @@ static inline int do_accumulate_op(void *source_buf, void *target_buf,
}
static inline int check_piggyback_lock(MPID_Win *win_ptr, MPIDI_CH3_Pkt_t *pkt, int *acquire_lock_fail) {
static inline int check_piggyback_lock(MPID_Win *win_ptr, MPIDI_VC_t *vc,
MPIDI_CH3_Pkt_t *pkt,
MPIDI_msg_sz_t *buflen,
int *acquire_lock_fail,
MPID_Request **reqp) {
int lock_type;
MPIDI_CH3_Pkt_flags_t flags;
int mpi_errno = MPI_SUCCESS;
(*acquire_lock_fail) = 0;
(*reqp) = NULL;
MPIDI_CH3_PKT_RMA_GET_FLAGS((*pkt), flags, mpi_errno);
MPIDI_CH3_PKT_RMA_GET_LOCK_TYPE((*pkt), lock_type, mpi_errno);
if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK) {
if (MPIDI_CH3I_Try_acquire_win_lock(win_ptr, lock_type) == 0) {
/* cannot acquire the lock, queue up this operation. */
mpi_errno = enqueue_lock_origin(win_ptr, pkt);
mpi_errno = enqueue_lock_origin(win_ptr, vc, pkt, buflen, reqp);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
(*acquire_lock_fail) = 1;
}
else {
/* unset LOCK flag */
MPIDI_CH3_PKT_RMA_UNSET_FLAG((*pkt), MPIDI_CH3_PKT_FLAG_RMA_LOCK, mpi_errno);
/* set LOCK_GRANTED flag */
MPIDI_CH3_PKT_RMA_SET_FLAG((*pkt), MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED, mpi_errno);
}
}
fn_exit:
......@@ -484,7 +573,7 @@ static inline int finish_op_on_target(MPID_Win *win_ptr, MPIDI_VC_t *vc,
if (type == MPIDI_CH3_PKT_PUT || type == MPIDI_CH3_PKT_ACCUMULATE) {
/* This is PUT or ACC */
if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED) {
if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK) {
if (!(flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) &&
!(flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK)) {
mpi_errno = MPIDI_CH3I_Send_lock_granted_pkt(vc, win_ptr, source_win_handle);
......
This diff is collapsed.
......@@ -82,6 +82,7 @@ MPID_Request * MPID_Request_create(void)
request for RMA operations */
req->dev.target_win_handle = MPI_WIN_NULL;
req->dev.source_win_handle = MPI_WIN_NULL;
req->dev.lock_queue_entry = NULL;
req->dev.dtype_info = NULL;
req->dev.dataloop = NULL;
req->dev.iov_offset = 0;
......
......@@ -32,6 +32,25 @@ cvars:
starts to poke progress engine when number of posted
operations reaches that value.
- name : MPIR_CVAR_CH3_RMA_OP_PIGGYBACK_LOCK_DATA_SIZE
category : CH3
type : int
default : 65536
class : none
verbosity : MPI_T_VERBOSITY_USER_BASIC
scope : MPI_T_SCOPE_ALL_EQ
description : >-
Specify the threshold of data size of a RMA operation
which can be piggybacked with a LOCK message. It is
always a positive value and should not be smaller
than MPIDI_RMA_IMMED_BYTES.
If user sets it as a small value, for middle and large
data size, we will lose performance because of always
waiting for round-trip of LOCK synchronization; if
user sets it as a large value, we need to consume
more memory on target side to buffer this lock request
when lock is not satisfied.
=== END_MPI_T_CVAR_INFO_BLOCK ===
*/
......@@ -174,12 +193,11 @@ int MPIDI_CH3I_Put(const void *origin_addr, int origin_count, MPI_Datatype
/* copy data from origin buffer to immed area in packet header */
mpi_errno = immed_copy(src, dest, put_pkt->immed_len);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
/* If all data is in pkt header, mark this op as a candidate
for piggybacking LOCK. */
if (put_pkt->immed_len == len)
new_ptr->piggyback_lock_candidate = 1;
}
if (len <= MPIR_MAX(MPIDI_RMA_IMMED_BYTES,
MPIR_CVAR_CH3_RMA_OP_PIGGYBACK_LOCK_DATA_SIZE))
new_ptr->piggyback_lock_candidate = 1;
}
MPIR_T_PVAR_TIMER_END(RMA, rma_rmaqueue_set);
......@@ -521,12 +539,11 @@ int MPIDI_CH3I_Accumulate(const void *origin_addr, int origin_count, MPI_Datatyp
/* copy data from origin buffer to immed area in packet header */
mpi_errno = immed_copy(src, dest, accum_pkt->immed_len);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
/* If all data is in pkt header, mark this op as
a candidate for piggybacking LOCK. */
if (accum_pkt->immed_len == len)
new_ptr->piggyback_lock_candidate = 1;
}
if (len <= MPIR_MAX(MPIDI_RMA_IMMED_BYTES,
MPIR_CVAR_CH3_RMA_OP_PIGGYBACK_LOCK_DATA_SIZE))
new_ptr->piggyback_lock_candidate = 1;
}
MPIR_T_PVAR_TIMER_END(RMA, rma_rmaqueue_set);
......@@ -741,12 +758,11 @@ int MPIDI_CH3I_Get_accumulate(const void *origin_addr, int origin_count,
/* copy data from origin buffer to immed area in packet header */
mpi_errno = immed_copy(src, dest, get_accum_pkt->immed_len);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
/* If all data is in pkt header, mark this op as a candidate
for piggybacking LOCK. */
if (get_accum_pkt->immed_len == len)
new_ptr->piggyback_lock_candidate = 1;
}
if (len <= MPIR_MAX(MPIDI_RMA_IMMED_BYTES,
MPIR_CVAR_CH3_RMA_OP_PIGGYBACK_LOCK_DATA_SIZE))
new_ptr->piggyback_lock_candidate = 1;
}
}
......
......@@ -205,18 +205,19 @@ int MPIDI_CH3_PktHandler_Put(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
MPIU_Assert(put_pkt->target_win_handle != MPI_WIN_NULL);
MPID_Win_get_ptr(put_pkt->target_win_handle, win_ptr);
data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t);
data_buf = (char *) pkt + sizeof(MPIDI_CH3_Pkt_t);
mpi_errno = check_piggyback_lock(win_ptr, pkt, &acquire_lock_fail);
mpi_errno = check_piggyback_lock(win_ptr, vc, pkt, buflen,
&acquire_lock_fail, &req);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
if (acquire_lock_fail) {
(*buflen) = sizeof(MPIDI_CH3_Pkt_t);
(*rreqp) = NULL;
(*rreqp) = req;
goto fn_exit;
}
/* get start location of data and length of data */
data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t);
data_buf = (char *) pkt + sizeof(MPIDI_CH3_Pkt_t);
req = MPID_Request_create();
MPIU_Object_set_ref(req, 1);
......@@ -368,23 +369,24 @@ int MPIDI_CH3_PktHandler_Get(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
MPIU_Assert(get_pkt->target_win_handle != MPI_WIN_NULL);
MPID_Win_get_ptr(get_pkt->target_win_handle, win_ptr);
mpi_errno = check_piggyback_lock(win_ptr, pkt, &acquire_lock_fail);
mpi_errno = check_piggyback_lock(win_ptr, vc, pkt,
buflen, &acquire_lock_fail, &req);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
if (acquire_lock_fail) {
(*buflen) = sizeof(MPIDI_CH3_Pkt_t);
(*rreqp) = NULL;
(*rreqp) = req;
goto fn_exit;
}
data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t);
data_buf = (char *) pkt + sizeof(MPIDI_CH3_Pkt_t);
req = MPID_Request_create();
req->dev.target_win_handle = get_pkt->target_win_handle;
req->dev.source_win_handle = get_pkt->source_win_handle;
req->dev.flags = get_pkt->flags;
/* get start location of data and length of data */
data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t);
data_buf = (char *) pkt + sizeof(MPIDI_CH3_Pkt_t);
/* here we increment the Active Target counter to guarantee the GET-like
operation are completed when counter reaches zero. */
win_ptr->at_completion_counter++;
......@@ -402,7 +404,7 @@ int MPIDI_CH3_PktHandler_Get(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
MPIDI_Pkt_init(get_resp_pkt, MPIDI_CH3_PKT_GET_RESP);
get_resp_pkt->request_handle = get_pkt->request_handle;
get_resp_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (get_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED)
if (get_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK)
get_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED;
if (get_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH)
get_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK;
......@@ -524,18 +526,15 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
MPIU_Assert(accum_pkt->target_win_handle != MPI_WIN_NULL);
MPID_Win_get_ptr(accum_pkt->target_win_handle, win_ptr);
mpi_errno = check_piggyback_lock(win_ptr, pkt, &acquire_lock_fail);
mpi_errno = check_piggyback_lock(win_ptr, vc, pkt, buflen,
&acquire_lock_fail, &req);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
if (acquire_lock_fail) {
(*buflen) = sizeof(MPIDI_CH3_Pkt_t);
(*rreqp) = NULL;
(*rreqp) = req;
goto fn_exit;
}
data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t);
data_buf = (char *) pkt + sizeof(MPIDI_CH3_Pkt_t);
req = MPID_Request_create();
MPIU_Object_set_ref(req, 1);
*rreqp = req;
......@@ -550,6 +549,10 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.resp_request_handle = MPI_REQUEST_NULL;
req->dev.OnFinal = MPIDI_CH3_ReqHandler_AccumRecvComplete;
/* get start location of data and length of data */
data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t);