async.c 4.69 KB
Newer Older
1
2
3
4
5
6
7
8
/* -*- Mode: C; c-basic-offset:4 ; -*- */
/*
 *  (C) 2001 by Argonne National Laboratory.
 *      See COPYRIGHT in top-level directory.
 */

#include "mpiimpl.h"
#include "mpi_init.h"
9
#include "mpiu_thread.h"
10
11
12
13
14
15
16
17

static MPI_Comm progress_comm;
static MPIU_Thread_id_t progress_thread_id;
static MPIU_Thread_mutex_t progress_mutex;
static MPIU_Thread_cond_t progress_cond;
static volatile int progress_thread_done = 0;

#undef FUNCNAME
18
#define FUNCNAME progress_fn
19
20
21
22
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
static void progress_fn(void * data)
{
23
#if MPICH_THREAD_LEVEL >= MPI_THREAD_SERIALIZED
Pavan Balaji's avatar
Pavan Balaji committed
24
    int mpi_errno = MPI_SUCCESS;
25
26
27
28
29
    MPIU_THREADPRIV_DECL;

    /* Explicitly add CS_ENTER/EXIT since this thread is created from
     * within an internal function and will call NMPI functions
     * directly. */
30
    MPIU_THREAD_CS_ENTER(ALLFUNC,);
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

    /* FIXME: We assume that waiting on some request forces progress
     * on all requests. With fine-grained threads, will this still
     * work as expected? We can imagine an approach where a request on
     * a non-conflicting communicator would not touch the remaining
     * requests to avoid locking issues. Once the fine-grained threads
     * code is fully functional, we need to revisit this and, if
     * appropriate, either change what we do in this thread, or delete
     * this comment. */

    MPIU_THREADPRIV_GET;

    MPIR_Nest_incr();
    mpi_errno = NMPI_Recv(NULL, 0, MPI_CHAR, 0, 0, progress_comm, MPI_STATUS_IGNORE);
    MPIU_Assert(!mpi_errno);
    MPIR_Nest_decr();

    /* Send a signal to the main thread saying we are done */
    MPIU_Thread_mutex_lock(&progress_mutex, &mpi_errno);
    MPIU_Assert(!mpi_errno);

    progress_thread_done = 1;

    MPIU_Thread_mutex_unlock(&progress_mutex, &mpi_errno);
    MPIU_Assert(!mpi_errno);

    MPIU_Thread_cond_signal(&progress_cond, &mpi_errno);
    MPIU_Assert(!mpi_errno);

60
    MPIU_THREAD_CS_EXIT(ALLFUNC,);
61

62
#endif /* MPICH_THREAD_LEVEL >= MPI_THREAD_SERIALIZED */
63
64
65
66
67
68
69
70
71
    return;
}

#undef FUNCNAME
#define FUNCNAME MPIR_Init_async_thread
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
int MPIR_Init_async_thread(void)
{
72
#if MPICH_THREAD_LEVEL >= MPI_THREAD_SERIALIZED
73
74
75
    int mpi_errno = MPI_SUCCESS;
    MPID_Comm *comm_self_ptr, *progress_comm_ptr;
    int err = 0;
76
    MPID_MPI_STATE_DECL(MPID_STATE_MPIR_INIT_ASYNC_THREAD);
77
78
79
80
81

    MPID_MPI_FUNC_ENTER(MPID_STATE_MPIR_INIT_ASYNC_THREAD);


    /* Dup comm world for the progress thread */
82
83
84
85
86
87
88
89
    MPID_Comm_get_ptr(MPI_COMM_SELF, comm_self_ptr);
    mpi_errno = MPIR_Comm_dup_impl(comm_self_ptr, &progress_comm_ptr);
    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
    progress_comm = progress_comm_ptr->handle;

    MPIU_Thread_cond_create(&progress_cond, &err);
    MPIU_ERR_CHKANDJUMP1(err, mpi_errno, MPI_ERR_OTHER, "**cond_create", "**cond_create %s", strerror(err));
    
90
    MPIU_Thread_mutex_create(&progress_mutex, &err);
91
92
    MPIU_ERR_CHKANDJUMP1(err, mpi_errno, MPI_ERR_OTHER, "**mutex_create", "**mutex_create %s", strerror(err));
    
93
    MPIU_Thread_create((MPIU_Thread_func_t) progress_fn, NULL, &progress_thread_id, &err);
94
95
    MPIU_ERR_CHKANDJUMP1(err, mpi_errno, MPI_ERR_OTHER, "**mutex_create", "**mutex_create %s", strerror(err));
    
96
    MPID_MPI_FUNC_EXIT(MPID_STATE_MPIR_INIT_ASYNC_THREAD);
97

98
 fn_exit:
99
    return mpi_errno;
100
101
102
103
104
 fn_fail:
    goto fn_exit;
#else
    return MPI_SUCCESS;
#endif /* MPICH_THREAD_LEVEL >= MPI_THREAD_SERIALIZED */
105
106
107
108
109
110
111
112
113
}

#undef FUNCNAME
#define FUNCNAME MPIR_Finalize_async_thread
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
int MPIR_Finalize_async_thread(void)
{
    int mpi_errno = MPI_SUCCESS;
114
#if MPICH_THREAD_LEVEL >= MPI_THREAD_SERIALIZED
115
    MPIU_THREADPRIV_DECL;
116
    MPID_MPI_STATE_DECL(MPID_STATE_MPIR_FINALIZE_ASYNC_THREAD);
117
118
119
120
121
122
123
124
125
126

    MPID_MPI_FUNC_ENTER(MPID_STATE_MPIR_FINALIZE_ASYNC_THREAD);

    MPIU_THREADPRIV_GET;

    MPIR_Nest_incr();
    mpi_errno = NMPI_Send(NULL, 0, MPI_CHAR, 0, 0, progress_comm);
    MPIU_Assert(!mpi_errno);
    MPIR_Nest_decr();

127
128
    /* XXX DJG why is this unlock/lock necessary?  Should we just YIELD here or later?  */
    MPIU_THREAD_CS_EXIT(ALLFUNC,);
129
130
131
132
133
134
135
136
137
138
139
140

    MPIU_Thread_mutex_lock(&progress_mutex, &mpi_errno);
    MPIU_Assert(!mpi_errno);

    while (!progress_thread_done) {
        MPIU_Thread_cond_wait(&progress_cond, &progress_mutex, &mpi_errno);
        MPIU_Assert(!mpi_errno);
    }

    MPIU_Thread_mutex_unlock(&progress_mutex, &mpi_errno);
    MPIU_Assert(!mpi_errno);

141
    MPIU_THREAD_CS_ENTER(ALLFUNC,);
142
143
144
145
146
147
148
149
150

    MPIU_Thread_cond_destroy(&progress_cond, &mpi_errno);
    MPIU_Assert(!mpi_errno);

    MPIU_Thread_mutex_destroy(&progress_mutex, &mpi_errno);
    MPIU_Assert(!mpi_errno);

    MPID_MPI_FUNC_EXIT(MPID_STATE_MPIR_FINALIZE_ASYNC_THREAD);

151
#endif /* MPICH_THREAD_LEVEL >= MPI_THREAD_SERIALIZED */
152
153
    return mpi_errno;
}