Commit 9574b9ab authored by James Dinan's avatar James Dinan
Browse files

[svn-r10249] Updated fetch_and_op implementation

Updated the fetch_and_op implementation to have two data transfer paths; one
where data can be embedded in the packet header and one where it is sent
separately.  With this change, the header size is back to 40 bytes.

Reviewer: buntinas
parent 4237b489
......@@ -1895,6 +1895,7 @@ int MPIDI_CH3_ReqHandler_SinglePutAccumComplete( MPIDI_VC_t *, MPID_Request *,
int * );
int MPIDI_CH3_ReqHandler_GetRespDerivedDTComplete( MPIDI_VC_t *,
MPID_Request *, int * );
int MPIDI_CH3_ReqHandler_FOPComplete( MPIDI_VC_t *, MPID_Request *, int * );
/* Send Handlers */
int MPIDI_CH3_ReqHandler_SendReloadIOV( MPIDI_VC_t *vc, MPID_Request *sreq,
......
......@@ -34,9 +34,13 @@ typedef union {
#undef MPIR_OP_TYPE_MACRO
} MPIDI_CH3_CAS_Immed_u;
/* Union over all types (all builtin types) that are allowed in a Fetch-and-op
operation. This is used to allocate enough space in the packet header for
immediate data. */
/* Union over all types (all predefined types) that are allowed in a
Fetch-and-op operation. This can be too large for the packet header, so we
limit the immediate space in the header to FOP_IMMED_INTS. */
#define MPIDI_RMA_FOP_IMMED_INTS 2
#define MPIDI_RMA_FOP_RESP_IMMED_INTS 8
typedef union {
#define MPIR_OP_TYPE_MACRO(mpi_type_, c_type_, type_name_) c_type_ fop##type_name_;
MPIR_OP_TYPE_GROUP_ALL_BASIC
......@@ -82,7 +86,6 @@ typedef enum MPIDI_CH3_Pkt_type
MPIDI_CH3_PKT_CAS_UNLOCK,
MPIDI_CH3_PKT_CAS_RESP,
MPIDI_CH3_PKT_FOP,
MPIDI_CH3_PKT_FOP_UNLOCK,
MPIDI_CH3_PKT_FOP_RESP,
MPIDI_CH3_PKT_FLOW_CNTL_UPDATE, /* FIXME: Unused */
MPIDI_CH3_PKT_CLOSE,
......@@ -291,11 +294,10 @@ typedef struct MPIDI_CH3_Pkt_fop
* epoch for decrementing rma op counter in
* active target rma and for unlocking window
* in passive target rma. Otherwise set to NULL*/
/* source_win_handle is omitted here to reduce
* the packet size. If this is the last FETCH_OP
* packet, the type will be set to FETCH_OP_UNLOCK */
MPIDI_CH3_FOP_Immed_u origin_data;
MPI_Win source_win_handle; /* Used in the last RMA operation in an
* epoch in the case of passive target rma
* with shared locks. Otherwise set to NULL*/
int origin_data[MPIDI_RMA_FOP_IMMED_INTS];
}
MPIDI_CH3_Pkt_fop_t;
......@@ -303,7 +305,7 @@ typedef struct MPIDI_CH3_Pkt_fop_resp
{
MPIDI_CH3_Pkt_type_t type;
MPI_Request request_handle;
MPIDI_CH3_FOP_Immed_u data;
int data[MPIDI_RMA_FOP_RESP_IMMED_INTS];
}
MPIDI_CH3_Pkt_fop_resp_t;
......
......@@ -596,8 +596,6 @@ int MPIDI_CH3_PktHandler_Init( MPIDI_CH3_PktHandler_Fcn *pktArray[],
MPIDI_CH3_PktHandler_CASResp;
pktArray[MPIDI_CH3_PKT_FOP] =
MPIDI_CH3_PktHandler_FOP;
pktArray[MPIDI_CH3_PKT_FOP_UNLOCK] =
MPIDI_CH3_PktHandler_FOP;
pktArray[MPIDI_CH3_PKT_FOP_RESP] =
MPIDI_CH3_PktHandler_FOPResp;
/* End of default RMA operations */
......
......@@ -414,6 +414,130 @@ int MPIDI_CH3_ReqHandler_SinglePutAccumComplete( MPIDI_VC_t *vc,
return mpi_errno;
}
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3_ReqHandler_FOPComplete
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3_ReqHandler_FOPComplete( MPIDI_VC_t *vc,
MPID_Request *rreq, int *complete )
{
int mpi_errno = MPI_SUCCESS;
MPIDI_CH3_Pkt_t upkt;
MPIDI_CH3_Pkt_fop_resp_t *fop_resp_pkt = &upkt.fop_resp;
MPID_Request *resp_req;
MPID_Win *win_ptr;
MPI_User_function *uop;
int len, one;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_FOPCOMPLETE);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_FOPCOMPLETE);
MPIU_DBG_MSG(CH3_OTHER,VERBOSE,"performing FOP operation");
MPID_Datatype_get_size_macro(rreq->dev.datatype, len);
MPIDI_Pkt_init(fop_resp_pkt, MPIDI_CH3_PKT_FOP_RESP);
fop_resp_pkt->request_handle = rreq->dev.request_handle;
/* Copy original data into the send buffer. If data will fit in the
header, use that. Otherwise allocate a temporary buffer. */
if (len <= sizeof(fop_resp_pkt->data)) {
MPIU_Memcpy( fop_resp_pkt->data, rreq->dev.real_user_buf, len );
}
else {
resp_req = MPID_Request_create();
MPIU_ERR_CHKANDJUMP(resp_req == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
MPIU_Object_set_ref(resp_req, 1);
MPIDI_CH3U_SRBuf_alloc(resp_req, len);
MPIU_ERR_CHKANDJUMP(resp_req->dev.tmpbuf_sz < len, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
MPIU_Memcpy( resp_req->dev.tmpbuf, rreq->dev.real_user_buf, len );
}
/* Apply the op */
uop = MPIR_OP_HDL_TO_FN(rreq->dev.op);
one = 1;
(*uop)(rreq->dev.user_buf, rreq->dev.real_user_buf, &one, &rreq->dev.datatype);
/* Send back the original data. We do this here to ensure that the
operation is remote complete before responding to the origin. */
if (len <= sizeof(fop_resp_pkt->data)) {
MPIU_THREAD_CS_ENTER(CH3COMM,vc);
mpi_errno = MPIDI_CH3_iStartMsg(vc, fop_resp_pkt, sizeof(*fop_resp_pkt), &resp_req);
MPIU_THREAD_CS_EXIT(CH3COMM,vc);
MPIU_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
if (resp_req != NULL) {
MPID_Request_release(resp_req);
}
}
else {
MPID_IOV iov[MPID_IOV_LIMIT];
iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) fop_resp_pkt;
iov[0].MPID_IOV_LEN = sizeof(*fop_resp_pkt);
iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)resp_req->dev.tmpbuf;
iov[1].MPID_IOV_LEN = len;
MPIU_THREAD_CS_ENTER(CH3COMM,vc);
mpi_errno = MPIDI_CH3_iSendv(vc, resp_req, iov, 2);
MPIU_THREAD_CS_EXIT(CH3COMM,vc);
MPIU_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
}
/* Free temporary buffer allocated in PktHandler_FOP */
if (len > sizeof(int) * MPIDI_RMA_FOP_IMMED_INTS) {
MPIU_Free(rreq->dev.user_buf);
}
/* There are additional steps to take if this is a passive
target RMA or the last operation from the source */
MPID_Win_get_ptr(rreq->dev.target_win_handle, win_ptr);
/* if passive target RMA, increment counter */
if (win_ptr->current_lock_type != MPID_LOCK_NONE)
win_ptr->my_pt_rma_puts_accs++;
if (rreq->dev.source_win_handle != MPI_WIN_NULL) {
/* Last RMA operation from source. If active
target RMA, decrement window counter. If
passive target RMA, release lock on window and
grant next lock in the lock queue if there is
any. If it's a shared lock or a lock-put-unlock
type of optimization, we also need to send an
ack to the source. */
if (win_ptr->current_lock_type == MPID_LOCK_NONE) {
/* FIXME: MT: this has to be done atomically */
win_ptr->my_counter -= 1;
MPIDI_CH3_Progress_signal_completion();
}
else {
mpi_errno = MPIDI_CH3I_Release_lock(win_ptr);
/* Without the following signal_completion call, we
sometimes hang */
MPIDI_CH3_Progress_signal_completion();
}
}
*complete = 1;
fn_exit:
MPID_Request_release(rreq);
MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_REQHANDLER_FOPCOMPLETE);
return mpi_errno;
/* --BEGIN ERROR HANDLING-- */
fn_fail:
if (resp_req != NULL) {
MPIU_Object_set_ref(resp_req, 0);
MPIDI_CH3_Request_destroy(resp_req);
}
goto fn_exit;
/* --END ERROR HANDLING-- */
}
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3_ReqHandler_UnpackUEBufComplete
#undef FCNAME
......
......@@ -837,7 +837,7 @@ static int MPIDI_CH3I_Send_immed_rmw_msg(MPIDI_RMA_ops *rma_op,
MPID_Request **request)
{
int mpi_errno = MPI_SUCCESS;
MPID_Request *req, *resp_req = NULL;
MPID_Request *rmw_req = NULL, *resp_req = NULL;
MPIDI_VC_t *vc;
MPID_Comm *comm_ptr;
int len;
......@@ -861,12 +861,13 @@ static int MPIDI_CH3I_Send_immed_rmw_msg(MPIDI_RMA_ops *rma_op,
resp_req->dev.user_buf = rma_op->result_addr;
resp_req->dev.user_count = rma_op->result_count;
resp_req->dev.datatype = rma_op->result_datatype;
resp_req->dev.target_win_handle = MPI_WIN_NULL;
resp_req->dev.target_win_handle = target_win_handle;
resp_req->dev.source_win_handle = source_win_handle;
/* REQUIRE: All datatype arguments must be of the same, builtin
type and counts must be 1. */
MPID_Datatype_get_size_macro(rma_op->origin_datatype, len);
comm_ptr = win_ptr->comm_ptr;
if (rma_op->type == MPIDI_RMA_COMPARE_AND_SWAP) {
MPIDI_CH3_Pkt_t upkt;
......@@ -882,7 +883,8 @@ static int MPIDI_CH3I_Send_immed_rmw_msg(MPIDI_RMA_ops *rma_op,
MPIDI_Pkt_init(cas_pkt, MPIDI_CH3_PKT_CAS);
}
cas_pkt->addr = (char *) win_ptr->base_addrs[rma_op->target_rank] + win_ptr->disp_units[rma_op->target_rank] * rma_op->target_disp;
cas_pkt->addr = (char *) win_ptr->base_addrs[rma_op->target_rank] +
win_ptr->disp_units[rma_op->target_rank] * rma_op->target_disp;
cas_pkt->datatype = rma_op->target_datatype;
cas_pkt->target_win_handle = target_win_handle;
cas_pkt->request_handle = resp_req->handle;
......@@ -890,12 +892,15 @@ static int MPIDI_CH3I_Send_immed_rmw_msg(MPIDI_RMA_ops *rma_op,
MPIU_Memcpy( (void *) &cas_pkt->origin_data, rma_op->origin_addr, len );
MPIU_Memcpy( (void *) &cas_pkt->compare_data, rma_op->compare_addr, len );
comm_ptr = win_ptr->comm_ptr;
MPIDI_Comm_get_vc_set_active(comm_ptr, rma_op->target_rank, &vc);
MPIU_THREAD_CS_ENTER(CH3COMM,vc);
mpi_errno = MPIDI_CH3_iStartMsg(vc, cas_pkt, sizeof(*cas_pkt), &req);
mpi_errno = MPIDI_CH3_iStartMsg(vc, cas_pkt, sizeof(*cas_pkt), &rmw_req);
MPIU_THREAD_CS_EXIT(CH3COMM,vc);
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
if (rmw_req != NULL) {
MPID_Request_release(rmw_req);
}
}
else if (rma_op->type == MPIDI_RMA_FETCH_AND_OP) {
......@@ -904,32 +909,53 @@ static int MPIDI_CH3I_Send_immed_rmw_msg(MPIDI_RMA_ops *rma_op,
MPIU_Assert(len <= sizeof(MPIDI_CH3_FOP_Immed_u));
/* If this is the last operation, it also unlocks the window
at the target. */
if (source_win_handle != MPI_WIN_NULL) {
MPIDI_Pkt_init(fop_pkt, MPIDI_CH3_PKT_FOP_UNLOCK);
} else {
MPIDI_Pkt_init(fop_pkt, MPIDI_CH3_PKT_FOP);
}
MPIDI_Pkt_init(fop_pkt, MPIDI_CH3_PKT_FOP);
fop_pkt->addr = (char *) win_ptr->base_addrs[rma_op->target_rank] + win_ptr->disp_units[rma_op->target_rank] * rma_op->target_disp;
fop_pkt->addr = (char *) win_ptr->base_addrs[rma_op->target_rank] +
win_ptr->disp_units[rma_op->target_rank] * rma_op->target_disp;
fop_pkt->datatype = rma_op->target_datatype;
fop_pkt->target_win_handle = target_win_handle;
fop_pkt->source_win_handle = source_win_handle;
fop_pkt->request_handle = resp_req->handle;
fop_pkt->op = rma_op->op;
if (rma_op->op != MPIX_NO_OP) {
MPIU_Memcpy( (void *) &fop_pkt->origin_data, rma_op->origin_addr, len );
if (len <= sizeof(fop_pkt->origin_data) || rma_op->op == MPIX_NO_OP) {
/* Embed FOP data in the packet header */
if (rma_op->op != MPIX_NO_OP) {
MPIU_Memcpy( fop_pkt->origin_data, rma_op->origin_addr, len );
}
MPIDI_Comm_get_vc_set_active(comm_ptr, rma_op->target_rank, &vc);
MPIU_THREAD_CS_ENTER(CH3COMM,vc);
mpi_errno = MPIDI_CH3_iStartMsg(vc, fop_pkt, sizeof(*fop_pkt), &rmw_req);
MPIU_THREAD_CS_EXIT(CH3COMM,vc);
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
if (rmw_req != NULL) {
MPID_Request_release(rmw_req);
}
}
else {
/* Data is too big to copy into the FOP header, use an IOV to send it */
MPID_IOV iov[MPID_IOV_LIMIT];
comm_ptr = win_ptr->comm_ptr;
MPIDI_Comm_get_vc_set_active(comm_ptr, rma_op->target_rank, &vc);
MPIU_THREAD_CS_ENTER(CH3COMM,vc);
mpi_errno = MPIDI_CH3_iStartMsg(vc, fop_pkt, sizeof(*fop_pkt), &req);
MPIU_THREAD_CS_EXIT(CH3COMM,vc);
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
}
rmw_req = MPID_Request_create();
MPIU_ERR_CHKANDJUMP(rmw_req == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
MPIU_Object_set_ref(rmw_req, 1);
iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)fop_pkt;
iov[0].MPID_IOV_LEN = sizeof(*fop_pkt);
iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)rma_op->origin_addr;
iov[1].MPID_IOV_LEN = len; /* count == 1 */
MPIDI_Comm_get_vc_set_active(comm_ptr, rma_op->target_rank, &vc);
MPIU_THREAD_CS_ENTER(CH3COMM,vc);
mpi_errno = MPIDI_CH3_iSendv(vc, rmw_req, iov, 2);
MPIU_THREAD_CS_EXIT(CH3COMM,vc);
MPIU_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
}
}
else {
MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
}
......@@ -939,12 +965,15 @@ fn_exit:
return mpi_errno;
/* --BEGIN ERROR HANDLING-- */
fn_fail:
if (*request)
{
if (*request) {
MPIU_Object_set_ref(*request, 0);
MPIDI_CH3_Request_destroy(*request);
}
*request = NULL;
if (rmw_req) {
MPIU_Object_set_ref(rmw_req, 0);
MPIDI_CH3_Request_destroy(rmw_req);
}
goto fn_exit;
/* --END ERROR HANDLING-- */
}
......@@ -3288,13 +3317,10 @@ int MPIDI_CH3_PktHandler_FOP( MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *pkt,
MPIDI_msg_sz_t *buflen, MPID_Request **rreqp )
{
int mpi_errno = MPI_SUCCESS;
MPIDI_CH3_Pkt_t upkt;
MPIDI_CH3_Pkt_fop_resp_t *fop_resp_pkt = &upkt.fop_resp;
MPIDI_CH3_Pkt_fop_t *fop_pkt = &pkt->fop;
MPID_Win *win_ptr;
MPID_Request *req;
MPI_User_function *uop;
int len, one;
int len, data_complete = 0;
MPIU_CHKPMEM_DECL(1);
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_FOP);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_FOP);
......@@ -3302,73 +3328,71 @@ int MPIDI_CH3_PktHandler_FOP( MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *pkt,
MPIU_DBG_MSG(CH3_OTHER,VERBOSE,"received FOP pkt");
MPIU_INSTR_DURATION_START(rmapkt_fop);
/* return the number of bytes processed in this function */
/* data_len == 0 (all within packet) */
*buflen = sizeof(MPIDI_CH3_Pkt_t);
*rreqp = NULL;
req = MPID_Request_create();
MPIU_ERR_CHKANDJUMP(req == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
MPIU_Object_set_ref(req, 2);
*rreqp = NULL;
MPIDI_Pkt_init(fop_resp_pkt, MPIDI_CH3_PKT_FOP_RESP);
fop_resp_pkt->request_handle = fop_pkt->request_handle;
req->dev.user_buf = NULL; /* will be set later */
req->dev.user_count = 1;
req->dev.datatype = fop_pkt->datatype;
req->dev.op = fop_pkt->op;
req->dev.real_user_buf = fop_pkt->addr;
req->dev.target_win_handle = fop_pkt->target_win_handle;
req->dev.source_win_handle = fop_pkt->source_win_handle;
req->dev.request_handle = fop_pkt->request_handle;
/* Copy old value into the response packet */
MPID_Datatype_get_size_macro(fop_pkt->datatype, len);
MPID_Datatype_get_size_macro(req->dev.datatype, len);
MPIU_Assert(len <= sizeof(MPIDI_CH3_FOP_Immed_u));
MPIU_Memcpy( (void *)&fop_resp_pkt->data, fop_pkt->addr, len );
/* Send the response packet */
MPIU_THREAD_CS_ENTER(CH3COMM,vc);
mpi_errno = MPIDI_CH3_iStartMsg(vc, fop_resp_pkt, sizeof(*fop_resp_pkt), &req);
MPIU_THREAD_CS_EXIT(CH3COMM,vc);
MPIU_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
if (req != NULL) {
MPID_Request_release(req);
/* Set up the user buffer and receive data if needed */
if (len <= sizeof(fop_pkt->origin_data)) {
req->dev.user_buf = fop_pkt->origin_data;
*buflen = sizeof(MPIDI_CH3_Pkt_t);
data_complete = 1;
}
else {
/* Data won't fit in the header, allocate temp space and receive it */
MPIDI_msg_sz_t data_len;
void *data_buf;
/* Apply the op */
uop = MPIR_OP_HDL_TO_FN(fop_pkt->op);
one = 1;
data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t);
data_buf = (char *)pkt + sizeof(MPIDI_CH3_Pkt_t);
req->dev.recv_data_sz = len; /* count == 1 for FOP */
(*uop)((void *) &fop_pkt->origin_data, fop_pkt->addr, &one, &fop_pkt->datatype);
MPIU_CHKPMEM_MALLOC(req->dev.user_buf, void *, len, mpi_errno, "**nomemreq");
/* There are additional steps to take if this is a passive
target RMA or the last operation from the source */
mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &data_complete);
MPIU_ERR_CHKANDJUMP1(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
"**ch3|postrecv %s", "MPIDI_CH3_PKT_ACCUMULATE");
MPID_Win_get_ptr(fop_pkt->target_win_handle, win_ptr);
req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_FOPComplete;
if (! data_complete) {
*rreqp = req;
}
/* if passive target RMA, increment counter */
if (win_ptr->current_lock_type != MPID_LOCK_NONE)
win_ptr->my_pt_rma_puts_accs++;
/* return the number of bytes processed in this function */
*buflen = data_len + sizeof(MPIDI_CH3_Pkt_t);
}
if (fop_pkt->type == MPIDI_CH3_PKT_FOP_UNLOCK) {
/* Last RMA operation from source. If active
target RMA, decrement window counter. If
passive target RMA, release lock on window and
grant next lock in the lock queue if there is
any. If it's a shared lock or a lock-put-unlock
type of optimization, we also need to send an
ack to the source. */
if (win_ptr->current_lock_type == MPID_LOCK_NONE) {
/* FIXME: MT: this has to be done atomically */
win_ptr->my_counter -= 1;
MPIDI_CH3_Progress_signal_completion();
}
else {
mpi_errno = MPIDI_CH3I_Release_lock(win_ptr);
/* Without the following signal_completion call, we
sometimes hang */
MPIDI_CH3_Progress_signal_completion();
}
if (data_complete) {
int fop_complete = 0;
mpi_errno = MPIDI_CH3_ReqHandler_FOPComplete(vc, req, &fop_complete);
if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }
*rreqp = NULL;
}
fn_exit:
fn_exit:
MPIU_CHKPMEM_COMMIT();
MPIU_INSTR_DURATION_END(rmapkt_fop);
MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PKTHANDLER_FOP);
return mpi_errno;
fn_fail:
/* --BEGIN ERROR HANDLING-- */
fn_fail:
MPIU_CHKPMEM_REAP();
goto fn_exit;
/* --END ERROR HANDLING-- */
}
......@@ -3383,7 +3407,7 @@ int MPIDI_CH3_PktHandler_FOPResp( MPIDI_VC_t *vc ATTRIBUTE((unused)),
int mpi_errno = MPI_SUCCESS;
MPIDI_CH3_Pkt_fop_resp_t *fop_resp_pkt = &pkt->fop_resp;
MPID_Request *req;
int len;
int len, complete = 0;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_FOPRESP);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_FOPRESP);
......@@ -3393,17 +3417,43 @@ int MPIDI_CH3_PktHandler_FOPResp( MPIDI_VC_t *vc ATTRIBUTE((unused)),
MPID_Request_get_ptr(fop_resp_pkt->request_handle, req);
MPID_Datatype_get_size_macro(req->dev.datatype, len);
MPIU_Memcpy( req->dev.user_buf, (void *)&fop_resp_pkt->data, len );
if (len <= sizeof(fop_resp_pkt->data)) {
MPIU_Memcpy( req->dev.user_buf, (void *)fop_resp_pkt->data, len );
*buflen = sizeof(MPIDI_CH3_Pkt_t);
complete = 1;
}
else {
/* Data was too big to embed in the header */
MPIDI_msg_sz_t data_len;
void *data_buf;
data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t);
data_buf = (char *)pkt + sizeof(MPIDI_CH3_Pkt_t);
req->dev.recv_data_sz = len; /* count == 1 for FOP */
*rreqp = req;
mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf,
&data_len, &complete);
MPIU_ERR_CHKANDJUMP1(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER,
"**ch3|postrecv", "**ch3|postrecv %s",
"MPIDI_CH3_PKT_GET_RESP");
MPIDI_CH3U_Request_complete( req );
*buflen = sizeof(MPIDI_CH3_Pkt_t);
*rreqp = NULL;
/* return the number of bytes processed in this function */
*buflen = data_len + sizeof(MPIDI_CH3_Pkt_t);
}
if (complete) {
MPIDI_CH3U_Request_complete( req );
*rreqp = NULL;
}
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PKTHANDLER_FOPRESP);
return mpi_errno;
/* --BEGIN ERROR HANDLING-- */
fn_fail:
goto fn_exit;
/* --END ERROR HANDLING-- */
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment