Commit 2de51483 authored by Darius Buntinas's avatar Darius Buntinas
Browse files

[svn-r10217] more coding for portals4

parent cdf6d510
......@@ -8,6 +8,7 @@
**ptleqget:PtlEQGet() failed
**ptlmeappend:PtlMEAppend() failed
**ptlmeunlink:PtlMEUnlink() failed
**ptlmesearch:PtlMESearch() failed
**ptlmdbind:PtlMDBind()failed
**ptlmdrelease:PtlMDRelease() failed
**ptlput:PtlPut() failed
......
......@@ -41,10 +41,8 @@ typedef struct {
ptl_handle_me_t me;
void *chunk_buffer[MPID_NEM_PTL_NUM_CHUNK_BUFFERS];
MPIDI_msg_sz_t bytes_put;
event_handler_fn ack_handler;
event_handler_fn put_handler;
event_handler_fn get_handler;
event_handler_fn reply_handler;
int found; /* used in probes with PtlMESearch() */
event_handler_fn event_handler;
} MPID_nem_ptl_req_area;
/* macro for ptl private in req */
......@@ -60,10 +58,7 @@ typedef struct {
REQ_PTL(req_)->large = FALSE; \
REQ_PTL(req_)->md = PTL_INVALID_HANDLE; \
REQ_PTL(req_)->me = PTL_INVALID_HANDLE; \
REQ_PTL(req_)->ack_handler = NULL; \
REQ_PTL(req_)->put_handler = NULL; \
REQ_PTL(req_)->get_handler = NULL; \
REQ_PTL(req_)->reply_handler = NULL; \
REQ_PTL(req_)->event_handler = NULL; \
} while (0)
#define MPID_nem_ptl_request_create_req(sreq_, errno_, on_fail_) do { \
......@@ -85,38 +80,51 @@ typedef struct {
/* Header bit fields
bit field
---- -------------------
50 single/multiple
49 large/small
48 ssend
24-47 match-bits for large messages (24 bits)
0-23 source
63 single/multiple
62 large/small
61 ssend
32-60 seqnum for large messages (29 bits)
0-31 length
Note: This means we support no more than 2^24 processes.
*/
#define NPTL_SOURCE_OFFSET 0
#define NPTL_MATCH_BITS_OFFSET 24
#define NPTL_SSEND ((ptl_hdr_data_t)1<<48)
#define NPTL_LARGE ((ptl_hdr_data_t)1<<49)
#define NPTL_MULTIPLE ((ptl_hdr_data_t)1<<50)
#define NPTL_SOURCE_MASK ((((ptl_hdr_data_t)1<<24)-1)<<NPTL_SOURCE_OFFSET)
#define NPTL_MATCH_BITS_MASK ((((ptl_hdr_data_t)1<<24)-1)<<NPTL_MATCH_BITS_OFFSET)
#define NPTL_HEADER(flags, match_bits, source) ((flags) | \
((ptl_hdr_data_t)(match_bits))<<NPTL_MATCH_BITS_OFFSET | \
((ptl_hdr_data_t)(source))<<NPTL_SOURCE_OFFSET)
#define NPTL_HEADER_SOURCE(hdr) ((hdr & NPTL_SOURCE_MASK)>>NPTL_SOURCE_OFFSET)
#define NPTL_HEADER_MATCH_BITS(hdr) ((hdr & NPTL_MATCH_BITS_MASK)>>NPTL_MATCH_BITS_OFFSET)
/* This is the maximum number of processes we support */
#define NPTL_MAX_PROCS (NPTL_SOURCE_MASK+1)
/* create a match value */
#define NPTL_TAG_SHIFT 32 /* tag and ctx_id are limited to 32 bits each */
#define NPTL_MATCH(tag, context_id) (((ptl_match_bits)(tag) << NPTL_TAG_SHIFT) | (ptl_match_bits)(context_id))
#define NPTL_MATCH_GET_TAG(match_bits) ((match_bits) >> NPTL_TAG_SHIFT)
#define NPTL_ANY_TAG_MASK NPTL_MATCH(~0, 0)
#define NPTL_LENGTH_OFFSET 0
#define NPTL_SEQNUM_OFFSET 32
#define NPTL_SSEND ((ptl_hdr_data_t)1<<61)
#define NPTL_LARGE ((ptl_hdr_data_t)1<<62)
#define NPTL_MULTIPLE ((ptl_hdr_data_t)1<<63)
#define NPTL_LENGTH_MASK ((((ptl_hdr_data_t)1<<32)-1)<<NPTL_LENGTH_OFFSET)
#define NPTL_SEQNUM_MASK ((((ptl_hdr_data_t)1<<29)-1)<<NPTL_SEQNUM_OFFSET)
#define NPTL_HEADER(flags, large_seqnum, length) ((flags) | \
((ptl_hdr_data_t)(seqnum))<<NPTL_SEQNUM_OFFSET | \
((ptl_hdr_data_t)(length))<<NPTL_LENGTH_OFFSET)
#define NPTL_HEADER_GET_SEQNUM(hdr) ((hdr & NPTL_SEQNUM_MASK)>>NPTL_SEQNUM_OFFSET)
#define NPTL_HEADER_GET_LENGTH(hdr) ((hdr & NPTL_LENGTH_MASK)>>NPTL_LENGTH_OFFSET)
/* The comm_world rank of the sender is stored in the match_bits, but they are
ignored when matching.
bit field
---- -------------------
32-63 Tag
16-31 Context id
0-15 Rank
*/
#define NPTL_MATCH_TAG_OFFSET 32
#define NPTL_MATCH_CTX_OFFSET 16
#define NPTL_MATCH_RANK_MASK (((ptl_match_bits)(1) << 16) - 1)
#define NPTL_MATCH_TAG_MASK ((((ptl_match_bits)(1) << 16) - 1) << NPTL_MATCH_TAG_OFFSET)
#define NPTL_MATCH(tag, ctx) (((ptl_match_bits)(tag) << NPTL_MATCH_TAG_OFFSET) | \
((ptl_match_bits)(ctx) << NPTL_MATCH_CTX_OFFSET) | \
((ptl_match_bits)MPIDI_Process.my_pg_rank))
#define NPTL_MATCH_IGNORE NPTL_MATCH_RANK_MASK
#define NPTL_MATCH_IGNORE_ANY_TAG (NPTL_MATCH_IGNORE | NPTL_MATCH_TAG_MASK)
#define NPTL_MATCH_GET_RANK(match_bits) ((match_bits) & NPTL_MATCH_RANK_MASK)
#define NPTL_MATCH_GET_TAG(match_bits) ((match_bits) >> NPTL_MATCH_TAG_OFFSET)
int MPID_nem_ptl_send_init(void);
int MPID_nem_ptl_send_finalize(void);
......
......@@ -107,7 +107,10 @@ static int ptl_init(MPIDI_PG_t *pg_p, int pg_rank, char **bc_val_p, int *val_max
/* create business card */
mpi_errno = get_business_card(pg_rank, bc_val_p, val_max_sz_p);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
mpi_errno = MPIDI_CH3I_Register_anysource_notification(MPID_nem_ptl_anysource_posted, MPID_nem_ptl_anysource_matched);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
/* init other modules */
mpi_errno = MPID_nem_ptl_poll_init();
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
......
......@@ -92,7 +92,29 @@ int MPID_nem_ptl_poll(int is_blocking_poll)
if (ret == PTL_EQ_EMPTY)
break;
MPIU_ERR_CHKANDJUMP(ret, mpi_errno, MPI_ERR_OTHER, "**ptleqget");
switch (event.type) {
MPID_Request * const req = e->user_ptr;
case PTL_EVENT_PUT:
case PTL_EVENT_GET:
case PTL_EVENT_ACK:
case PTL_EVENT_REPLY:
case PTL_EVENT_SEARCH:
mpi_errno = REQ_PTL(sreq)->event_handler(e);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
break;
case PTL_EVENT_PUT_OVERFLOW:
MPIU_ERR_INTERNALANDJUMP(mpi_errno, "Overflow event");
break
default:
MPIU_ERR_INTERNALANDJUMP(mpi_errno, "Unexpected event type");
}
}
#if 0 /* used for non-matching message passing */
switch (event.type) {
case PTL_EVENT_PUT:
if (event.ni_fail_type) {
......@@ -134,7 +156,9 @@ int MPID_nem_ptl_poll(int is_blocking_poll)
printf("Got unexpected event %d\n", event.type);
break;
}
};
}
#endif
fn_exit:
/* MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_POLL); */
......
......@@ -4,6 +4,37 @@
* See COPYRIGHT in top-level directory.
*/
#undef FUNCNAME
#define FUNCNAME handle_probe
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
int handle_probe(const ptl_event_t *e)
{
int mpi_errno = MPI_SUCCESS;
MPID_Request *const req = e->user_ptr;
MPIDI_STATE_DECL(MPID_STATE_HANDLE_PROBE);
MPIDI_FUNC_ENTER(MPID_STATE_HANDLE_PROBE);
if (e->ni_fail_type == PTL_NI_NO_MATCH) {
REQ_PTL(req)->found = FALSE;
goto fn_exit;
}
REQ_PTL(req)->found = TRUE;
req->status.MPI_SOURCE = NPTL_MATCH_GET_RANK(e->match_bits);
req->status.MPI_TAG = NPTL_MATCH_GET_TAG(e->match_bits);
req->status.count = NPTL_MATCH_GET_LENGTH(e->match_bits);
MPIDI_CH3U_Request_complete(req);
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_HANDLE_PROBE);
return mpi_errno;
fn_fail:
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME MPID_nem_ptl_probe
......@@ -19,31 +50,58 @@ int MPID_nem_ptl_probe(MPIDI_VC_t *vc, int source, int tag, MPID_Comm *comm, int
#define FUNCNAME MPID_nem_ptl_iprobe
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
int MPID_nem_ptl_iprobe(MPIDI_VC_t *vc, int source, int tag, MPID_Comm *comm, int context_offset, int *flag, MPI_Status *status)
int MPID_nem_ptl_iprobe(MPIDI_VC_t *vc, int source, int tag, MPID_Comm *comm, int context_offset, int *flag, MPI_Status *status)
{
int mpi_errno = MPI_SUCCESS;
int ret;
ptl_process_t id_any;
MPID_Request_t *req;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_IPROBE);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_IPROBE);
id_any.phys.nid = PTL_NID_ANY;
id_any.phys.pid = PTL_PID_ANY;
/* create a request */
req = MPID_Request_create();
MPIU_ERR_CHKANDJUMP1(!req, mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s", "MPID_Request_create");
MPIU_Object_set_ref(sreq, 2); /* 1 ref for progress engine and 1 ref for us */
REQ_PTL(req)->search_handler = handle_probe;
/* create a dummy ME to use for searching the list */
me.start = NULL;
me.length = 0;
me.ct_handle = PTL_CT_NONE;
me.uid = PTL_UID_ANY;
me.options = ( PTL_ME_OP_PUT | PTL_ME_IS_ACCESSIBLE | PTL_ME_EVENT_LINK_DISABLE |
PTL_ME_EVENT_UNLINK_DISABLE | PTL_ME_USE_ONCE );
me.options = ( PTL_ME_OP_PUT | PTL_ME_USE_ONCE );
me.min_free = 0;
me.match_bits = NPTL_MATCH(tag, comm->context_id + context_offset);
if (source == MPI_ANY_SOURCE)
me.match_id = PTL_ID_ANY;
me.match_id = id_any;
else
me.match_id = vc_ptl->id;
me.match_bits = NPTL_MATCH(tag, context_id);
if (tag == MPI_ANY_TAG)
me.ignore_bits = NPTL_ANY_TAG_MASK;
me.ignore_bits = NPTL_MATCH_IGNORE_ANY_TAG;
else
me.ignore_bits = 0;
me.min_free = 0;
me.ignore_bits = NPTL_MATCH_IGNORE;
/* submit a search request */
ret = PtlMESearch(MPIDI_nem_ptl_ni, MPIDI_nem_ptl_pt, &me, PTL_SEARCH_ONLY, req);
MPIU_ERR_CHKANDJUMP(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmesearch");
/* wait for search request to complete */
do {
mpierrno = MPID_nem_ptl_poll(FALSE);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
} while (!MPID_Request_is_complete(req));
*flag = REQ_PTL(req)->found;
*status = req->status;
MPID_Request_release(req);
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_IPROBE);
......@@ -56,7 +114,7 @@ int MPID_nem_ptl_iprobe(MPIDI_VC_t *vc, int source, int tag, MPID_Comm *comm, i
#define FUNCNAME MPID_nem_ptl_improbe
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
int MPID_nem_ptl_improbe(MPIDI_VC_t *vc, int source, int tag, MPID_Comm *comm, int context_offset, int *flag,
int MPID_nem_ptl_improbe(MPIDI_VC_t *vc, int source, int tag, MPID_Comm *comm, int context_offset, int *flag,
MPID_Request **message, MPI_Status *status)
{
int mpi_errno = MPI_SUCCESS;
......@@ -113,21 +171,3 @@ int MPID_nem_ptl_anysource_improbe(int tag, MPID_Comm * comm, int context_offset
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME MPID_nem_ptl_cancel_recv
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
int MPID_nem_ptl_cancel_recv(MPIDI_VC_t *vc, MPID_Request_t *rreq)
{
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_CANCEL_RECV);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_CANCEL_RECV);
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_CANCEL_RECV);
return mpi_errno;
fn_fail:
goto fn_exit;
}
......@@ -18,8 +18,8 @@ static void dequeue_req(const ptl_event_t *e)
found = MPIDI_CH3U_Recvq_DP(rreq);
MPIU_Assert(found);
rreq->status.MPI_SOURCE = NPTL_HEADER_SOURCE(e->hdr_data);
rreq->status.MPI_TAG = NPTL_MATCH_GET_TAG(NPTL_HEADER_MATCH_BITS(e->hdr_data));
rreq->status.MPI_SOURCE = NPTL_MATCH_GET_RANK(e->match_bits);
rreq->status.MPI_TAG = NPTL_MATCH_GET_TAG(e->match_bits));
}
#undef FUNCNAME
......@@ -35,7 +35,9 @@ static int handler_recv_complete(const ptl_event_t *e)
MPIDI_STATE_DECL(MPID_STATE_HANDLER_RECV_COMPLETE);
MPIDI_FUNC_ENTER(MPID_STATE_HANDLER_RECV_COMPLETE);
MPIU_Assert(e->type == PTL_EVENT_REPLY || e->type == PTL_EVENT_PUT);
req->dev.recv_data_sz += e->mlength;
req->status.count = req->dev.recv_data_sz;
......@@ -70,7 +72,9 @@ static int handler_recv_dequeue_complete(const ptl_event_t *e)
MPIDI_STATE_DECL(MPID_STATE_HANDLER_RECV_DEQUEUE_COMPLETE);
MPIDI_FUNC_ENTER(MPID_STATE_HANDLER_RECV_DEQUEUE_COMPLETE);
MPIU_Assert(e->type == PTL_EVENT_PUT);
dequeue_req(e);
mpi_errno = handler_recv_complete(e);
......@@ -93,6 +97,8 @@ static int handler_recv_unpack_complete(const ptl_event_t *e)
MPIDI_STATE_DECL(MPID_STATE_HANDLER_RECV_UNPACK_COMPLETE);
MPIDI_FUNC_ENTER(MPID_STATE_HANDLER_RECV_UNPACK_COMPLETE);
MPIU_Assert(e->type == PTL_EVENT_REPLY || e->type == PTL_EVENT_PUT);
unpack_byte(req->dev.segment_ptr, req->dev.segment_first, e->mlength, REQ_PTL(rreq_)->chunk_buffer[0]);
......@@ -117,6 +123,8 @@ static int handler_recv_dequeue_unpack_complete(const ptl_event_t *e)
MPIDI_STATE_DECL(MPID_STATE_HANDLER_RECV_DEQUEUE_UNPACK_COMPLETE);
MPIDI_FUNC_ENTER(MPID_STATE_HANDLER_RECV_DEQUEUE_UNPACK_COMPLETE);
MPIU_Assert(e->type == PTL_EVENT_PUT);
dequeue_req(e);
mpi_errno = handler_recv_unpack_complete(e);
......@@ -145,6 +153,8 @@ static int handler_recv_dequeue_large(const ptl_event_t *e)
MPIDI_STATE_DECL(MPID_STATE_HANDLER_RECV_DEQUEUE_LARGE);
MPIDI_FUNC_ENTER(MPID_STATE_HANDLER_RECV_DEQUEUE_LARGE);
MPIU_Assert(e->type == PTL_EVENT_PUT);
dequeue_req(e);
......@@ -163,11 +173,10 @@ static int handler_recv_dequeue_large(const ptl_event_t *e)
if (dt_contig) {
/* recv buffer is contig */
REQ_PTL(rreq)->event_handler = handler_recv_complete;
ret = PtlGet(MPIDI_nem_ptl_global_md, (ptl_size_t)rreq->user_buf + PTL_LARGE_THRESHOLD, data_sz - PTL_LARGE_THRESHOLD,
vc_ptl->id, vc_ptl->pt, NPTL_HEADER_MATCH_BITS(e->hdr_data), 0, req);
vc_ptl->id, vc_ptl->pt, NPTL_HEADER_GET_SEQNUM(e->hdr_data), 0, req);
MPIU_ERR_CHKANDJUMP(ret, mpi_errno, MPI_ERR_OTHER, "**ptlget");
REQ_PTL(rreq)->put_handler = NULL;
REQ_PTL(rreq)->reply_handler = handler_recv_complete;
goto fn_exit;
}
......@@ -188,13 +197,10 @@ static int handler_recv_dequeue_large(const ptl_event_t *e)
ret = PtlMDBind(MPIDI_nem_ptl_ni, &md, &REQ_PTL(rreq)->md);
MPIU_ERR_CHKANDJUMP(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmdbind");
REQ_PTL(rreq)->put_handler = NULL;
REQ_PTL(rreq)->reply_handler = handler_recv_complete;
REQ_PTL(rreq)->event_handler = handler_recv_complete;
ret = PtlGet(REQ_PTL(rreq)->md, 0, rreq->dev.segment_size - rreq->dev.segment_first, vc_ptl->id, vc_ptl->pt,
NPTL_HEADER_MATCH_BITS(e->hdr_data), 0, req);
NPTL_HEADER_GET_SEQNUM(e->hdr_data), 0, req);
MPIU_ERR_CHKANDJUMP(ret, mpi_errno, MPI_ERR_OTHER, "**ptlget");
REQ_PTL(rreq)->put_handler = NULL;
REQ_PTL(rreq)->reply_handler = handler_recv_complete;
goto fn_exit;
}
......@@ -202,12 +208,11 @@ static int handler_recv_dequeue_large(const ptl_event_t *e)
/* FIXME: For now, allocate a single large buffer to hold entire message */
MPIU_CHKPMEM_MALLOC(REQ_PTL(rreq)->chunk_buffer[0], void *, rreq->dev.segment_size - rreq->dev.segment_first, mpi_errno, "chunk_buffer");
REQ_PTL(rreq)->event_handler = handler_recv_unpack_complete;
ret = PtlGet(MPIDI_nem_ptl_global_md, (ptl_size_t)REQ_PTL(rreq)->chunk_buffer[0],
rreq->dev.segment_size - rreq->dev.segment_first, vc_ptl->id, vc_ptl->pt,
NPTL_HEADER_MATCH_BITS(e->hdr_data), 0, req);
NPTL_HEADER_GET_SEQNUM(e->hdr_data), 0, req);
MPIU_ERR_CHKANDJUMP(ret, mpi_errno, MPI_ERR_OTHER, "**ptlget");
REQ_PTL(rreq)->put_handler = NULL;
REQ_PTL(rreq)->reply_handler = handler_recv_unpack_complete;
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_HANDLER_RECV_DEQUEUE_LARGE);
......@@ -227,6 +232,7 @@ static int handler_recv_dequeue_unpack_large(const ptl_event_t *e)
MPIDI_STATE_DECL(MPID_STATE_HANDLER_RECV_DEQUEUE_UNPACK_LARGE);
MPIDI_FUNC_ENTER(MPID_STATE_HANDLER_RECV_DEQUEUE_UNPACK_LARGE);
MPIU_Assert(e->type == PTL_EVENT_PUT);
dequeue_req(e);
......@@ -248,12 +254,11 @@ static int handler_recv_dequeue_unpack_large(const ptl_event_t *e)
MPIU_CHKPMEM_MALLOC(REQ_PTL(rreq)->chunk_buffer[0], void *, rreq->dev.segment_size - rreq->dev.segment_first, mpi_errno, "chunk_buffer");
REQ_PTL(rreq)->event_handler = handler_recv_unpack_complete;
ret = PtlGet(MPIDI_nem_ptl_global_md, (ptl_size_t)REQ_PTL(rreq)->chunk_buffer[0],
rreq->dev.segment_size - rreq->dev.segment_first, vc_ptl->id, vc_ptl->pt,
NPTL_HEADER_MATCH_BITS(e->hdr_data), 0, req);
NPTL_HEADER_GET_SEQNUM(e->hdr_data), 0, req);
MPIU_ERR_CHKANDJUMP(ret, mpi_errno, MPI_ERR_OTHER, "**ptlget");
REQ_PTL(rreq)->put_handler = NULL;
REQ_PTL(rreq)->reply_handler = handler_recv_unpack_complete;
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_HANDLER_DEQUEUE_RECV_UNPACK_LARGE);
......@@ -277,19 +282,27 @@ int MPID_nem_ptl_recv_posted(MPIDI_VC_t *vc, MPID_Request_t *rreq)
MPID_Datatype *dt_ptr;
MPI_Aint dt_true_lb;
MPI_Aint last;
ptl_process_t id_any;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_RECV_POSTED);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_RECV_POSTED);
id_any.phys.nid = PTL_NID_ANY;
id_any.phys.pid = PTL_PID_ANY;
MPID_nem_ptl_init_req(rreq);
me.ct_handle = PTL_CT_NONE;
me.uid = PTL_UID_ANY;
me.options = ( PTL_ME_OP_PUT | PTL_ME_IS_ACCESSIBLE | PTL_ME_EVENT_LINK_DISABLE |
PTL_ME_EVENT_UNLINK_DISABLE | PTL_ME_USE_ONCE );
me.match_id = vc_ptl->id;
if (vc == NULL)
/* MPI_ANY_SOURCE receive */
me.match_id = id_any;
else
me.match_id = vc_ptl->id;
me.match_bits = NPTL_MATCH(rreq->dev.match.parts.tag, rreq->dev.match.parts.context_id);
me.ignore_bits = 0;
me.ignore_bits = NPTL_MATCH_IGNORE;
me.min_free = 0;
MPIDI_Datatype_get_info(rreq->dev.user_count, rreq->dev.datatype, dt_contig, data_sz, dt_ptr, dt_true_lb);
......@@ -299,7 +312,7 @@ int MPID_nem_ptl_recv_posted(MPIDI_VC_t *vc, MPID_Request_t *rreq)
/* small contig message */
me.start = rreq->dev.user_buf;
me.length = data_sz;
REQ_PTL(rreq)->put_handler = handler_recv_dequeue_complete;
REQ_PTL(rreq)->event_handler = handler_recv_dequeue_complete;
} else {
/* small noncontig */
rreq->dev.segment_ptr = MPID_Segment_alloc();
......@@ -317,14 +330,14 @@ int MPID_nem_ptl_recv_posted(MPIDI_VC_t *vc, MPID_Request_t *rreq)
me.start = rreq->dev.iov;
me.length = rreq->dev.iov_count;
me.options |= PTL_IOVEC;
REQ_PTL(rreq)->put_handler = handler_recv_dequeue_complete;
REQ_PTL(rreq)->event_handler = handler_recv_dequeue_complete;
} else {
/* IOV is not long enough to describe entire message: recv into
buffer and unpack later */
MPIU_CHKPMEM_MALLOC(REQ_PTL(req)->chunk_buffer[0], void *, data_sz, mpi_errno, "chunk_buffer");
me.start = REQ_PTL(rreq)->chunk_buffer[0];
me.length = data_sz;
REQ_PTL(rreq)->put_handler = handler_recv_dequeue_unpack_complete;
REQ_PTL(rreq)->event_handler = handler_recv_dequeue_unpack_complete;
}
}
} else {
......@@ -333,7 +346,7 @@ int MPID_nem_ptl_recv_posted(MPIDI_VC_t *vc, MPID_Request_t *rreq)
/* large contig message */
me.start = rreq->dev.user_buf;
me.length = PTL_LARGE_THRESHOLD;
REQ_PTL(req_)->put_handler = handler_recv_dequeue_large;
REQ_PTL(req_)->event_handler = handler_recv_dequeue_large;
} else {
/* large noncontig */
rreq->dev.segment_ptr = MPID_Segment_alloc();
......@@ -352,14 +365,14 @@ int MPID_nem_ptl_recv_posted(MPIDI_VC_t *vc, MPID_Request_t *rreq)
me.start = rreq->dev.iov;
me.length = rreq->dev.iov_count;
me.options |= PTL_IOVEC;
REQ_PTL(rreq)->put_handler = handler_recv_dequeue_large;
REQ_PTL(rreq)->event_handler = handler_recv_dequeue_large;
} else {
/* IOV is not long enough to describe the first chunk: recv into
buffer and unpack later */
MPIU_CHKPMEM_MALLOC(REQ_PTL(req)->chunk_buffer[0], void *, PTL_LARGE_THRESHOLD, mpi_errno, "chunk_buffer");
me.start = REQ_PTL(req)->chunk_buffer[0];
me.length = PTL_LARGE_THRESHOLD;
REQ_PTL(rreq)->put_handler = handler_recv_dequeue_unpack_large;
REQ_PTL(rreq)->event_handler = handler_recv_dequeue_unpack_large;
}
}
......@@ -376,6 +389,73 @@ int MPID_nem_ptl_recv_posted(MPIDI_VC_t *vc, MPID_Request_t *rreq)
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME MPID_nem_ptl_anysource_posted
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
void MPID_nem_ptl_anysource_posted(MPID_Request *rreq)
{
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_ANYSOURCE_POSTED);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_ANYSOURCE_POSTED);
mpi_errno = MPID_nem_ptl_recv_posted(NULL, rreq);
/* FIXME: This function is void, so we can't return an error. This function
cannot return an error because the queue functions (where the posted_recv
hooks are called) return no error code. */
MPIU_Assertp(mpi_errno == MPI_SUCCESS);
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_ANYSOURCE_POSTED);
}
#undef FUNCNAME
#define FUNCNAME cancel_recv
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
static int cancel_recv(MPID_Request *rreq, int *cancelled)
{
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_CANCEL_RECV);
MPIDI_FUNC_ENTER(MPID_STATE_CANCEL_RECV);
MPIU_Assert(0 && "FIXME: Need to implement cancel_recv");
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_CANCEL_RECV);
return mpi_errno;
fn_fail:
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME MPID_nem_ptl_anysource_matched
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
int MPID_nem_ptl_anysource_matched(MPID_Request *rreq)
{
int mpi_errno = MPI_SUCCESS;
int cancelled;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_ANYSOURCE_MATCHED);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_ANYSOURCE_MATCHED);
mpi_errno = cancel_recv(rreq, &cancelled);
/* FIXME: This function is does not return an error because the queue
functions (where the posted_recv hooks are called) return no error
code. */
MPIU_Assertp(mpi_errno == MPI_SUCCESS);
return !cancelled;
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_ANYSOURCE_MATCHED);
}
#undef FUNCNAME
#define FUNCNAME MPID_nem_ptl_cancel_recv
#undef FCNAME
......@@ -387,7 +467,7 @@ int MPID_nem_ptl_cancel_recv(MPIDI_VC_t *vc, MPID_Request *rreq)
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_CANCEL_RECV);
***********
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_CANCEL_RECV);
......
......@@ -735,6 +735,8 @@ static int handler_send_complete(const ptl_event_t *e)
MPIDI_FUNC_ENTER(MPID_STATE_HANDLER_SEND_COMPLETE);
MPIU_Assert(e->type == PTL_EVENT_ACK || e->type == PTL_EVENT_PUT || e->type == PTL_EVENT_GET);
if (REQ_PTL(sreq)->me != PTL_INVALID_HANDLE) {
ret = PtlMEUnlink(REQ_PTL(sreq)->me);
MPIU_ERR_CHKANDJUMP(ret, mpi_errno, MPI_ERR_OTHER, "**ptlmeunlink");
......@@ -770,13 +772,14 @@ static int handler_large(const ptl_event_t *e)