Commit 87acbbbe authored by Xin Zhao's avatar Xin Zhao
Browse files

Bug-fix: add IMMED area in GET/GACC response packets

In this patch we allow GET/GACC response packets to
piggyback some IMMED data, just like what we did
for PUT/GACC/FOP/CAS packets.

No reviewer.
parent 4739df59
......@@ -119,7 +119,8 @@ typedef enum {
MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK = 256,
MPIDI_CH3_PKT_FLAG_RMA_UNLOCK_ACK = 512,
MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED = 1024,
MPIDI_CH3_PKT_FLAG_RMA_UNLOCK_NO_ACK = 2048
MPIDI_CH3_PKT_FLAG_RMA_UNLOCK_NO_ACK = 2048,
MPIDI_CH3_PKT_FLAG_RMA_IMMED_RESP = 4096
} MPIDI_CH3_Pkt_flags_t;
typedef struct MPIDI_CH3_Pkt_send {
......@@ -559,6 +560,9 @@ typedef struct MPIDI_CH3_Pkt_get_resp {
/* Followings are used to decrement ack_counter at origin */
MPI_Win source_win_handle;
int target_rank;
/* Followings are to piggyback IMMED data */
size_t immed_len;
char data[MPIDI_RMA_IMMED_BYTES];
} MPIDI_CH3_Pkt_get_resp_t;
typedef struct MPIDI_CH3_Pkt_get_accum_resp {
......@@ -570,6 +574,9 @@ typedef struct MPIDI_CH3_Pkt_get_accum_resp {
/* Followings are used to decrement ack_counter at origin */
MPI_Win source_win_handle;
int target_rank;
/* Followings are to piggyback IMMED data */
size_t immed_len;
char data[MPIDI_RMA_IMMED_BYTES];
} MPIDI_CH3_Pkt_get_accum_resp_t;
typedef struct MPIDI_CH3_Pkt_fop_resp {
......
......@@ -224,6 +224,8 @@ int MPIDI_CH3_ReqHandler_GaccumRecvComplete( MPIDI_VC_t *vc,
MPID_Request *resp_req;
MPID_IOV iov[MPID_IOV_LIMIT];
MPI_Aint true_lb, true_extent;
size_t len;
int iovcnt;
MPIU_CHKPMEM_DECL(1);
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_GACCUMRECVCOMPLETE);
......@@ -242,6 +244,7 @@ int MPIDI_CH3_ReqHandler_GaccumRecvComplete( MPIDI_VC_t *vc,
get_accum_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK;
if (rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK)
get_accum_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_UNLOCK_ACK;
get_accum_resp_pkt->immed_len = 0;
MPID_Datatype_get_size_macro(rreq->dev.datatype, type_size);
......@@ -275,14 +278,42 @@ int MPIDI_CH3_ReqHandler_GaccumRecvComplete( MPIDI_VC_t *vc,
operation are completed when counter reaches zero. */
win_ptr->at_completion_counter++;
iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) get_accum_resp_pkt;
iov[0].MPID_IOV_LEN = sizeof(*get_accum_resp_pkt);
/* length of target data */
MPIU_Assign_trunc(len, rreq->dev.user_count * type_size, size_t);
/* both origin buffer and target buffer are basic datatype,
fill IMMED data area in response packet header. */
if (rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_IMMED_RESP) {
/* Try to copy target data into packet header. */
MPIU_Assign_trunc(get_accum_resp_pkt->immed_len,
MPIR_MIN(len, (MPIDI_RMA_IMMED_BYTES / type_size) * type_size),
size_t);
if (get_accum_resp_pkt->immed_len > 0) {
void *src = resp_req->dev.user_buf;
void *dest = (void*) get_accum_resp_pkt->data;
/* copy data from origin buffer to immed area in packet header */
mpi_errno = immed_copy(src, dest, get_accum_resp_pkt->immed_len);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
}
}
iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)resp_req->dev.user_buf;
iov[1].MPID_IOV_LEN = type_size*rreq->dev.user_count;
if (len == get_accum_resp_pkt->immed_len) {
/* All origin data is in packet header, issue the header. */
iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) get_accum_resp_pkt;
iov[0].MPID_IOV_LEN = sizeof(*get_accum_resp_pkt);
iovcnt = 1;
}
else {
iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) get_accum_resp_pkt;
iov[0].MPID_IOV_LEN = sizeof(*get_accum_resp_pkt);
iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) ((char *)resp_req->dev.user_buf + get_accum_resp_pkt->immed_len);
iov[1].MPID_IOV_LEN = rreq->dev.user_count * type_size - get_accum_resp_pkt->immed_len;
iovcnt = 2;
}
MPIU_THREAD_CS_ENTER(CH3COMM,vc);
mpi_errno = MPIDI_CH3_iSendv(vc, resp_req, iov, 2);
mpi_errno = MPIDI_CH3_iSendv(vc, resp_req, iov, iovcnt);
MPIU_THREAD_CS_EXIT(CH3COMM,vc);
MPIU_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
......@@ -569,6 +600,7 @@ int MPIDI_CH3_ReqHandler_GetDerivedDTRecvComplete( MPIDI_VC_t *vc,
get_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK;
if (rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK)
get_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_UNLOCK_ACK;
get_resp_pkt->immed_len = 0;
sreq->dev.segment_ptr = MPID_Segment_alloc( );
MPIU_ERR_CHKANDJUMP1((sreq->dev.segment_ptr == NULL), mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s", "MPID_Segment_alloc");
......@@ -836,6 +868,8 @@ static inline int perform_get_in_lock_queue(MPID_Win *win_ptr, MPIDI_Win_lock_qu
MPID_Request *sreq = NULL;
MPIDI_VC_t *vc = NULL;
MPI_Aint type_size;
size_t len;
int iovcnt;
MPID_IOV iov[MPID_IOV_LIMIT];
int mpi_errno = MPI_SUCCESS;
......@@ -868,18 +902,47 @@ static inline int perform_get_in_lock_queue(MPID_Win *win_ptr, MPIDI_Win_lock_qu
get_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_UNLOCK_ACK;
get_resp_pkt->target_rank = win_ptr->comm_ptr->rank;
get_resp_pkt->source_win_handle = get_pkt->source_win_handle;
get_resp_pkt->immed_len = 0;
iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) get_resp_pkt;
iov[0].MPID_IOV_LEN = sizeof(*get_resp_pkt);
iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)get_pkt->addr;
/* length of target data */
MPID_Datatype_get_size_macro(get_pkt->datatype, type_size);
iov[1].MPID_IOV_LEN = get_pkt->count * type_size;
MPIU_Assign_trunc(len, get_pkt->count * type_size, size_t);
/* both origin buffer and target buffer are basic datatype,
fill IMMED data area in response packet header. */
if (get_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_IMMED_RESP) {
/* Try to copy target data into packet header. */
MPIU_Assign_trunc(get_resp_pkt->immed_len,
MPIR_MIN(len, (MPIDI_RMA_IMMED_BYTES / type_size) * type_size),
size_t);
if (get_resp_pkt->immed_len > 0) {
void *src = get_pkt->addr;
void *dest = (void*) get_resp_pkt->data;
/* copy data from origin buffer to immed area in packet header */
mpi_errno = immed_copy(src, dest, get_resp_pkt->immed_len);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
}
}
if (len == get_resp_pkt->immed_len) {
/* All origin data is in packet header, issue the header. */
iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) get_resp_pkt;
iov[0].MPID_IOV_LEN = sizeof(*get_resp_pkt);
iovcnt = 1;
}
else {
iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) get_resp_pkt;
iov[0].MPID_IOV_LEN = sizeof(*get_resp_pkt);
iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) ((char *)get_pkt->addr + get_resp_pkt->immed_len);
iov[1].MPID_IOV_LEN = get_pkt->count * type_size - get_resp_pkt->immed_len;
iovcnt = 2;
}
/* get vc object */
MPIDI_Comm_get_vc(win_ptr->comm_ptr, get_pkt->origin_rank, &vc);
mpi_errno = MPIDI_CH3_iSendv(vc, sreq, iov, 2);
mpi_errno = MPIDI_CH3_iSendv(vc, sreq, iov, iovcnt);
if (mpi_errno != MPI_SUCCESS) {
MPID_Request_release(sreq);
MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
......@@ -940,6 +1003,8 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win *win_ptr, MPIDI_Win_loc
MPID_Request *sreq = NULL;
MPIDI_VC_t *vc = NULL;
MPI_Aint type_size;
size_t len;
int iovcnt;
MPID_IOV iov[MPID_IOV_LIMIT];
int mpi_errno = MPI_SUCCESS;
......@@ -990,17 +1055,46 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win *win_ptr, MPIDI_Win_loc
get_accum_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_UNLOCK_ACK;
get_accum_resp_pkt->target_rank = win_ptr->comm_ptr->rank;
get_accum_resp_pkt->source_win_handle = get_accum_pkt->source_win_handle;
get_accum_resp_pkt->immed_len = 0;
/* length of target data */
MPIU_Assign_trunc(len, get_accum_pkt->count * type_size, size_t);
/* both origin buffer and target buffer are basic datatype,
fill IMMED data area in response packet header. */
if (get_accum_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_IMMED_RESP) {
/* Try to copy target data into packet header. */
MPIU_Assign_trunc(get_accum_resp_pkt->immed_len,
MPIR_MIN(len, (MPIDI_RMA_IMMED_BYTES / type_size) * type_size),
size_t);
if (get_accum_resp_pkt->immed_len > 0) {
void *src = sreq->dev.user_buf;
void *dest = (void*) get_accum_resp_pkt->data;
/* copy data from origin buffer to immed area in packet header */
mpi_errno = immed_copy(src, dest, get_accum_resp_pkt->immed_len);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
}
}
iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) get_accum_resp_pkt;
iov[0].MPID_IOV_LEN = sizeof(*get_accum_resp_pkt);
iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) sreq->dev.user_buf;
iov[1].MPID_IOV_LEN = get_accum_pkt->count * type_size;
if (len == get_accum_resp_pkt->immed_len) {
/* All origin data is in packet header, issue the header. */
iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) get_accum_resp_pkt;
iov[0].MPID_IOV_LEN = sizeof(*get_accum_resp_pkt);
iovcnt = 1;
}
else {
iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) get_accum_resp_pkt;
iov[0].MPID_IOV_LEN = sizeof(*get_accum_resp_pkt);
iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) ((char *)sreq->dev.user_buf + get_accum_resp_pkt->immed_len);
iov[1].MPID_IOV_LEN = get_accum_pkt->count * type_size - get_accum_resp_pkt->immed_len;
iovcnt = 2;
}
/* get vc object */
MPIDI_Comm_get_vc(win_ptr->comm_ptr, get_accum_pkt->origin_rank, &vc);
mpi_errno = MPIDI_CH3_iSendv(vc, sreq, iov, 2);
mpi_errno = MPIDI_CH3_iSendv(vc, sreq, iov, iovcnt);
if (mpi_errno != MPI_SUCCESS) {
MPID_Request_release(sreq);
MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
......
......@@ -358,6 +358,10 @@ int MPIDI_CH3I_Get(void *origin_addr, int origin_count, MPI_Datatype
if (!new_ptr->is_dt) {
new_ptr->piggyback_lock_candidate = 1;
/* Only fill IMMED data in response packet when both origin and target
buffers are basic datatype. */
get_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_IMMED_RESP;
}
MPIR_T_PVAR_TIMER_END(RMA, rma_rmaqueue_set);
......@@ -695,6 +699,10 @@ int MPIDI_CH3I_Get_accumulate(const void *origin_addr, int origin_count,
if (!new_ptr->is_dt) {
new_ptr->piggyback_lock_candidate = 1;
/* Only fill IMMED data in response packet when both origin and target
buffers are basic datatype. */
get_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_IMMED_RESP;
}
}
......@@ -763,6 +771,10 @@ int MPIDI_CH3I_Get_accumulate(const void *origin_addr, int origin_count,
if (len <= MPIR_MAX(MPIDI_RMA_IMMED_BYTES,
MPIR_CVAR_CH3_RMA_OP_PIGGYBACK_LOCK_DATA_SIZE))
new_ptr->piggyback_lock_candidate = 1;
/* Only fill IMMED data in response packet when both origin and target
buffers are basic datatype. */
get_accum_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_IMMED_RESP;
}
}
......@@ -1128,6 +1140,7 @@ int MPIDI_Fetch_and_op(const void *origin_addr, void *result_addr,
get_pkt->source_win_handle = win_ptr->handle;
get_pkt->origin_rank = rank;
get_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
get_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_IMMED_RESP;
new_ptr->origin_addr = result_addr;
new_ptr->origin_count = 1;
......
......@@ -395,6 +395,8 @@ int MPIDI_CH3_PktHandler_Get(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
/* basic datatype. send the data. */
MPIDI_CH3_Pkt_t upkt;
MPIDI_CH3_Pkt_get_resp_t *get_resp_pkt = &upkt.get_resp;
size_t len;
int iovcnt;
MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_GET_RESP);
req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_GetSendComplete;
......@@ -412,16 +414,45 @@ int MPIDI_CH3_PktHandler_Get(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
get_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_UNLOCK_ACK;
get_resp_pkt->target_rank = win_ptr->comm_ptr->rank;
get_resp_pkt->source_win_handle = get_pkt->source_win_handle;
get_resp_pkt->immed_len = 0;
iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) get_resp_pkt;
iov[0].MPID_IOV_LEN = sizeof(*get_resp_pkt);
iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) get_pkt->addr;
/* length of target data */
MPID_Datatype_get_size_macro(get_pkt->datatype, type_size);
iov[1].MPID_IOV_LEN = get_pkt->count * type_size;
MPIU_Assign_trunc(len, get_pkt->count * type_size, size_t);
/* both origin buffer and target buffer are basic datatype,
fill IMMED data area in response packet header. */
if (get_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_IMMED_RESP) {
/* Try to copy target data into packet header. */
MPIU_Assign_trunc(get_resp_pkt->immed_len,
MPIR_MIN(len, (MPIDI_RMA_IMMED_BYTES / type_size) * type_size),
size_t);
if (get_resp_pkt->immed_len > 0) {
void *src = get_pkt->addr;
void *dest = (void*) get_resp_pkt->data;
/* copy data from origin buffer to immed area in packet header */
mpi_errno = immed_copy(src, dest, get_resp_pkt->immed_len);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
}
}
if (len == get_resp_pkt->immed_len) {
/* All origin data is in packet header, issue the header. */
iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) get_resp_pkt;
iov[0].MPID_IOV_LEN = sizeof(*get_resp_pkt);
iovcnt = 1;
}
else {
iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) get_resp_pkt;
iov[0].MPID_IOV_LEN = sizeof(*get_resp_pkt);
iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) ((char *)get_pkt->addr + get_resp_pkt->immed_len);
iov[1].MPID_IOV_LEN = get_pkt->count * type_size - get_resp_pkt->immed_len;
iovcnt = 2;
}
MPIU_THREAD_CS_ENTER(CH3COMM, vc);
mpi_errno = MPIDI_CH3_iSendv(vc, req, iov, 2);
mpi_errno = MPIDI_CH3_iSendv(vc, req, iov, iovcnt);
MPIU_THREAD_CS_EXIT(CH3COMM, vc);
/* --BEGIN ERROR HANDLING-- */
if (mpi_errno != MPI_SUCCESS) {
......@@ -1230,10 +1261,27 @@ int MPIDI_CH3_PktHandler_Get_AccumResp(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
MPID_Datatype_get_size_macro(req->dev.datatype, type_size);
req->dev.recv_data_sz = type_size * req->dev.user_count;
if (get_accum_resp_pkt->immed_len > 0) {
/* first copy IMMED data from pkt header to origin buffer */
MPIU_Memcpy(req->dev.user_buf, get_accum_resp_pkt->data, get_accum_resp_pkt->immed_len);
req->dev.user_buf = (void*)((char*)req->dev.user_buf + get_accum_resp_pkt->immed_len);
req->dev.recv_data_sz -= get_accum_resp_pkt->immed_len;
if (req->dev.recv_data_sz == 0)
complete = 1;
/* return the number of bytes processed in this function */
*buflen = sizeof(MPIDI_CH3_Pkt_t);
}
if(req->dev.recv_data_sz > 0) {
*rreqp = req;
mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete);
MPIU_ERR_CHKANDJUMP1(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
"**ch3|postrecv %s", "MPIDI_CH3_PKT_GET_ACCUM_RESP");
/* return the number of bytes processed in this function */
*buflen = data_len + sizeof(MPIDI_CH3_Pkt_t);
}
if (complete) {
/* Request-based RMA defines final actions for completing user request. */
int (*reqFn)(MPIDI_VC_t *, MPID_Request *, int *);
......@@ -1246,8 +1294,6 @@ int MPIDI_CH3_PktHandler_Get_AccumResp(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
}
*rreqp = NULL;
}
/* return the number of bytes processed in this function */
*buflen = data_len + sizeof(MPIDI_CH3_Pkt_t);
fn_exit:
MPIR_T_PVAR_TIMER_END(RMA, rma_rmapkt_get_accum_resp);
......@@ -1350,10 +1396,28 @@ int MPIDI_CH3_PktHandler_GetResp(MPIDI_VC_t * vc ATTRIBUTE((unused)),
MPID_Datatype_get_size_macro(req->dev.datatype, type_size);
req->dev.recv_data_sz = type_size * req->dev.user_count;
if (get_resp_pkt->immed_len > 0) {
/* first copy IMMED data from pkt header to origin buffer */
MPIU_Memcpy(req->dev.user_buf, get_resp_pkt->data, get_resp_pkt->immed_len);
req->dev.user_buf = (void*)((char*)req->dev.user_buf + get_resp_pkt->immed_len);
req->dev.recv_data_sz -= get_resp_pkt->immed_len;
if (req->dev.recv_data_sz == 0)
complete = 1;
/* return the number of bytes processed in this function */
*buflen = sizeof(MPIDI_CH3_Pkt_t);
}
if (req->dev.recv_data_sz > 0) {
*rreqp = req;
mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete);
MPIU_ERR_CHKANDJUMP1(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv", "**ch3|postrecv %s",
"MPIDI_CH3_PKT_GET_RESP");
/* return the number of bytes processed in this function */
*buflen = data_len + sizeof(MPIDI_CH3_Pkt_t);
}
if (complete) {
/* Request-based RMA defines final actions for completing user request. */
int (*reqFn)(MPIDI_VC_t *, MPID_Request *, int *);
......@@ -1367,8 +1431,6 @@ int MPIDI_CH3_PktHandler_GetResp(MPIDI_VC_t * vc ATTRIBUTE((unused)),
*rreqp = NULL;
}
/* return the number of bytes processed in this function */
*buflen = data_len + sizeof(MPIDI_CH3_Pkt_t);
fn_exit:
MPIR_T_PVAR_TIMER_END(RMA, rma_rmapkt_get_resp);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment