Commit 994a9644 authored by James Dinan's avatar James Dinan
Browse files

[svn-r10175] Implementation of MPI-3 RMA Compare-and-swap.

This patch adds support for the new MPI-3 compare-and-swap routine.  The
implementation embeds the data into the packet header to reduce the number of
messages.

Reviewer: buntinas
parent bb4ecbbf
......@@ -3843,6 +3843,7 @@ extern MPIR_Op_check_dtype_fn *MPIR_Op_check_dtype_table[];
#endif /* MPIR_MAX */
int MPIR_Type_is_rma_atomic(MPI_Datatype type);
int MPIR_Compare_equal(const void *a, const void *b, MPI_Datatype type);
int MPIR_Allgather_impl(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
void *recvbuf, int recvcount, MPI_Datatype recvtype,
......
......@@ -28,12 +28,10 @@ int MPIR_Type_is_rma_atomic(MPI_Datatype type)
MPIR_OP_TYPE_GROUP(C_INTEGER)
MPIR_OP_TYPE_GROUP(FORTRAN_INTEGER)
MPIR_OP_TYPE_GROUP(LOGICAL)
case MPI_BYTE:
/* extra types that are not required to be supported by the MPI
MPIR_OP_TYPE_GROUP(BYTE)
MPIR_OP_TYPE_GROUP(C_INTEGER_EXTRA)
MPIR_OP_TYPE_GROUP(FORTRAN_INTEGER_EXTRA)
MPIR_OP_TYPE_GROUP(LOGICAL_EXTRA)
*/
return TRUE;
break;
#undef MPIR_OP_TYPE_MACRO
......@@ -43,3 +41,37 @@ int MPIR_Type_is_rma_atomic(MPI_Datatype type)
}
}
/* Returns true if (a == b) when interepreted using the given datatype.
* Currently, this is only defined for RMA atomic types.
*/
#undef FUNCNAME
#define FUNCNAME MPIR_COMPARE_EQUAL
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
int MPIR_Compare_equal(const void *a, const void *b, MPI_Datatype type)
{
switch (type) {
#undef MPIR_OP_TYPE_MACRO
#define MPIR_OP_TYPE_MACRO(mpi_type_, c_type_) \
case mpi_type_: \
if (*(c_type_ *)a == *(c_type_ *)b) \
return TRUE; \
break;
MPIR_OP_TYPE_GROUP(C_INTEGER)
MPIR_OP_TYPE_GROUP(FORTRAN_INTEGER)
MPIR_OP_TYPE_GROUP(LOGICAL)
MPIR_OP_TYPE_GROUP(BYTE)
MPIR_OP_TYPE_GROUP(C_INTEGER_EXTRA)
MPIR_OP_TYPE_GROUP(FORTRAN_INTEGER_EXTRA)
MPIR_OP_TYPE_GROUP(LOGICAL_EXTRA)
#undef MPIR_OP_TYPE_MACRO
default:
return FALSE;
break;
}
return FALSE;
}
......@@ -1794,6 +1794,10 @@ int MPIDI_CH3_PktHandler_Accumulate( MPIDI_VC_t *, MPIDI_CH3_Pkt_t *,
MPIDI_msg_sz_t *, MPID_Request ** );
int MPIDI_CH3_PktHandler_Accumulate_Immed( MPIDI_VC_t *, MPIDI_CH3_Pkt_t *,
MPIDI_msg_sz_t *, MPID_Request ** );
int MPIDI_CH3_PktHandler_CAS( MPIDI_VC_t *, MPIDI_CH3_Pkt_t *,
MPIDI_msg_sz_t *, MPID_Request ** );
int MPIDI_CH3_PktHandler_CASResp( MPIDI_VC_t *, MPIDI_CH3_Pkt_t *,
MPIDI_msg_sz_t *, MPID_Request ** );
int MPIDI_CH3_PktHandler_Get( MPIDI_VC_t *, MPIDI_CH3_Pkt_t *,
MPIDI_msg_sz_t *, MPID_Request ** );
int MPIDI_CH3_PktHandler_GetResp( MPIDI_VC_t *, MPIDI_CH3_Pkt_t *,
......
......@@ -16,6 +16,69 @@
/* This is the number of ints that can be carried within an RMA packet */
#define MPIDI_RMA_IMMED_INTS 1
/* Union over all types that are allowed in a CAS operation. This
is used to allocate enough space in the packet header for immediate
data. */
typedef union {
char cas_char;
short cas_short;
int cas_int;
long cas_long;
MPI_Aint cas_aint;
MPI_Offset cas_offset;
#if defined(HAVE_INT8_T)
int8_t cas_int8_t;
#endif
#if defined(HAVE_INT16_T)
int16_t cas_int16_t;
#endif
#if defined(HAVE_INT32_T)
int32_t cas_int32_t;
#endif
#if defined(HAVE_INT64_T)
int64_t cas_int64_t;
#endif
#if defined(HAVE_UINT8_T)
uint8_t cas_uint8_t;
#endif
#if defined(HAVE_UINT16_T)
uint16_t cas_uint16_t;
#endif
#if defined(HAVE_UINT32_T)
uint32_t cas_uint32_t;
#endif
#if defined(HAVE_UINT64_T)
uint64_t cas_uint64_t;
#endif
#if defined(HAVE_LONG_LONG_INT)
long long cas_long_long;
#endif
#if defined(HAVE_FORTRAN_BINDING)
MPI_Fint cas_fint;
#endif
#if defined(HAVE__BOOL)
_Bool cas__bool;
#endif
#if defined(HAVE_CXX_BINDING)
MPIR_CXX_BOOL_CTYPE cas_cxx_bool;
#endif
#if defined(MPIR_INTEGER1_CTYPE)
MPIR_INTEGER1_CTYPE cas_integer1;
#endif
#if defined(MPIR_INTEGER2_CTYPE)
MPIR_INTEGER2_CTYPE cas_integer2;
#endif
#if defined(MPIR_INTEGER4_CTYPE)
MPIR_INTEGER4_CTYPE cas_integer4;
#endif
#if defined(MPIR_INTEGER8_CTYPE)
MPIR_INTEGER8_CTYPE cas_integer8;
#endif
#if defined(MPIR_INTEGER16_CTYPE)
MPIR_INTEGER16_CTYPE cas_integer16;
#endif
} MPIDI_CH3_CAS_Immed_u;
/*
* MPIDI_CH3_Pkt_type_t
*
......@@ -50,6 +113,8 @@ typedef enum MPIDI_CH3_Pkt_type
/* RMA Packets end here */
MPIDI_CH3_PKT_ACCUM_IMMED, /* optimization for short accumulate */
/* FIXME: Add PUT, GET_IMMED packet types */
MPIDI_CH3_PKT_CAS,
MPIDI_CH3_PKT_CAS_RESP,
MPIDI_CH3_PKT_FLOW_CNTL_UPDATE, /* FIXME: Unused */
MPIDI_CH3_PKT_CLOSE,
MPIDI_CH3_PKT_END_CH3
......@@ -219,6 +284,33 @@ typedef struct MPIDI_CH3_Pkt_accum_immed
}
MPIDI_CH3_Pkt_accum_immed_t;
typedef struct MPIDI_CH3_Pkt_cas
{
MPIDI_CH3_Pkt_type_t type;
void *addr;
MPI_Datatype datatype;
MPI_Request request_handle;
/* FIXME: do we need both handles? */
MPI_Win target_win_handle; /* Used in the last RMA operation in each
* epoch for decrementing rma op counter in
* active target rma and for unlocking window
* in passive target rma. Otherwise set to NULL*/
MPI_Win source_win_handle; /* Used in the last RMA operation in an
* epoch in the case of passive target rma
* with shared locks. Otherwise set to NULL*/
MPIDI_CH3_CAS_Immed_u origin_data;
MPIDI_CH3_CAS_Immed_u compare_data;
}
MPIDI_CH3_Pkt_cas_t;
typedef struct MPIDI_CH3_Pkt_cas_resp
{
MPIDI_CH3_Pkt_type_t type;
MPI_Request request_handle;
MPIDI_CH3_CAS_Immed_u data;
}
MPIDI_CH3_Pkt_cas_resp_t;
typedef struct MPIDI_CH3_Pkt_lock
{
MPIDI_CH3_Pkt_type_t type;
......@@ -310,6 +402,8 @@ typedef union MPIDI_CH3_Pkt
MPIDI_CH3_Pkt_lock_get_unlock_t lock_get_unlock;
MPIDI_CH3_Pkt_lock_accum_unlock_t lock_accum_unlock;
MPIDI_CH3_Pkt_close_t close;
MPIDI_CH3_Pkt_cas_t cas;
MPIDI_CH3_Pkt_cas_resp_t cas_resp;
# if defined(MPIDI_CH3_PKT_DECL)
MPIDI_CH3_PKT_DECL
# endif
......
......@@ -12,7 +12,8 @@ typedef enum MPIDI_RMA_Op_type_e {
MPIDI_RMA_ACCUMULATE = 25,
MPIDI_RMA_LOCK = 26,
MPIDI_RMA_ACC_CONTIG = 27,
MPIDI_RMA_GET_ACCUMULATE = 28
MPIDI_RMA_GET_ACCUMULATE = 28,
MPIDI_RMA_COMPARE_AND_SWAP = 29
} MPIDI_RMA_Op_type_t;
/* Special case RMA operations */
......@@ -70,6 +71,12 @@ typedef struct MPIDI_RMA_ops {
struct MPID_Request *request;
MPIDI_RMA_dtype_info dtype_info;
void *dataloop;
void *result_addr;
int result_count;
MPI_Datatype result_datatype;
void *compare_addr;
int compare_count;
MPI_Datatype compare_datatype;
} MPIDI_RMA_ops;
typedef struct MPIDI_PT_single_op {
......
......@@ -588,6 +588,10 @@ int MPIDI_CH3_PktHandler_Init( MPIDI_CH3_PktHandler_Fcn *pktArray[],
MPIDI_CH3_PktHandler_LockGetUnlock;
pktArray[MPIDI_CH3_PKT_ACCUM_IMMED] =
MPIDI_CH3_PktHandler_Accumulate_Immed;
pktArray[MPIDI_CH3_PKT_CAS] =
MPIDI_CH3_PktHandler_CAS;
pktArray[MPIDI_CH3_PKT_CAS_RESP] =
MPIDI_CH3_PktHandler_CASResp;
/* End of default RMA operations */
fn_fail:
......
......@@ -166,3 +166,90 @@ int MPIDI_Get_accumulate(const void *origin_addr, int origin_count,
goto fn_exit;
/* --END ERROR HANDLING-- */
}
#undef FUNCNAME
#define FUNCNAME MPIDI_Compare_and_swap
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_Compare_and_swap(const void *origin_addr, const void *compare_addr,
void *result_addr, MPI_Datatype datatype, int target_rank,
MPI_Aint target_disp, MPID_Win *win_ptr)
{
int mpi_errno = MPI_SUCCESS;
int rank;
MPIDI_RMA_ops *new_ptr;
MPIU_CHKPMEM_DECL(1);
MPIDI_STATE_DECL(MPID_STATE_MPIDI_COMPARE_AND_SWAP);
MPIDI_RMA_FUNC_ENTER(MPID_STATE_MPIDI_COMPARE_AND_SWAP);
if (target_rank == MPI_PROC_NULL) {
goto fn_exit;
}
rank = win_ptr->myrank;
/* The datatype must be predefined, and one of: C integer, Fortran integer,
* Logical, Multi-language types, or Byte. This is checked above the ADI,
* so there's no need to check it again here. */
if (target_rank == rank) {
void *dest_addr = (char *) win_ptr->base + target_disp;
int len;
MPID_Datatype_get_size_macro(datatype, len);
MPIU_Assert(len <= sizeof(MPIDI_CH3_CAS_Immed_u));
MPIU_Memcpy(result_addr, dest_addr, len);
if (MPIR_Compare_equal(compare_addr, dest_addr, datatype)) {
MPIU_Memcpy(dest_addr, origin_addr, len);
}
goto fn_exit;
}
else {
/* Append this operation to the RMA ops queue */
MPIU_INSTR_DURATION_START(rmaqueue_alloc);
MPIU_CHKPMEM_MALLOC(new_ptr, MPIDI_RMA_ops *, sizeof(MPIDI_RMA_ops),
mpi_errno, "RMA operation entry");
MPIU_INSTR_DURATION_END(rmaqueue_alloc);
if (win_ptr->rma_ops_list_tail)
win_ptr->rma_ops_list_tail->next = new_ptr;
else
win_ptr->rma_ops_list_head = new_ptr;
win_ptr->rma_ops_list_tail = new_ptr;
MPIU_INSTR_DURATION_START(rmaqueue_set);
new_ptr->next = NULL;
new_ptr->type = MPIDI_RMA_COMPARE_AND_SWAP;
new_ptr->origin_addr = (void *) origin_addr;
new_ptr->origin_count = 1;
new_ptr->origin_datatype = datatype;
new_ptr->target_rank = target_rank;
new_ptr->target_disp = target_disp;
new_ptr->target_count = 1;
new_ptr->target_datatype = datatype;
new_ptr->result_addr = result_addr;
new_ptr->result_count = 1;
new_ptr->result_datatype = datatype;
new_ptr->compare_addr = (void *) compare_addr;
new_ptr->compare_count = 1;
new_ptr->compare_datatype = datatype;
MPIU_INSTR_DURATION_END(rmaqueue_set);
}
fn_exit:
MPIU_CHKPMEM_COMMIT();
MPIDI_RMA_FUNC_EXIT(MPID_STATE_MPIDI_COMPARE_AND_SWAP);
return mpi_errno;
/* --BEGIN ERROR HANDLING-- */
fn_fail:
MPIU_CHKPMEM_REAP();
goto fn_exit;
/* --END ERROR HANDLING-- */
}
......@@ -40,18 +40,6 @@ int MPIDI_Fetch_and_op(const void *origin_addr, void *result_addr,
}
#undef FUNCNAME
#define FUNCNAME MPIDI_Compare_and_swap
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_Compare_and_swap(const void *origin_addr, const void *compare_addr,
void *result_addr, MPI_Datatype datatype, int target_rank,
MPI_Aint target_disp, MPID_Win *win)
{
MPIDI_FUNC_NOTIMPL(COMPARE_AND_SWAP)
}
#undef FUNCNAME
#define FUNCNAME MPIDI_Rput
#undef FCNAME
......
......@@ -35,6 +35,7 @@ MPIU_INSTR_DURATION_DECL(rmapkt_acc);
MPIU_INSTR_DURATION_DECL(rmapkt_acc_predef);
MPIU_INSTR_DURATION_DECL(rmapkt_acc_immed);
MPIU_INSTR_DURATION_DECL(rmapkt_acc_immed_op);
MPIU_INSTR_DURATION_DECL(rmapkt_cas);
MPIU_INSTR_DURATION_EXTERN_DECL(rmaqueue_alloc);
MPIU_INSTR_DURATION_EXTERN_DECL(rmaqueue_set);
void MPIDI_CH3_RMA_InitInstr(void);
......@@ -68,6 +69,7 @@ void MPIDI_CH3_RMA_InitInstr(void)
MPIU_INSTR_DURATION_INIT(rmapkt_acc_predef,0,"RMA:PKTHANDLER for Accumulate: predef dtype");
MPIU_INSTR_DURATION_INIT(rmapkt_acc_immed,0,"RMA:PKTHANDLER for Accum immed");
MPIU_INSTR_DURATION_INIT(rmapkt_acc_immed_op,0,"RMA:PKTHANDLER for Accum immed operation");
MPIU_INSTR_DURATION_INIT(rmapkt_cas,0,"RMA:PKTHANDLER for Compare-and-swap");
}
/* These are used to use a common routine to complete lists of RMA
......@@ -102,6 +104,8 @@ static int MPIDI_CH3I_Recv_rma_msg(MPIDI_RMA_ops * rma_op, MPID_Win * win_ptr,
void ** dataloop, MPID_Request ** request);
static int MPIDI_CH3I_Send_contig_acc_msg(MPIDI_RMA_ops *, MPID_Win *,
MPI_Win, MPI_Win, MPID_Request ** );
static int MPIDI_CH3I_Send_immed_rmw_msg(MPIDI_RMA_ops *, MPID_Win *,
MPI_Win, MPI_Win, MPID_Request ** );
static int MPIDI_CH3I_Do_passive_target_rma(MPID_Win *, int *);
static int MPIDI_CH3I_Send_lock_put_or_acc(MPID_Win *);
static int MPIDI_CH3I_Send_lock_get(MPID_Win *);
......@@ -285,6 +289,13 @@ int MPIDI_Win_fence(int assert, MPID_Win *win_ptr)
&curr_ptr->dataloop, &curr_ptr->request);
if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }
break;
case (MPIDI_RMA_COMPARE_AND_SWAP):
mpi_errno = MPIDI_CH3I_Send_immed_rmw_msg(curr_ptr, win_ptr,
source_win_handle, target_win_handle,
&curr_ptr->request );
if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }
break;
default:
MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,"**winInvalidOp");
}
......@@ -810,6 +821,97 @@ static int MPIDI_CH3I_Send_contig_acc_msg(MPIDI_RMA_ops *rma_op,
}
/*
* Initiate an immediate RMW accumulate operation
*/
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_Send_immed_rmw_msg
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static int MPIDI_CH3I_Send_immed_rmw_msg(MPIDI_RMA_ops *rma_op,
MPID_Win *win_ptr,
MPI_Win source_win_handle,
MPI_Win target_win_handle,
MPID_Request **request)
{
int mpi_errno = MPI_SUCCESS;
MPID_Request *req, *resp_req = NULL;
MPIDI_VC_t *vc;
MPID_Comm *comm_ptr;
int len;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SEND_IMMED_RMW_MSG);
MPIDI_RMA_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SEND_IMMED_RMW_MSG);
*request = NULL;
/* Create a request for the RMW response. Store the origin buf, count, and
datatype in it, and pass the request's handle RMW packet. When the
response comes from the target, it will contain the request handle. */
resp_req = MPID_Request_create();
MPIU_ERR_CHKANDJUMP(resp_req == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
*request = resp_req;
/* Set refs on the request to 2: one for the response message, and one for
the partial completion handler */
MPIU_Object_set_ref(resp_req, 2);
resp_req->dev.user_buf = rma_op->result_addr;
resp_req->dev.user_count = rma_op->result_count;
resp_req->dev.datatype = rma_op->result_datatype;
resp_req->dev.target_win_handle = MPI_WIN_NULL;
resp_req->dev.source_win_handle = source_win_handle;
/* REQUIRE: All datatype arguments must be of the same, builtin
type and counts must be 1. */
MPID_Datatype_get_size_macro(rma_op->origin_datatype, len);
if (rma_op->type == MPIDI_RMA_COMPARE_AND_SWAP) {
MPIDI_CH3_Pkt_t upkt;
MPIDI_CH3_Pkt_cas_t *cas_pkt = &upkt.cas;
MPIU_Assert(len <= sizeof(MPIDI_CH3_CAS_Immed_u));
MPIDI_Pkt_init(cas_pkt, MPIDI_CH3_PKT_CAS);
cas_pkt->addr = (char *) win_ptr->base_addrs[rma_op->target_rank] + rma_op->target_disp;
cas_pkt->datatype = rma_op->target_datatype;
cas_pkt->target_win_handle = target_win_handle;
cas_pkt->source_win_handle = source_win_handle;
cas_pkt->request_handle = resp_req->handle;
MPIU_Memcpy( (void *) &cas_pkt->origin_data, rma_op->origin_addr, len );
MPIU_Memcpy( (void *) &cas_pkt->compare_data, rma_op->compare_addr, len );
comm_ptr = win_ptr->comm_ptr;
MPIDI_Comm_get_vc_set_active(comm_ptr, rma_op->target_rank, &vc);
MPIU_THREAD_CS_ENTER(CH3COMM,vc);
mpi_errno = MPIDI_CH3_iStartMsg(vc, cas_pkt, sizeof(*cas_pkt), &req);
MPIU_THREAD_CS_EXIT(CH3COMM,vc);
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
}
/* TODO: Fetch-and-op implementation will go here */
else {
MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
}
fn_exit:
MPIDI_RMA_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SEND_IMMED_RMW_MSG);
return mpi_errno;
/* --BEGIN ERROR HANDLING-- */
fn_fail:
if (*request)
{
MPIU_Object_set_ref(*request, 0);
MPIDI_CH3_Request_destroy(*request);
}
*request = NULL;
goto fn_exit;
/* --END ERROR HANDLING-- */
}
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_Recv_rma_msg
......@@ -1329,6 +1431,13 @@ int MPIDI_Win_complete(MPID_Win *win_ptr)
&curr_ptr->dataloop, &curr_ptr->request);
if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }
break;
case (MPIDI_RMA_COMPARE_AND_SWAP):
mpi_errno = MPIDI_CH3I_Send_immed_rmw_msg(curr_ptr, win_ptr,
source_win_handle, target_win_handle,
&curr_ptr->request );
if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }
break;
default:
MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,"**winInvalidOp");
}
......@@ -1662,7 +1771,8 @@ int MPIDI_Win_unlock(int dest, MPID_Win *win_ptr)
MPIDI_Comm_get_vc_set_active(comm_ptr, dest, &vc);
if (rma_op->next->next == NULL) {
/* TODO: MPI-3: Add lock->cas->unlock optimization */
if (rma_op->next->next == NULL && rma_op->next->type != MPIDI_RMA_COMPARE_AND_SWAP) {
/* Single put, get, or accumulate between the lock and unlock. If it
* is of small size and predefined datatype at the target, we
* do an optimization where the lock and the RMA operation are
......@@ -2002,6 +2112,14 @@ static int MPIDI_CH3I_Do_passive_target_rma(MPID_Win *win_ptr,
&curr_ptr->dataloop, &curr_ptr->request);
if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }
break;
case (MPIDI_RMA_COMPARE_AND_SWAP):
win_ptr->pt_rma_puts_accs[curr_ptr->target_rank]++;
mpi_errno = MPIDI_CH3I_Send_immed_rmw_msg(curr_ptr, win_ptr,
source_win_handle, target_win_handle,
&curr_ptr->request );
if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }
break;
default:
MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,"**winInvalidOp");
}
......@@ -2990,6 +3108,138 @@ int MPIDI_CH3_PktHandler_Accumulate_Immed( MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *pkt,
}
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3_PktHandler_CAS
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3_PktHandler_CAS( MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *pkt,
MPIDI_msg_sz_t *buflen, MPID_Request **rreqp )
{
int mpi_errno = MPI_SUCCESS;
MPIDI_CH3_Pkt_t upkt;
MPIDI_CH3_Pkt_cas_resp_t *cas_resp_pkt = &upkt.cas_resp;
MPIDI_CH3_Pkt_cas_t *cas_pkt = &pkt->cas;
MPID_Win *win_ptr;
MPID_Request *req;
int len;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_CAS);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_CAS);
MPIU_DBG_MSG(CH3_OTHER,VERBOSE,"received CAS pkt");
MPIU_INSTR_DURATION_START(rmapkt_cas);
/* return the number of bytes processed in this function */
/* data_len == 0 (all within packet) */
*buflen = sizeof(MPIDI_CH3_Pkt_t);
*rreqp = NULL;
MPIDI_Pkt_init(cas_resp_pkt, MPIDI_CH3_PKT_CAS_RESP);
cas_resp_pkt->request_handle = cas_pkt->request_handle;
/* Copy old value into the response packet */
MPID_Datatype_get_size_macro(cas_pkt->datatype, len);
MPIU_Assert(len <= sizeof(MPIDI_CH3_CAS_Immed_u));
MPIU_Memcpy( (void *)&cas_resp_pkt->data, cas_pkt->addr, len );
/* Send the response packet */
MPIU_THREAD_CS_ENTER(CH3COMM,vc);
mpi_errno = MPIDI_CH3_iStartMsg(vc, cas_resp_pkt, sizeof(*cas_resp_pkt), &req);
MPIU_THREAD_CS_EXIT(CH3COMM,vc);
MPIU_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
if (req != NULL) {
MPID_Request_release(req);
}
/* Compare and replace if equal */
if (MPIR_Compare_equal(&cas_pkt->compare_data, cas_pkt->addr, cas_pkt->datatype)) {
MPIU_Memcpy(cas_pkt->addr, &cas_pkt->origin_data, len);
}
/* There are additional steps to take if this is a passive
target RMA or the last operation from the source */
MPID_Win_get_ptr(cas_pkt->target_win_handle, win_ptr);
/* if passive target RMA, increment counter */
if (win_ptr->current_lock_type != MPID_LOCK_NONE)
win_ptr->my_pt_rma_puts_accs++;
/* Send RMA done packet? FIXME: Can the cas_resp handler handle this? */
if (cas_pkt->source_win_handle != MPI_WIN_NULL) {
/* Last RMA operation from source. If active
target RMA, decrement window counter. If
passive target RMA, release lock on window and
grant next lock in the lock queue if there is
any. If it's a shared lock or a lock-put-unlock
type of optimization, we also need to send an
ack to the source. */
if (win_ptr->current_lock_type == MPID_LOCK_NONE) {
/* FIXME: MT: this has to be done atomically */
win_ptr->my_counter -= 1;
MPIDI_CH3_Progress_signal_completion();
}
else {
if ((win_ptr->current_lock_type == MPI_LOCK_SHARED) ||
(/*rreq->dev.single_op_opt*/ 0 == 1)) {
mpi_errno = MPIDI_CH3I_Send_pt_rma_done_pkt(vc, cas_pkt->source_win_handle);
if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }
}
mpi_errno = MPIDI_CH3I_Release_lock(win_ptr);
/* Without the following signal_completion call, we
sometimes hang */
MPIDI_CH3_Progress_signal_completion();
}
}
fn_exit:
MPIU_INSTR_DURATION_END(rmapkt_cas);
MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PKTHANDLER_CAS);
return mpi_errno;
fn_fail:
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3_PktHandler_CASResp
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)