async.c 5.03 KB
Newer Older
1
2
3
4
5
6
7
8
/* -*- Mode: C; c-basic-offset:4 ; -*- */
/*
 *  (C) 2001 by Argonne National Laboratory.
 *      See COPYRIGHT in top-level directory.
 */

#include "mpiimpl.h"
#include "mpi_init.h"
9
#include "mpiu_thread.h"
10

11
12
#ifndef MPICH_MPI_FROM_PMPI

13
static MPID_Comm *progress_comm_ptr;
14
15
16
17
18
static MPIU_Thread_id_t progress_thread_id;
static MPIU_Thread_mutex_t progress_mutex;
static MPIU_Thread_cond_t progress_cond;
static volatile int progress_thread_done = 0;

19
20
#define WAKE_TAG 100

21
#undef FUNCNAME
22
#define FUNCNAME progress_fn
23
24
25
26
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
static void progress_fn(void * data)
{
27
#if MPICH_THREAD_LEVEL >= MPI_THREAD_SERIALIZED
Pavan Balaji's avatar
Pavan Balaji committed
28
    int mpi_errno = MPI_SUCCESS;
29
30
31
    MPID_Request *request_ptr = NULL;
    MPI_Request request;
    MPI_Status status;
32
33
34
35

    /* Explicitly add CS_ENTER/EXIT since this thread is created from
     * within an internal function and will call NMPI functions
     * directly. */
36
    MPIU_THREAD_CS_ENTER(ALLFUNC,);
37
38
39
40
41
42
43
44
45
46

    /* FIXME: We assume that waiting on some request forces progress
     * on all requests. With fine-grained threads, will this still
     * work as expected? We can imagine an approach where a request on
     * a non-conflicting communicator would not touch the remaining
     * requests to avoid locking issues. Once the fine-grained threads
     * code is fully functional, we need to revisit this and, if
     * appropriate, either change what we do in this thread, or delete
     * this comment. */

47
48
49
50
51
    mpi_errno = MPID_Irecv(NULL, 0, MPI_CHAR, 0, WAKE_TAG, progress_comm_ptr,
                           MPID_CONTEXT_INTRA_PT2PT, &request_ptr);
    MPIU_Assert(!mpi_errno);
    request = request_ptr->handle;
    mpi_errno = MPIR_Wait_impl(&request, &status);
52
53
54
55
56
57
58
59
60
61
62
63
64
65
    MPIU_Assert(!mpi_errno);

    /* Send a signal to the main thread saying we are done */
    MPIU_Thread_mutex_lock(&progress_mutex, &mpi_errno);
    MPIU_Assert(!mpi_errno);

    progress_thread_done = 1;

    MPIU_Thread_mutex_unlock(&progress_mutex, &mpi_errno);
    MPIU_Assert(!mpi_errno);

    MPIU_Thread_cond_signal(&progress_cond, &mpi_errno);
    MPIU_Assert(!mpi_errno);

66
    MPIU_THREAD_CS_EXIT(ALLFUNC,);
67

68
#endif /* MPICH_THREAD_LEVEL >= MPI_THREAD_SERIALIZED */
69
70
71
72
73
74
75
76
77
    return;
}

