margo.c 6.68 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11

/*
 * (C) 2015 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

#include <assert.h>
#include <unistd.h>
#include <errno.h>
#include <abt.h>
12
#include <abt-snoozer.h>
13
14
15

#include "margo.h"

16
17
struct margo_instance
{
18
    /* provided by caller */
19
20
    hg_context_t *hg_context;
    hg_class_t *hg_class;
21
22
23
    ABT_pool handler_pool;
    ABT_pool progress_pool;

24
    /* internal to margo for this particular instance */
25
26
27
28
29
30
31
32
33
34
35
36
37
38
    ABT_thread hg_progress_tid;
    int hg_progress_shutdown_flag;
    int table_index;
};

struct margo_handler_mapping
{
    hg_class_t *class;
    margo_instance_id mid;
};

#define MAX_HANDLER_MAPPING 8
static int handler_mapping_table_size = 0;
static struct margo_handler_mapping handler_mapping_table[MAX_HANDLER_MAPPING] = {0};
39

40
static void hg_progress_fn(void* foo);
41
42
43
44
45
46
47
48
49


struct handler_entry
{
    void* fn;
    hg_handle_t handle;
    struct handler_entry *next; 
};

50
51
margo_instance_id margo_init(ABT_pool progress_pool, ABT_pool handler_pool,
    hg_context_t *hg_context, hg_class_t *hg_class)
52
53
{
    int ret;
54
55
56
    struct margo_instance *mid;

    if(handler_mapping_table_size >= MAX_HANDLER_MAPPING)
57
        return(MARGO_INSTANCE_NULL);
58
59
60

    mid = malloc(sizeof(*mid));
    if(!mid)
61
        return(MARGO_INSTANCE_NULL);
62
    memset(mid, 0, sizeof(*mid));
63

64
65
    mid->progress_pool = progress_pool;
    mid->handler_pool = handler_pool;
66
67
    mid->hg_class = hg_class;
    mid->hg_context = hg_context;
68

69
    ret = ABT_thread_create(mid->progress_pool, hg_progress_fn, mid, 
70
        ABT_THREAD_ATTR_NULL, &mid->hg_progress_tid);
71
72
73
    if(ret != 0)
    {
        fprintf(stderr, "Error: ABT_thread_create()\n");
74
        free(mid);
75
        return(MARGO_INSTANCE_NULL);
76
77
    }

78
79
80
81
82
83
    handler_mapping_table[handler_mapping_table_size].mid = mid;
    handler_mapping_table[handler_mapping_table_size].class = mid->hg_class;
    mid->table_index = handler_mapping_table_size;
    handler_mapping_table_size++;

    return mid;
84
85
}

86
void margo_finalize(margo_instance_id mid)
87
{
88
89
    int i;

90
    /* tell progress thread to wrap things up */
91
    mid->hg_progress_shutdown_flag = 1;
92
93

    /* wait for it to shutdown cleanly */
94
95
    ABT_thread_join(mid->hg_progress_tid);
    ABT_thread_free(&mid->hg_progress_tid);
96

97
98
99
100
101
    for(i=mid->table_index; i<(handler_mapping_table_size-1); i++)
    {
        handler_mapping_table[i] = handler_mapping_table[i+1];
    }
    handler_mapping_table_size--;
102
103
104
105
106

    return;
}

/* dedicated thread function to drive Mercury progress */
107
static void hg_progress_fn(void* foo)
108
109
110
{
    int ret;
    unsigned int actual_count;
111
    struct margo_instance *mid = (struct margo_instance *)foo;
112
    size_t size;
113

114
    while(!mid->hg_progress_shutdown_flag)
115
116
    {
        do {
117
118
            ret = HG_Trigger(mid->hg_class, mid->hg_context, 0, 1, &actual_count);
        } while((ret == HG_SUCCESS) && actual_count && !mid->hg_progress_shutdown_flag);
119

120
        if(!mid->hg_progress_shutdown_flag)
121
122
123
124
125
126
127
128
129
130
131
132
        {
            ABT_pool_get_total_size(mid->progress_pool, &size);
            if(size > 1)
            {
                HG_Progress(mid->hg_class, mid->hg_context, 0);
                ABT_thread_yield();
            }
            else
            {
                HG_Progress(mid->hg_class, mid->hg_context, 100);
            }
        }
133
134
    }

135
    return;
136
137
}

138
ABT_pool* margo_get_handler_pool(margo_instance_id mid)
139
{
140
    return(&mid->handler_pool);
141
142
143
144
145
146
147
148
149
150
151
152
153
154
}

