Commit 0542e304 authored by Xin Zhao's avatar Xin Zhao
Browse files

Rewrite code of passive lock control messages.



1. Piggyback LOCK request with first IMMED operation.

When we see an IMMED operation, we can always piggyback
LOCK request with that operation to reduce one sync
message of single LOCK request. When packet header of
that operation is received on target, we will try to
acquire the lock and perform that operation. The target
either piggybacks LOCK_GRANTED message with the response
packet (if available), or sends a single LOCK_GRANTED
message back to origin.

2. Rewrite code of manage lock queue.

When the lock request cannot be satisfied on target,
we need to buffer that lock request on target. All we
need to do is enqueuing the packet header, which contains
all information we need after lock is granted. When
the current lock is released, the runtime will goes
over the lock queue and grant the lock to the next
available request. After lock is granted, the runtime
just trigger the packet handler for the second time.

3. Release lock on target side if piggybacking with UNLOCK.

If there are active-message operations to be issued,
we piggyback a UNLOCK flag with the last operation.
When the target recieves it, it will release the current
lock and grant the lock to the next process.
Signed-off-by: Pavan Balaji's avatarPavan Balaji <balaji@anl.gov>
parent 7fbe72dd
......@@ -48,6 +48,7 @@ static inline MPIDI_RMA_Op_t *MPIDI_CH3I_Win_op_alloc(MPID_Win * win_ptr)
e->dataloop = NULL;
e->request = NULL;
e->is_dt = 0;
e->piggyback_lock_candidate = 0;
return e;
}
......
......@@ -81,6 +81,7 @@ typedef struct MPIDI_RMA_Op {
MPIDI_CH3_Pkt_t pkt;
MPIDI_RMA_Pool_type_t pool_type;
int is_dt;
int piggyback_lock_candidate;
} MPIDI_RMA_Op_t;
typedef struct MPIDI_RMA_Target {
......@@ -135,25 +136,9 @@ typedef struct MPIDI_RMA_Win_list {
extern MPIDI_RMA_Win_list_t *MPIDI_RMA_Win_list, *MPIDI_RMA_Win_list_tail;
typedef struct MPIDI_PT_single_op {
MPIDI_CH3_Pkt_type_t type; /* put, get, or accum. */
void *addr;
int count;
MPI_Datatype datatype;
MPI_Op op;
void *data; /* for queued puts and accumulates, data is copied here */
MPI_Request request_handle; /* for gets */
int data_recd; /* to indicate if the data has been received */
MPIDI_CH3_Pkt_flags_t flags;
} MPIDI_PT_single_op;
typedef struct MPIDI_Win_lock_queue {
struct MPIDI_Win_lock_queue *next;
int lock_type;
MPI_Win source_win_handle;
int origin_rank;
struct MPIDI_PT_single_op *pt_single_op; /* to store info for
* lock-put-unlock optimization */
MPIDI_CH3_Pkt_t pkt; /* all information for this request packet */
} MPIDI_Win_lock_queue;
typedef MPIDI_RMA_Op_t *MPIDI_RMA_Ops_list_t;
......
......@@ -120,7 +120,9 @@ typedef enum {
MPIDI_CH3_PKT_FLAG_RMA_NOCHECK = 32,
MPIDI_CH3_PKT_FLAG_RMA_SHARED = 64,
MPIDI_CH3_PKT_FLAG_RMA_EXCLUSIVE = 128,
MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK = 256
MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK = 256,
MPIDI_CH3_PKT_FLAG_RMA_UNLOCK_ACK = 512,
MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED = 1024
} MPIDI_CH3_Pkt_flags_t;
typedef struct MPIDI_CH3_Pkt_send {
......@@ -407,6 +409,8 @@ typedef struct MPIDI_CH3_Pkt_put {
* with shared locks. Otherwise set to NULL*/
char data[MPIDI_RMA_IMMED_BYTES];
size_t immed_len;
int lock_type; /* used when piggybacking LOCK message. */
int origin_rank; /* used when piggybacking LOCK message. */
} MPIDI_CH3_Pkt_put_t;
typedef struct MPIDI_CH3_Pkt_get {
......@@ -424,6 +428,8 @@ typedef struct MPIDI_CH3_Pkt_get {
MPI_Win source_win_handle; /* Used in the last RMA operation in an
* epoch in the case of passive target rma
* with shared locks. Otherwise set to NULL*/
int lock_type; /* used when piggybacking LOCK message. */
int origin_rank; /* used when piggybacking LOCK message. */
} MPIDI_CH3_Pkt_get_t;
typedef struct MPIDI_CH3_Pkt_get_resp {
......@@ -452,6 +458,8 @@ typedef struct MPIDI_CH3_Pkt_accum {
* with shared locks. Otherwise set to NULL*/
char data[MPIDI_RMA_IMMED_BYTES];
size_t immed_len;
int lock_type; /* used when piggybacking LOCK message. */
int origin_rank; /* used when piggybacking LOCK message. */
} MPIDI_CH3_Pkt_accum_t;
typedef struct MPIDI_CH3_Pkt_get_accum {
......@@ -472,6 +480,8 @@ typedef struct MPIDI_CH3_Pkt_get_accum {
* with shared locks. Otherwise set to NULL*/
char data[MPIDI_RMA_IMMED_BYTES];
size_t immed_len;
int lock_type; /* used when piggybacking LOCK message. */
int origin_rank; /* used when piggybacking LOCK message. */
} MPIDI_CH3_Pkt_get_accum_t;
typedef struct MPIDI_CH3_Pkt_get_accum_resp {
......@@ -515,6 +525,8 @@ typedef struct MPIDI_CH3_Pkt_cas {
* in passive target rma. Otherwise set to NULL*/
MPIDI_CH3_CAS_Immed_u origin_data;
MPIDI_CH3_CAS_Immed_u compare_data;
int lock_type; /* used when piggybacking LOCK message. */
int origin_rank; /* used when piggybacking LOCK message. */
} MPIDI_CH3_Pkt_cas_t;
typedef struct MPIDI_CH3_Pkt_cas_resp {
......@@ -541,6 +553,8 @@ typedef struct MPIDI_CH3_Pkt_fop {
* in passive target rma. Otherwise set to NULL*/
char data[MPIDI_RMA_IMMED_BYTES];
int immed_len;
int lock_type; /* used when piggybacking LOCK message. */
int origin_rank; /* used when piggybacking LOCK message. */
} MPIDI_CH3_Pkt_fop_t;
typedef struct MPIDI_CH3_Pkt_fop_resp {
......@@ -596,6 +610,7 @@ typedef struct MPIDI_CH3_Pkt_flush_ack {
MPI_Win source_win_handle;
int target_rank; /* Used in flush_ack response to look up the
* target state at the origin. */
MPIDI_CH3_Pkt_flags_t flags;
} MPIDI_CH3_Pkt_flush_ack_t;
typedef struct MPIDI_CH3_Pkt_decr_at_counter {
......
......@@ -342,6 +342,7 @@ struct MPIDI_Win_target_state {
* (none, shared, exclusive) */ \
volatile int shared_lock_ref_cnt; \
struct MPIDI_Win_lock_queue volatile *lock_queue; /* list of unsatisfied locks */ \
struct MPIDI_Win_lock_queue volatile *lock_queue_tail; /* tail of unstaisfied locks. */ \
\
MPI_Aint *sizes; /* array of sizes of all windows */ \
struct MPIDI_Win_info_args info_args; \
......@@ -474,7 +475,6 @@ typedef struct MPIDI_Request {
MPI_Win target_win_handle;
MPI_Win source_win_handle;
MPIDI_CH3_Pkt_flags_t flags; /* flags that were included in the original RMA packet header */
struct MPIDI_Win_lock_queue *lock_queue_entry; /* for single lock-put-unlock optimization */
MPI_Request resp_request_handle; /* Handle for get_accumulate response */
MPIDI_REQUEST_SEQNUM
......
......@@ -84,6 +84,7 @@ static inline int send_unlock_msg(int dest, MPID_Win * win_ptr)
MPIDI_Pkt_init(unlock_pkt, MPIDI_CH3_PKT_UNLOCK);
unlock_pkt->target_win_handle = win_ptr->all_win_handles[dest];
unlock_pkt->source_win_handle = win_ptr->handle;
/* Reset the local state of the target to unlocked */
win_ptr->targets[dest].remote_lock_state = MPIDI_CH3_WIN_LOCK_NONE;
......@@ -154,6 +155,7 @@ static inline int MPIDI_CH3I_Send_lock_granted_pkt(MPIDI_VC_t * vc, MPID_Win * w
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static inline int MPIDI_CH3I_Send_flush_ack_pkt(MPIDI_VC_t *vc, MPID_Win *win_ptr,
MPIDI_CH3_Pkt_flags_t flags,
MPI_Win source_win_handle)
{
MPIDI_CH3_Pkt_t upkt;
......@@ -167,6 +169,7 @@ static inline int MPIDI_CH3I_Send_flush_ack_pkt(MPIDI_VC_t *vc, MPID_Win *win_pt
MPIDI_Pkt_init(flush_ack_pkt, MPIDI_CH3_PKT_FLUSH_ACK);
flush_ack_pkt->source_win_handle = source_win_handle;
flush_ack_pkt->target_rank = win_ptr->comm_ptr->rank;
flush_ack_pkt->flags = flags;
/* Because this is in a packet handler, it is already within a critical section */
/* MPIU_THREAD_CS_ENTER(CH3COMM,vc); */
......@@ -227,6 +230,56 @@ static inline int send_decr_at_cnt_msg(int dst, MPID_Win * win_ptr)
/* --END ERROR HANDLING-- */
}
/* enqueue an unsatisfied origin in passive target at target side. */
static inline int enqueue_lock_origin(MPID_Win *win_ptr, MPIDI_CH3_Pkt_t *pkt)
{
MPIDI_Win_lock_queue *new_ptr = NULL;
int mpi_errno = MPI_SUCCESS;
new_ptr = (MPIDI_Win_lock_queue *) MPIU_Malloc(sizeof(MPIDI_Win_lock_queue));
if (!new_ptr) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s",
"MPIDI_Win_lock_queue");
}
new_ptr->next = NULL;
new_ptr->pkt = (*pkt);
MPL_LL_APPEND(win_ptr->lock_queue, win_ptr->lock_queue_tail, new_ptr);
fn_exit:
return mpi_errno;
fn_fail:
goto fn_exit;
}
static inline int set_lock_sync_counter(MPID_Win *win_ptr, int target_rank)
{
int mpi_errno = MPI_SUCCESS;
if (win_ptr->outstanding_locks > 0) {
win_ptr->outstanding_locks--;
MPIU_Assert(win_ptr->outstanding_locks >= 0);
}
else {
MPIDI_RMA_Target_t *t = NULL;
mpi_errno = MPIDI_CH3I_Win_find_target(win_ptr, target_rank, &t);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
MPIU_Assert(t != NULL);
t->outstanding_lock--;
MPIU_Assert(t->outstanding_lock == 0);
}
fn_exit:
return mpi_errno;
fn_fail:
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME acquire_local_lock
#undef FCNAME
......@@ -237,21 +290,21 @@ static inline int acquire_local_lock(MPID_Win * win_ptr, int lock_type)
MPIDI_STATE_DECL(MPID_STATE_ACQUIRE_LOCAL_LOCK);
MPIDI_RMA_FUNC_ENTER(MPID_STATE_ACQUIRE_LOCAL_LOCK);
/* poke the progress engine until the local lock is granted */
if (MPIDI_CH3I_Try_acquire_win_lock(win_ptr, lock_type) == 0) {
MPID_Progress_state progress_state;
MPID_Progress_start(&progress_state);
while (MPIDI_CH3I_Try_acquire_win_lock(win_ptr, lock_type) == 0) {
mpi_errno = MPID_Progress_wait(&progress_state);
/* --BEGIN ERROR HANDLING-- */
if (mpi_errno != MPI_SUCCESS) {
MPID_Progress_end(&progress_state);
MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**winnoprogress");
}
/* --END ERROR HANDLING-- */
}
MPID_Progress_end(&progress_state);
if (MPIDI_CH3I_Try_acquire_win_lock(win_ptr, lock_type) == 1) {
mpi_errno = set_lock_sync_counter(win_ptr, win_ptr->comm_ptr->rank);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
else {
/* Queue the lock information. */
MPIDI_CH3_Pkt_t pkt;
MPIDI_CH3_Pkt_lock_t *lock_pkt = &pkt.lock;
MPIDI_Pkt_init(lock_pkt, MPIDI_CH3_PKT_LOCK);
lock_pkt->lock_type = lock_type;
lock_pkt->origin_rank = win_ptr->comm_ptr->rank;
mpi_errno = enqueue_lock_origin(win_ptr, &pkt);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
}
win_ptr->targets[win_ptr->comm_ptr->rank].remote_lock_state = MPIDI_CH3_WIN_LOCK_GRANTED;
......@@ -409,6 +462,41 @@ static inline int do_accumulate_op(MPID_Request *rreq)
goto fn_exit;
}
static inline int check_piggyback_lock(MPID_Win *win_ptr, MPIDI_CH3_Pkt_t *pkt, int *acquire_lock_fail) {
int lock_type;
MPIDI_CH3_Pkt_flags_t flags;
int mpi_errno = MPI_SUCCESS;
(*acquire_lock_fail) = 0;
MPIDI_CH3_PKT_RMA_GET_FLAGS((*pkt), flags, mpi_errno);
MPIDI_CH3_PKT_RMA_GET_LOCK_TYPE((*pkt), lock_type, mpi_errno);
if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK) {
if (MPIDI_CH3I_Try_acquire_win_lock(win_ptr, lock_type) == 0) {
/* cannot acquire the lock, queue up this operation. */
mpi_errno = enqueue_lock_origin(win_ptr, pkt);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
(*acquire_lock_fail) = 1;
}
else {
/* unset LOCK flag */
MPIDI_CH3_PKT_RMA_UNSET_FLAG((*pkt), MPIDI_CH3_PKT_FLAG_RMA_LOCK, mpi_errno);
/* set LOCK_GRANTED flag */
MPIDI_CH3_PKT_RMA_SET_FLAG((*pkt), MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED, mpi_errno);
}
}
fn_exit:
return mpi_errno;
fn_fail:
goto fn_exit;
}
static inline int wait_progress_engine(void)
{
int mpi_errno = MPI_SUCCESS;
......
......@@ -88,8 +88,17 @@ int MPIDI_CH3_ReqHandler_PutRecvComplete( MPIDI_VC_t *vc,
MPID_Win_get_ptr(rreq->dev.target_win_handle, win_ptr);
if (rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED) {
if (!(rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) &&
!(rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK)) {
mpi_errno = MPIDI_CH3I_Send_lock_granted_pkt(vc, win_ptr, rreq->dev.source_win_handle);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
MPIDI_CH3_Progress_signal_completion();
}
}
if (rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) {
mpi_errno = MPIDI_CH3I_Send_flush_ack_pkt(vc, win_ptr, rreq->dev.source_win_handle);
mpi_errno = MPIDI_CH3I_Send_flush_ack_pkt(vc, win_ptr, rreq->dev.flags,
rreq->dev.source_win_handle);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
MPIDI_CH3_Progress_signal_completion();
}
......@@ -100,7 +109,14 @@ int MPIDI_CH3_ReqHandler_PutRecvComplete( MPIDI_VC_t *vc,
if (win_ptr->at_completion_counter == 0)
MPIDI_CH3_Progress_signal_completion();
}
if (rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK) {
mpi_errno = MPIDI_CH3I_Release_lock(win_ptr);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
mpi_errno = MPIDI_CH3I_Send_flush_ack_pkt(vc, win_ptr, rreq->dev.flags,
rreq->dev.source_win_handle);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
MPIDI_CH3_Progress_signal_completion();
}
mpi_errno = MPIDI_CH3_Finish_rma_op_target(vc, win_ptr, TRUE, rreq->dev.flags,
rreq->dev.source_win_handle);
if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }
......@@ -147,8 +163,17 @@ int MPIDI_CH3_ReqHandler_AccumRecvComplete( MPIDI_VC_t *vc,
MPIU_ERR_POP(mpi_errno);
}
if (rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED) {
if (!(rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) &&
!(rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK)) {
mpi_errno = MPIDI_CH3I_Send_lock_granted_pkt(vc, win_ptr, rreq->dev.source_win_handle);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
MPIDI_CH3_Progress_signal_completion();
}
}
if (rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) {
mpi_errno = MPIDI_CH3I_Send_flush_ack_pkt(vc, win_ptr, rreq->dev.source_win_handle);
mpi_errno = MPIDI_CH3I_Send_flush_ack_pkt(vc, win_ptr, rreq->dev.flags,
rreq->dev.source_win_handle);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
MPIDI_CH3_Progress_signal_completion();
}
......@@ -159,6 +184,13 @@ int MPIDI_CH3_ReqHandler_AccumRecvComplete( MPIDI_VC_t *vc,
if (win_ptr->at_completion_counter == 0)
MPIDI_CH3_Progress_signal_completion();
}
if (rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK) {
mpi_errno = MPIDI_CH3I_Release_lock(win_ptr);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
mpi_errno = MPIDI_CH3I_Send_flush_ack_pkt(vc, win_ptr, rreq->dev.flags,
rreq->dev.source_win_handle);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
mpi_errno = MPIDI_CH3_Finish_rma_op_target(vc, win_ptr, TRUE, rreq->dev.flags,
rreq->dev.source_win_handle);
......@@ -205,8 +237,12 @@ int MPIDI_CH3_ReqHandler_GaccumRecvComplete( MPIDI_VC_t *vc,
get_accum_resp_pkt->target_rank = win_ptr->comm_ptr->rank;
get_accum_resp_pkt->source_win_handle = rreq->dev.source_win_handle;
get_accum_resp_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED)
get_accum_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED;
if (rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH)
get_accum_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK;
if (rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK)
get_accum_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_UNLOCK_ACK;
MPID_Datatype_get_size_macro(rreq->dev.datatype, type_size);
......@@ -522,8 +558,12 @@ int MPIDI_CH3_ReqHandler_GetDerivedDTRecvComplete( MPIDI_VC_t *vc,
get_resp_pkt->target_rank = win_ptr->comm_ptr->rank;
get_resp_pkt->source_win_handle = rreq->dev.source_win_handle;
get_resp_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED)
get_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED;
if (rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH)
get_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK;
if (rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK)
get_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_UNLOCK_ACK;
sreq->dev.segment_ptr = MPID_Segment_alloc( );
MPIU_ERR_CHKANDJUMP1((sreq->dev.segment_ptr == NULL), mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s", "MPID_Segment_alloc");
......@@ -832,6 +872,82 @@ static int create_derived_datatype(MPID_Request *req, MPID_Datatype **dtp)
return mpi_errno;
}
static inline int perform_op_in_lock_queue(MPID_Win *win_ptr, MPIDI_Win_lock_queue *lock_entry)
{
int mpi_errno = MPI_SUCCESS;
MPID_Request *req = NULL;
MPIDI_msg_sz_t len = sizeof(MPIDI_CH3_Pkt_t);
MPIDI_VC_t *vc = NULL;
int origin_rank;
static MPIDI_CH3_PktHandler_Fcn *pktArray[MPIDI_CH3_PKT_END_ALL+1];
static int needsInit = 1;
if (lock_entry->pkt.type == MPIDI_CH3_PKT_LOCK) {
/* single LOCK request */
MPIDI_CH3_Pkt_lock_t *lock_pkt = &(lock_entry->pkt.lock);
if (lock_pkt->origin_rank == win_ptr->comm_ptr->rank) {
if (win_ptr->outstanding_locks > 0) {
win_ptr->outstanding_locks--;
MPIU_Assert(win_ptr->outstanding_locks >= 0);
}
else {
MPIDI_RMA_Target_t *t = NULL;
mpi_errno = MPIDI_CH3I_Win_find_target(win_ptr,
win_ptr->comm_ptr->rank, &t);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
MPIU_Assert(t != NULL);
t->outstanding_lock--;
MPIU_Assert(t->outstanding_lock == 0);
}
}
else {
MPIDI_Comm_get_vc_set_active(win_ptr->comm_ptr,
lock_pkt->origin_rank, &vc);
mpi_errno = MPIDI_CH3I_Send_lock_granted_pkt(vc, win_ptr,
lock_pkt->source_win_handle);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
}
}
else {
/* LOCK+OP packet */
/* get VC */
MPIDI_CH3_PKT_RMA_GET_ORIGIN_RANK(lock_entry->pkt, origin_rank, mpi_errno);
MPIDI_Comm_get_vc(win_ptr->comm_ptr, origin_rank, &vc);
/* unset LOCK flag */
MPIDI_CH3_PKT_RMA_UNSET_FLAG(lock_entry->pkt, MPIDI_CH3_PKT_FLAG_RMA_LOCK, mpi_errno);
/* set LOCK_GRANTED flag */
MPIDI_CH3_PKT_RMA_SET_FLAG(lock_entry->pkt, MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED, mpi_errno);
if (needsInit) {
mpi_errno = MPIDI_CH3_PktHandler_Init(pktArray, MPIDI_CH3_PKT_END_CH3);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
needsInit = 0;
}
/* invalid pkt data will result in unpredictable behavior */
MPIU_Assert((lock_entry->pkt).type >= MPIDI_CH3_PKT_PUT && (lock_entry->pkt).type <= MPIDI_CH3_PKT_CAS);
/* trigger packet handler to deal with this op. */
mpi_errno = pktArray[lock_entry->pkt.type](vc, &(lock_entry->pkt), &len, &req);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
MPIU_Assert(len == sizeof(MPIDI_CH3_Pkt_t));
MPIU_Assert(req == NULL);
}
fn_exit:
return mpi_errno;
fn_fail:
goto fn_exit;
}
static int entered_flag = 0;
static int entered_count = 0;
......@@ -843,10 +959,10 @@ static int entered_count = 0;
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3I_Release_lock(MPID_Win *win_ptr)
{
MPIDI_Win_lock_queue *lock_queue, **lock_queue_ptr;
MPIDI_Win_lock_queue *lock_entry, *lock_entry_next;
int requested_lock, mpi_errno = MPI_SUCCESS, temp_entered_count;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_RELEASE_LOCK);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_RELEASE_LOCK);
if (win_ptr->current_lock_type == MPI_LOCK_SHARED) {
......@@ -860,159 +976,57 @@ int MPIDI_CH3I_Release_lock(MPID_Win *win_ptr)
if (win_ptr->shared_lock_ref_cnt == 0) {
/* This function needs to be reentrant even in the single-threaded case
because when going through the lock queue, the do_simple_get
called in the
lock-get-unlock case may itself cause a request to complete, and
this function
may again get called in the completion action in
ch3u_handle_send_req.c. To
handle this possibility, we use an entered_flag. If the flag is
not 0, we simply
increment the entered_count and return. The loop through the lock
queue is repeated
if the entered_count has changed while we are in the loop.
because when going through the lock queue, pkt_handler() in
perform_op_in_lock_queue() may again call release_lock(). To handle
this possibility, we use an entered_flag.
If the flag is not 0, we simply increment the entered_count and return.
The loop through the lock queue is repeated if the entered_count has
changed while we are in the loop.
*/
if (entered_flag != 0) {
entered_count++;
entered_count++; /* Count how many times we re-enter */
goto fn_exit;
}
else {
entered_flag = 1;
temp_entered_count = entered_count;
}
do {
entered_flag = 1; /* Mark that we are now entering release_lock() */
temp_entered_count = entered_count;
do {
if (temp_entered_count != entered_count) temp_entered_count++;
/* FIXME: MT: The setting of the lock type must be done atomically */
win_ptr->current_lock_type = MPID_LOCK_NONE;
/* If there is a lock queue, try to satisfy as many lock requests as
possible. If the first one is a shared lock, grant it and grant all
other shared locks. If the first one is an exclusive lock, grant
only that one. */
/* FIXME: MT: All queue accesses need to be made atomic */
lock_queue = (MPIDI_Win_lock_queue *) win_ptr->lock_queue;
lock_queue_ptr = (MPIDI_Win_lock_queue **) &(win_ptr->lock_queue);
while (lock_queue) {
/* if it is not a lock-op-unlock type case or if it is a
lock-op-unlock type case but all the data has been received,
try to acquire the lock */
if ((lock_queue->pt_single_op == NULL) ||
(lock_queue->pt_single_op->data_recd == 1)) {
requested_lock = lock_queue->lock_type;
if (MPIDI_CH3I_Try_acquire_win_lock(win_ptr, requested_lock)
== 1) {
if (lock_queue->pt_single_op != NULL) {
/* single op. do it here */
MPIDI_PT_single_op * single_op;
single_op = lock_queue->pt_single_op;
if (single_op->type == MPIDI_CH3_PKT_LOCK_PUT_UNLOCK) {
mpi_errno = MPIR_Localcopy(single_op->data,
single_op->count,
single_op->datatype,
single_op->addr,
single_op->count,
single_op->datatype);
}
else if (single_op->type == MPIDI_CH3_PKT_LOCK_ACCUM_UNLOCK) {
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
mpi_errno = do_simple_accumulate(single_op);
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
}
else if (single_op->type == MPIDI_CH3_PKT_LOCK_GET_UNLOCK) {
mpi_errno = do_simple_get(win_ptr, lock_queue);
}
if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }
/* if put or accumulate, send rma done packet and release lock. */
if (single_op->type != MPIDI_CH3_PKT_LOCK_GET_UNLOCK) {
/* NOTE: Only *queued* single_op operations are completed here.
Lock-op-unlock/single_op RMA ops can also be completed as
they arrive within various packet/request handlers via
MPIDI_CH3_Finish_rma_op_target(). That call cannot be used
here, because it would enter this function recursively. */
MPIDI_VC_t *vc;
MPIDI_Comm_get_vc(win_ptr->comm_ptr, lock_queue->origin_rank, &vc);
mpi_errno =
MPIDI_CH3I_Send_flush_ack_pkt(vc, win_ptr,
lock_queue->source_win_handle);
if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }
/* release the lock */
if (win_ptr->current_lock_type == MPI_LOCK_SHARED) {
/* decr ref cnt */
/* FIXME: MT: Must be done atomically */
win_ptr->shared_lock_ref_cnt--;
}
/* If shared lock ref count is 0
(which is also true if the lock is an
exclusive lock), release the lock. */
if (win_ptr->shared_lock_ref_cnt == 0) {
/* FIXME: MT: The setting of the lock type
must be done atomically */
win_ptr->current_lock_type = MPID_LOCK_NONE;
}
/* dequeue entry from lock queue */
MPIU_Free(single_op->data);
MPIU_Free(single_op);
*lock_queue_ptr = lock_queue->next;
MPIU_Free(lock_queue);
lock_queue = *lock_queue_ptr;
}
else {
/* it's a get. The operation is not complete. It
will be completed in ch3u_handle_send_req.c.
Free the single_op structure. If it's an