margo.c 13.6 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11

/*
 * (C) 2015 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

#include <assert.h>
#include <unistd.h>
#include <errno.h>
#include <abt.h>
12
#include <abt-snoozer.h>
13
#include <time.h>
Philip Carns's avatar
bug fix    
Philip Carns committed
14
#include <math.h>
15
16

#include "margo.h"
17
#include "margo-timer.h"
Philip Carns's avatar
Philip Carns committed
18
#include "utlist.h"
19

20
21
22
23
24
/* TODO: including core.h for cancel definition, presumably this will be 
 * available in top level later?
 */
#include <mercury_core.h>

25
26
#define MERCURY_PROGRESS_TIMEOUT_UB 100 /* 100 milliseconds */

27
28
struct margo_instance
{
29
    /* provided by caller */
30
31
    hg_context_t *hg_context;
    hg_class_t *hg_class;
32
33
34
    ABT_pool handler_pool;
    ABT_pool progress_pool;

35
    /* internal to margo for this particular instance */
36
37
    ABT_thread hg_progress_tid;
    int hg_progress_shutdown_flag;
38
39
40
41
42
43
44

    /* control logic for callers waiting on margo to be finalized */
    int finalize_flag;
    int finalize_waiters_in_progress_pool;
    ABT_mutex finalize_mutex;
    ABT_cond finalize_cond;

45
46
47
48
49
50
51
52
53
54
55
56
    int table_index;
};

struct margo_handler_mapping
{
    hg_class_t *class;
    margo_instance_id mid;
};

#define MAX_HANDLER_MAPPING 8
static int handler_mapping_table_size = 0;
static struct margo_handler_mapping handler_mapping_table[MAX_HANDLER_MAPPING] = {0};
57

58
static void hg_progress_fn(void* foo);
59
60
61
62
63
64
65
66
67


struct handler_entry
{
    void* fn;
    hg_handle_t handle;
    struct handler_entry *next; 
};

68
69
margo_instance_id margo_init(ABT_pool progress_pool, ABT_pool handler_pool,
    hg_context_t *hg_context, hg_class_t *hg_class)
70
71
{
    int ret;
72
73
74
    struct margo_instance *mid;

    if(handler_mapping_table_size >= MAX_HANDLER_MAPPING)
75
        return(MARGO_INSTANCE_NULL);
76
77
78

    mid = malloc(sizeof(*mid));
    if(!mid)
79
        return(MARGO_INSTANCE_NULL);
80
    memset(mid, 0, sizeof(*mid));
81

82
83
84
    ABT_mutex_create(&mid->finalize_mutex);
    ABT_cond_create(&mid->finalize_cond);

85
86
    mid->progress_pool = progress_pool;
    mid->handler_pool = handler_pool;
87
88
    mid->hg_class = hg_class;
    mid->hg_context = hg_context;
89

90
91
92
93
94
95
96
97
    ret = margo_timer_instance_init(mid);
    if(ret != 0)
    {
        fprintf(stderr, "Error: margo_timer_instance_init()\n");
        free(mid);
        return(MARGO_INSTANCE_NULL);
    }

98
    ret = ABT_thread_create(mid->progress_pool, hg_progress_fn, mid, 
99
        ABT_THREAD_ATTR_NULL, &mid->hg_progress_tid);
100
101
102
    if(ret != 0)
    {
        fprintf(stderr, "Error: ABT_thread_create()\n");
103
        free(mid);
104
        return(MARGO_INSTANCE_NULL);
105
106
    }

107
108
109
110
111
112
    handler_mapping_table[handler_mapping_table_size].mid = mid;
    handler_mapping_table[handler_mapping_table_size].class = mid->hg_class;
    mid->table_index = handler_mapping_table_size;
    handler_mapping_table_size++;

    return mid;
113
114
}

