Commit ec920e5f authored by Valentin Petrov's avatar Valentin Petrov Committed by Charles J Archer
Browse files

OFI: Add support for large tags using immediate data and OFI tag layouts



This patch modifies the OFI netmod to support large tag layouts, while preserving the old
tag layout.  OFI defines a 64 bit tag, but also provides for a 64 bit tag and immediate data.
In some OFI providers, we may want to select different tag layouts.  This patch currently
does not query for the proper tag layout or attempt to make a choice of the optimal layout,
it provides macro/templatized support for different tag formats.  Additional selection
criteria will be added in subsequent patches.

  * Tag layout is moved to a separate file.
    Added init_sendtag_M2, init_recvtag_M2 (M2 stands for MODE #2, i.e. the mode
    that uses fi_tsenddata and does not pack source into tag).

  * Created a template file for ofi_tagged.c
    Moved do_isend into template file which is included twice into ofi_tagged.c thus providing for the two
    versions of do_isend and do_isend_2 corresponding to the two API sets.

  * All send functions are available in two versions.
    Added macro that declares a function for the two API sets. The first set has the namings inherited from
    the previous netmod version. The functions of the second API set have the "_2" suffix.

  * Recv_posted, anysource_posted, recv_callback, ofi_probe  are templatized.

  * ofi_tag_to_vc renamed ofi_wc_to_vc
    Note, for the API_SET_2 the pgid is stored in the imm data while
    psource and port will be packed the same way as in API_SET_1.

  * Adds api_set member in gl_data struct.  Initialize routines based on api_set

  * Added RCD (RtsCtsData) protocol identifiers

  * Added support for OFI MEM_TAG_FORMAT

  * PGID placement modified
