margo.c 5.83 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11

/*
 * (C) 2015 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

#include <assert.h>
#include <unistd.h>
#include <errno.h>
#include <abt.h>
12
#include <abt-snoozer.h>
13 14 15 16 17 18 19 20

#include "margo.h"

static na_class_t *network_class = NULL;
static na_context_t *na_context = NULL;
static hg_context_t *hg_context = NULL;
static hg_class_t *hg_class = NULL;

21
static ABT_thread hg_progress_tid;
22
static int hg_progress_shutdown_flag = 0;
23
static void hg_progress_fn(void* foo);
24 25

static ABT_pool main_pool;
26 27
static ABT_pool engine_pool;
static ABT_xstream engine_xstream;
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90

struct handler_entry
{
    void* fn;
    hg_handle_t handle;
    struct handler_entry *next; 
};

int margo_init(na_bool_t listen, const char* local_addr)
{
    int ret;
    ABT_xstream xstream;

    /* boilerplate HG initialization steps */
    network_class = NA_Initialize(local_addr, listen);
    if(!network_class)
    {
        return(-1);
    }

    na_context = NA_Context_create(network_class);
    if(!na_context)
    {
        NA_Finalize(network_class);
        return(-1);
    }

    hg_class = HG_Init(network_class, na_context, NULL);
    if(!hg_class)
    {
        NA_Context_destroy(network_class, na_context);
        NA_Finalize(network_class);
        return(-1);
    }

    hg_context = HG_Context_create(hg_class);
    if(!hg_context)
    {
        HG_Finalize(hg_class);
        NA_Context_destroy(network_class, na_context);
        NA_Finalize(network_class);
        return(-1);
    }

    /* get the primary pool for the caller, this is where we will run ULTs to
     * handle incoming requests
     */    
    ret = ABT_xstream_self(&xstream);
    if(ret != 0)
    {
        /* TODO: err handling */
        fprintf(stderr, "Error: ABT_xstream_self()\n");
        return(-1);
    }

    ret = ABT_xstream_get_main_pools(xstream, 1, &main_pool);
    if(ret != 0)
    {
        /* TODO: err handling */
        fprintf(stderr, "Error: ABT_xstream_get_main_pools()\n");
        return(-1);
    }

91
    /* create an ES and ULT to drive Mercury progress */
92
    ret = ABT_snoozer_xstream_create(1, &engine_pool, &engine_xstream);
93 94
    if(ret != 0)
    {
95 96 97 98 99 100 101 102 103 104
        /* TODO: err handling */
        fprintf(stderr, "Error: ABT_snoozer_xstream_create()\n");
        return(-1);
    }
    ret = ABT_thread_create(engine_pool, hg_progress_fn, NULL, 
        ABT_THREAD_ATTR_NULL, &hg_progress_tid);
    if(ret != 0)
    {
        /* TODO: err handling */
        fprintf(stderr, "Error: ABT_thread_create()\n");
105 106 107 108 109 110 111 112 113 114 115 116
        return(-1);
    }

    return 0;
}

void margo_finalize(void)
{
    /* tell progress thread to wrap things up */
    hg_progress_shutdown_flag = 1;

    /* wait for it to shutdown cleanly */
117 118 119 120
    ABT_thread_join(hg_progress_tid);
    ABT_thread_free(&hg_progress_tid);
    ABT_xstream_join(engine_xstream);
    ABT_xstream_free(&engine_xstream);
121 122 123 124 125 126 127 128 129 130

    HG_Context_destroy(hg_context);
    HG_Finalize(hg_class);
    NA_Context_destroy(network_class, na_context);
    NA_Finalize(network_class);

    return;
}

/* dedicated thread function to drive Mercury progress */
131
static void hg_progress_fn(void* foo)
132 133 134 135 136 137 138 139 140 141 142 143 144 145
{
    int ret;
    unsigned int actual_count;

    while(!hg_progress_shutdown_flag)
    {
        do {
            ret = HG_Trigger(hg_class, hg_context, 0, 1, &actual_count);
        } while((ret == HG_SUCCESS) && actual_count && !hg_progress_shutdown_flag);

        if(!hg_progress_shutdown_flag)
            HG_Progress(hg_class, hg_context, 100);
    }

146
    return;
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261
}

hg_class_t* margo_get_class(void)
{
    return(hg_class);
}

ABT_pool* margo_get_main_pool(void)
{
    return(&main_pool);
}

na_return_t margo_addr_lookup(const char* name, na_addr_t* addr)
{
    na_return_t ret;

    ret = NA_Addr_lookup_wait(network_class, name, addr);

    return ret;
}

hg_return_t margo_create_handle(na_addr_t addr, hg_id_t id,
    hg_handle_t *handle)
{
    hg_return_t ret;

    ret = HG_Create(hg_class, hg_context, addr, id, handle);

    return ret;
}

static hg_return_t margo_forward_cb(const struct hg_cb_info *info)
{
    hg_return_t hret = info->ret;

    ABT_eventual *eventual = info->arg;
    /* propagate return code out through eventual */
    ABT_eventual_set(*eventual, &hret, sizeof(hret));
    
    return(HG_SUCCESS);
}

hg_return_t margo_forward(
    hg_handle_t handle,
    void *in_struct)
{
    hg_return_t hret = HG_TIMEOUT;
    ABT_eventual eventual;
    int ret;
    hg_return_t* waited_hret;

    ret = ABT_eventual_create(sizeof(hret), &eventual);
    if(ret != 0)
    {
        return(HG_NOMEM_ERROR);        
    }

    hret = HG_Forward(handle, margo_forward_cb, &eventual, in_struct);
    if(hret == 0)
    {
        ABT_eventual_wait(eventual, (void**)&waited_hret);
        hret = *waited_hret;
    }

    ABT_eventual_free(&eventual);

    return(hret);
}

static hg_return_t margo_bulk_transfer_cb(const struct hg_bulk_cb_info *hg_bulk_cb_info)
{
    hg_return_t hret = hg_bulk_cb_info->ret;
    ABT_eventual *eventual = hg_bulk_cb_info->arg;

    /* propagate return code out through eventual */
    ABT_eventual_set(*eventual, &hret, sizeof(hret));
    
    return(HG_SUCCESS);
}

hg_return_t margo_bulk_transfer(
    hg_bulk_context_t *context,
    hg_bulk_op_t op,
    na_addr_t origin_addr,
    hg_bulk_t origin_handle,
    size_t origin_offset,
    hg_bulk_t local_handle,
    size_t local_offset,
    size_t size)
{
    hg_return_t hret = HG_TIMEOUT;
    hg_return_t *waited_hret;
    ABT_eventual eventual;
    int ret;

    ret = ABT_eventual_create(sizeof(hret), &eventual);
    if(ret != 0)
    {
        return(HG_NOMEM_ERROR);        
    }

    hret = HG_Bulk_transfer(context, margo_bulk_transfer_cb, &eventual, op, 
        origin_addr, origin_handle, origin_offset, local_handle, local_offset,
        size, HG_OP_ID_IGNORE);
    if(hret == 0)
    {
        ABT_eventual_wait(eventual, (void**)&waited_hret);
        hret = *waited_hret;
    }

    ABT_eventual_free(&eventual);

    return(hret);
}