115
void margo_finalize(margo_instance_id mid)
116
{
117
118
    int i;

119
    /* tell progress thread to wrap things up */
120
    mid->hg_progress_shutdown_flag = 1;
121
122

    /* wait for it to shutdown cleanly */
123
124
    ABT_thread_join(mid->hg_progress_tid);
    ABT_thread_free(&mid->hg_progress_tid);
125

126
127
128
129
130
    for(i=mid->table_index; i<(handler_mapping_table_size-1); i++)
    {
        handler_mapping_table[i] = handler_mapping_table[i+1];
    }
    handler_mapping_table_size--;
131

132
133
134
135
136
137
138
139
140
141
142
143
144
    ABT_mutex_lock(mid->finalize_mutex);
    mid->finalize_flag = 1;
    ABT_cond_broadcast(mid->finalize_cond);
    ABT_mutex_unlock(mid->finalize_mutex);

    /* TODO: yuck, there is a race here if someone was really waiting for
     * finalize; we can't destroy the data structures out from under them.
     * We could fix this by reference counting so that the last caller
     * (whether a finalize() caller or wait_for_finalize() caller) knows it
     * is safe to turn off the lights on their way out.  For now we just leak 
     * a small amount of memory.
     */
#if 0
145
    margo_timer_instance_finalize(mid);
146

147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
    ABT_mutex_free(&mid->finalize_mutex);
    ABT_cond_free(&mid->finalize_cond);
    free(mid);
#endif

    return;
}

void margo_wait_for_finalize(margo_instance_id mid)
{
    ABT_xstream xstream;
    ABT_pool pool;
    int ret;
    int in_pool = 0;

    ret = ABT_xstream_self(&xstream);
    if(ret != 0)
        return;
    ret = ABT_xstream_get_main_pools(xstream, 1, &pool);
    if(ret != 0)
        return;

    /* Is this waiter in the same pool as the pool running the progress
     * thread?
     */
    if(pool == mid->progress_pool)
        in_pool = 1;

    ABT_mutex_lock(mid->finalize_mutex);

        mid->finalize_waiters_in_progress_pool += in_pool;
            
        while(!mid->finalize_flag)
            ABT_cond_wait(mid->finalize_cond, mid->finalize_mutex);

    ABT_mutex_unlock(mid->finalize_mutex);
    
184
185
186
187
    return;
}

/* dedicated thread function to drive Mercury progress */
188
static void hg_progress_fn(void* foo)
189
190
191
{
    int ret;
    unsigned int actual_count;
192
    struct margo_instance *mid = (struct margo_instance *)foo;
193
    size_t size;
194
195
    unsigned int hg_progress_timeout = MERCURY_PROGRESS_TIMEOUT_UB;
    double next_timer_exp;
196

197
    while(!mid->hg_progress_shutdown_flag)
198
199
    {
        do {
200
            ret = HG_Trigger(mid->hg_context, 0, 1, &actual_count);
201
        } while((ret == HG_SUCCESS) && actual_count && !mid->hg_progress_shutdown_flag);
202

203
        if(!mid->hg_progress_shutdown_flag)
204
        {
205
206
            ABT_mutex_lock(mid->finalize_mutex);

207
            ABT_pool_get_total_size(mid->progress_pool, &size);
208
209
210
211
212
213
            /* Are there any other threads executing in this pool that are *not*
             * blocked on margo_wait_for_finalize()?  If so then, we can't
             * sleep here or else those threads will not get a chance to
             * execute.
             */
            if(size > mid->finalize_waiters_in_progress_pool)
214
            {
215
                ABT_mutex_unlock(mid->finalize_mutex);
216
                HG_Progress(mid->hg_context, 0);
217
218
219
220
                ABT_thread_yield();
            }
            else
            {
221
                ABT_mutex_unlock(mid->finalize_mutex);
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240

                ret = margo_timer_get_next_expiration(mid, &next_timer_exp);
                if(ret == 0)
                {
                    /* there is a queued timer, don't block long enough
                     * to keep this timer waiting
                     */
                    if(next_timer_exp >= 0.0)
                    {
                        next_timer_exp *= 1000; /* convert to milliseconds */
                        if(next_timer_exp < MERCURY_PROGRESS_TIMEOUT_UB)
                            hg_progress_timeout = (unsigned int)next_timer_exp;
                    }
                    else
                    {
                        hg_progress_timeout = 0;
                    }
                }
                HG_Progress(mid->hg_context, hg_progress_timeout);
241
242
            }
        }
243

244
        /* check for any expired timers */
245
        margo_check_timers(mid);
246
247
    }

248
    return;
249
250
}

251
ABT_pool* margo_get_handler_pool(margo_instance_id mid)
252
{
253
    return(&mid->handler_pool);
254
255
}

256
257
258
259
260
261
262
263
264
265
266
hg_context_t* margo_get_context(margo_instance_id mid)
{
    return(mid->hg_context);
}

hg_class_t* margo_get_class(margo_instance_id mid)
{
    return(mid->hg_class);
}


Jonathan Jenkins's avatar
Jonathan Jenkins committed
267
static hg_return_t margo_cb(const struct hg_cb_info *info)
268
269
270
271
272
273
274
275
276
277
{
    hg_return_t hret = info->ret;

    ABT_eventual *eventual = info->arg;
    /* propagate return code out through eventual */
    ABT_eventual_set(*eventual, &hret, sizeof(hret));
    
    return(HG_SUCCESS);
}

278
279
280
typedef struct
{
    hg_handle_t handle;
Shane Snyder's avatar
Shane Snyder committed
281
} margo_forward_timeout_cb_dat;
282
283
284

static void margo_forward_timeout_cb(void *arg)
{
Shane Snyder's avatar
Shane Snyder committed
285
286
    margo_forward_timeout_cb_dat *timeout_cb_dat =
        (margo_forward_timeout_cb_dat *)arg;
287
288

    /* cancel the Mercury op if the forward timed out */
Shane Snyder's avatar
Shane Snyder committed
289
    HG_Core_cancel(timeout_cb_dat->handle);
290
291
292
    return;
}

293
294
295
296
297
298
hg_return_t margo_forward_timed(
    margo_instance_id mid,
    hg_handle_t handle,
    void *in_struct,
    double timeout_ms)
{
Shane Snyder's avatar
Shane Snyder committed
299
    int ret;
300
301
302
    hg_return_t hret = HG_TIMEOUT;
    ABT_eventual eventual;
    hg_return_t* waited_hret;
Shane Snyder's avatar
Shane Snyder committed
303
304
    margo_timer_t forward_timer;
    margo_forward_timeout_cb_dat timeout_cb_dat;
305
306
307
308
309
310
311

    ret = ABT_eventual_create(sizeof(hret), &eventual);
    if(ret != 0)
    {
        return(HG_NOMEM_ERROR);        
    }

Shane Snyder's avatar
Shane Snyder committed
312
313
    /* set a timer object to expire when this forward times out */
    timeout_cb_dat.handle = handle;
314
    margo_timer_init(mid, &forward_timer, margo_forward_timeout_cb,
Shane Snyder's avatar
Shane Snyder committed
315
        &timeout_cb_dat, timeout_ms);
316
317
318
319
320
321
322
323

    hret = HG_Forward(handle, margo_cb, &eventual, in_struct);
    if(hret == 0)
    {
        ABT_eventual_wait(eventual, (void**)&waited_hret);
        hret = *waited_hret;
    }

324
325
    /* remove timer if it is still in place (i.e., not timed out) */
    if(hret != HG_TIMEOUT)
326
        margo_timer_destroy(mid, &forward_timer);
327
328
329
330

    ABT_eventual_free(&eventual);

    return(hret);
331
332
333
334

}


335
hg_return_t margo_forward(
336
    margo_instance_id mid,
337
338
339
340
341
342
343
344
345
346
347
348
349
350
    hg_handle_t handle,
    void *in_struct)
{
    hg_return_t hret = HG_TIMEOUT;
    ABT_eventual eventual;
    int ret;
    hg_return_t* waited_hret;

    ret = ABT_eventual_create(sizeof(hret), &eventual);
    if(ret != 0)
    {
        return(HG_NOMEM_ERROR);        
    }

Jonathan Jenkins's avatar
Jonathan Jenkins committed
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
    hret = HG_Forward(handle, margo_cb, &eventual, in_struct);
    if(hret == 0)
    {
        ABT_eventual_wait(eventual, (void**)&waited_hret);
        hret = *waited_hret;
    }

    ABT_eventual_free(&eventual);

    return(hret);
}

hg_return_t margo_respond(
    margo_instance_id mid,
    hg_handle_t handle,
    void *out_struct)
{
    hg_return_t hret = HG_TIMEOUT;
    ABT_eventual eventual;
    int ret;
    hg_return_t* waited_hret;

    ret = ABT_eventual_create(sizeof(hret), &eventual);
    if(ret != 0)
    {
        return(HG_NOMEM_ERROR);
    }

    hret = HG_Respond(handle, margo_cb, &eventual, out_struct);
380
381
382
383
384
385
386
387
388
389
390
    if(hret == 0)
    {
        ABT_eventual_wait(eventual, (void**)&waited_hret);
        hret = *waited_hret;
    }

    ABT_eventual_free(&eventual);

    return(hret);
}

391

392
static hg_return_t margo_bulk_transfer_cb(const struct hg_cb_info *info)
393
{
394
395
    hg_return_t hret = info->ret;
    ABT_eventual *eventual = info->arg;
396
397
398
399
400
401
402

    /* propagate return code out through eventual */
    ABT_eventual_set(*eventual, &hret, sizeof(hret));
    
    return(HG_SUCCESS);
}

Philip Carns's avatar
Philip Carns committed
403
404
struct lookup_cb_evt
{
405
406
    hg_return_t nret;
    hg_addr_t addr;
Philip Carns's avatar
Philip Carns committed
407
408
};

409
static hg_return_t margo_addr_lookup_cb(const struct hg_cb_info *info)
Philip Carns's avatar
Philip Carns committed
410
411
{
    struct lookup_cb_evt evt;
412
413
    evt.nret = info->ret;
    evt.addr = info->info.lookup.addr;
Philip Carns's avatar
Philip Carns committed
414

415
    ABT_eventual *eventual = info->arg;
Philip Carns's avatar
Philip Carns committed
416
417
418
419

    /* propagate return code out through eventual */
    ABT_eventual_set(*eventual, &evt, sizeof(evt));
    
420
    return(HG_SUCCESS);
Philip Carns's avatar
Philip Carns committed
421
422
423
}


424
hg_return_t margo_addr_lookup(
425
    margo_instance_id mid,
426
    hg_context_t *context,
Philip Carns's avatar
Philip Carns committed
427
    const char   *name,
428
    hg_addr_t    *addr)
429
{
430
    hg_return_t nret;
Philip Carns's avatar
Philip Carns committed
431
    struct lookup_cb_evt *evt;
432
433
434
    ABT_eventual eventual;
    int ret;

Philip Carns's avatar
Philip Carns committed
435
    ret = ABT_eventual_create(sizeof(*evt), &eventual);
436
437
438
439
440
    if(ret != 0)
    {
        return(HG_NOMEM_ERROR);        
    }

441
442
    nret = HG_Addr_lookup(context, margo_addr_lookup_cb,
        &eventual, name, HG_OP_ID_IGNORE);
443
444
    if(nret == 0)
    {
Philip Carns's avatar
Philip Carns committed
445
446
447
        ABT_eventual_wait(eventual, (void**)&evt);
        *addr = evt->addr;
        nret = evt->nret;
448
449
450
451
452
453
454
    }

    ABT_eventual_free(&eventual);

    return(nret);
}

455
hg_return_t margo_bulk_transfer(
456
    margo_instance_id mid,
457
    hg_context_t *context,
458
    hg_bulk_op_t op,
459
    hg_addr_t origin_addr,
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
    hg_bulk_t origin_handle,
    size_t origin_offset,
    hg_bulk_t local_handle,
    size_t local_offset,
    size_t size)
{
    hg_return_t hret = HG_TIMEOUT;
    hg_return_t *waited_hret;
    ABT_eventual eventual;
    int ret;

    ret = ABT_eventual_create(sizeof(hret), &eventual);
    if(ret != 0)
    {
        return(HG_NOMEM_ERROR);        
    }

    hret = HG_Bulk_transfer(context, margo_bulk_transfer_cb, &eventual, op, 
        origin_addr, origin_handle, origin_offset, local_handle, local_offset,
        size, HG_OP_ID_IGNORE);
    if(hret == 0)
    {
        ABT_eventual_wait(eventual, (void**)&waited_hret);
        hret = *waited_hret;
    }

    ABT_eventual_free(&eventual);

    return(hret);
}

491
492
typedef struct
{
493
    margo_instance_id mid;
494
495
    ABT_mutex mutex;
    ABT_cond cond;
496
    int is_asleep;
497
498
499
500
501
502
503
} margo_thread_sleep_cb_dat;

static void margo_thread_sleep_cb(void *arg)
{
    margo_thread_sleep_cb_dat *sleep_cb_dat =
        (margo_thread_sleep_cb_dat *)arg;

504
505
506
507
508
    /* decrement number of waiting threads */
    ABT_mutex_lock(sleep_cb_dat->mid->finalize_mutex);
    sleep_cb_dat->mid->finalize_waiters_in_progress_pool--;
    ABT_mutex_unlock(sleep_cb_dat->mid->finalize_mutex);

509
510
    /* wake up the sleeping thread */
    ABT_mutex_lock(sleep_cb_dat->mutex);
511
    sleep_cb_dat->is_asleep = 0;
512
513
514
515
516
517
518
    ABT_cond_signal(sleep_cb_dat->cond);
    ABT_mutex_unlock(sleep_cb_dat->mutex);

    return;
}

void margo_thread_sleep(
519
    margo_instance_id mid,
520
521
522
523
524
525
    double timeout_ms)
{
    margo_timer_t sleep_timer;
    margo_thread_sleep_cb_dat sleep_cb_dat;

    /* set data needed for sleep callback */
526
    sleep_cb_dat.mid = mid;
527
528
    ABT_mutex_create(&(sleep_cb_dat.mutex));
    ABT_cond_create(&(sleep_cb_dat.cond));
529
    sleep_cb_dat.is_asleep = 1;
530
531

    /* initialize the sleep timer */
532
    margo_timer_init(mid, &sleep_timer, margo_thread_sleep_cb,
533
534
        &sleep_cb_dat, timeout_ms);

535
536
537
538
539
    /* increment number of waiting threads */
    ABT_mutex_lock(mid->finalize_mutex);
    mid->finalize_waiters_in_progress_pool++;
    ABT_mutex_unlock(mid->finalize_mutex);

540
541
    /* yield thread for specified timeout */
    ABT_mutex_lock(sleep_cb_dat.mutex);
542
543
    while(sleep_cb_dat.is_asleep)
        ABT_cond_wait(sleep_cb_dat.cond, sleep_cb_dat.mutex);
544
545
546
547
548
549
    ABT_mutex_unlock(sleep_cb_dat.mutex);

    return;
}


550
margo_instance_id margo_hg_class_to_instance(hg_class_t *cl)
551
552
553
554
555
{
    int i;

    for(i=0; i<handler_mapping_table_size; i++)
    {
556
        if(handler_mapping_table[i].class == cl)
557
558
559
560
            return(handler_mapping_table[i].mid);
    }
    return(NULL);
}