Commit b46eb963 authored by Darius Buntinas's avatar Darius Buntinas
Browse files

[svn-r7390] adding anysource probe support for mx and newmad

parent 5e0e3c84
...@@ -21,6 +21,8 @@ typedef int (* MPID_nem_net_module_connect_to_root_t)(const char *business_card, ...@@ -21,6 +21,8 @@ typedef int (* MPID_nem_net_module_connect_to_root_t)(const char *business_card,
typedef int (* MPID_nem_net_module_vc_init_t)(MPIDI_VC_t *vc); typedef int (* MPID_nem_net_module_vc_init_t)(MPIDI_VC_t *vc);
typedef int (* MPID_nem_net_module_vc_destroy_t)(MPIDI_VC_t *vc); typedef int (* MPID_nem_net_module_vc_destroy_t)(MPIDI_VC_t *vc);
typedef int (* MPID_nem_net_module_vc_terminate_t)(MPIDI_VC_t *vc); typedef int (* MPID_nem_net_module_vc_terminate_t)(MPIDI_VC_t *vc);
typedef int (* MPID_nem_net_module_anysource_iprobe_t)(int tag, MPID_Comm *comm, int context_offset, int *flag,
MPI_Status *status);
typedef void (* MPID_nem_net_module_vc_dbg_print_sendq_t)(FILE *stream, MPIDI_VC_t *vc); typedef void (* MPID_nem_net_module_vc_dbg_print_sendq_t)(FILE *stream, MPIDI_VC_t *vc);
...@@ -39,6 +41,7 @@ typedef struct MPID_nem_netmod_funcs ...@@ -39,6 +41,7 @@ typedef struct MPID_nem_netmod_funcs
MPID_nem_net_module_vc_init_t vc_init; MPID_nem_net_module_vc_init_t vc_init;
MPID_nem_net_module_vc_destroy_t vc_destroy; MPID_nem_net_module_vc_destroy_t vc_destroy;
MPID_nem_net_module_vc_terminate_t vc_terminate; MPID_nem_net_module_vc_terminate_t vc_terminate;
MPID_nem_net_module_anysource_iprobe_t anysource_iprobe;
} MPID_nem_netmod_funcs_t; } MPID_nem_netmod_funcs_t;
extern MPID_nem_net_module_vc_dbg_print_sendq_t MPID_nem_net_module_vc_dbg_print_sendq; extern MPID_nem_net_module_vc_dbg_print_sendq_t MPID_nem_net_module_vc_dbg_print_sendq;
......
...@@ -21,7 +21,8 @@ MPID_nem_netmod_funcs_t MPIDI_nem_elan_funcs = { ...@@ -21,7 +21,8 @@ MPID_nem_netmod_funcs_t MPIDI_nem_elan_funcs = {
MPID_nem_elan_connect_to_root, MPID_nem_elan_connect_to_root,
MPID_nem_elan_vc_init, MPID_nem_elan_vc_init,
MPID_nem_elan_vc_destroy, MPID_nem_elan_vc_destroy,
MPID_nem_elan_vc_terminate MPID_nem_elan_vc_terminate,
NULL /* anysource iprobe */
}; };
......
...@@ -15,7 +15,8 @@ MPID_nem_netmod_funcs_t MPIDI_nem_gm_funcs = { ...@@ -15,7 +15,8 @@ MPID_nem_netmod_funcs_t MPIDI_nem_gm_funcs = {
MPID_nem_gm_connect_to_root, MPID_nem_gm_connect_to_root,
MPID_nem_gm_vc_init, MPID_nem_gm_vc_init,
MPID_nem_gm_vc_destroy, MPID_nem_gm_vc_destroy,
MPID_nem_gm_vc_terminate MPID_nem_gm_vc_terminate,
NULL /* anysource iprobe */
}; };
......
...@@ -47,6 +47,8 @@ int MPID_nem_mx_cancel_recv(MPIDI_VC_t *vc, MPID_Request *rreq); ...@@ -47,6 +47,8 @@ int MPID_nem_mx_cancel_recv(MPIDI_VC_t *vc, MPID_Request *rreq);
int MPID_nem_mx_probe(MPIDI_VC_t *vc, int source, int tag, MPID_Comm *comm, int context_offset, MPI_Status *status); int MPID_nem_mx_probe(MPIDI_VC_t *vc, int source, int tag, MPID_Comm *comm, int context_offset, MPI_Status *status);
int MPID_nem_mx_iprobe(MPIDI_VC_t *vc, int source, int tag, MPID_Comm *comm, int context_offset, int *flag, MPI_Status *status); int MPID_nem_mx_iprobe(MPIDI_VC_t *vc, int source, int tag, MPID_Comm *comm, int context_offset, int *flag, MPI_Status *status);
int MPID_nem_mx_anysource_iprobe(int tag, MPID_Comm *comm, int context_offset, int *flag, MPI_Status *status);
/* Callback routine for unex msgs in MX */ /* Callback routine for unex msgs in MX */
mx_unexp_handler_action_t MPID_nem_mx_get_adi_msg(void *context,mx_endpoint_addr_t source, mx_unexp_handler_action_t MPID_nem_mx_get_adi_msg(void *context,mx_endpoint_addr_t source,
uint64_t match_info,uint32_t length,void *data); uint64_t match_info,uint32_t length,void *data);
......
...@@ -15,7 +15,8 @@ MPID_nem_netmod_funcs_t MPIDI_nem_mx_funcs = { ...@@ -15,7 +15,8 @@ MPID_nem_netmod_funcs_t MPIDI_nem_mx_funcs = {
MPID_nem_mx_connect_to_root, MPID_nem_mx_connect_to_root,
MPID_nem_mx_vc_init, MPID_nem_mx_vc_init,
MPID_nem_mx_vc_destroy, MPID_nem_mx_vc_destroy,
MPID_nem_mx_vc_terminate MPID_nem_mx_vc_terminate,
MPID_nem_mx_anysource_iprobe
}; };
static MPIDI_Comm_ops_t comm_ops = { static MPIDI_Comm_ops_t comm_ops = {
...@@ -255,6 +256,8 @@ MPID_nem_mx_vc_init (MPIDI_VC_t *vc) ...@@ -255,6 +256,8 @@ MPID_nem_mx_vc_init (MPIDI_VC_t *vc)
mpi_errno = MPID_nem_mx_get_from_bc (business_card, &VC_FIELD(vc, remote_endpoint_id), &VC_FIELD(vc, remote_nic_id)); mpi_errno = MPID_nem_mx_get_from_bc (business_card, &VC_FIELD(vc, remote_endpoint_id), &VC_FIELD(vc, remote_nic_id));
if (mpi_errno) MPIU_ERR_POP (mpi_errno); if (mpi_errno) MPIU_ERR_POP (mpi_errno);
MPIU_Free(business_card);
ret = mx_connect(MPID_nem_mx_local_endpoint,VC_FIELD(vc, remote_nic_id),VC_FIELD(vc, remote_endpoint_id), ret = mx_connect(MPID_nem_mx_local_endpoint,VC_FIELD(vc, remote_nic_id),VC_FIELD(vc, remote_endpoint_id),
MPID_NEM_MX_FILTER,MX_INFINITE,&(VC_FIELD(vc, remote_endpoint_addr))); MPID_NEM_MX_FILTER,MX_INFINITE,&(VC_FIELD(vc, remote_endpoint_addr)));
MPIU_ERR_CHKANDJUMP1 (ret != MX_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**mx_connect", "**mx_connect %s", mx_strerror (ret)); MPIU_ERR_CHKANDJUMP1 (ret != MX_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**mx_connect", "**mx_connect %s", mx_strerror (ret));
......
...@@ -98,3 +98,11 @@ int MPID_nem_mx_iprobe(MPIDI_VC_t *vc, int source, int tag, MPID_Comm *comm, in ...@@ -98,3 +98,11 @@ int MPID_nem_mx_iprobe(MPIDI_VC_t *vc, int source, int tag, MPID_Comm *comm, in
#undef FUNCNAME
#define FUNCNAME MPID_nem_mx_anysource_iprobe
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
int MPID_nem_mx_anysource_iprobe(int tag, MPID_Comm *comm, int context_offset, int *flag, MPI_Status *status)
{
return MPID_nem_mx_iprobe(NULL, MPI_ANY_SOURCE, tag, comm, context_offset, flag, status);
}
...@@ -17,7 +17,8 @@ MPID_nem_netmod_funcs_t MPIDI_nem_newmad_funcs = { ...@@ -17,7 +17,8 @@ MPID_nem_netmod_funcs_t MPIDI_nem_newmad_funcs = {
MPID_nem_newmad_connect_to_root, MPID_nem_newmad_connect_to_root,
MPID_nem_newmad_vc_init, MPID_nem_newmad_vc_init,
MPID_nem_newmad_vc_destroy, MPID_nem_newmad_vc_destroy,
MPID_nem_newmad_vc_terminate MPID_nem_newmad_vc_terminate,
MPID_nem_newmad_anysource_iprobe
}; };
static MPIDI_Comm_ops_t comm_ops = { static MPIDI_Comm_ops_t comm_ops = {
......
...@@ -63,5 +63,6 @@ MPID_nem_netmod_funcs_t MPIDI_nem_none_funcs = { ...@@ -63,5 +63,6 @@ MPID_nem_netmod_funcs_t MPIDI_nem_none_funcs = {
nm_connect_to_root, nm_connect_to_root,
nm_vc_init, nm_vc_init,
nm_vc_destroy, nm_vc_destroy,
nm_vc_terminate nm_vc_terminate,
NULL /* anysource iprobe */
}; };
...@@ -18,7 +18,8 @@ MPID_nem_netmod_funcs_t MPIDI_nem_psm_funcs = { ...@@ -18,7 +18,8 @@ MPID_nem_netmod_funcs_t MPIDI_nem_psm_funcs = {
MPID_nem_psm_connect_to_root, MPID_nem_psm_connect_to_root,
MPID_nem_psm_vc_init, MPID_nem_psm_vc_init,
MPID_nem_psm_vc_destroy, MPID_nem_psm_vc_destroy,
MPID_nem_psm_vc_terminate MPID_nem_psm_vc_terminate,
NULL /* anysource iprobe */
}; };
#define MPIDI_CH3I_ENDPOINT_KEY "endpoint_id" #define MPIDI_CH3I_ENDPOINT_KEY "endpoint_id"
......
...@@ -31,7 +31,8 @@ MPID_nem_netmod_funcs_t MPIDI_nem_tcp_funcs = { ...@@ -31,7 +31,8 @@ MPID_nem_netmod_funcs_t MPIDI_nem_tcp_funcs = {
MPID_nem_tcp_connect_to_root, MPID_nem_tcp_connect_to_root,
MPID_nem_tcp_vc_init, MPID_nem_tcp_vc_init,
MPID_nem_tcp_vc_destroy, MPID_nem_tcp_vc_destroy,
MPID_nem_tcp_vc_terminate MPID_nem_tcp_vc_terminate,
NULL /* anysource iprobe */
}; };
/* in case there are no packet types defined (e.g., they're ifdef'ed out) make sure the array is not zero length */ /* in case there are no packet types defined (e.g., they're ifdef'ed out) make sure the array is not zero length */
......
...@@ -41,6 +41,9 @@ int MPID_nem_choose_netmod(void) ...@@ -41,6 +41,9 @@ int MPID_nem_choose_netmod(void)
{ {
MPID_nem_netmod_func = MPID_nem_netmod_funcs[i]; MPID_nem_netmod_func = MPID_nem_netmod_funcs[i];
MPID_nem_netmod_id = i; MPID_nem_netmod_id = i;
#ifdef ENABLE_COMM_OVERRIDES
MPIDI_Anysource_iprobe_fn = MPID_nem_netmod_func->anysource_iprobe;
#endif
goto fn_exit; goto fn_exit;
} }
} }
......
...@@ -684,6 +684,9 @@ typedef struct MPIDI_Comm_ops ...@@ -684,6 +684,9 @@ typedef struct MPIDI_Comm_ops
int *flag, MPI_Status *status); int *flag, MPI_Status *status);
} MPIDI_Comm_ops_t; } MPIDI_Comm_ops_t;
extern int (*MPIDI_Anysource_iprobe_fn)(int tag, MPID_Comm * comm, int context_offset, int *flag,
MPI_Status * status);
#endif #endif
typedef struct MPIDI_VC typedef struct MPIDI_VC
......
...@@ -6,6 +6,9 @@ ...@@ -6,6 +6,9 @@
#include "mpidimpl.h" #include "mpidimpl.h"
int (*MPIDI_Anysource_iprobe_fn)(int tag, MPID_Comm * comm, int context_offset, int *flag,
MPI_Status * status) = NULL;
#undef FUNCNAME #undef FUNCNAME
#define FUNCNAME MPID_Iprobe #define FUNCNAME MPID_Iprobe
#undef FCNAME #undef FCNAME
...@@ -29,6 +32,52 @@ int MPID_Iprobe(int source, int tag, MPID_Comm *comm, int context_offset, ...@@ -29,6 +32,52 @@ int MPID_Iprobe(int source, int tag, MPID_Comm *comm, int context_offset,
goto fn_exit; goto fn_exit;
} }
#ifdef ENABLE_COMM_OVERRIDES
if (MPIDI_Anysource_iprobe_fn) {
if (source == MPI_ANY_SOURCE) {
/* if it's anysource, check shm, then check the network.
If still not found, call progress, and check again. */
/* check shm*/
MPIU_THREAD_CS_ENTER(MSGQUEUE,);
found = MPIDI_CH3U_Recvq_FU(source, tag, context, status);
MPIU_THREAD_CS_EXIT(MSGQUEUE,);
if (!found) {
/* not found, check network */
mpi_errno = MPIDI_Anysource_iprobe_fn(tag, comm, context_offset, &found, status);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
if (!found) {
/* still not found, make some progress*/
mpi_errno = MPIDI_CH3_Progress_poke();
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
/* check shm again */
MPIU_THREAD_CS_ENTER(MSGQUEUE,);
found = MPIDI_CH3U_Recvq_FU(source, tag, context, status);
MPIU_THREAD_CS_EXIT(MSGQUEUE,);
if (!found) {
/* check network again */
mpi_errno = MPIDI_Anysource_iprobe_fn(tag, comm, context_offset, &found, status);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
}
}
*flag = found;
goto fn_exit;
} else {
/* it's not anysource, check if the netmod has overridden it */
MPIDI_VC_t * vc;
MPIDI_Comm_get_vc_set_active(comm, source, &vc);
if (vc->comm_ops && vc->comm_ops->probe) {
mpi_errno = vc->comm_ops->iprobe(vc, source, tag, comm, context_offset, &found, status);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
*flag = found;
goto fn_exit;
}
/* fall-through to shm case */
}
}
#endif
/* FIXME: The routine CH3U_Recvq_FU is used only by the probe functions; /* FIXME: The routine CH3U_Recvq_FU is used only by the probe functions;
it should atomically return the flag and status rather than create it should atomically return the flag and status rather than create
a request. Note that in some cases it will be possible to a request. Note that in some cases it will be possible to
...@@ -55,4 +104,6 @@ int MPID_Iprobe(int source, int tag, MPID_Comm *comm, int context_offset, ...@@ -55,4 +104,6 @@ int MPID_Iprobe(int source, int tag, MPID_Comm *comm, int context_offset,
fn_exit: fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_IPROBE); MPIDI_FUNC_EXIT(MPID_STATE_MPID_IPROBE);
return mpi_errno; return mpi_errno;
fn_fail:
goto fn_exit;
} }
...@@ -26,6 +26,43 @@ int MPID_Probe(int source, int tag, MPID_Comm * comm, int context_offset, ...@@ -26,6 +26,43 @@ int MPID_Probe(int source, int tag, MPID_Comm * comm, int context_offset,
goto fn_exit; goto fn_exit;
} }
#ifdef ENABLE_COMM_OVERRIDES
if (MPIDI_Anysource_iprobe_fn) {
if (source == MPI_ANY_SOURCE) {
/* if it's anysource, loop while checking the shm recv
queue and iprobing the netmod, then do a progress
test to make some progress. */
do {
int found;
MPIU_THREAD_CS_ENTER(MSGQUEUE,);
found = MPIDI_CH3U_Recvq_FU(source, tag, context, status);
MPIU_THREAD_CS_EXIT(MSGQUEUE,);
if (found) break;
mpi_errno = MPIDI_Anysource_iprobe_fn(tag, comm, context_offset, &found, status);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
if (found) break;
mpi_errno = MPIDI_CH3_Progress_test();
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
} while (1);
goto fn_exit;
} else {
/* it's not anysource, see if this is for the netmod */
MPIDI_VC_t * vc;
MPIDI_Comm_get_vc_set_active(comm, source, &vc);
if (vc->comm_ops && vc->comm_ops->probe) {
/* netmod has overridden probe */
mpi_errno = vc->comm_ops->probe(vc, source, tag, comm, context_offset, status);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
goto fn_exit;
}
/* fall-through to shm case */
}
}
#endif
MPIDI_CH3_Progress_start(&progress_state); MPIDI_CH3_Progress_start(&progress_state);
do do
{ {
...@@ -44,4 +81,6 @@ int MPID_Probe(int source, int tag, MPID_Comm * comm, int context_offset, ...@@ -44,4 +81,6 @@ int MPID_Probe(int source, int tag, MPID_Comm * comm, int context_offset,
fn_exit: fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_PROBE); MPIDI_FUNC_EXIT(MPID_STATE_MPID_PROBE);
return mpi_errno; return mpi_errno;
fn_fail:
goto fn_exit;
} }
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment