Commit 4c69be3c authored by Norio Yamaguchi's avatar Norio Yamaguchi Committed by Pavan Balaji
Browse files

Implement tag matching for ANY_SOURCE and ANY_TAG

parent 430e9f4b
......@@ -96,6 +96,8 @@ int MPID_nem_tofu_recv_posted(struct MPIDI_VC *vc, struct MPID_Request *req);
int MPID_nem_tofu_kvs_put_binary(int from, const char *postfix, const uint8_t * buf,
int length);
int MPID_nem_tofu_kvs_get_binary(int from, const char *postfix, char *buf, int length);
void MPID_nem_tofu_anysource_posted(MPID_Request *req);
int MPID_nem_tofu_anysource_matched(MPID_Request *req);
/*
* temporary llctofu api
......@@ -112,6 +114,7 @@ extern int llctofu_unbind(void *endpt);
extern int llctofu_poll(int in_blocking_poll,
llctofu_send_f sfnc, llctofu_recv_f rfnc);
extern int convert_rank_llc2mpi(MPID_Comm *comm, int llc_rank, int *mpi_rank);
typedef struct MPID_nem_tofu_netmod_hdr {
int initiator_pg_rank;
#ifndef notdef_hsiz_hack
......
......@@ -155,6 +155,10 @@ MPID_nem_tofu_init (MPIDI_PG_t *pg_p, int pg_rank,
pmi_errno = PMI_Barrier();
MPIU_ERR_CHKANDJUMP(pmi_errno != PMI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**PMI_Barrier");
mpi_errno =
MPIDI_CH3I_Register_anysource_notification(MPID_nem_tofu_anysource_posted,
MPID_nem_tofu_anysource_matched);
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_TOFU_INIT);
return mpi_errno;
......
......@@ -255,8 +255,10 @@ int MPID_nem_tofu_recv_posted(struct MPIDI_VC *vc, struct MPID_Request *req)
/* Save data size for llctofu_poll */
req->dev.recv_data_sz = data_sz;
#if 0 /* FIXME : vc is NULL when rank is MPI_ANY_SOURCE */
dprintf("tofu_recv_posted,%d<-%d,vc=%p,req=%p,user_buf=%p,data_sz=%ld,datatype=%08x,dt_contig=%d\n",
MPIDI_Process.my_pg_rank, vc->pg_rank, vc, req, req->dev.user_buf, req->dev.recv_data_sz, req->dev.datatype, dt_contig);
#endif
void *write_to_buf;
if (dt_contig) {
......@@ -273,18 +275,33 @@ int MPID_nem_tofu_recv_posted(struct MPIDI_VC *vc, struct MPID_Request *req)
LLC_comm_rank(LLC_COMM_MPICH, &LLC_my_rank);
dprintf("tofu_isend,LLC_my_rank=%d\n", LLC_my_rank);
#if 0 /* FIXME : vc is NULL when rank is MPI_ANY_SOURCE */
dprintf("tofu_recv_posted,remote_endpoint_addr=%ld\n", VC_FIELD(vc, remote_endpoint_addr));
#endif
LLC_cmd_t *cmd = LLC_cmd_alloc2(1, 1, 1);
cmd[0].opcode = LLC_OPCODE_RECV;
cmd[0].comm = LLC_COMM_MPICH;
cmd[0].rank = VC_FIELD(vc, remote_endpoint_addr);
cmd[0].req_id = cmd;
if (req->dev.match.parts.rank == MPI_ANY_SOURCE) {
cmd[0].rank = LLC_ANY_SOURCE;
cmd[0].mask.rank = 0;
} else {
cmd[0].rank = VC_FIELD(vc, remote_endpoint_addr);
}
/* req->comm is set in MPID_irecv --> MPIDI_CH3U_Recvq_FDU_or_AEP */
*(int32_t*)((uint8_t*)&cmd[0].tag) = req->dev.match.parts.tag;
*(MPIR_Context_id_t*)((uint8_t*)&cmd[0].tag + sizeof(int32_t)) =
if (req->dev.match.parts.tag == MPI_ANY_TAG) {
*(int32_t*)((uint8_t*)&cmd[0].tag) = LLC_ANY_TAG;
*(int32_t*)((uint8_t*)&cmd[0].mask.tag) = 0;
}
else {
*(int32_t*)((uint8_t*)&cmd[0].tag) = req->dev.match.parts.tag;
}
*(MPIR_Context_id_t*)((uint8_t*)&cmd[0].tag + sizeof(int32_t)) =
req->dev.match.parts.context_id;
MPIU_Assert(sizeof(LLC_tag_t) >= sizeof(int32_t) + sizeof(MPIR_Context_id_t));
memset((uint8_t*)&cmd[0].tag + sizeof(int32_t) + sizeof(MPIR_Context_id_t),
......@@ -307,7 +324,13 @@ int MPID_nem_tofu_recv_posted(struct MPIDI_VC *vc, struct MPID_Request *req)
cmd[0].niov_remote = 1;
((struct llctofu_cmd_area *)cmd[0].usr_area)->cbarg = req;
((struct llctofu_cmd_area *)cmd[0].usr_area)->raddr = VC_FIELD(vc, remote_endpoint_addr);
if (req->dev.match.parts.rank == MPI_ANY_SOURCE) {
((struct llctofu_cmd_area *)cmd[0].usr_area)->raddr = MPI_ANY_SOURCE; /* FIXME : should 0 ? */
} else {
((struct llctofu_cmd_area *)cmd[0].usr_area)->raddr = VC_FIELD(vc, remote_endpoint_addr);
}
REQ_FIELD(req, cmds) = cmd;
llc_errno = LLC_post(cmd, 1);
MPIU_ERR_CHKANDJUMP(llc_errno != LLC_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**LLC_post");
......@@ -318,3 +341,40 @@ int MPID_nem_tofu_recv_posted(struct MPIDI_VC *vc, struct MPID_Request *req)
fn_fail:
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME MPID_nem_tofu_anysource_posted
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
void MPID_nem_tofu_anysource_posted(MPID_Request *req)
{
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_TOFU_AYSOURCE_POSTED);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_TOFU_AYSOURCE_POSTED);
mpi_errno = MPID_nem_tofu_recv_posted(NULL, req);
MPIU_Assert(mpi_errno == MPI_SUCCESS);
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_TOFU_AYSOURCE_POSTED);
}
#undef FUNCNAME
#define FUNCNAME MPID_nem_tofu_anysource_matched
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPID_nem_tofu_anysource_matched(MPID_Request *req)
{
int matched = FALSE;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_TOFU_ANYSOURCE_MATCHED);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_TOFU_ANYSOURCE_MATCHED);
/* FIXME : How to call a cancel_recv function */
/* If LLC_postedq is still having this request, delete it.
Ohterwise, return TURE */
matched = LLC_req_approve_recv((LLC_cmd_t *)REQ_FIELD(req, cmds));
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_TOFU_ANYSOURCE_MATCHED);
return matched;
}
......@@ -630,6 +630,25 @@ ssize_t llctofu_writev(void *endpt, uint64_t raddr,
return nw;
}
int convert_rank_llc2mpi(MPID_Comm *comm, int llc_rank, int *mpi_rank)
{
int size, rank;
int found = 0;
MPIDI_VC_t *vc;
size = MPIR_Comm_size(comm);
for (rank = 0; rank < size; rank++) {
MPIDI_Comm_get_vc(comm, rank, &vc);
if (llc_rank == VC_FIELD(vc, remote_endpoint_addr)) {
*mpi_rank = rank; // rank number in the req->comm
found = 1;
break;
}
}
return found;
}
int llctofu_poll(int in_blocking_poll,
llctofu_send_f sfnc, llctofu_recv_f rfnc)
{
......@@ -773,8 +792,23 @@ int llctofu_poll(int in_blocking_poll,
MPIU_Free(REQ_FIELD(req, pack_buf));
}
req->status.MPI_TAG = req->dev.match.parts.tag;
req->status.MPI_SOURCE = req->dev.match.parts.rank;
req->status.MPI_TAG = events[0].side.initiator.tag & 0xffffffff;;
if (req->dev.match.parts.rank != MPI_ANY_SOURCE) {
req->status.MPI_SOURCE = req->dev.match.parts.rank;
} else {
/* 'events[0].side.initiator.rank' is LLC rank.
* Convert it to a rank number in the communicator. */
int found = 0;
found = convert_rank_llc2mpi(req->comm, events[0].side.initiator.rank, &req->status.MPI_SOURCE);
MPIU_Assert(found);
}
MPIR_STATUS_SET_COUNT(req->status, events[0].side.initiator.length);
/* Dequeue request from posted queue.
It's posted in MPID_Irecv --> MPIDI_CH3U_Recvq_FDU_or_AEP */
int found = MPIDI_CH3U_Recvq_DP(req);
MPIU_Assert(found);
MPIR_STATUS_SET_COUNT(req->status, events[0].side.initiator.length);
/* Dequeue request from posted queue.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment