margo.c 6.78 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11

/*
 * (C) 2015 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

#include <assert.h>
#include <unistd.h>
#include <errno.h>
#include <abt.h>
12
#include <abt-snoozer.h>
13 14 15

#include "margo.h"

16 17
struct margo_instance
{
18
    /* not needed */
19 20
    na_class_t *network_class;
    na_context_t *na_context;
21 22

    /* provided by caller */
23 24
    hg_context_t *hg_context;
    hg_class_t *hg_class;
25 26 27
    ABT_pool handler_pool;
    ABT_pool progress_pool;

28 29 30 31 32 33 34 35 36 37 38 39 40 41
    ABT_thread hg_progress_tid;
    int hg_progress_shutdown_flag;
    int table_index;
};

struct margo_handler_mapping
{
    hg_class_t *class;
    margo_instance_id mid;
};

#define MAX_HANDLER_MAPPING 8
static int handler_mapping_table_size = 0;
static struct margo_handler_mapping handler_mapping_table[MAX_HANDLER_MAPPING] = {0};
42

43
static void hg_progress_fn(void* foo);
44 45 46 47 48 49 50 51 52


struct handler_entry
{
    void* fn;
    hg_handle_t handle;
    struct handler_entry *next; 
};

53
margo_instance_id margo_init(na_bool_t listen, const char* local_addr, ABT_pool progress_pool, ABT_pool handler_pool)
54 55
{
    int ret;
56 57 58 59 60 61 62 63 64
    struct margo_instance *mid;

    if(handler_mapping_table_size >= MAX_HANDLER_MAPPING)
        return(NULL);

    mid = malloc(sizeof(*mid));
    if(!mid)
        return(NULL);
    memset(mid, 0, sizeof(*mid));
65

66 67 68
    mid->progress_pool = progress_pool;
    mid->handler_pool = handler_pool;

69
    /* boilerplate HG initialization steps */
70 71
    mid->network_class = NA_Initialize(local_addr, listen);
    if(!mid->network_class)
72
    {
73 74
        free(mid);
        return(NULL);
75 76
    }

77 78
    mid->na_context = NA_Context_create(mid->network_class);
    if(!mid->na_context)
79
    {
80 81 82
        NA_Finalize(mid->network_class);
        free(mid);
        return(NULL);
83 84
    }

85 86
    mid->hg_class = HG_Init(mid->network_class, mid->na_context, NULL);
    if(!mid->hg_class)
87
    {
88 89 90 91
        NA_Context_destroy(mid->network_class, mid->na_context);
        NA_Finalize(mid->network_class);
        free(mid);
        return(NULL);
92 93
    }

94 95
    mid->hg_context = HG_Context_create(mid->hg_class);
    if(!mid->hg_context)
96
    {
97 98 99 100
        HG_Finalize(mid->hg_class);
        NA_Context_destroy(mid->network_class, mid->na_context);
        NA_Finalize(mid->network_class);
        return(NULL);
101 102
    }

103
    ret = ABT_thread_create(mid->progress_pool, hg_progress_fn, mid, 
104
        ABT_THREAD_ATTR_NULL, &mid->hg_progress_tid);
105 106 107 108
    if(ret != 0)
    {
        /* TODO: err handling */
        fprintf(stderr, "Error: ABT_thread_create()\n");
109
        return(NULL);
110 111
    }

112 113 114 115 116 117
    handler_mapping_table[handler_mapping_table_size].mid = mid;
    handler_mapping_table[handler_mapping_table_size].class = mid->hg_class;
    mid->table_index = handler_mapping_table_size;
    handler_mapping_table_size++;

    return mid;
118 119
}

120
void margo_finalize(margo_instance_id mid)
121
{
122 123
    int i;

124
    /* tell progress thread to wrap things up */
125
    mid->hg_progress_shutdown_flag = 1;
126 127

    /* wait for it to shutdown cleanly */
128 129
    ABT_thread_join(mid->hg_progress_tid);
    ABT_thread_free(&mid->hg_progress_tid);
130

131 132 133 134 135 136 137 138 139 140
    HG_Context_destroy(mid->hg_context);
    HG_Finalize(mid->hg_class);
    NA_Context_destroy(mid->network_class, mid->na_context);
    NA_Finalize(mid->network_class);

    for(i=mid->table_index; i<(handler_mapping_table_size-1); i++)
    {
        handler_mapping_table[i] = handler_mapping_table[i+1];
    }
    handler_mapping_table_size--;
141 142 143 144 145

    return;
}

/* dedicated thread function to drive Mercury progress */
146
static void hg_progress_fn(void* foo)
147 148 149
{
    int ret;
    unsigned int actual_count;
150
    struct margo_instance *mid = (struct margo_instance *)foo;
151

152
    while(!mid->hg_progress_shutdown_flag)
153 154
    {
        do {
155 156
            ret = HG_Trigger(mid->hg_class, mid->hg_context, 0, 1, &actual_count);
        } while((ret == HG_SUCCESS) && actual_count && !mid->hg_progress_shutdown_flag);
157

158 159
        if(!mid->hg_progress_shutdown_flag)
            HG_Progress(mid->hg_class, mid->hg_context, 100);
160 161
    }

162
    return;
163 164
}

165
/****************************/
166
hg_class_t* margo_get_class(margo_instance_id mid)
167
{
168
    return(mid->hg_class);
169 170
}

171 172
hg_return_t margo_create_handle(margo_instance_id mid, na_addr_t addr, 
    hg_id_t id, hg_handle_t *handle)
173
{
174 175 176 177 178
    hg_return_t ret;

    ret = HG_Create(mid->hg_class, mid->hg_context, addr, id, handle);

    return ret;
179 180
}

181
na_return_t margo_addr_lookup(margo_instance_id mid, const char* name, na_addr_t* addr)
182 183 184
{
    na_return_t ret;

185
    ret = NA_Addr_lookup_wait(mid->network_class, name, addr);
186 187 188

    return ret;
}
189
/****************************/
190

191
ABT_pool* margo_get_handler_pool(margo_instance_id mid)
192
{
193
    return(&mid->handler_pool);
194 195 196 197 198 199 200 201 202 203 204 205 206 207
}

static hg_return_t margo_forward_cb(const struct hg_cb_info *info)
{
    hg_return_t hret = info->ret;

    ABT_eventual *eventual = info->arg;
    /* propagate return code out through eventual */
    ABT_eventual_set(*eventual, &hret, sizeof(hret));
    
    return(HG_SUCCESS);
}

hg_return_t margo_forward(
208
    margo_instance_id mid,
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246
    hg_handle_t handle,
    void *in_struct)
{
    hg_return_t hret = HG_TIMEOUT;
    ABT_eventual eventual;
    int ret;
    hg_return_t* waited_hret;

    ret = ABT_eventual_create(sizeof(hret), &eventual);
    if(ret != 0)
    {
        return(HG_NOMEM_ERROR);        
    }

    hret = HG_Forward(handle, margo_forward_cb, &eventual, in_struct);
    if(hret == 0)
    {
        ABT_eventual_wait(eventual, (void**)&waited_hret);
        hret = *waited_hret;
    }

    ABT_eventual_free(&eventual);

    return(hret);
}

static hg_return_t margo_bulk_transfer_cb(const struct hg_bulk_cb_info *hg_bulk_cb_info)
{
    hg_return_t hret = hg_bulk_cb_info->ret;
    ABT_eventual *eventual = hg_bulk_cb_info->arg;

    /* propagate return code out through eventual */
    ABT_eventual_set(*eventual, &hret, sizeof(hret));
    
    return(HG_SUCCESS);
}

hg_return_t margo_bulk_transfer(
247
    margo_instance_id mid,
248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281
    hg_bulk_context_t *context,
    hg_bulk_op_t op,
    na_addr_t origin_addr,
    hg_bulk_t origin_handle,
    size_t origin_offset,
    hg_bulk_t local_handle,
    size_t local_offset,
    size_t size)
{
    hg_return_t hret = HG_TIMEOUT;
    hg_return_t *waited_hret;
    ABT_eventual eventual;
    int ret;

    ret = ABT_eventual_create(sizeof(hret), &eventual);
    if(ret != 0)
    {
        return(HG_NOMEM_ERROR);        
    }

    hret = HG_Bulk_transfer(context, margo_bulk_transfer_cb, &eventual, op, 
        origin_addr, origin_handle, origin_offset, local_handle, local_offset,
        size, HG_OP_ID_IGNORE);
    if(hret == 0)
    {
        ABT_eventual_wait(eventual, (void**)&waited_hret);
        hret = *waited_hret;
    }

    ABT_eventual_free(&eventual);

    return(hret);
}

282 283 284 285 286 287 288 289 290 291 292
margo_instance_id margo_hg_class_to_instance(hg_class_t *class)
{
    int i;

    for(i=0; i<handler_mapping_table_size; i++)
    {
        if(handler_mapping_table[i].class == class)
            return(handler_mapping_table[i].mid);
    }
    return(NULL);
}