Commit 5717e183 authored by Norio Yamaguchi's avatar Norio Yamaguchi Committed by Pavan Balaji
Browse files

Implement iprobe and improbe in netmod-tofu

parent 4c69be3c
......@@ -15,6 +15,7 @@ lib_lib@MPILIBNAME@_la_SOURCES += \
src/mpid/ch3/channels/nemesis/netmod/tofu/tofu_vc.c \
src/mpid/ch3/channels/nemesis/netmod/tofu/tofu_poll.c \
src/mpid/ch3/channels/nemesis/netmod/tofu/tofu_send.c \
src/mpid/ch3/channels/nemesis/netmod/tofu/tofu_probe.c \
$(EOA)
noinst_HEADERS += \
......
......@@ -98,6 +98,12 @@ int MPID_nem_tofu_kvs_put_binary(int from, const char *postfix, const uint8_t *
int MPID_nem_tofu_kvs_get_binary(int from, const char *postfix, char *buf, int length);
void MPID_nem_tofu_anysource_posted(MPID_Request *req);
int MPID_nem_tofu_anysource_matched(MPID_Request *req);
int MPID_nem_tofu_probe(MPIDI_VC_t *vc, int source, int tag, MPID_Comm *comm, int context_offset,
MPI_Status *status);
int MPID_nem_tofu_iprobe(MPIDI_VC_t *vc, int source, int tag, MPID_Comm *comm, int context_offset,
int *flag, MPI_Status *status);
int MPID_nem_tofu_improbe(MPIDI_VC_t *vc, int source, int tag, MPID_Comm *comm, int context_offset,
int *flag, MPID_Request **message, MPI_Status *status);
/*
* temporary llctofu api
......
......@@ -159,6 +159,8 @@ MPID_nem_tofu_init (MPIDI_PG_t *pg_p, int pg_rank,
MPIDI_CH3I_Register_anysource_notification(MPID_nem_tofu_anysource_posted,
MPID_nem_tofu_anysource_matched);
MPIDI_Anysource_improbe_fn = MPID_nem_tofu_anysource_improbe;
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_TOFU_INIT);
return mpi_errno;
......@@ -214,7 +216,7 @@ MPID_nem_tofu_connect_to_root (const char *business_card, MPIDI_VC_t *new_vc)
#define FCNAME MPIU_QUOTE(FUNCNAME)
int MPID_nem_tofu_anysource_iprobe(int tag, MPID_Comm *comm, int context_offset, int *flag, MPI_Status *status)
{
return MPI_SUCCESS;
return MPID_nem_tofu_iprobe(NULL, MPI_ANY_SOURCE, tag, comm, context_offset, flag, status);
}
......@@ -224,5 +226,5 @@ int MPID_nem_tofu_anysource_iprobe(int tag, MPID_Comm *comm, int context_offset,
#define FCNAME MPIU_QUOTE(FUNCNAME)
int MPID_nem_tofu_anysource_improbe(int tag, MPID_Comm *comm, int context_offset, int *flag, MPID_Request **message, MPI_Status *status)
{
return MPI_SUCCESS;
return MPID_nem_tofu_improbe(NULL, MPI_ANY_SOURCE, tag, comm, context_offset, flag, message, status);
}
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/* vim: set ts=8 sts=4 sw=4 noexpandtab : */
/*
*
*/
#include "mpid_nem_impl.h"
#include "tofu_impl.h"
//#define MPID_NEM_TOFU_DEBUG_PROBE
#ifdef MPID_NEM_TOFU_DEBUG_PROBE
#define dprintf printf
#else
#define dprintf(...)
#endif
#undef FUNCNAME
#define FUNCNAME MPID_nem_tofu_probe
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPID_nem_tofu_probe(MPIDI_VC_t *vc, int source, int tag, MPID_Comm *comm, int context_offset,
MPI_Status *status)
{
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_TOFU_PROBE);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_TOFU_PROBE);
dprintf("tofu_probe,source=%d,tag=%d\n", source, tag);
/* NOTE : This function is not used. Because 'vc->comm_ops->probe()' is not used */
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_TOFU_PROBE);
return mpi_errno;
fn_fail:
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME MPID_nem_tofu_iprobe
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPID_nem_tofu_iprobe(MPIDI_VC_t *vc, int source, int tag, MPID_Comm *comm, int context_offset,
int *flag, MPI_Status *status)
{
int mpi_errno = MPI_SUCCESS, llc_errno;
int rank;
LLC_tag_t _tag;
LLC_probe_t probe;
LLC_match_mask_t mask;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_TOFU_IPROBE);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_TOFU_IPROBE);
dprintf("tofu_iprobe,source=%d,tag=%d\n", source, tag);
mask.rank = ~0;
mask.tag = ~0;
if (source == MPI_ANY_SOURCE) {
rank = LLC_ANY_SOURCE;
mask.rank = 0;
} else {
MPIU_Assert(vc);
rank = VC_FIELD(vc, remote_endpoint_addr);
}
if (tag == MPI_ANY_TAG) {
*(int32_t*)((uint8_t *)&_tag) = LLC_ANY_TAG;
*(int32_t*)((uint8_t *)&mask.tag) = 0;
} else {
*(int32_t*)((uint8_t *)&_tag) = tag;
}
*(MPIR_Context_id_t*)((uint8_t*)&_tag + sizeof(int32_t)) =
comm->recvcontext_id + context_offset;
memset((uint8_t*)&_tag + sizeof(int32_t) + sizeof(MPIR_Context_id_t),
0, sizeof(LLC_tag_t) - sizeof(int32_t) - sizeof(MPIR_Context_id_t));
llc_errno = LLC_probe(LLC_COMM_MPICH, rank, _tag, &mask, &probe);
if (llc_errno == LLC_SUCCESS) {
*flag = 1;
status->MPI_ERROR = MPI_SUCCESS;
if (source != MPI_ANY_SOURCE) {
status->MPI_SOURCE = source;
} else {
int found = 0;
found = convert_rank_llc2mpi(comm, probe.rank, &status->MPI_SOURCE);
MPIU_Assert(found);
}
status->MPI_TAG = probe.tag & 0xffffffff;
MPIR_STATUS_SET_COUNT(*status, probe.len);
} else {
*flag = 0;
MPID_Progress_poke(); /* do LLC_poll */
}
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_TOFU_IPROBE);
return mpi_errno;
fn_fail:
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME MPID_nem_tofu_improbe
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPID_nem_tofu_improbe(MPIDI_VC_t *vc, int source, int tag, MPID_Comm *comm, int context_offset,
int *flag, MPID_Request **message, MPI_Status *status)
{
int mpi_errno = MPI_SUCCESS;
int rank;
LLC_tag_t _tag;
LLC_probe_t probe;
LLC_match_mask_t mask;
LLC_cmd_t *msg = NULL;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_TOFU_IMPROBE);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_TOFU_IMPROBE);
dprintf("tofu_improbe,source=%d,tag=%d\n", source, tag);
mask.rank = ~0;
mask.tag = ~0;
if (source == MPI_ANY_SOURCE) {
rank = LLC_ANY_SOURCE;
mask.rank = 0;
} else {
MPIU_Assert(vc);
rank = VC_FIELD(vc, remote_endpoint_addr);
}
if (tag == MPI_ANY_TAG) {
*(int32_t*)((uint8_t *)&_tag) = LLC_ANY_TAG;
*(int32_t*)((uint8_t *)&mask.tag) = 0;
} else {
*(int32_t*)((uint8_t *)&_tag) = tag;
}
*(MPIR_Context_id_t*)((uint8_t*)&_tag + sizeof(int32_t)) =
comm->recvcontext_id + context_offset;
memset((uint8_t*)&_tag + sizeof(int32_t) + sizeof(MPIR_Context_id_t),
0, sizeof(LLC_tag_t) - sizeof(int32_t) - sizeof(MPIR_Context_id_t));
msg = LLC_mprobe(LLC_COMM_MPICH, rank, _tag, &mask, &probe);
if (msg) {
MPID_Request *req;
*flag = 1;
req = MPID_Request_create();
MPIU_Object_set_ref(req, 2);
req->kind = MPID_REQUEST_MPROBE;
req->comm = comm;
MPIR_Comm_add_ref(comm);
req->ch.vc = vc;
MPIDI_Request_set_msg_type(req, MPIDI_REQUEST_EAGER_MSG);
req->dev.recv_pending_count = 1;
req->status.MPI_ERROR = MPI_SUCCESS;
if (source != MPI_ANY_SOURCE) {
req->status.MPI_SOURCE = source;
} else {
int found = 0;
found = convert_rank_llc2mpi(comm, probe.rank, &req->status.MPI_SOURCE);
MPIU_Assert(found);
}
req->status.MPI_TAG = probe.tag & 0xffffffff;
req->dev.recv_data_sz = probe.len;
MPIR_STATUS_SET_COUNT(req->status, req->dev.recv_data_sz);
req->dev.tmpbuf = MPIU_Malloc(req->dev.recv_data_sz);
MPIU_Assert(req->dev.tmpbuf);
/* receive message in req->dev.tmpbuf */
LLC_cmd_t *cmd = LLC_cmd_alloc2(1, 1, 1);
cmd[0].opcode = 0; // not use
cmd[0].comm = LLC_COMM_MPICH;
cmd[0].req_id = cmd;
cmd[0].rank = msg->rank;
// cmd[0].tag = 0; // not use
cmd[0].iov_local[0].addr = (uint64_t)req->dev.tmpbuf;
cmd[0].iov_local[0].length = req->dev.recv_data_sz;
cmd[0].niov_local = 1;
cmd[0].iov_remote[0].addr = 0;
cmd[0].iov_remote[0].length = req->dev.recv_data_sz;
cmd[0].niov_remote = 1;
((struct llctofu_cmd_area *)cmd[0].usr_area)->cbarg = req;
if (source == MPI_ANY_SOURCE) {
((struct llctofu_cmd_area *)cmd[0].usr_area)->raddr = MPI_ANY_SOURCE;
} else {
((struct llctofu_cmd_area *)cmd[0].usr_area)->raddr = VC_FIELD(vc, remote_endpoint_addr);
}
LLC_recv_msg(cmd, msg);
/* Wait until the reception of data is completed */
do {
mpi_errno = MPID_nem_tofu_poll(0);
MPIU_ERR_POP(mpi_errno);
} while (!MPID_Request_is_complete(req));
// MPIDI_CH3U_Request_complete(req); // This operation is done in llctofu_poll.
*message = req;
/* TODO : Should we change status ? */
//status->MPI_ERROR = MPI_SUCCESS;
status->MPI_SOURCE = req->status.MPI_SOURCE;
status->MPI_TAG = req->status.MPI_TAG;
MPIR_STATUS_SET_COUNT(*status, req->dev.recv_data_sz);
} else {
*flag = 0;
*message = NULL;
MPID_Progress_poke(); /* do LLC_poll */
}
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_TOFU_IMPROBE);
return mpi_errno;
fn_fail:
goto fn_exit;
}
......@@ -759,62 +759,57 @@ int llctofu_poll(int in_blocking_poll,
lcmd = events[0].side.initiator.req_id;
MPID_Request *req = ((struct llctofu_cmd_area*)lcmd->usr_area)->cbarg;
/* Unpack non-contiguous dt */
int is_contig;
MPID_Datatype_is_contig(req->dev.datatype, &is_contig);
if (!is_contig) {
dprintf("llctofu_poll,unpack noncontiguous data to user buffer\n");
/* see MPIDI_CH3U_Request_unpack_uebuf (in /src/mpid/ch3/src/ch3u_request.c) */
/* or MPIDI_CH3U_Receive_data_found (in src/mpid/ch3/src/ch3u_handle_recv_pkt.c) */
MPIDI_msg_sz_t unpack_sz = req->dev.recv_data_sz;
MPID_Segment seg;
MPI_Aint last;
/* user_buf etc. are set in MPID_irecv --> MPIDI_CH3U_Recvq_FDU_or_AEP */
MPID_Segment_init(req->dev.user_buf, req->dev.user_count, req->dev.datatype, &seg,
0);
last = unpack_sz;
MPID_Segment_unpack(&seg, 0, &last, REQ_FIELD(req, pack_buf));
if (last != unpack_sz) {
/* --BEGIN ERROR HANDLING-- */
/* received data was not entirely consumed by unpack()
* because too few bytes remained to fill the next basic
* datatype */
MPIR_STATUS_SET_COUNT(req->status, last);
req->status.MPI_ERROR =
MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__,
MPI_ERR_TYPE, "**llctofu_poll", 0);
/* --END ERROR HANDLING-- */
if (req->kind != MPID_REQUEST_MPROBE) {
/* Unpack non-contiguous dt */
int is_contig;
MPID_Datatype_is_contig(req->dev.datatype, &is_contig);
if (!is_contig) {
dprintf("llctofu_poll,unpack noncontiguous data to user buffer\n");
/* see MPIDI_CH3U_Request_unpack_uebuf (in /src/mpid/ch3/src/ch3u_request.c) */
/* or MPIDI_CH3U_Receive_data_found (in src/mpid/ch3/src/ch3u_handle_recv_pkt.c) */
MPIDI_msg_sz_t unpack_sz = req->dev.recv_data_sz;
MPID_Segment seg;
MPI_Aint last;
/* user_buf etc. are set in MPID_irecv --> MPIDI_CH3U_Recvq_FDU_or_AEP */
MPID_Segment_init(req->dev.user_buf, req->dev.user_count, req->dev.datatype, &seg,
0);
last = unpack_sz;
MPID_Segment_unpack(&seg, 0, &last, REQ_FIELD(req, pack_buf));
if (last != unpack_sz) {
/* --BEGIN ERROR HANDLING-- */
/* received data was not entirely consumed by unpack()
* because too few bytes remained to fill the next basic
* datatype */
MPIR_STATUS_SET_COUNT(req->status, last);
req->status.MPI_ERROR =
MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__,
MPI_ERR_TYPE, "**llctofu_poll", 0);
/* --END ERROR HANDLING-- */
}
dprintf("llctofu_poll,ref_count=%d,pack_buf=%p\n", req->ref_count,
REQ_FIELD(req, pack_buf));
MPIU_Free(REQ_FIELD(req, pack_buf));
}
dprintf("llctofu_poll,ref_count=%d,pack_buf=%p\n", req->ref_count,
REQ_FIELD(req, pack_buf));
MPIU_Free(REQ_FIELD(req, pack_buf));
}
req->status.MPI_TAG = events[0].side.initiator.tag & 0xffffffff;;
if (req->dev.match.parts.rank != MPI_ANY_SOURCE) {
req->status.MPI_SOURCE = req->dev.match.parts.rank;
} else {
/* 'events[0].side.initiator.rank' is LLC rank.
* Convert it to a rank number in the communicator. */
int found = 0;
found = convert_rank_llc2mpi(req->comm, events[0].side.initiator.rank, &req->status.MPI_SOURCE);
req->status.MPI_TAG = events[0].side.initiator.tag & 0xffffffff;;
if (req->dev.match.parts.rank != MPI_ANY_SOURCE) {
req->status.MPI_SOURCE = req->dev.match.parts.rank;
} else {
/* 'events[0].side.initiator.rank' is LLC rank.
* Convert it to a rank number in the communicator. */
int found = 0;
found = convert_rank_llc2mpi(req->comm, events[0].side.initiator.rank, &req->status.MPI_SOURCE);
MPIU_Assert(found);
}
MPIR_STATUS_SET_COUNT(req->status, events[0].side.initiator.length);
/* Dequeue request from posted queue.
It's posted in MPID_Irecv --> MPIDI_CH3U_Recvq_FDU_or_AEP */
int found = MPIDI_CH3U_Recvq_DP(req);
MPIU_Assert(found);
}
MPIR_STATUS_SET_COUNT(req->status, events[0].side.initiator.length);
/* Dequeue request from posted queue.
It's posted in MPID_Irecv --> MPIDI_CH3U_Recvq_FDU_or_AEP */
int found = MPIDI_CH3U_Recvq_DP(req);
MPIU_Assert(found);
MPIR_STATUS_SET_COUNT(req->status, events[0].side.initiator.length);
/* Dequeue request from posted queue.
It's posted in MPID_Irecv --> MPIDI_CH3U_Recvq_FDU_or_AEP */
int found = MPIDI_CH3U_Recvq_DP(req);
MPIU_Assert(found);
/* Mark completion on rreq */
MPIDI_CH3U_Request_complete(req);
......
......@@ -38,9 +38,9 @@ static MPIDI_Comm_ops_t comm_ops = {
.cancel_send = NULL,
.cancel_recv = NULL,
.probe = NULL,
.iprobe = NULL,
.improbe = NULL
.probe = MPID_nem_tofu_probe,
.iprobe = MPID_nem_tofu_iprobe,
.improbe = MPID_nem_tofu_improbe
};
#undef FUNCNAME
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment