margo.c 7.46 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11

/*
 * (C) 2015 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

#include <assert.h>
#include <unistd.h>
#include <errno.h>
#include <abt.h>
12
#include <abt-snoozer.h>
13 14 15

#include "margo.h"

16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
struct margo_instance
{
    na_class_t *network_class;
    na_context_t *na_context;
    hg_context_t *hg_context;
    hg_class_t *hg_class;
    ABT_thread hg_progress_tid;
    int hg_progress_shutdown_flag;
    ABT_pool main_pool;
    ABT_pool engine_pool;
    ABT_xstream engine_xstream;
    int table_index;
};

struct margo_handler_mapping
{
    hg_class_t *class;
    margo_instance_id mid;
};

#define MAX_HANDLER_MAPPING 8
static int handler_mapping_table_size = 0;
static struct margo_handler_mapping handler_mapping_table[MAX_HANDLER_MAPPING] = {0};
39

40
static void hg_progress_fn(void* foo);
41 42 43 44 45 46 47 48 49


struct handler_entry
{
    void* fn;
    hg_handle_t handle;
    struct handler_entry *next; 
};

50
margo_instance_id margo_init(na_bool_t listen, const char* local_addr)
51 52 53
{
    int ret;
    ABT_xstream xstream;
54 55 56 57 58 59 60 61 62
    struct margo_instance *mid;

    if(handler_mapping_table_size >= MAX_HANDLER_MAPPING)
        return(NULL);

    mid = malloc(sizeof(*mid));
    if(!mid)
        return(NULL);
    memset(mid, 0, sizeof(*mid));
63 64

    /* boilerplate HG initialization steps */
65 66
    mid->network_class = NA_Initialize(local_addr, listen);
    if(!mid->network_class)
67
    {
68 69
        free(mid);
        return(NULL);
70 71
    }

72 73
    mid->na_context = NA_Context_create(mid->network_class);
    if(!mid->na_context)
74
    {
75 76 77
        NA_Finalize(mid->network_class);
        free(mid);
        return(NULL);
78 79
    }

80 81
    mid->hg_class = HG_Init(mid->network_class, mid->na_context, NULL);
    if(!mid->hg_class)
82
    {
83 84 85 86
        NA_Context_destroy(mid->network_class, mid->na_context);
        NA_Finalize(mid->network_class);
        free(mid);
        return(NULL);
87 88
    }

89 90
    mid->hg_context = HG_Context_create(mid->hg_class);
    if(!mid->hg_context)
91
    {
92 93 94 95
        HG_Finalize(mid->hg_class);
        NA_Context_destroy(mid->network_class, mid->na_context);
        NA_Finalize(mid->network_class);
        return(NULL);
96 97 98 99 100 101 102 103 104 105
    }

    /* get the primary pool for the caller, this is where we will run ULTs to
     * handle incoming requests
     */    
    ret = ABT_xstream_self(&xstream);
    if(ret != 0)
    {
        /* TODO: err handling */
        fprintf(stderr, "Error: ABT_xstream_self()\n");
106
        return(NULL);
107 108
    }

109
    ret = ABT_xstream_get_main_pools(xstream, 1, &mid->main_pool);
110 111 112 113
    if(ret != 0)
    {
        /* TODO: err handling */
        fprintf(stderr, "Error: ABT_xstream_get_main_pools()\n");
114
        return(NULL);
115 116
    }

117
    /* create an ES and ULT to drive Mercury progress */
118
    ret = ABT_snoozer_xstream_create(1, &mid->engine_pool, &mid->engine_xstream);
119 120
    if(ret != 0)
    {
121 122
        /* TODO: err handling */
        fprintf(stderr, "Error: ABT_snoozer_xstream_create()\n");
123
        return(NULL);
124
    }
125 126
    ret = ABT_thread_create(mid->engine_pool, hg_progress_fn, mid, 
        ABT_THREAD_ATTR_NULL, &mid->hg_progress_tid);
127 128 129 130
    if(ret != 0)
    {
        /* TODO: err handling */
        fprintf(stderr, "Error: ABT_thread_create()\n");
131
        return(NULL);
132 133
    }

134 135 136 137 138 139
    handler_mapping_table[handler_mapping_table_size].mid = mid;
    handler_mapping_table[handler_mapping_table_size].class = mid->hg_class;
    mid->table_index = handler_mapping_table_size;
    handler_mapping_table_size++;

    return mid;
140 141
}

142
void margo_finalize(margo_instance_id mid)
143
{
144 145
    int i;

146
    /* tell progress thread to wrap things up */
147
    mid->hg_progress_shutdown_flag = 1;
148 149

    /* wait for it to shutdown cleanly */
150 151 152 153
    ABT_thread_join(mid->hg_progress_tid);
    ABT_thread_free(&mid->hg_progress_tid);
    ABT_xstream_join(mid->engine_xstream);
    ABT_xstream_free(&mid->engine_xstream);
154

155 156 157 158 159 160 161 162 163 164
    HG_Context_destroy(mid->hg_context);
    HG_Finalize(mid->hg_class);
    NA_Context_destroy(mid->network_class, mid->na_context);
    NA_Finalize(mid->network_class);

    for(i=mid->table_index; i<(handler_mapping_table_size-1); i++)
    {
        handler_mapping_table[i] = handler_mapping_table[i+1];
    }
    handler_mapping_table_size--;
165 166 167 168 169

    return;
}

/* dedicated thread function to drive Mercury progress */
170
static void hg_progress_fn(void* foo)
171 172 173
{
    int ret;
    unsigned int actual_count;
174
    struct margo_instance *mid = (struct margo_instance *)foo;
175

176
    while(!mid->hg_progress_shutdown_flag)
177 178
    {
        do {
179 180
            ret = HG_Trigger(mid->hg_class, mid->hg_context, 0, 1, &actual_count);
        } while((ret == HG_SUCCESS) && actual_count && !mid->hg_progress_shutdown_flag);
181

182 183
        if(!mid->hg_progress_shutdown_flag)
            HG_Progress(mid->hg_class, mid->hg_context, 100);
184 185
    }

186
    return;
187 188
}

189
hg_class_t* margo_get_class(margo_instance_id mid)
190
{
191
    return(mid->hg_class);
192 193
}

194
ABT_pool* margo_get_main_pool(margo_instance_id mid)
195
{
196
    return(&mid->main_pool);
197 198
}

199
na_return_t margo_addr_lookup(margo_instance_id mid, const char* name, na_addr_t* addr)
200 201 202
{
    na_return_t ret;

203
    ret = NA_Addr_lookup_wait(mid->network_class, name, addr);
204 205 206 207

    return ret;
}

208 209
hg_return_t margo_create_handle(margo_instance_id mid, na_addr_t addr, 
    hg_id_t id, hg_handle_t *handle)
210 211 212
{
    hg_return_t ret;

213
    ret = HG_Create(mid->hg_class, mid->hg_context, addr, id, handle);
214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229

    return ret;
}

static hg_return_t margo_forward_cb(const struct hg_cb_info *info)
{
    hg_return_t hret = info->ret;

    ABT_eventual *eventual = info->arg;
    /* propagate return code out through eventual */
    ABT_eventual_set(*eventual, &hret, sizeof(hret));
    
    return(HG_SUCCESS);
}

hg_return_t margo_forward(
230
    margo_instance_id mid,
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268
    hg_handle_t handle,
    void *in_struct)
{
    hg_return_t hret = HG_TIMEOUT;
    ABT_eventual eventual;
    int ret;
    hg_return_t* waited_hret;

    ret = ABT_eventual_create(sizeof(hret), &eventual);
    if(ret != 0)
    {
        return(HG_NOMEM_ERROR);        
    }

    hret = HG_Forward(handle, margo_forward_cb, &eventual, in_struct);
    if(hret == 0)
    {
        ABT_eventual_wait(eventual, (void**)&waited_hret);
        hret = *waited_hret;
    }

    ABT_eventual_free(&eventual);

    return(hret);
}

static hg_return_t margo_bulk_transfer_cb(const struct hg_bulk_cb_info *hg_bulk_cb_info)
{
    hg_return_t hret = hg_bulk_cb_info->ret;
    ABT_eventual *eventual = hg_bulk_cb_info->arg;

    /* propagate return code out through eventual */
    ABT_eventual_set(*eventual, &hret, sizeof(hret));
    
    return(HG_SUCCESS);
}

hg_return_t margo_bulk_transfer(
269
    margo_instance_id mid,
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303
    hg_bulk_context_t *context,
    hg_bulk_op_t op,
    na_addr_t origin_addr,
    hg_bulk_t origin_handle,
    size_t origin_offset,
    hg_bulk_t local_handle,
    size_t local_offset,
    size_t size)
{
    hg_return_t hret = HG_TIMEOUT;
    hg_return_t *waited_hret;
    ABT_eventual eventual;
    int ret;

    ret = ABT_eventual_create(sizeof(hret), &eventual);
    if(ret != 0)
    {
        return(HG_NOMEM_ERROR);        
    }

    hret = HG_Bulk_transfer(context, margo_bulk_transfer_cb, &eventual, op, 
        origin_addr, origin_handle, origin_offset, local_handle, local_offset,
        size, HG_OP_ID_IGNORE);
    if(hret == 0)
    {
        ABT_eventual_wait(eventual, (void**)&waited_hret);
        hret = *waited_hret;
    }

    ABT_eventual_free(&eventual);

    return(hret);
}

304 305 306 307 308 309 310 311 312 313 314
margo_instance_id margo_hg_class_to_instance(hg_class_t *class)
{
    int i;

    for(i=0; i<handler_mapping_table_size; i++)
    {
        if(handler_mapping_table[i].class == class)
            return(handler_mapping_table[i].mid);
    }
    return(NULL);
}