static hg_return_t margo_forward_cb(const struct hg_cb_info *info)
{
    hg_return_t hret = info->ret;

    ABT_eventual *eventual = info->arg;
    /* propagate return code out through eventual */
    ABT_eventual_set(*eventual, &hret, sizeof(hret));
    
    return(HG_SUCCESS);
}

hg_return_t margo_forward(
155
    margo_instance_id mid,
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
    hg_handle_t handle,
    void *in_struct)
{
    hg_return_t hret = HG_TIMEOUT;
    ABT_eventual eventual;
    int ret;
    hg_return_t* waited_hret;

    ret = ABT_eventual_create(sizeof(hret), &eventual);
    if(ret != 0)
    {
        return(HG_NOMEM_ERROR);        
    }

    hret = HG_Forward(handle, margo_forward_cb, &eventual, in_struct);
    if(hret == 0)
    {
        ABT_eventual_wait(eventual, (void**)&waited_hret);
        hret = *waited_hret;
    }

    ABT_eventual_free(&eventual);

    return(hret);
}

182

183
184
185
186
187
188
189
190
191
192
193
static hg_return_t margo_bulk_transfer_cb(const struct hg_bulk_cb_info *hg_bulk_cb_info)
{
    hg_return_t hret = hg_bulk_cb_info->ret;
    ABT_eventual *eventual = hg_bulk_cb_info->arg;

    /* propagate return code out through eventual */
    ABT_eventual_set(*eventual, &hret, sizeof(hret));
    
    return(HG_SUCCESS);
}

Philip Carns's avatar
Philip Carns committed
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
struct lookup_cb_evt
{
    na_return_t nret;
    na_addr_t addr;
};

static na_return_t margo_na_addr_lookup_cb(const struct na_cb_info *callback_info)
{
    struct lookup_cb_evt evt;
    evt.nret = callback_info->ret;
    evt.addr = callback_info->info.lookup.addr;

    ABT_eventual *eventual = callback_info->arg;

    /* propagate return code out through eventual */
    ABT_eventual_set(*eventual, &evt, sizeof(evt));
    
    return(NA_SUCCESS);
}


215
na_return_t margo_na_addr_lookup(
216
    margo_instance_id mid,
217
218
    na_class_t   *na_class,
    na_context_t *context,
Philip Carns's avatar
Philip Carns committed
219
220
    const char   *name,
    na_addr_t    *addr)
221
222
{
    na_return_t nret;
Philip Carns's avatar
Philip Carns committed
223
    struct lookup_cb_evt *evt;
224
225
226
    ABT_eventual eventual;
    int ret;

Philip Carns's avatar
Philip Carns committed
227
    ret = ABT_eventual_create(sizeof(*evt), &eventual);
228
229
230
231
232
233
234
235
236
    if(ret != 0)
    {
        return(HG_NOMEM_ERROR);        
    }

    nret = NA_Addr_lookup(na_class, context, margo_na_addr_lookup_cb,
        &eventual, name, NA_OP_ID_IGNORE);
    if(nret == 0)
    {
Philip Carns's avatar
Philip Carns committed
237
238
239
        ABT_eventual_wait(eventual, (void**)&evt);
        *addr = evt->addr;
        nret = evt->nret;
240
241
242
243
244
245
246
    }

    ABT_eventual_free(&eventual);

    return(nret);
}

247
hg_return_t margo_bulk_transfer(
248
    margo_instance_id mid,
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
    hg_bulk_context_t *context,
    hg_bulk_op_t op,
    na_addr_t origin_addr,
    hg_bulk_t origin_handle,
    size_t origin_offset,
    hg_bulk_t local_handle,
    size_t local_offset,
    size_t size)
{
    hg_return_t hret = HG_TIMEOUT;
    hg_return_t *waited_hret;
    ABT_eventual eventual;
    int ret;

    ret = ABT_eventual_create(sizeof(hret), &eventual);
    if(ret != 0)
    {
        return(HG_NOMEM_ERROR);        
    }

    hret = HG_Bulk_transfer(context, margo_bulk_transfer_cb, &eventual, op, 
        origin_addr, origin_handle, origin_offset, local_handle, local_offset,
        size, HG_OP_ID_IGNORE);
    if(hret == 0)
    {
        ABT_eventual_wait(eventual, (void**)&waited_hret);
        hret = *waited_hret;
    }

    ABT_eventual_free(&eventual);

    return(hret);
}

283
margo_instance_id margo_hg_class_to_instance(hg_class_t *cl)
284
285
286
287
288
{
    int i;

    for(i=0; i<handler_mapping_table_size; i++)
    {
289
        if(handler_mapping_table[i].class == cl)
290
291
292
293
            return(handler_mapping_table[i].mid);
    }
    return(NULL);
}