margo.c 6.63 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11

/*
 * (C) 2015 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

#include <assert.h>
#include <unistd.h>
#include <errno.h>
#include <abt.h>
12
#include <abt-snoozer.h>
13 14 15

#include "margo.h"

16 17
struct margo_instance
{
18
    /* provided by caller */
19 20
    hg_context_t *hg_context;
    hg_class_t *hg_class;
21 22 23
    ABT_pool handler_pool;
    ABT_pool progress_pool;

24
    /* internal to margo for this particular instance */
25 26 27 28 29 30 31 32 33 34 35 36 37 38
    ABT_thread hg_progress_tid;
    int hg_progress_shutdown_flag;
    int table_index;
};

struct margo_handler_mapping
{
    hg_class_t *class;
    margo_instance_id mid;
};

#define MAX_HANDLER_MAPPING 8
static int handler_mapping_table_size = 0;
static struct margo_handler_mapping handler_mapping_table[MAX_HANDLER_MAPPING] = {0};
39

40
static void hg_progress_fn(void* foo);
41 42 43 44 45 46 47 48 49


struct handler_entry
{
    void* fn;
    hg_handle_t handle;
    struct handler_entry *next; 
};

50 51
margo_instance_id margo_init(ABT_pool progress_pool, ABT_pool handler_pool,
    hg_context_t *hg_context, hg_class_t *hg_class)
52 53
{
    int ret;
54 55 56
    struct margo_instance *mid;

    if(handler_mapping_table_size >= MAX_HANDLER_MAPPING)
57
        return(MARGO_INSTANCE_NULL);
58 59 60

    mid = malloc(sizeof(*mid));
    if(!mid)
61
        return(MARGO_INSTANCE_NULL);
62
    memset(mid, 0, sizeof(*mid));
63

64 65
    mid->progress_pool = progress_pool;
    mid->handler_pool = handler_pool;
66 67
    mid->hg_class = hg_class;
    mid->hg_context = hg_context;
68

69
    ret = ABT_thread_create(mid->progress_pool, hg_progress_fn, mid, 
70
        ABT_THREAD_ATTR_NULL, &mid->hg_progress_tid);
71 72 73
    if(ret != 0)
    {
        fprintf(stderr, "Error: ABT_thread_create()\n");
74
        free(mid);
75
        return(MARGO_INSTANCE_NULL);
76 77
    }

78 79 80 81 82 83
    handler_mapping_table[handler_mapping_table_size].mid = mid;
    handler_mapping_table[handler_mapping_table_size].class = mid->hg_class;
    mid->table_index = handler_mapping_table_size;
    handler_mapping_table_size++;

    return mid;
84 85
}

86
void margo_finalize(margo_instance_id mid)
87
{
88 89
    int i;

90
    /* tell progress thread to wrap things up */
91
    mid->hg_progress_shutdown_flag = 1;
92 93

    /* wait for it to shutdown cleanly */
94 95
    ABT_thread_join(mid->hg_progress_tid);
    ABT_thread_free(&mid->hg_progress_tid);
96

97 98 99 100 101
    for(i=mid->table_index; i<(handler_mapping_table_size-1); i++)
    {
        handler_mapping_table[i] = handler_mapping_table[i+1];
    }
    handler_mapping_table_size--;
102 103 104 105 106

    return;
}

/* dedicated thread function to drive Mercury progress */
107
static void hg_progress_fn(void* foo)
108 109 110
{
    int ret;
    unsigned int actual_count;
111
    struct margo_instance *mid = (struct margo_instance *)foo;
112
    size_t size;
113

114
    while(!mid->hg_progress_shutdown_flag)
115 116
    {
        do {
117
            ret = HG_Trigger(mid->hg_context, 0, 1, &actual_count);
118
        } while((ret == HG_SUCCESS) && actual_count && !mid->hg_progress_shutdown_flag);
119

120
        if(!mid->hg_progress_shutdown_flag)
121
        {
122
            ABT_pool_get_total_size(mid->progress_pool, &size);
Philip Carns's avatar
Philip Carns committed
123
            if(size > 0)
124
            {
125
                HG_Progress(mid->hg_context, 0);
126 127 128 129
                ABT_thread_yield();
            }
            else
            {
130
                HG_Progress(mid->hg_context, 100);
131 132
            }
        }
133 134
    }

135
    return;
136 137
}

138
ABT_pool* margo_get_handler_pool(margo_instance_id mid)
139
{
140
    return(&mid->handler_pool);
141 142 143 144 145 146 147 148 149 150 151 152 153 154
}

