Commit 355e6828 authored by James Dinan's avatar James Dinan
Browse files

[svn-r10365] MPI-3 RMA Get accumulate implementation

Reviewer: goodell
parent db211602
......@@ -1791,6 +1791,8 @@ int MPIDI_CH3_PktHandler_FOP( MPIDI_VC_t *, MPIDI_CH3_Pkt_t *,
MPIDI_msg_sz_t *, MPID_Request ** );
int MPIDI_CH3_PktHandler_FOPResp( MPIDI_VC_t *, MPIDI_CH3_Pkt_t *,
MPIDI_msg_sz_t *, MPID_Request ** );
int MPIDI_CH3_PktHandler_Get_AccumResp( MPIDI_VC_t *, MPIDI_CH3_Pkt_t *,
MPIDI_msg_sz_t *, MPID_Request ** );
int MPIDI_CH3_PktHandler_Get( MPIDI_VC_t *, MPIDI_CH3_Pkt_t *,
MPIDI_msg_sz_t *, MPID_Request ** );
int MPIDI_CH3_PktHandler_GetResp( MPIDI_VC_t *, MPIDI_CH3_Pkt_t *,
......@@ -1891,6 +1893,9 @@ int MPIDI_CH3_ReqHandler_PutAccumRespComplete( MPIDI_VC_t *, MPID_Request *,
int MPIDI_CH3_ReqHandler_AccumRespDerivedDTComplete( MPIDI_VC_t *,
MPID_Request *,
int * );
int MPIDI_CH3_ReqHandler_GetAccumRespComplete( MPIDI_VC_t *vc,
MPID_Request *rreq,
int *complete );
int MPIDI_CH3_ReqHandler_SinglePutAccumComplete( MPIDI_VC_t *, MPID_Request *,
int * );
int MPIDI_CH3_ReqHandler_GetRespDerivedDTComplete( MPIDI_VC_t *,
......
......@@ -87,6 +87,8 @@ typedef enum MPIDI_CH3_Pkt_type
MPIDI_CH3_PKT_CAS_RESP,
MPIDI_CH3_PKT_FOP,
MPIDI_CH3_PKT_FOP_RESP,
MPIDI_CH3_PKT_GET_ACCUM,
MPIDI_CH3_PKT_GET_ACCUM_RESP,
MPIDI_CH3_PKT_FLOW_CNTL_UPDATE, /* FIXME: Unused */
MPIDI_CH3_PKT_CLOSE,
MPIDI_CH3_PKT_END_CH3
......@@ -221,6 +223,7 @@ MPIDI_CH3_Pkt_get_resp_t;
typedef struct MPIDI_CH3_Pkt_accum
{
MPIDI_CH3_Pkt_type_t type;
MPI_Request request_handle; /* For get_accumulate response */
void *addr;
int count;
MPI_Datatype datatype;
......@@ -236,6 +239,13 @@ typedef struct MPIDI_CH3_Pkt_accum
}
MPIDI_CH3_Pkt_accum_t;
typedef struct MPIDI_CH3_Pkt_get_accum_resp
{
MPIDI_CH3_Pkt_type_t type;
MPI_Request request_handle;
}
MPIDI_CH3_Pkt_get_accum_resp_t;
typedef struct MPIDI_CH3_Pkt_accum_immed
{
MPIDI_CH3_Pkt_type_t type;
......@@ -404,6 +414,7 @@ typedef union MPIDI_CH3_Pkt
MPIDI_CH3_Pkt_cas_resp_t cas_resp;
MPIDI_CH3_Pkt_fop_t fop;
MPIDI_CH3_Pkt_fop_resp_t fop_resp;
MPIDI_CH3_Pkt_get_accum_resp_t get_accum_resp;
# if defined(MPIDI_CH3_PKT_DECL)
MPIDI_CH3_PKT_DECL
# endif
......
......@@ -297,6 +297,7 @@ typedef struct MPIDI_Request {
MPI_Win source_win_handle;
int single_op_opt; /* to indicate a lock-put-unlock optimization case */
struct MPIDI_Win_lock_queue *lock_queue_entry; /* for single lock-put-unlock optimization */
MPI_Request resp_request_handle; /* Handle for get_accumulate response */
MPIDI_REQUEST_SEQNUM
......
......@@ -598,6 +598,10 @@ int MPIDI_CH3_PktHandler_Init( MPIDI_CH3_PktHandler_Fcn *pktArray[],
MPIDI_CH3_PktHandler_FOP;
pktArray[MPIDI_CH3_PKT_FOP_RESP] =
MPIDI_CH3_PktHandler_FOPResp;
pktArray[MPIDI_CH3_PKT_GET_ACCUM] =
MPIDI_CH3_PktHandler_Accumulate;
pktArray[MPIDI_CH3_PKT_GET_ACCUM_RESP] =
MPIDI_CH3_PktHandler_Get_AccumResp;
/* End of default RMA operations */
fn_fail:
......
......@@ -83,10 +83,65 @@ int MPIDI_CH3_ReqHandler_PutAccumRespComplete( MPIDI_VC_t *vc,
{
int mpi_errno = MPI_SUCCESS;
MPID_Win *win_ptr;
MPIU_CHKPMEM_DECL(1);
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_PUTACCUMRESPCOMPLETE);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_PUTACCUMRESPCOMPLETE);
/* Perform get in get-accumulate */
if (rreq->dev.resp_request_handle != MPI_REQUEST_NULL) {
int predefined, type_size;
MPIDI_CH3_Pkt_t upkt;
MPIDI_CH3_Pkt_get_accum_resp_t *get_accum_resp_pkt = &upkt.get_accum_resp;
MPID_Request *resp_req;
MPID_IOV iov[MPID_IOV_LIMIT];
MPIDI_Pkt_init(get_accum_resp_pkt, MPIDI_CH3_PKT_GET_ACCUM_RESP);
get_accum_resp_pkt->request_handle = rreq->dev.resp_request_handle;
MPID_Datatype_get_size_macro(rreq->dev.datatype, type_size);
MPIDI_CH3I_DATATYPE_IS_PREDEFINED(rreq->dev.datatype, predefined);
/* Copy data into a temporary buffer */
resp_req = MPID_Request_create();
MPIU_ERR_CHKANDJUMP(resp_req == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
MPIU_Object_set_ref(resp_req, 1);
MPIU_CHKPMEM_MALLOC(resp_req->dev.user_buf, void *, rreq->dev.user_count * type_size,
mpi_errno, "GACC resp. buffer");
if (predefined) {
MPIU_Memcpy(resp_req->dev.user_buf, rreq->dev.real_user_buf,
rreq->dev.user_count * type_size);
} else {
MPID_Segment *seg = MPID_Segment_alloc();
MPI_Aint last = type_size * rreq->dev.user_count;
MPIU_ERR_CHKANDJUMP1(seg == NULL, mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s", "MPID_Segment");
MPID_Segment_init(rreq->dev.real_user_buf, rreq->dev.user_count, rreq->dev.datatype, seg, 0);
MPID_Segment_pack(seg, 0, &last, resp_req->dev.user_buf);
MPID_Segment_free(seg);
}
resp_req->dev.OnFinal = MPIDI_CH3_ReqHandler_GetAccumRespComplete;
resp_req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_GetAccumRespComplete;
iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) get_accum_resp_pkt;
iov[0].MPID_IOV_LEN = sizeof(*get_accum_resp_pkt);
iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)resp_req->dev.user_buf;
iov[1].MPID_IOV_LEN = type_size*rreq->dev.user_count;
MPIU_THREAD_CS_ENTER(CH3COMM,vc);
mpi_errno = MPIDI_CH3_iSendv(vc, resp_req, iov, 2);
MPIU_THREAD_CS_EXIT(CH3COMM,vc);
MPIU_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
/* Mark get portion as handled */
rreq->dev.resp_request_handle = MPI_REQUEST_NULL;
}
if (MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_ACCUM_RESP) {
/* accumulate data from tmp_buf into user_buf */
mpi_errno = do_accumulate_op(rreq);
......@@ -130,9 +185,16 @@ int MPIDI_CH3_ReqHandler_PutAccumRespComplete( MPIDI_VC_t *vc,
/* mark data transfer as complete and decrement CC */
MPIDI_CH3U_Request_complete(rreq);
*complete = TRUE;
fn_fail:
fn_exit:
MPIU_CHKPMEM_COMMIT();
MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_REQHANDLER_PUTACCUMRESPCOMPLETE);
return MPI_SUCCESS;
/* --BEGIN ERROR HANDLING-- */
fn_fail:
MPIU_CHKPMEM_REAP();
goto fn_exit;
/* --END ERROR HANDLING-- */
}
#undef FUNCNAME
......@@ -256,6 +318,32 @@ int MPIDI_CH3_ReqHandler_AccumRespDerivedDTComplete( MPIDI_VC_t *vc ATTRIBUTE((u
return mpi_errno;
}
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3_ReqHandler_GetAccumRespComplete
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3_ReqHandler_GetAccumRespComplete( MPIDI_VC_t *vc,
MPID_Request *rreq,
int *complete )
{
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_GETACCUMRESPCOMPLETE);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_GETACCUMRESPCOMPLETE);
MPIU_Free(rreq->dev.user_buf);
MPIDI_CH3U_Request_complete(rreq);
*complete = TRUE;
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_REQHANDLER_GETACCUMRESPCOMPLETE);
return mpi_errno;
fn_fail:
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3_ReqHandler_GetRespDerivedDTComplete
#undef FCNAME
......
......@@ -87,6 +87,7 @@ MPID_Request * MPID_Request_create(void)
req->dev.dtype_info = NULL;
req->dev.dataloop = NULL;
req->dev.iov_offset = 0;
req->dev.resp_request_handle = MPI_REQUEST_NULL;
#ifdef MPIDI_CH3_REQUEST_INIT
MPIDI_CH3_REQUEST_INIT(req);
#endif
......
......@@ -22,13 +22,15 @@ int MPIDI_Get_accumulate(const void *origin_addr, int origin_count,
MPI_Datatype result_datatype, int target_rank, MPI_Aint target_disp,
int target_count, MPI_Datatype target_datatype, MPI_Op op, MPID_Win *win_ptr)
{
int mpi_errno = MPI_SUCCESS;
MPIDI_msg_sz_t data_sz;
int rank, origin_predefined, result_predefined, target_predefined;
int dt_contig ATTRIBUTE((unused));
MPI_Aint dt_true_lb ATTRIBUTE((unused));
MPID_Datatype *dtp;
int mpi_errno = MPI_SUCCESS;
MPIDI_msg_sz_t data_sz;
int rank, origin_predefined, result_predefined, target_predefined;
int dt_contig ATTRIBUTE((unused));
MPI_Aint dt_true_lb ATTRIBUTE((unused));
MPID_Datatype *dtp;
MPIDI_RMA_ops *new_ptr;
MPIU_CHKLMEM_DECL(2);
MPIU_CHKPMEM_DECL(1);
MPIDI_STATE_DECL(MPID_STATE_MPIDI_GET_ACCUMULATE);
MPIDI_RMA_FUNC_ENTER(MPID_STATE_MPIDI_GET_ACCUMULATE);
......@@ -152,17 +154,62 @@ int MPIDI_Get_accumulate(const void *origin_addr, int origin_count,
}
}
else {
/* TODO: Inter-process get_accumulate isn't implemented yet */
MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**notimpl");
/* Append the operation to the window's RMA ops queue */
MPIU_INSTR_DURATION_START(rmaqueue_alloc);
MPIU_CHKPMEM_MALLOC(new_ptr, MPIDI_RMA_ops *, sizeof(MPIDI_RMA_ops),
mpi_errno, "RMA operation entry");
MPIU_INSTR_DURATION_END(rmaqueue_alloc);
if (win_ptr->rma_ops_list_tail)
win_ptr->rma_ops_list_tail->next = new_ptr;
else
win_ptr->rma_ops_list_head = new_ptr;
win_ptr->rma_ops_list_tail = new_ptr;
/* TODO: Can we use the MPIDI_RMA_ACC_CONTIG optimization? */
MPIU_INSTR_DURATION_START(rmaqueue_set);
new_ptr->next = NULL;
new_ptr->type = MPIDI_RMA_GET_ACCUMULATE;
/* Cast away const'ness for origin_address as MPIDI_RMA_ops
* contain both PUT and GET like ops */
new_ptr->origin_addr = (void *) origin_addr;
new_ptr->origin_count = origin_count;
new_ptr->origin_datatype = origin_datatype;
new_ptr->result_addr = result_addr;
new_ptr->result_count = result_count;
new_ptr->result_datatype = result_datatype;
new_ptr->target_rank = target_rank;
new_ptr->target_disp = target_disp;
new_ptr->target_count = target_count;
new_ptr->target_datatype = target_datatype;
new_ptr->op = op;
MPIU_INSTR_DURATION_END(rmaqueue_set);
/* if source or target datatypes are derived, increment their
reference counts */
if (!origin_predefined) {
MPID_Datatype_get_ptr(origin_datatype, dtp);
MPID_Datatype_add_ref(dtp);
}
if (!result_predefined) {
MPID_Datatype_get_ptr(result_datatype, dtp);
MPID_Datatype_add_ref(dtp);
}
if (!target_predefined) {
MPID_Datatype_get_ptr(target_datatype, dtp);
MPID_Datatype_add_ref(dtp);
}
}
fn_exit:
MPIU_CHKPMEM_COMMIT();
MPIU_CHKLMEM_FREEALL();
MPIDI_RMA_FUNC_EXIT(MPID_STATE_MPIDI_GET_ACCUMULATE);
return mpi_errno;
/* --BEGIN ERROR HANDLING-- */
fn_fail:
MPIU_CHKPMEM_REAP();
goto fn_exit;
/* --END ERROR HANDLING-- */
}
......
......@@ -37,6 +37,7 @@ MPIU_INSTR_DURATION_DECL(rmapkt_acc_immed);
MPIU_INSTR_DURATION_DECL(rmapkt_acc_immed_op);
MPIU_INSTR_DURATION_DECL(rmapkt_cas);
MPIU_INSTR_DURATION_DECL(rmapkt_fop);
MPIU_INSTR_DURATION_DECL(rmapkt_get_accum);
MPIU_INSTR_DURATION_EXTERN_DECL(rmaqueue_alloc);
MPIU_INSTR_DURATION_EXTERN_DECL(rmaqueue_set);
void MPIDI_CH3_RMA_InitInstr(void);
......@@ -72,6 +73,7 @@ void MPIDI_CH3_RMA_InitInstr(void)
MPIU_INSTR_DURATION_INIT(rmapkt_acc_immed_op,0,"RMA:PKTHANDLER for Accum immed operation");
MPIU_INSTR_DURATION_INIT(rmapkt_cas,0,"RMA:PKTHANDLER for Compare-and-swap");
MPIU_INSTR_DURATION_INIT(rmapkt_fop,0,"RMA:PKTHANDLER for Fetch-and-op");
MPIU_INSTR_DURATION_INIT(rmapkt_get_accum,0,"RMA:PKTHANDLER for Get-Accumulate");
}
/* These are used to use a common routine to complete lists of RMA
......@@ -272,6 +274,7 @@ int MPIDI_Win_fence(int assert, MPID_Win *win_ptr)
{
case (MPIDI_RMA_PUT):
case (MPIDI_RMA_ACCUMULATE):
case (MPIDI_RMA_GET_ACCUMULATE):
mpi_errno = MPIDI_CH3I_Send_rma_msg(curr_ptr, win_ptr,
source_win_handle, target_win_handle,
&curr_ptr->dtype_info,
......@@ -496,6 +499,7 @@ static int MPIDI_CH3I_Send_rma_msg(MPIDI_RMA_ops *rma_op, MPID_Win *win_ptr,
MPIDI_VC_t * vc;
MPID_Comm *comm_ptr;
MPID_Datatype *target_dtp=NULL, *origin_dtp=NULL;
MPID_Request *resp_req=NULL;
MPIU_CHKPMEM_DECL(1);
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SEND_RMA_MSG);
MPIDI_STATE_DECL(MPID_STATE_MEMCPY);
......@@ -519,6 +523,46 @@ static int MPIDI_CH3I_Send_rma_msg(MPIDI_RMA_ops *rma_op, MPID_Win *win_ptr,
iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) put_pkt;
iov[0].MPID_IOV_LEN = sizeof(*put_pkt);
}
else if (rma_op->type == MPIDI_RMA_GET_ACCUMULATE)
{
/* Create a request for the GACC response. Store the response buf, count, and
datatype in it, and pass the request's handle in the GACC packet. When the
response comes from the target, it will contain the request handle. */
resp_req = MPID_Request_create();
MPIU_ERR_CHKANDJUMP(resp_req == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
MPIU_Object_set_ref(resp_req, 2);
resp_req->dev.user_buf = rma_op->result_addr;
resp_req->dev.user_count = rma_op->result_count;
resp_req->dev.datatype = rma_op->result_datatype;
resp_req->dev.target_win_handle = target_win_handle;
resp_req->dev.source_win_handle = source_win_handle;
MPIDI_CH3I_DATATYPE_IS_PREDEFINED(resp_req->dev.datatype, predefined);
if (!predefined) {
MPID_Datatype *result_dtp = NULL;
MPID_Datatype_get_ptr(resp_req->dev.datatype, result_dtp);
resp_req->dev.datatype_ptr = result_dtp;
/* this will cause the datatype to be freed when the
request is freed. */
}
/* Note: Get_accumulate uses the same packet type as accumulate */
MPIDI_Pkt_init(accum_pkt, MPIDI_CH3_PKT_GET_ACCUM);
accum_pkt->addr = (char *) win_ptr->base_addrs[rma_op->target_rank] +
win_ptr->disp_units[rma_op->target_rank] * rma_op->target_disp;
accum_pkt->count = rma_op->target_count;
accum_pkt->datatype = rma_op->target_datatype;
accum_pkt->dataloop_size = 0;
accum_pkt->op = rma_op->op;
accum_pkt->target_win_handle = target_win_handle;
accum_pkt->source_win_handle = source_win_handle;
accum_pkt->request_handle = resp_req->handle;
iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) accum_pkt;
iov[0].MPID_IOV_LEN = sizeof(*accum_pkt);
}
else
{
MPIDI_Pkt_init(accum_pkt, MPIDI_CH3_PKT_ACCUMULATE);
......@@ -691,13 +735,48 @@ static int MPIDI_CH3I_Send_rma_msg(MPIDI_RMA_ops *rma_op, MPID_Win *win_ptr,
if (origin_dt_derived)
MPID_Datatype_release(origin_dtp);
MPID_Datatype_release(target_dtp);
}
}
/* This operation can generate two requests; one for inbound and one for
outbound data. */
if (resp_req != NULL) {
if (*request != NULL) {
/* If we have both inbound and outbound requests (i.e. GACC
operation), we need to ensure that the source buffer is
available and that the response data has been received before
informing the origin that this operation is complete. Because
the update needs to be done atomically at the target, they will
not send back data until it has been received. Therefore,
completion of the response request implies that the send request
has completed.
Therefore: refs on the response request are set to two: one is
held by the progress engine and the other by the RMA op
completion code. Refs on the outbound request are set to one;
it will be completed by the progress engine.
*/
MPIU_Object_set_ref(*request, 1);
*request = resp_req;
} else {
*request = resp_req;
}
/* For error checking */
resp_req = NULL;
}
fn_exit:
MPIU_CHKPMEM_COMMIT();
MPIDI_RMA_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SEND_RMA_MSG);
return mpi_errno;
/* --BEGIN ERROR HANDLING-- */
fn_fail:
if (resp_req) {
MPIU_Object_set_ref(resp_req, 0);
MPIDI_CH3_Request_destroy(resp_req);
}
if (*request)
{
MPIU_CHKPMEM_REAP();
......@@ -1479,6 +1558,7 @@ int MPIDI_Win_complete(MPID_Win *win_ptr)
{
case (MPIDI_RMA_PUT):
case (MPIDI_RMA_ACCUMULATE):
case (MPIDI_RMA_GET_ACCUMULATE):
mpi_errno = MPIDI_CH3I_Send_rma_msg(curr_ptr, win_ptr,
source_win_handle, target_win_handle,
&curr_ptr->dtype_info,
......@@ -1842,7 +1922,8 @@ int MPIDI_Win_unlock(int dest, MPID_Win *win_ptr)
/* TODO: MPI-3: Add lock->cas->unlock optimization */
if ( rma_op->next->next == NULL &&
rma_op->next->type != MPIDI_RMA_COMPARE_AND_SWAP &&
rma_op->next->type != MPIDI_RMA_FETCH_AND_OP ) {
rma_op->next->type != MPIDI_RMA_FETCH_AND_OP &&
rma_op->next->type != MPIDI_RMA_GET_ACCUMULATE ) {
/* Single put, get, or accumulate between the lock and unlock. If it
* is of small size and predefined datatype at the target, we
* do an optimization where the lock and the RMA operation are
......@@ -2087,7 +2168,8 @@ static int MPIDI_CH3I_Do_passive_target_rma(MPID_Win *win_ptr,
if (win_ptr->rma_ops_list_tail->type == MPIDI_RMA_GET ||
win_ptr->rma_ops_list_tail->type == MPIDI_RMA_COMPARE_AND_SWAP ||
win_ptr->rma_ops_list_tail->type == MPIDI_RMA_FETCH_AND_OP) {
win_ptr->rma_ops_list_tail->type == MPIDI_RMA_FETCH_AND_OP ||
win_ptr->rma_ops_list_tail->type == MPIDI_RMA_GET_ACCUMULATE) {
/* last operation sends a response message. no need to wait
for an additional rma done pkt */
*wait_for_rma_done_pkt = 0;
......@@ -2164,6 +2246,7 @@ static int MPIDI_CH3I_Do_passive_target_rma(MPID_Win *win_ptr,
{
case (MPIDI_RMA_PUT): /* same as accumulate */
case (MPIDI_RMA_ACCUMULATE):
case (MPIDI_RMA_GET_ACCUMULATE):
win_ptr->pt_rma_puts_accs[curr_ptr->target_rank]++;
mpi_errno = MPIDI_CH3I_Send_rma_msg(curr_ptr, win_ptr,
source_win_handle, target_win_handle,
......@@ -2953,6 +3036,12 @@ int MPIDI_CH3_PktHandler_Accumulate( MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *pkt,
req->dev.target_win_handle = accum_pkt->target_win_handle;
req->dev.source_win_handle = accum_pkt->source_win_handle;
if (accum_pkt->type == MPIDI_CH3_PKT_GET_ACCUM) {
req->dev.resp_request_handle = accum_pkt->request_handle;
} else {
req->dev.resp_request_handle = MPI_REQUEST_NULL;
}
MPIDI_CH3I_DATATYPE_IS_PREDEFINED(accum_pkt->datatype, predefined);
if (predefined)
{
......@@ -2969,6 +3058,8 @@ int MPIDI_CH3_PktHandler_Accumulate( MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *pkt,
accum_pkt->count * MPIR_MAX(extent,true_extent));
}
/* FIXME: This seems unnecessary - predefined datatypes should always
* have true_lb of 0, right? */
/* adjust for potential negative lower bound in datatype */
tmp_buf = (void *)((char*)tmp_buf - true_lb);
......@@ -3457,6 +3548,63 @@ int MPIDI_CH3_PktHandler_FOPResp( MPIDI_VC_t *vc ATTRIBUTE((unused)),
}
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3_PktHandler_Get_AccumResp
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3_PktHandler_Get_AccumResp( MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *pkt,
MPIDI_msg_sz_t *buflen, MPID_Request **rreqp )
{
MPIDI_CH3_Pkt_get_accum_resp_t *get_accum_resp_pkt = &pkt->get_accum_resp;
MPID_Request *req;
int complete;
char *data_buf = NULL;
MPIDI_msg_sz_t data_len;
int mpi_errno = MPI_SUCCESS;
int type_size;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_GET_ACCUM_RESP);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_GET_ACCUM_RESP);
MPIU_DBG_MSG(CH3_OTHER,VERBOSE,"received Get-Accumulate response pkt");
MPIU_INSTR_DURATION_START(rmapkt_get_accum);
data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t);
data_buf = (char *)pkt + sizeof(MPIDI_CH3_Pkt_t);
MPID_Request_get_ptr(get_accum_resp_pkt->request_handle, req);
MPID_Datatype_get_size_macro(req->dev.datatype, type_size);
req->dev.recv_data_sz = type_size * req->dev.user_count;
/* FIXME: It is likely that this cannot happen (never perform
a get with a 0-sized item). In that case, change this
to an MPIU_Assert (and do the same for accumulate and put) */
if (req->dev.recv_data_sz == 0) {
MPIDI_CH3U_Request_complete( req );
*buflen = sizeof(MPIDI_CH3_Pkt_t);
*rreqp = NULL;
}
else {
*rreqp = req;
mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete);
MPIU_ERR_CHKANDJUMP1(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
"**ch3|postrecv %s", "MPIDI_CH3_PKT_GET_ACCUM_RESP");
if (complete) {
MPIDI_CH3U_Request_complete(req);
*rreqp = NULL;
}
/* return the number of bytes processed in this function */
*buflen = data_len + sizeof(MPIDI_CH3_Pkt_t);
}
fn_exit:
MPIU_INSTR_DURATION_END(rmapkt_get_accum);
MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PKTHANDLER_GET_ACCUM_RESP);
return mpi_errno;
fn_fail:
goto fn_exit;
}
#undef FUNCNAME
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment