Commit de9d0f21 authored by Xin Zhao's avatar Xin Zhao
Browse files

Rewrite code of piggybacking IMMED data with RMA packets.



Originally we add "immed_data" and "immed_len" areas to RMA packets,
in order to piggyback small amount of data with packet header to
reduce number of packets (Note that "immed_len" is necessary when
the piggybacked data is not the entire data). However, those areas
potentially increase the packet union size and worsen the two-sided
communication. This patch fixes this issue.

In this patch, we remove "immed_data" and "immed_len" from normal
"MPIDI_CH3_Pkt_XXX_t" operation type (e.g. MPIDI_CH3_Pkt_put_t), and
we introduce new "MPIDI_CH3_Pkt_XXX_immed_t" packt type for each
operation (e.g. MPIDI_CH3_Pkt_put_immed_t).

"MPIDI_CH3_Pkt_XXX_immed_t" is used when (1) both origin and target
are basic datatypes, AND, (2) the data to be sent can be entirely fit
into the header. By doing this, "MPIDI_CH3_Pkt_XXX_immed_t" needs
"immed_data" area but can drop "immed_len" area. Also, since it only
works with basic target datatype, it can drop "dataloop_size" area
as well. All operations that do not satisfy (1) or (2) will use
normal "MPIDI_CH3_Pkt_XXX_t" type.

Originally we always piggyback FOP data into the packet header,
which makes the packet size too large. In this patch we split the
FOP operaton into IMMED packets and normal packets.

Because CAS only work with 2 basic datatype and non-complex
elements, the data amount is relatively small, we always piggyback
the data with packet header and only use "MPIDI_CH3_Pkt_XXX_immed_t"
packet type for CAS.
Signed-off-by: Pavan Balaji's avatarPavan Balaji <balaji@anl.gov>
parent 3a017faa
......@@ -326,7 +326,6 @@ static int issue_put_op(MPIDI_RMA_Op_t * rma_op, MPID_Win *win_ptr,
MPIDI_CH3_Pkt_flags_t flags)
{
MPIDI_VC_t *vc = NULL;
size_t len;
MPI_Aint origin_type_size;
MPID_Comm *comm_ptr = win_ptr->comm_ptr;
MPIDI_CH3_Pkt_put_t *put_pkt = &rma_op->pkt.put;
......@@ -341,23 +340,10 @@ static int issue_put_op(MPIDI_RMA_Op_t * rma_op, MPID_Win *win_ptr,
put_pkt->flags |= flags;
MPID_Datatype_get_size_macro(rma_op->origin_datatype, origin_type_size);
MPIU_Assign_trunc(len, rma_op->origin_count * origin_type_size, size_t);
if (!rma_op->is_dt) {
/* Fill origin data into packet header IMMED area as much as possible */
MPIU_Assign_trunc(put_pkt->immed_len,
MPIR_MIN(len, (MPIDI_RMA_IMMED_BYTES/origin_type_size)*origin_type_size),
int);
if (put_pkt->immed_len > 0) {
void *src = rma_op->origin_addr, *dest = put_pkt->data;
mpi_errno = immed_copy(src, dest, (size_t)put_pkt->immed_len);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
}
}
MPIDI_Comm_get_vc_set_active(comm_ptr, rma_op->target_rank, &vc);
if (len == (size_t)put_pkt->immed_len) {
if (rma_op->pkt.type == MPIDI_CH3_PKT_PUT_IMMED) {
/* All origin data is in packet header, issue the header. */
MPIU_THREAD_CS_ENTER(CH3COMM, vc);
mpi_errno = MPIDI_CH3_iStartMsg(vc, put_pkt, sizeof(*put_pkt), &(rma_op->request));
......@@ -368,10 +354,8 @@ static int issue_put_op(MPIDI_RMA_Op_t * rma_op, MPID_Win *win_ptr,
/* We still need to issue from origin buffer. */
iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) put_pkt;
iov[0].MPID_IOV_LEN = sizeof(*put_pkt);
if (!rma_op->is_dt) {
iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) ((char *)rma_op->origin_addr + put_pkt->immed_len);
iov[1].MPID_IOV_LEN = rma_op->origin_count * origin_type_size - put_pkt->immed_len;
}
iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) ((char *)rma_op->origin_addr);
iov[1].MPID_IOV_LEN = rma_op->origin_count * origin_type_size;
mpi_errno = issue_from_origin_buffer(rma_op, iov, vc);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
......@@ -397,7 +381,6 @@ static int issue_acc_op(MPIDI_RMA_Op_t *rma_op, MPID_Win *win_ptr,
MPIDI_CH3_Pkt_flags_t flags)
{
MPIDI_VC_t *vc = NULL;
size_t len;
MPI_Aint origin_type_size;
MPID_Comm *comm_ptr = win_ptr->comm_ptr;
MPIDI_CH3_Pkt_accum_t *accum_pkt = &rma_op->pkt.accum;
......@@ -412,23 +395,10 @@ static int issue_acc_op(MPIDI_RMA_Op_t *rma_op, MPID_Win *win_ptr,
accum_pkt->flags |= flags;
MPID_Datatype_get_size_macro(rma_op->origin_datatype, origin_type_size);
MPIU_Assign_trunc(len, rma_op->origin_count * origin_type_size, size_t);
if (!rma_op->is_dt) {
/* Fill origin data into packet header IMMED area as much as possible */
MPIU_Assign_trunc(accum_pkt->immed_len,
MPIR_MIN(len, (MPIDI_RMA_IMMED_BYTES/origin_type_size)*origin_type_size),
int);
if (accum_pkt->immed_len > 0) {
void *src = rma_op->origin_addr, *dest = accum_pkt->data;
mpi_errno = immed_copy(src, dest, (size_t)accum_pkt->immed_len);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
}
}
MPIDI_Comm_get_vc_set_active(comm_ptr, rma_op->target_rank, &vc);
if (len == (size_t)accum_pkt->immed_len) {
if (rma_op->pkt.type == MPIDI_CH3_PKT_ACCUMULATE_IMMED) {
/* All origin data is in packet header, issue the header. */
MPIU_THREAD_CS_ENTER(CH3COMM, vc);
mpi_errno = MPIDI_CH3_iStartMsg(vc, accum_pkt, sizeof(*accum_pkt), &(rma_op->request));
......@@ -439,10 +409,8 @@ static int issue_acc_op(MPIDI_RMA_Op_t *rma_op, MPID_Win *win_ptr,
/* We still need to issue from origin buffer. */
iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) accum_pkt;
iov[0].MPID_IOV_LEN = sizeof(*accum_pkt);
if (!rma_op->is_dt) {
iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) ((char *)rma_op->origin_addr + accum_pkt->immed_len);
iov[1].MPID_IOV_LEN = rma_op->origin_count * origin_type_size - accum_pkt->immed_len;
}
iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) ((char *)rma_op->origin_addr);
iov[1].MPID_IOV_LEN = rma_op->origin_count * origin_type_size;
mpi_errno = issue_from_origin_buffer(rma_op, iov, vc);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
......@@ -466,7 +434,6 @@ static int issue_get_acc_op(MPIDI_RMA_Op_t *rma_op, MPID_Win *win_ptr,
MPIDI_CH3_Pkt_flags_t flags)
{
MPIDI_VC_t *vc = NULL;
size_t len;
MPI_Aint origin_type_size;
MPID_Comm *comm_ptr = win_ptr->comm_ptr;
MPIDI_CH3_Pkt_get_accum_t *get_accum_pkt = &rma_op->pkt.get_accum;
......@@ -505,30 +472,12 @@ static int issue_get_acc_op(MPIDI_RMA_Op_t *rma_op, MPID_Win *win_ptr,
get_accum_pkt->request_handle = resp_req->handle;
get_accum_pkt->flags |= flags;
if (!rma_op->is_dt) {
/* Only fill IMMED data in response packet when both origin and target
buffers are basic datatype. */
get_accum_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_IMMED_RESP;
}
MPID_Datatype_get_size_macro(rma_op->origin_datatype, origin_type_size);
MPIU_Assign_trunc(len, rma_op->origin_count * origin_type_size, size_t);
if (!rma_op->is_dt) {
/* Fill origin data into packet header IMMED area as much as possible */
MPIU_Assign_trunc(get_accum_pkt->immed_len,
MPIR_MIN(len, (MPIDI_RMA_IMMED_BYTES/origin_type_size)*origin_type_size),
int);
if (get_accum_pkt->immed_len > 0) {
void *src = rma_op->origin_addr, *dest = get_accum_pkt->data;
mpi_errno = immed_copy(src, dest, (size_t)get_accum_pkt->immed_len);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
}
}
MPIDI_Comm_get_vc_set_active(comm_ptr, rma_op->target_rank, &vc);
if (len == (size_t)get_accum_pkt->immed_len) {
if (rma_op->pkt.type == MPIDI_CH3_PKT_GET_ACCUM_IMMED) {
/* All origin data is in packet header, issue the header. */
MPIU_THREAD_CS_ENTER(CH3COMM, vc);
mpi_errno = MPIDI_CH3_iStartMsg(vc, get_accum_pkt, sizeof(*get_accum_pkt), &(rma_op->request));
......@@ -539,10 +488,8 @@ static int issue_get_acc_op(MPIDI_RMA_Op_t *rma_op, MPID_Win *win_ptr,
/* We still need to issue from origin buffer. */
iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) get_accum_pkt;
iov[0].MPID_IOV_LEN = sizeof(*get_accum_pkt);
if (!rma_op->is_dt) {
iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) ((char *)rma_op->origin_addr + get_accum_pkt->immed_len);
iov[1].MPID_IOV_LEN = rma_op->origin_count * origin_type_size - get_accum_pkt->immed_len;
}
iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) ((char *)rma_op->origin_addr);
iov[1].MPID_IOV_LEN = rma_op->origin_count * origin_type_size;
mpi_errno = issue_from_origin_buffer(rma_op, iov, vc);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
......@@ -636,11 +583,6 @@ static int issue_get_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
get_pkt->request_handle = rma_op->request->handle;
get_pkt->flags |= flags;
if (!rma_op->is_dt) {
/* Only fill IMMED data in response packet when both origin and target
buffers are basic datatype. */
get_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_IMMED_RESP;
}
comm_ptr = win_ptr->comm_ptr;
MPIDI_Comm_get_vc_set_active(comm_ptr, rma_op->target_rank, &vc);
......@@ -705,7 +647,6 @@ static int issue_cas_op(MPIDI_RMA_Op_t * rma_op,
MPID_Win * win_ptr, MPIDI_RMA_Target_t *target_ptr,
MPIDI_CH3_Pkt_flags_t flags)
{
MPI_Aint len;
MPIDI_VC_t *vc = NULL;
MPID_Comm *comm_ptr = win_ptr->comm_ptr;
MPIDI_CH3_Pkt_cas_t *cas_pkt = &rma_op->pkt.cas;
......@@ -729,20 +670,12 @@ static int issue_cas_op(MPIDI_RMA_Op_t * rma_op,
rma_op->request->dev.user_count = rma_op->result_count;
rma_op->request->dev.datatype = rma_op->result_datatype;
/* REQUIRE: All datatype arguments must be of the same, builtin
* type and counts must be 1. */
MPID_Datatype_get_size_macro(rma_op->origin_datatype, len);
MPIU_Assert(len <= sizeof(MPIDI_CH3_CAS_Immed_u));
rma_op->request->dev.target_win_handle = cas_pkt->target_win_handle;
rma_op->request->dev.source_win_handle = cas_pkt->source_win_handle;
cas_pkt->request_handle = rma_op->request->handle;
cas_pkt->flags |= flags;
MPIU_Memcpy((void *) &cas_pkt->origin_data, rma_op->origin_addr, len);
MPIU_Memcpy((void *) &cas_pkt->compare_data, rma_op->compare_addr, len);
MPIDI_Comm_get_vc_set_active(comm_ptr, rma_op->target_rank, &vc);
MPIU_THREAD_CS_ENTER(CH3COMM, vc);
mpi_errno = MPIDI_CH3_iStartMsg(vc, cas_pkt, sizeof(*cas_pkt), &rmw_req);
......@@ -782,8 +715,8 @@ static int issue_fop_op(MPIDI_RMA_Op_t * rma_op,
MPID_Comm *comm_ptr = win_ptr->comm_ptr;
MPIDI_CH3_Pkt_fop_t *fop_pkt = &rma_op->pkt.fop;
MPID_Request *resp_req = NULL;
size_t len;
MPI_Aint origin_type_size;
MPID_IOV iov[MPID_IOV_LIMIT];
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_ISSUE_FOP_OP);
......@@ -809,26 +742,27 @@ static int issue_fop_op(MPIDI_RMA_Op_t * rma_op,
fop_pkt->flags |= flags;
MPID_Datatype_get_size_macro(rma_op->origin_datatype, origin_type_size);
MPIU_Assign_trunc(len, rma_op->origin_count * origin_type_size, size_t);
/* Fill origin data into packet header IMMED area as much as possible */
MPIU_Assign_trunc(fop_pkt->immed_len,
MPIR_MIN(len, (MPIDI_RMA_IMMED_BYTES/origin_type_size)*origin_type_size),
int);
if (fop_pkt->immed_len > 0) {
void *src = rma_op->origin_addr, *dest = fop_pkt->data;
mpi_errno = immed_copy(src, dest, (size_t)fop_pkt->immed_len);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
}
MPIDI_Comm_get_vc_set_active(comm_ptr, rma_op->target_rank, &vc);
MPID_Datatype_get_size_macro(rma_op->origin_datatype, origin_type_size);
if (rma_op->pkt.type == MPIDI_CH3_PKT_FOP_IMMED) {
/* All origin data is in packet header, issue the header. */
MPIU_THREAD_CS_ENTER(CH3COMM, vc);
mpi_errno = MPIDI_CH3_iStartMsg(vc, fop_pkt, sizeof(*fop_pkt), &(rma_op->request));
MPIU_THREAD_CS_EXIT(CH3COMM, vc);
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
}
else {
/* We still need to issue from origin buffer. */
iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) fop_pkt;
iov[0].MPID_IOV_LEN = sizeof(*fop_pkt);
iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) ((char *)rma_op->origin_addr);
iov[1].MPID_IOV_LEN = origin_type_size;
mpi_errno = issue_from_origin_buffer(rma_op, iov, vc);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
}
/* This operation can generate two requests; one for inbound and one for
* outbound data. */
......@@ -888,21 +822,25 @@ static inline int issue_rma_op(MPIDI_RMA_Op_t * op_ptr, MPID_Win * win_ptr,
switch (op_ptr->pkt.type) {
case (MPIDI_CH3_PKT_PUT):
case (MPIDI_CH3_PKT_PUT_IMMED):
mpi_errno = issue_put_op(op_ptr, win_ptr, target_ptr, flags);
break;
case (MPIDI_CH3_PKT_ACCUMULATE):
case (MPIDI_CH3_PKT_ACCUMULATE_IMMED):
mpi_errno = issue_acc_op(op_ptr, win_ptr, target_ptr, flags);
break;
case (MPIDI_CH3_PKT_GET_ACCUM):
case (MPIDI_CH3_PKT_GET_ACCUM_IMMED):
mpi_errno = issue_get_acc_op(op_ptr, win_ptr, target_ptr, flags);
break;
case (MPIDI_CH3_PKT_GET):
mpi_errno = issue_get_op(op_ptr, win_ptr, target_ptr, flags);
break;
case (MPIDI_CH3_PKT_CAS):
case (MPIDI_CH3_PKT_CAS_IMMED):
mpi_errno = issue_cas_op(op_ptr, win_ptr, target_ptr, flags);
break;
case (MPIDI_CH3_PKT_FOP):
case (MPIDI_CH3_PKT_FOP_IMMED):
mpi_errno = issue_fop_op(op_ptr, win_ptr, target_ptr, flags);
break;
default:
......
......@@ -1913,6 +1913,8 @@ int MPIDI_CH3_ReqHandler_AccumRecvComplete( MPIDI_VC_t *, MPID_Request *,
int * );
int MPIDI_CH3_ReqHandler_GaccumRecvComplete( MPIDI_VC_t *, MPID_Request *,
int * );
int MPIDI_CH3_ReqHandler_FOPRecvComplete( MPIDI_VC_t *, MPID_Request *,
int * );
int MPIDI_CH3_ReqHandler_AccumDerivedDTRecvComplete( MPIDI_VC_t *,
MPID_Request *,
int * );
......
This diff is collapsed.
......@@ -433,7 +433,6 @@ typedef struct MPIDI_Request {
MPI_Op op;
/* For accumulate, since data is first read into a tmp_buf */
void *real_user_buf;
void *final_user_buf;
/* For derived datatypes at target */
struct MPIDI_RMA_dtype_info *dtype_info;
void *dataloop;
......
......@@ -333,9 +333,12 @@ static inline int enqueue_lock_origin(MPID_Win *win_ptr, MPIDI_VC_t *vc,
}
if (pkt->type == MPIDI_CH3_PKT_LOCK ||
pkt->type == MPIDI_CH3_PKT_PUT_IMMED ||
pkt->type == MPIDI_CH3_PKT_ACCUMULATE_IMMED ||
pkt->type == MPIDI_CH3_PKT_GET ||
pkt->type == MPIDI_CH3_PKT_FOP ||
pkt->type == MPIDI_CH3_PKT_CAS) {
pkt->type == MPIDI_CH3_PKT_GET_ACCUM_IMMED ||
pkt->type == MPIDI_CH3_PKT_FOP_IMMED ||
pkt->type == MPIDI_CH3_PKT_CAS_IMMED) {
/* return bytes of data processed in this pkt handler */
(*buflen) = sizeof(MPIDI_CH3_Pkt_t);
......@@ -351,8 +354,6 @@ static inline int enqueue_lock_origin(MPID_Win *win_ptr, MPIDI_VC_t *vc,
MPID_Request *req = NULL;
MPI_Datatype target_dtp;
int target_count;
int immed_len = 0;
void *immed_data = NULL;
int complete = 0;
MPIDI_msg_sz_t data_len;
char *data_buf = NULL;
......@@ -365,17 +366,6 @@ static inline int enqueue_lock_origin(MPID_Win *win_ptr, MPIDI_VC_t *vc,
MPID_Datatype_get_size_macro(target_dtp, type_size);
recv_data_sz = type_size * target_count;
if (recv_data_sz <= MPIDI_RMA_IMMED_BYTES) {
/* return bytes of data processed in this pkt handler */
(*buflen) = sizeof(MPIDI_CH3_Pkt_t);
if (new_ptr != NULL)
new_ptr->all_data_recved = 1;
goto issue_ack;
}
if (new_ptr != NULL) {
if (win_ptr->current_lock_data_bytes + recv_data_sz
< MPIR_CVAR_CH3_RMA_LOCK_DATA_BYTES) {
......@@ -430,10 +420,6 @@ static inline int enqueue_lock_origin(MPID_Win *win_ptr, MPIDI_VC_t *vc,
req->dev.OnFinal = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete;
req->dev.lock_queue_entry = new_ptr;
MPIDI_CH3_PKT_RMA_GET_IMMED_LEN((*pkt), immed_len, mpi_errno);
if (immed_len > 0) {
req->dev.recv_data_sz -= immed_len;
}
data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t);
data_buf = (char *) pkt + sizeof(MPIDI_CH3_Pkt_t);
MPIU_Assert(req->dev.recv_data_sz > 0);
......@@ -447,15 +433,6 @@ static inline int enqueue_lock_origin(MPID_Win *win_ptr, MPIDI_VC_t *vc,
req->dev.OnFinal = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete;
req->dev.lock_queue_entry = new_ptr;
MPIDI_CH3_PKT_RMA_GET_IMMED_LEN((*pkt), immed_len, mpi_errno);
MPIDI_CH3_PKT_RMA_GET_IMMED_DATA_PTR((*pkt), immed_data, mpi_errno);
if (immed_len > 0) {
/* see if we can receive some data from packet header */
MPIU_Memcpy(req->dev.user_buf, immed_data, (size_t)immed_len);
req->dev.user_buf = (void*)((char*)req->dev.user_buf + immed_len);
req->dev.recv_data_sz -= immed_len;
}
data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t);
data_buf = (char *) pkt + sizeof(MPIDI_CH3_Pkt_t);
MPIU_Assert(req->dev.recv_data_sz > 0);
......@@ -597,7 +574,9 @@ static inline int adjust_op_piggybacked_with_lock (MPID_Win *win_ptr,
&(target->dt_op_list_tail), op);
}
else if (op->pkt.type == MPIDI_CH3_PKT_PUT ||
op->pkt.type == MPIDI_CH3_PKT_ACCUMULATE) {
op->pkt.type == MPIDI_CH3_PKT_PUT_IMMED ||
op->pkt.type == MPIDI_CH3_PKT_ACCUMULATE ||
op->pkt.type == MPIDI_CH3_PKT_ACCUMULATE_IMMED) {
MPIDI_CH3I_RMA_Ops_append(&(target->write_op_list),
&(target->write_op_list_tail), op);
}
......@@ -877,7 +856,8 @@ static inline int finish_op_on_target(MPID_Win *win_ptr, MPIDI_VC_t *vc,
MPI_Win source_win_handle) {
int mpi_errno = MPI_SUCCESS;
if (type == MPIDI_CH3_PKT_PUT || type == MPIDI_CH3_PKT_ACCUMULATE) {
if (type == MPIDI_CH3_PKT_PUT || type == MPIDI_CH3_PKT_PUT_IMMED ||
type == MPIDI_CH3_PKT_ACCUMULATE_IMMED || type == MPIDI_CH3_PKT_ACCUMULATE) {
/* This is PUT or ACC */
if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE) {
......
......@@ -573,12 +573,18 @@ int MPIDI_CH3_PktHandler_Init( MPIDI_CH3_PktHandler_Fcn *pktArray[],
We could even do lazy initialization (make this part of win_create) */
pktArray[MPIDI_CH3_PKT_PUT] =
MPIDI_CH3_PktHandler_Put;
pktArray[MPIDI_CH3_PKT_PUT_IMMED] =
MPIDI_CH3_PktHandler_Put;
pktArray[MPIDI_CH3_PKT_ACCUMULATE] =
MPIDI_CH3_PktHandler_Accumulate;
pktArray[MPIDI_CH3_PKT_ACCUMULATE_IMMED] =
MPIDI_CH3_PktHandler_Accumulate;
pktArray[MPIDI_CH3_PKT_GET] =
MPIDI_CH3_PktHandler_Get;
pktArray[MPIDI_CH3_PKT_GET_RESP] =
MPIDI_CH3_PktHandler_GetResp;
pktArray[MPIDI_CH3_PKT_GET_RESP_IMMED] =
MPIDI_CH3_PktHandler_GetResp;
pktArray[MPIDI_CH3_PKT_LOCK] =
MPIDI_CH3_PktHandler_Lock;
pktArray[MPIDI_CH3_PKT_LOCK_ACK] =
......@@ -593,18 +599,26 @@ int MPIDI_CH3_PktHandler_Init( MPIDI_CH3_PktHandler_Fcn *pktArray[],
MPIDI_CH3_PktHandler_FlushAck;
pktArray[MPIDI_CH3_PKT_DECR_AT_COUNTER] =
MPIDI_CH3_PktHandler_DecrAtCnt;
pktArray[MPIDI_CH3_PKT_CAS] =
pktArray[MPIDI_CH3_PKT_CAS_IMMED] =
MPIDI_CH3_PktHandler_CAS;
pktArray[MPIDI_CH3_PKT_CAS_RESP] =
pktArray[MPIDI_CH3_PKT_CAS_RESP_IMMED] =
MPIDI_CH3_PktHandler_CASResp;
pktArray[MPIDI_CH3_PKT_FOP] =
MPIDI_CH3_PktHandler_FOP;
pktArray[MPIDI_CH3_PKT_FOP_IMMED] =
MPIDI_CH3_PktHandler_FOP;
pktArray[MPIDI_CH3_PKT_FOP_RESP] =
MPIDI_CH3_PktHandler_FOPResp;
pktArray[MPIDI_CH3_PKT_FOP_RESP_IMMED] =
MPIDI_CH3_PktHandler_FOPResp;
pktArray[MPIDI_CH3_PKT_GET_ACCUM] =
MPIDI_CH3_PktHandler_GetAccumulate;
pktArray[MPIDI_CH3_PKT_GET_ACCUM_IMMED] =
MPIDI_CH3_PktHandler_GetAccumulate;
pktArray[MPIDI_CH3_PKT_GET_ACCUM_RESP] =
MPIDI_CH3_PktHandler_Get_AccumResp;
pktArray[MPIDI_CH3_PKT_GET_ACCUM_RESP_IMMED] =
MPIDI_CH3_PktHandler_Get_AccumResp;
/* End of default RMA operations */
/* Fault tolerance */
......
This diff is collapsed.
......@@ -217,7 +217,7 @@ int MPIDI_CH3_ReqHandler_CASSendComplete( MPIDI_VC_t *vc,
because inside finish_op_on_target() we may call this request handler
on the same request again (in release_lock()). Marking this request as
completed will prevent us from processing the same request twice. */
mpi_errno = finish_op_on_target(win_ptr, vc, MPIDI_CH3_PKT_CAS,
mpi_errno = finish_op_on_target(win_ptr, vc, MPIDI_CH3_PKT_CAS_IMMED,
flags, source_win_handle);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
......
......@@ -93,7 +93,6 @@ MPID_Request * MPID_Request_create(void)
req->dev.OnDataAvail = NULL;
req->dev.OnFinal = NULL;
req->dev.user_buf = NULL;
req->dev.final_user_buf = NULL;
req->dev.drop_data = FALSE;
#ifdef MPIDI_CH3_REQUEST_INIT
MPIDI_CH3_REQUEST_INIT(req);
......
......@@ -366,7 +366,9 @@ static inline int issue_ops_target(MPID_Win * win_ptr, MPIDI_RMA_Target_t *targe
win_ptr->active_req_cnt++;
if (curr_op->pkt.type == MPIDI_CH3_PKT_PUT ||
curr_op->pkt.type == MPIDI_CH3_PKT_ACCUMULATE) {
curr_op->pkt.type == MPIDI_CH3_PKT_PUT_IMMED ||
curr_op->pkt.type == MPIDI_CH3_PKT_ACCUMULATE ||
curr_op->pkt.type == MPIDI_CH3_PKT_ACCUMULATE_IMMED) {
target->put_acc_issued = 1; /* set PUT_ACC_FLAG when sending
PUT/ACC operation. */
}
......@@ -401,7 +403,9 @@ static inline int issue_ops_target(MPID_Win * win_ptr, MPIDI_RMA_Target_t *targe
&(target->dt_op_list_tail), curr_op);
}
else if (curr_op->pkt.type == MPIDI_CH3_PKT_PUT ||
curr_op->pkt.type == MPIDI_CH3_PKT_ACCUMULATE) {
curr_op->pkt.type == MPIDI_CH3_PKT_PUT_IMMED ||
curr_op->pkt.type == MPIDI_CH3_PKT_ACCUMULATE ||
curr_op->pkt.type == MPIDI_CH3_PKT_ACCUMULATE_IMMED) {
MPIDI_CH3I_RMA_Ops_append(&(target->write_op_list),
&(target->write_op_list_tail), curr_op);
}
......
......@@ -121,6 +121,9 @@ int MPIDI_CH3I_Put(const void *origin_addr, int origin_count, MPI_Datatype
else {
MPIDI_RMA_Op_t *new_ptr = NULL;
MPIDI_CH3_Pkt_put_t *put_pkt = NULL;
MPI_Aint origin_type_size;
size_t immed_len, len;
int use_immed_pkt = FALSE;
/* queue it up */
mpi_errno = MPIDI_CH3I_Win_get_op(win_ptr, &new_ptr);
......@@ -155,30 +158,48 @@ int MPIDI_CH3I_Put(const void *origin_addr, int origin_count, MPI_Datatype
new_ptr->is_dt = 1;
}
MPID_Datatype_get_size_macro(origin_datatype, origin_type_size);
MPIU_Assign_trunc(len, origin_count * origin_type_size, size_t);
/* Judge if we can use IMMED data packet */
if (!new_ptr->is_dt) {
MPIU_Assign_trunc(immed_len,
(MPIDI_RMA_IMMED_BYTES/origin_type_size)*origin_type_size,
size_t);
if (len <= immed_len)
use_immed_pkt = TRUE;
}
/* Judge if this operation is an piggyback candidate */
if (!new_ptr->is_dt) {
size_t len;
MPI_Aint origin_type_size;
MPID_Datatype_get_size_macro(new_ptr->origin_datatype, origin_type_size);
MPIU_Assign_trunc(len, new_ptr->origin_count * origin_type_size, size_t);
if (len <= MPIR_MAX(MPIDI_RMA_IMMED_BYTES,
MPIR_CVAR_CH3_RMA_OP_PIGGYBACK_LOCK_DATA_SIZE))
if (len <= MPIR_CVAR_CH3_RMA_OP_PIGGYBACK_LOCK_DATA_SIZE)
new_ptr->piggyback_lock_candidate = 1;
}
/************** Setting packet struct areas in operation ****************/
put_pkt = &(new_ptr->pkt.put);
MPIDI_Pkt_init(put_pkt, MPIDI_CH3_PKT_PUT);
if (use_immed_pkt) {
MPIDI_Pkt_init(put_pkt, MPIDI_CH3_PKT_PUT_IMMED);
}
else {
MPIDI_Pkt_init(put_pkt, MPIDI_CH3_PKT_PUT);
}
put_pkt->addr = (char *) win_ptr->base_addrs[target_rank] +
win_ptr->disp_units[target_rank] * target_disp;
put_pkt->count = target_count;
put_pkt->datatype = target_datatype;
put_pkt->dataloop_size = 0;
put_pkt->info.dataloop_size = 0;
put_pkt->target_win_handle = win_ptr->all_win_handles[target_rank];
put_pkt->source_win_handle = win_ptr->handle;
put_pkt->immed_len = 0;
put_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (use_immed_pkt) {
void *src = (void *)origin_addr, *dest = (void *)(put_pkt->info.data);
mpi_errno = immed_copy(src, dest, len);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
}
MPIR_T_PVAR_TIMER_END(RMA, rma_rmaqueue_set);
......@@ -288,6 +309,9 @@ int MPIDI_CH3I_Get(void *origin_addr, int origin_count, MPI_Datatype
else {
MPIDI_RMA_Op_t *new_ptr = NULL;
MPIDI_CH3_Pkt_get_t *get_pkt = NULL;
MPI_Aint target_type_size;
size_t immed_len, len;
int use_immed_resp_pkt = FALSE;
/* queue it up */
mpi_errno = MPIDI_CH3I_Win_get_op(win_ptr, &new_ptr);
......@@ -322,6 +346,18 @@ int MPIDI_CH3I_Get(void *origin_addr, int origin_count, MPI_Datatype
new_ptr->is_dt = 1;
}
MPID_Datatype_get_size_macro(target_datatype, target_type_size);
MPIU_Assign_trunc(len, target_count * target_type_size, size_t);
/* Judge if we can use IMMED data response packet */
if (!new_ptr->is_dt) {
MPIU_Assign_trunc(immed_len,
(MPIDI_RMA_IMMED_BYTES/target_type_size)*target_type_size,
size_t);
if (len <= immed_len)
use_immed_resp_pkt = TRUE;
}
/* Judge if this operation is an piggyback candidate. */
if (!new_ptr->is_dt) {
new_ptr->piggyback_lock_candidate = 1;
......@@ -335,10 +371,12 @@ int MPIDI_CH3I_Get(void *origin_addr, int origin_count, MPI_Datatype
win_ptr->disp_units[target_rank] * target_disp;
get_pkt->count = target_count;
get_pkt->datatype = target_datatype;
get_pkt->dataloop_size = 0;
get_pkt->info.dataloop_size = 0;
get_pkt->target_win_handle = win_ptr->all_win_handles[target_rank];
get_pkt->source_win_handle = win_ptr->handle;
get_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (use_immed_resp_pkt)
get_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_IMMED_RESP;
MPIR_T_PVAR_TIMER_END(RMA, rma_rmaqueue_set);
......@@ -450,6 +488,9 @@ int MPIDI_CH3I_Accumulate(const void *origin_addr, int origin_count, MPI_Datatyp
else {
MPIDI_RMA_Op_t *new_ptr = NULL;
MPIDI_CH3_Pkt_accum_t *accum_pkt = NULL;
MPI_Aint origin_type_size;
size_t immed_len, len;
int use_immed_pkt = FALSE;
/* queue it up */
mpi_errno = MPIDI_CH3I_Win_get_op(win_ptr, &new_ptr);
......@@ -483,14 +524,21 @@ int MPIDI_CH3I_Accumulate(const void *origin_addr, int origin_count, MPI_Datatyp
new_ptr->is_dt = 1;
}
MPID_Datatype_get_size_macro(origin_datatype, origin_type_size);
MPIU_Assign_trunc(len, origin_count * origin_type_size, size_t);
/* Judge if we can use IMMED data packet */
if (!new_ptr->is_dt) {
MPIU_Assign_trunc(immed_len,
(MPIDI_RMA_IMMED_BYTES/origin_type_size)*origin_type_size,
size_t);
if (len <= immed_len)
use_immed_pkt = TRUE;
}
/* Judge if this operation is an piggyback candidate. */
if (!new_ptr->is_dt) {
size_t len;
MPI_Aint origin_type_size;
MPID_Datatype_get_size_macro(new_ptr->origin_datatype, origin_type_size);
MPIU_Assign_trunc(len, new_ptr->origin_count * origin_type_size, size_t);
if (len <= MPIR_MAX(MPIDI_RMA_IMMED_BYTES,