Signed-off-by: default avatarCharles J Archer <charles.j.archer@intel.com>
parent eb69611e
......@@ -22,14 +22,21 @@
/* ------------------------------------------------------------------------ */
#undef FCNAME
#define FCNAME DECL_FUNC(ofi_tag_to_vc)
static inline MPIDI_VC_t *ofi_tag_to_vc(uint64_t match_bits)
static inline MPIDI_VC_t *ofi_wc_to_vc(cq_tagged_entry_t * wc)
{
int pgid = 0, port = 0;
MPIDI_VC_t *vc = NULL;
MPIDI_PG_t *pg = NULL;
uint64_t match_bits = wc->tag;
int wc_pgid;
BEGIN_FUNC(FCNAME);
if (NO_PGID == get_pgid(match_bits)) {
if (gl_data.api_set == API_SET_1) {
wc_pgid = get_pgid(match_bits);
} else {
wc_pgid = wc->data;
}
if (NO_PGID == wc_pgid) {
/* -------------------------------------------------------------------- */
/* Dynamic path -- This uses a linear search, but number of cm vc's is */
/* a small number, and they should be ephemeral. This lookup should */
......@@ -53,7 +60,7 @@ static inline MPIDI_VC_t *ofi_tag_to_vc(uint64_t match_bits)
pg = gl_data.pg_p;
while (pg) {
MPIDI_PG_IdToNum(pg, &pgid);
if (get_pgid(match_bits) == pgid) {
if (wc_pgid == pgid) {
break;
}
pg = pg->next;
......@@ -102,13 +109,13 @@ static inline int MPID_nem_ofi_conn_req_callback(cq_tagged_entry_t * wc, MPID_Re
bc[wc->len] = '\0';
MPIU_Assert(gl_data.conn_req == rreq);
FI_RC(fi_trecv(gl_data.endpoint,
gl_data.conn_req->dev.user_buf,
OFI_KVSAPPSTRLEN,
gl_data.mr,
FI_ADDR_UNSPEC,
MPID_CONN_REQ,
~MPID_PROTOCOL_MASK,
(void *) &(REQ_OFI(gl_data.conn_req)->ofi_context)), trecv);
gl_data.conn_req->dev.user_buf,
OFI_KVSAPPSTRLEN,
gl_data.mr,
FI_ADDR_UNSPEC,
MPID_CONN_REQ,
GET_RCD_IGNORE_MASK(),
(void *) &(REQ_OFI(gl_data.conn_req)->ofi_context)), trecv);
addr = MPIU_Malloc(gl_data.bound_addrlen);
MPIU_Assertp(addr);
......@@ -204,7 +211,7 @@ static inline int MPID_nem_ofi_preposted_callback(cq_tagged_entry_t * wc, MPID_R
MPID_Request *new_rreq, *sreq;
BEGIN_FUNC(FCNAME);
vc = ofi_tag_to_vc(wc->tag);
vc = ofi_wc_to_vc(wc);
MPIU_Assert(vc);
VC_READY_CHECK(vc);
......@@ -243,12 +250,13 @@ static inline int MPID_nem_ofi_preposted_callback(cq_tagged_entry_t * wc, MPID_R
rreq->dev.user_count = 0;
FI_RC(fi_trecv(gl_data.endpoint,
&rreq->dev.user_count,
sizeof rreq->dev.user_count,
gl_data.mr,
FI_ADDR_UNSPEC,
MPID_MSG_RTS,
~MPID_PROTOCOL_MASK, &(REQ_OFI(rreq)->ofi_context)), trecv);
&rreq->dev.user_count,
sizeof rreq->dev.user_count,
gl_data.mr,
FI_ADDR_UNSPEC,
MPID_MSG_RTS,
GET_RCD_IGNORE_MASK(),
&(REQ_OFI(rreq)->ofi_context)), trecv);
END_FUNC_RC(FCNAME);
}
......@@ -289,10 +297,17 @@ int MPID_nem_ofi_cm_init(MPIDI_PG_t * pg_p, int pg_rank ATTRIBUTE((unused)))
/* ------------------------------------- */
/* Set up CH3 and netmod data structures */
/* ------------------------------------- */
MPI_RC(MPIDI_CH3I_Register_anysource_notification(MPID_nem_ofi_anysource_posted,
MPID_nem_ofi_anysource_matched));
MPIDI_Anysource_iprobe_fn = MPID_nem_ofi_anysource_iprobe;
MPIDI_Anysource_improbe_fn = MPID_nem_ofi_anysource_improbe;
if (gl_data.api_set == API_SET_1) {
MPI_RC(MPIDI_CH3I_Register_anysource_notification(MPID_nem_ofi_anysource_posted,
MPID_nem_ofi_anysource_matched));
MPIDI_Anysource_iprobe_fn = MPID_nem_ofi_anysource_iprobe;
MPIDI_Anysource_improbe_fn = MPID_nem_ofi_anysource_improbe;
} else {
MPI_RC(MPIDI_CH3I_Register_anysource_notification(MPID_nem_ofi_anysource_posted_2,
MPID_nem_ofi_anysource_matched));
MPIDI_Anysource_iprobe_fn = MPID_nem_ofi_anysource_iprobe_2;
MPIDI_Anysource_improbe_fn = MPID_nem_ofi_anysource_improbe_2;
}
gl_data.pg_p = pg_p;
/* ----------------------------------- */
......@@ -304,13 +319,13 @@ int MPID_nem_ofi_cm_init(MPIDI_PG_t * pg_p, int pg_rank ATTRIBUTE((unused)))
REQ_OFI(persistent_req)->vc = NULL;
REQ_OFI(persistent_req)->event_callback = MPID_nem_ofi_preposted_callback;
FI_RC(fi_trecv(gl_data.endpoint,
&persistent_req->dev.user_count,
sizeof persistent_req->dev.user_count,
gl_data.mr,
FI_ADDR_UNSPEC,
MPID_MSG_RTS,
~MPID_PROTOCOL_MASK,
(void *) &(REQ_OFI(persistent_req)->ofi_context)), trecv);
&persistent_req->dev.user_count,
sizeof persistent_req->dev.user_count,
gl_data.mr,
FI_ADDR_UNSPEC,
MPID_MSG_RTS,
GET_RCD_IGNORE_MASK(),
(void *) &(REQ_OFI(persistent_req)->ofi_context)), trecv);
gl_data.persistent_req = persistent_req;
/* --------------------------------- */
......@@ -323,12 +338,13 @@ int MPID_nem_ofi_cm_init(MPIDI_PG_t * pg_p, int pg_rank ATTRIBUTE((unused)))
REQ_OFI(conn_req)->vc = NULL; /* We don't know the source yet */
REQ_OFI(conn_req)->event_callback = MPID_nem_ofi_conn_req_callback;
FI_RC(fi_trecv(gl_data.endpoint,
conn_req->dev.user_buf,
OFI_KVSAPPSTRLEN,
gl_data.mr,
FI_ADDR_UNSPEC,
MPID_CONN_REQ,
~MPID_PROTOCOL_MASK, (void *) &(REQ_OFI(conn_req)->ofi_context)), trecv);
conn_req->dev.user_buf,
OFI_KVSAPPSTRLEN,
gl_data.mr,
FI_ADDR_UNSPEC,
MPID_CONN_REQ,
GET_RCD_IGNORE_MASK(),
(void *) &(REQ_OFI(conn_req)->ofi_context)), trecv);
gl_data.conn_req = conn_req;
......@@ -536,13 +552,24 @@ int MPID_nem_ofi_connect_to_root(const char *business_card, MPIDI_VC_t * new_vc)
sreq->dev.next = NULL;
REQ_OFI(sreq)->event_callback = MPID_nem_ofi_connect_to_root_callback;
REQ_OFI(sreq)->pack_buffer = my_bc;
conn_req_send_bits = init_sendtag(0, MPIR_Process.comm_world->rank, 0, MPID_CONN_REQ);
FI_RC(fi_tsend(gl_data.endpoint,
REQ_OFI(sreq)->pack_buffer,
my_bc_len,
gl_data.mr,
VC_OFI(new_vc)->direct_addr,
conn_req_send_bits, &(REQ_OFI(sreq)->ofi_context)), tsend);
if (gl_data.api_set == API_SET_1) {
conn_req_send_bits = init_sendtag(0, MPIR_Process.comm_world->rank, 0, MPID_CONN_REQ);
FI_RC(fi_tsend(gl_data.endpoint,
REQ_OFI(sreq)->pack_buffer,
my_bc_len,
gl_data.mr,
VC_OFI(new_vc)->direct_addr,
conn_req_send_bits, &(REQ_OFI(sreq)->ofi_context)), tsend);
} else {
conn_req_send_bits = init_sendtag_2(0, 0, MPID_CONN_REQ);
FI_RC(fi_tsenddata(gl_data.endpoint,
REQ_OFI(sreq)->pack_buffer,
my_bc_len,
gl_data.mr,
MPIR_Process.comm_world->rank,
VC_OFI(new_vc)->direct_addr,
conn_req_send_bits, &(REQ_OFI(sreq)->ofi_context)), tsend);
}
MPID_nem_ofi_poll(MPID_NONBLOCKING_POLL);
VC_OFI(new_vc)->is_cmvc = 1;
VC_OFI(new_vc)->next = gl_data.cm_vcs;
......
......@@ -21,6 +21,7 @@
#include <rdma/fi_cm.h>
#include <netdb.h>
#include "ofi_tag_layout.h"
/* ************************************************************************** */
/* Type Definitions */
/* ************************************************************************** */
......@@ -56,6 +57,7 @@ typedef struct {
MPID_Request *persistent_req; /* Unexpected request queue */
MPID_Request *conn_req; /* Connection request */
MPIDI_Comm_ops_t comm_ops;
int api_set;
} MPID_nem_ofi_global_t;
/* ******************************** */
......@@ -173,37 +175,11 @@ fn_fail: \
#define NO_PGID 0
/* **************************************************************************
* match/ignore bit manipulation
* **************************************************************************
* 0123 4567 01234567 0123 4567 01234567 0123 4567 01234567 01234567 01234567
* | | |
* ^ | context id | source | message tag
* | | | |
* +---- protocol
* ************************************************************************** */
#define MPID_PROTOCOL_MASK (0xF000000000000000ULL)
#define MPID_CONTEXT_MASK (0x0FFFF00000000000ULL)
#define MPID_SOURCE_MASK (0x00000FFFF0000000ULL)
#define MPID_TAG_MASK (0x000000000FFFFFFFULL)
#define MPID_PGID_MASK (0x00000000FFFFFFFFULL)
#define MPID_PSOURCE_MASK (0x0000FFFF00000000ULL)
#define MPID_PORT_NAME_MASK (0x0FFF000000000000ULL)
#define MPID_SYNC_SEND (0x1000000000000000ULL)
#define MPID_SYNC_SEND_ACK (0x2000000000000000ULL)
#define MPID_MSG_RTS (0x3000000000000000ULL)
#define MPID_MSG_CTS (0x4000000000000000ULL)
#define MPID_MSG_DATA (0x5000000000000000ULL)
#define MPID_CONN_REQ (0x6000000000000000ULL)
#define MPID_SOURCE_SHIFT (16)
#define MPID_TAG_SHIFT (28)
#define MPID_PSOURCE_SHIFT (16)
#define MPID_PORT_SHIFT (32)
#define OFI_KVSAPPSTRLEN 1024
#define PEEK_INIT 0
#define PEEK_FOUND 1
#define MEM_TAG_FORMAT (0xFFFF00000000LLU)
/* ******************************** */
/* Request manipulation inlines */
/* ******************************** */
......@@ -224,97 +200,42 @@ static inline int MPID_nem_ofi_create_req(MPID_Request ** request, int refcnt)
return mpi_errno;
}
/* ******************************** */
/* Tag Manipulation inlines */
/* ******************************** */
static inline uint64_t init_sendtag(MPIR_Context_id_t contextid, int source, int tag, uint64_t type)
{
uint64_t match_bits;
match_bits = contextid;
match_bits = (match_bits << MPID_SOURCE_SHIFT);
match_bits |= source;
match_bits = (match_bits << MPID_TAG_SHIFT);
match_bits |= (MPID_TAG_MASK & tag) | type;
return match_bits;
}
/* receive posting */
static inline uint64_t init_recvtag(uint64_t * mask_bits,
MPIR_Context_id_t contextid, int source, int tag)
{
uint64_t match_bits = 0;
*mask_bits = MPID_SYNC_SEND;
match_bits = contextid;
match_bits = (match_bits << MPID_SOURCE_SHIFT);
if (MPI_ANY_SOURCE == source) {
match_bits = (match_bits << MPID_TAG_SHIFT);
*mask_bits |= MPID_SOURCE_MASK;
}
else {
match_bits |= source;
match_bits = (match_bits << MPID_TAG_SHIFT);
}
if (MPI_ANY_TAG == tag)
*mask_bits |= MPID_TAG_MASK;
else
match_bits |= (MPID_TAG_MASK & tag);
return match_bits;
}
static inline int get_tag(uint64_t match_bits)
{
return ((int) (match_bits & MPID_TAG_MASK));
}
static inline int get_source(uint64_t match_bits)
{
return ((int) ((match_bits & MPID_SOURCE_MASK) >> (MPID_TAG_SHIFT)));
}
static inline int get_psource(uint64_t match_bits)
{
return ((int) ((match_bits & MPID_PSOURCE_MASK) >> (MPID_PORT_SHIFT)));
}
static inline int get_pgid(uint64_t match_bits)
{
return ((int) (match_bits & MPID_PGID_MASK));
}
static inline int get_port(uint64_t match_bits)
{
return ((int) ((match_bits & MPID_PORT_NAME_MASK) >> MPID_TAG_SHIFT));
}
/* ************************************************************************** */
/* MPICH Comm Override and Netmod functions */
/* ************************************************************************** */
int MPID_nem_ofi_recv_posted(struct MPIDI_VC *vc, struct MPID_Request *req);
int MPID_nem_ofi_send(struct MPIDI_VC *vc, const void *buf, int count,
MPI_Datatype datatype, int dest, int tag, MPID_Comm * comm,
int context_offset, struct MPID_Request **request);
int MPID_nem_ofi_isend(struct MPIDI_VC *vc, const void *buf, int count,
MPI_Datatype datatype, int dest, int tag, MPID_Comm * comm,
int context_offset, struct MPID_Request **request);
int MPID_nem_ofi_ssend(struct MPIDI_VC *vc, const void *buf, int count,
MPI_Datatype datatype, int dest, int tag, MPID_Comm * comm,
int context_offset, struct MPID_Request **request);
int MPID_nem_ofi_issend(struct MPIDI_VC *vc, const void *buf, int count,
MPI_Datatype datatype, int dest, int tag, MPID_Comm * comm,
int context_offset, struct MPID_Request **request);
#define DECLARE_TWO_API_SETS(_ret, _fc_name, ...) \
_ret _fc_name(__VA_ARGS__); \
_ret _fc_name##_2(__VA_ARGS__);
DECLARE_TWO_API_SETS(int, MPID_nem_ofi_recv_posted, struct MPIDI_VC *vc, struct MPID_Request *req);
DECLARE_TWO_API_SETS(int, MPID_nem_ofi_send, struct MPIDI_VC *vc, const void *buf, int count,\
MPI_Datatype datatype, int dest, int tag, MPID_Comm * comm,\
int context_offset, struct MPID_Request **request);
DECLARE_TWO_API_SETS(int, MPID_nem_ofi_isend, struct MPIDI_VC *vc, const void *buf, int count,\
MPI_Datatype datatype, int dest, int tag, MPID_Comm * comm,\
int context_offset, struct MPID_Request **request);
DECLARE_TWO_API_SETS(int, MPID_nem_ofi_ssend, struct MPIDI_VC *vc, const void *buf, int count,\
MPI_Datatype datatype, int dest, int tag, MPID_Comm * comm,
int context_offset, struct MPID_Request **request);
DECLARE_TWO_API_SETS(int, MPID_nem_ofi_issend, struct MPIDI_VC *vc, const void *buf, int count,\
MPI_Datatype datatype, int dest, int tag, MPID_Comm * comm,\
int context_offset, struct MPID_Request **request);
int MPID_nem_ofi_cancel_send(struct MPIDI_VC *vc, struct MPID_Request *sreq);
int MPID_nem_ofi_cancel_recv(struct MPIDI_VC *vc, struct MPID_Request *rreq);
int MPID_nem_ofi_iprobe(struct MPIDI_VC *vc, int source, int tag, MPID_Comm * comm,
int context_offset, int *flag, MPI_Status * status);
int MPID_nem_ofi_improbe(struct MPIDI_VC *vc, int source, int tag, MPID_Comm * comm,
int context_offset, int *flag, MPID_Request ** message,
MPI_Status * status);
int MPID_nem_ofi_anysource_iprobe(int tag, MPID_Comm * comm, int context_offset,
int *flag, MPI_Status * status);
int MPID_nem_ofi_anysource_improbe(int tag, MPID_Comm * comm, int context_offset,
int *flag, MPID_Request ** message, MPI_Status * status);
void MPID_nem_ofi_anysource_posted(MPID_Request * rreq);
DECLARE_TWO_API_SETS(int, MPID_nem_ofi_iprobe, struct MPIDI_VC *vc, int source, int tag, MPID_Comm * comm,
int context_offset, int *flag, MPI_Status * status);
DECLARE_TWO_API_SETS(int, MPID_nem_ofi_improbe,struct MPIDI_VC *vc, int source, int tag, MPID_Comm * comm,
int context_offset, int *flag, MPID_Request ** message,
MPI_Status * status);
DECLARE_TWO_API_SETS(int, MPID_nem_ofi_anysource_iprobe,int tag, MPID_Comm * comm, int context_offset,
int *flag, MPI_Status * status);
DECLARE_TWO_API_SETS(int, MPID_nem_ofi_anysource_improbe,int tag, MPID_Comm * comm, int context_offset,
int *flag, MPID_Request ** message, MPI_Status * status);
DECLARE_TWO_API_SETS(void, MPID_nem_ofi_anysource_posted, MPID_Request * rreq);
int MPID_nem_ofi_anysource_matched(MPID_Request * rreq);
int MPID_nem_ofi_send_data(cq_tagged_entry_t * wc, MPID_Request * sreq);
int MPID_nem_ofi_SendNoncontig(MPIDI_VC_t * vc, MPID_Request * sreq,
......
......@@ -83,6 +83,9 @@ int MPID_nem_ofi_init(MPIDI_PG_t * pg_p, int pg_rank, char **bc_val_p, int *val_
hints->caps = FI_TAGGED; /* Tag matching interface */
hints->caps |= FI_DYNAMIC_MR; /* Global dynamic mem region */
hints->ep_attr->mem_tag_format = MEM_TAG_FORMAT;
MPIU_Assert(pg_p->size < ((1 << MPID_RANK_BITS) - 1));
/* ------------------------------------------------------------------------ */
/* FI_VERSION provides binary backward and forward compatibility support */
/* Specify the version of OFI is coded to, the provider will select struct */
......@@ -124,6 +127,8 @@ int MPID_nem_ofi_init(MPIDI_PG_t * pg_p, int pg_rank, char **bc_val_p, int *val_
&gl_data.fabric, /* Out: Fabric descriptor */
NULL), openfabric); /* Context: fabric events */
gl_data.api_set = API_SET_1;
/* ------------------------------------------------------------------------ */
/* Create the access domain, which is the physical or virtual network or */
/* hardware port/collection of ports. Returns a domain object that can be */
......@@ -236,7 +241,7 @@ int MPID_nem_ofi_init(MPIDI_PG_t * pg_p, int pg_rank, char **bc_val_p, int *val_
/* -------------------------------- */
/* Set the MPI maximum tag value */
/* -------------------------------- */
MPIR_Process.attrs.tag_ub = (1 << MPID_TAG_SHIFT) - 1;
MPIR_Process.attrs.tag_ub = (1 << MPID_TAG_BITS) - 1;
/* --------------------------------- */
/* Wait for all the ranks to publish */
......
......@@ -24,13 +24,18 @@
} else { \
pgid = NO_PGID; \
} \
match_bits = (uint64_t)MPIR_Process.comm_world->rank << \
(MPID_PORT_SHIFT); \
if (0 == pgid) { \
if (gl_data.api_set == API_SET_1){ \
match_bits = ((uint64_t)pgid << MPID_PGID_SHIFT); \
}else{ \
match_bits = 0; \
} \
if (NO_PGID == pgid) { \
match_bits |= (uint64_t)vc->port_name_tag<< \
(MPID_PORT_SHIFT+MPID_PSOURCE_SHIFT); \
(MPID_PORT_SHIFT); \
}else{ \
match_bits |= (uint64_t)MPIR_Process.comm_world->rank << \
(MPID_PSOURCE_SHIFT); \
} \
match_bits |= pgid; \
match_bits |= MPID_MSG_RTS; \
})
......@@ -80,15 +85,26 @@
VC_OFI(vc)->direct_addr, \
match_bits | MPID_MSG_CTS, \
0, /* Exact tag match, no ignore bits */ \
&(REQ_OFI(cts_req)->ofi_context)),trecv); \
FI_RC(fi_tsend(gl_data.endpoint, \
&REQ_OFI(sreq)->pack_buffer_size, \
sizeof(REQ_OFI(sreq)->pack_buffer_size), \
gl_data.mr, \
VC_OFI(vc)->direct_addr, \
match_bits, \
&(REQ_OFI(sreq)->ofi_context)),tsend); \
})
&(REQ_OFI(cts_req)->ofi_context)),trecv); \
if (gl_data.api_set == API_SET_1){ \
FI_RC(fi_tsend(gl_data.endpoint, \
&REQ_OFI(sreq)->pack_buffer_size, \
sizeof(REQ_OFI(sreq)->pack_buffer_size), \
gl_data.mr, \
VC_OFI(vc)->direct_addr, \
match_bits, \
&(REQ_OFI(sreq)->ofi_context)),tsend); \
}else{ \
FI_RC(fi_tsenddata(gl_data.endpoint, \
&REQ_OFI(sreq)->pack_buffer_size, \
sizeof(REQ_OFI(sreq)->pack_buffer_size), \
gl_data.mr, \
pgid, \
VC_OFI(vc)->direct_addr, \
match_bits, \
&(REQ_OFI(sreq)->ofi_context)),tsend); \
} \
})
/* ------------------------------------------------------------------------ */
......@@ -109,13 +125,13 @@ static int MPID_nem_ofi_data_callback(cq_tagged_entry_t * wc, MPID_Request * sre
vc = REQ_OFI(sreq)->vc;
REQ_OFI(sreq)->tag = tag | MPID_MSG_DATA;
FI_RC(fi_tsend(gl_data.endpoint,
REQ_OFI(sreq)->pack_buffer,
REQ_OFI(sreq)->pack_buffer_size,
gl_data.mr,
VC_OFI(vc)->direct_addr,
wc->tag | MPID_MSG_DATA, (void *) &(REQ_OFI(sreq)->ofi_context)), tsend);
}
if (MPID_cc_get(sreq->cc) == 1) {
REQ_OFI(sreq)->pack_buffer,
REQ_OFI(sreq)->pack_buffer_size,
gl_data.mr,
VC_OFI(vc)->direct_addr,
MPID_MSG_DATA, (void *) &(REQ_OFI(sreq)->ofi_context)), tsend);
break;
case MPID_MSG_DATA:
if (REQ_OFI(sreq)->pack_buffer)
MPIU_Free(REQ_OFI(sreq)->pack_buffer);
......
#if (API_SET != API_SET_1) && (API_SET != API_SET_2)
#error Undefined API SET
#endif
/* ------------------------------------------------------------------------ */
/* peek_callback called when a successful peek is completed */
/* ------------------------------------------------------------------------ */
#undef FCNAME
#define FCNAME DECL_FUNC(peek_callback)
static int
ADD_SUFFIX(peek_callback)(cq_tagged_entry_t * wc, MPID_Request * rreq)
{
int mpi_errno = MPI_SUCCESS;
BEGIN_FUNC(FCNAME);
REQ_OFI(rreq)->match_state = PEEK_FOUND;
#if API_SET == API_SET_1
rreq->status.MPI_SOURCE = get_source(wc->tag);
#elif API_SET == API_SET_2
rreq->status.MPI_SOURCE = wc->data;
#endif
rreq->status.MPI_TAG = get_tag(wc->tag);
MPIR_STATUS_SET_COUNT(rreq->status, wc->len);
rreq->status.MPI_ERROR = MPI_SUCCESS;
END_FUNC(FCNAME);
return mpi_errno;
}
#undef FCNAME
#define FCNAME DECL_FUNC(MPID_nem_ofi_iprobe_impl)
int ADD_SUFFIX(MPID_nem_ofi_iprobe_impl)(struct MPIDI_VC *vc,
int source,
int tag,
MPID_Comm * comm,
int context_offset,
int *flag, MPI_Status * status, MPID_Request ** rreq_ptr)
{
int ret, mpi_errno = MPI_SUCCESS;
fi_addr_t remote_proc = 0;
uint64_t match_bits, mask_bits;
size_t len;
MPID_Request rreq_s, *rreq;
BEGIN_FUNC(FCNAME);
if (rreq_ptr) {
MPIDI_Request_create_rreq(rreq, mpi_errno, goto fn_exit);
*rreq_ptr = rreq;
rreq->comm = comm;
rreq->dev.match.parts.rank = source;
rreq->dev.match.parts.tag = tag;
rreq->dev.match.parts.context_id = comm->context_id;
MPIR_Comm_add_ref(comm);
}
else {
rreq = &rreq_s;
rreq->dev.OnDataAvail = NULL;
}
REQ_OFI(rreq)->event_callback = ADD_SUFFIX(peek_callback);
REQ_OFI(rreq)->match_state = PEEK_INIT;
OFI_ADDR_INIT(source, vc, remote_proc);
#if API_SET == API_SET_1
match_bits = init_recvtag(&mask_bits, comm->context_id + context_offset, source, tag);
#elif API_SET == API_SET_2
match_bits = init_recvtag_2(&mask_bits, comm->context_id + context_offset, tag);
#endif
/* ------------------------------------------------------------------------- */
/* fi_recvmsg with FI_PEEK: */
/* Initiate a search for a match in the hardware or software queue. */
/* The search can complete immediately with -ENOMSG. */
/* I successful, libfabric will enqueue a context entry into the completion */
/* queue to make the search nonblocking. This code will poll until the */
/* entry is enqueued. */
/* ------------------------------------------------------------------------- */
msg_tagged_t msg;
uint64_t msgflags = FI_PEEK;
msg.msg_iov = NULL;
msg.desc = NULL;
msg.iov_count = 0;
msg.addr = remote_proc;
msg.tag = match_bits;
msg.ignore = mask_bits;
msg.context = (void *) &(REQ_OFI(rreq)->ofi_context);
msg.data = 0;
if(*flag == CLAIM_PEEK)
msgflags|=FI_CLAIM;
ret = fi_trecvmsg(gl_data.endpoint,&msg,msgflags);
if(ret == -ENOMSG) {
if (rreq_ptr) {
MPIDI_CH3_Request_destroy(rreq);
*rreq_ptr = NULL;
*flag = 0;
}
MPID_nem_ofi_poll(MPID_NONBLOCKING_POLL);
goto fn_exit;
}
MPIU_ERR_CHKANDJUMP4((ret < 0), mpi_errno, MPI_ERR_OTHER,
"**ofi_peek", "**ofi_peek %s %d %s %s",
__SHORT_FILE__, __LINE__, FCNAME, fi_strerror(-ret));
while (PEEK_INIT == REQ_OFI(rreq)->match_state)
MPID_nem_ofi_poll(MPID_BLOCKING_POLL);
*status = rreq->status;
*flag = 1;
END_FUNC_RC(FCNAME);
}
#undef FCNAME
#define FCNAME DECL_FUNC(MPID_nem_ofi_iprobe)
int ADD_SUFFIX(MPID_nem_ofi_iprobe)(struct MPIDI_VC *vc,
int source,
int tag,
MPID_Comm * comm, int context_offset, int *flag, MPI_Status * status)
{
int rc;
BEGIN_FUNC(FCNAME);
*flag = 0;
rc = ADD_SUFFIX(MPID_nem_ofi_iprobe_impl)(vc, source,
tag, comm, context_offset, flag, status, NULL);
END_FUNC(FCNAME);