static hg_return_t margo_forward_cb(const struct hg_cb_info *info)
{
    hg_return_t hret = info->ret;

    ABT_eventual *eventual = info->arg;
    /* propagate return code out through eventual */
    ABT_eventual_set(*eventual, &hret, sizeof(hret));
    
    return(HG_SUCCESS);
}

hg_return_t margo_forward(
155
    margo_instance_id mid,
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
    hg_handle_t handle,
    void *in_struct)
{
    hg_return_t hret = HG_TIMEOUT;
    ABT_eventual eventual;
    int ret;
    hg_return_t* waited_hret;

    ret = ABT_eventual_create(sizeof(hret), &eventual);
    if(ret != 0)
    {
        return(HG_NOMEM_ERROR);        
    }

    hret = HG_Forward(handle, margo_forward_cb, &eventual, in_struct);
    if(hret == 0)
    {
        ABT_eventual_wait(eventual, (void**)&waited_hret);
        hret = *waited_hret;
    }

    ABT_eventual_free(&eventual);

    return(hret);
}

182

183 184 185 186 187 188 189 190 191 192 193
static hg_return_t margo_bulk_transfer_cb(const struct hg_bulk_cb_info *hg_bulk_cb_info)
{
    hg_return_t hret = hg_bulk_cb_info->ret;
    ABT_eventual *eventual = hg_bulk_cb_info->arg;

    /* propagate return code out through eventual */
    ABT_eventual_set(*eventual, &hret, sizeof(hret));
    
    return(HG_SUCCESS);
}

Philip Carns's avatar
Philip Carns committed
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214
struct lookup_cb_evt
{
    na_return_t nret;
    na_addr_t addr;
};

static na_return_t margo_na_addr_lookup_cb(const struct na_cb_info *callback_info)
{
    struct lookup_cb_evt evt;
    evt.nret = callback_info->ret;
    evt.addr = callback_info->info.lookup.addr;

    ABT_eventual *eventual = callback_info->arg;

    /* propagate return code out through eventual */
    ABT_eventual_set(*eventual, &evt, sizeof(evt));
    
    return(NA_SUCCESS);
}


215
na_return_t margo_na_addr_lookup(
216
    margo_instance_id mid,
217 218
    na_class_t   *na_class,
    na_context_t *context,
Philip Carns's avatar
Philip Carns committed
219 220
    const char   *name,
    na_addr_t    *addr)
221 222
{
    na_return_t nret;
Philip Carns's avatar
Philip Carns committed
223
    struct lookup_cb_evt *evt;
224 225 226
    ABT_eventual eventual;
    int ret;

Philip Carns's avatar
Philip Carns committed
227
    ret = ABT_eventual_create(sizeof(*evt), &eventual);
228 229 230 231 232 233 234 235 236
    if(ret != 0)
    {
        return(HG_NOMEM_ERROR);        
    }

    nret = NA_Addr_lookup(na_class, context, margo_na_addr_lookup_cb,
        &eventual, name, NA_OP_ID_IGNORE);
    if(nret == 0)
    {
Philip Carns's avatar
Philip Carns committed
237 238 239
        ABT_eventual_wait(eventual, (void**)&evt);
        *addr = evt->addr;
        nret = evt->nret;
240 241 242 243 244 245 246
    }

    ABT_eventual_free(&eventual);

    return(nret);
}

247
hg_return_t margo_bulk_transfer(
248
    margo_instance_id mid,
249
    hg_context_t *context,
250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282
    hg_bulk_op_t op,
    na_addr_t origin_addr,
    hg_bulk_t origin_handle,
    size_t origin_offset,
    hg_bulk_t local_handle,
    size_t local_offset,
    size_t size)
{
    hg_return_t hret = HG_TIMEOUT;
    hg_return_t *waited_hret;
    ABT_eventual eventual;
    int ret;

    ret = ABT_eventual_create(sizeof(hret), &eventual);
    if(ret != 0)
    {
        return(HG_NOMEM_ERROR);        
    }

    hret = HG_Bulk_transfer(context, margo_bulk_transfer_cb, &eventual, op, 
        origin_addr, origin_handle, origin_offset, local_handle, local_offset,
        size, HG_OP_ID_IGNORE);
    if(hret == 0)
    {
        ABT_eventual_wait(eventual, (void**)&waited_hret);
        hret = *waited_hret;
    }

    ABT_eventual_free(&eventual);

    return(hret);
}

283
margo_instance_id margo_hg_class_to_instance(hg_class_t *cl)
284 285 286 287 288
{
    int i;

    for(i=0; i<handler_mapping_table_size; i++)
    {
289
        if(handler_mapping_table[i].class == cl)
290 291 292 293
            return(handler_mapping_table[i].mid);
    }
    return(NULL);
}