#undef FUNCNAME
#define FUNCNAME MPIR_Init_async_thread
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
int MPIR_Init_async_thread(void)
{
78
#if MPICH_THREAD_LEVEL >= MPI_THREAD_SERIALIZED
79
    int mpi_errno = MPI_SUCCESS;
80
    MPID_Comm *comm_self_ptr;
81
    int err = 0;
82
    MPID_MPI_STATE_DECL(MPID_STATE_MPIR_INIT_ASYNC_THREAD);
83
84
85
86
87

    MPID_MPI_FUNC_ENTER(MPID_STATE_MPIR_INIT_ASYNC_THREAD);


    /* Dup comm world for the progress thread */
88
89
90
91
92
93
94
    MPID_Comm_get_ptr(MPI_COMM_SELF, comm_self_ptr);
    mpi_errno = MPIR_Comm_dup_impl(comm_self_ptr, &progress_comm_ptr);
    if (mpi_errno) MPIU_ERR_POP(mpi_errno);

    MPIU_Thread_cond_create(&progress_cond, &err);
    MPIU_ERR_CHKANDJUMP1(err, mpi_errno, MPI_ERR_OTHER, "**cond_create", "**cond_create %s", strerror(err));
    
95
    MPIU_Thread_mutex_create(&progress_mutex, &err);
96
97
    MPIU_ERR_CHKANDJUMP1(err, mpi_errno, MPI_ERR_OTHER, "**mutex_create", "**mutex_create %s", strerror(err));
    
98
    MPIU_Thread_create((MPIU_Thread_func_t) progress_fn, NULL, &progress_thread_id, &err);
99
100
    MPIU_ERR_CHKANDJUMP1(err, mpi_errno, MPI_ERR_OTHER, "**mutex_create", "**mutex_create %s", strerror(err));
    
101
    MPID_MPI_FUNC_EXIT(MPID_STATE_MPIR_INIT_ASYNC_THREAD);
102

103
 fn_exit:
104
    return mpi_errno;
105
106
107
108
109
 fn_fail:
    goto fn_exit;
#else
    return MPI_SUCCESS;
#endif /* MPICH_THREAD_LEVEL >= MPI_THREAD_SERIALIZED */
110
111
112
113
114
115
116
117
118
}

#undef FUNCNAME
#define FUNCNAME MPIR_Finalize_async_thread
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
int MPIR_Finalize_async_thread(void)
{
    int mpi_errno = MPI_SUCCESS;
119
#if MPICH_THREAD_LEVEL >= MPI_THREAD_SERIALIZED
120
121
122
    MPID_Request *request_ptr = NULL;
    MPI_Request request;
    MPI_Status status;
123
    MPID_MPI_STATE_DECL(MPID_STATE_MPIR_FINALIZE_ASYNC_THREAD);
124
125
126

    MPID_MPI_FUNC_ENTER(MPID_STATE_MPIR_FINALIZE_ASYNC_THREAD);

127
128
129
130
131
    mpi_errno = MPID_Isend(NULL, 0, MPI_CHAR, 0, WAKE_TAG, progress_comm_ptr,
                           MPID_CONTEXT_INTRA_PT2PT, &request_ptr);
    MPIU_Assert(!mpi_errno);
    request = request_ptr->handle;
    mpi_errno = MPIR_Wait_impl(&request, &status);
132
133
    MPIU_Assert(!mpi_errno);

134
135
    /* XXX DJG why is this unlock/lock necessary?  Should we just YIELD here or later?  */
    MPIU_THREAD_CS_EXIT(ALLFUNC,);
136
137
138
139
140
141
142
143
144
145
146
147

    MPIU_Thread_mutex_lock(&progress_mutex, &mpi_errno);
    MPIU_Assert(!mpi_errno);

    while (!progress_thread_done) {
        MPIU_Thread_cond_wait(&progress_cond, &progress_mutex, &mpi_errno);
        MPIU_Assert(!mpi_errno);
    }

    MPIU_Thread_mutex_unlock(&progress_mutex, &mpi_errno);
    MPIU_Assert(!mpi_errno);

148
    MPIU_THREAD_CS_ENTER(ALLFUNC,);
149
150
151
152
153
154
155
156
157

    MPIU_Thread_cond_destroy(&progress_cond, &mpi_errno);
    MPIU_Assert(!mpi_errno);

    MPIU_Thread_mutex_destroy(&progress_mutex, &mpi_errno);
    MPIU_Assert(!mpi_errno);

    MPID_MPI_FUNC_EXIT(MPID_STATE_MPIR_FINALIZE_ASYNC_THREAD);

158
#endif /* MPICH_THREAD_LEVEL >= MPI_THREAD_SERIALIZED */
159
160
    return mpi_errno;
}
161
162

#endif