delegator-service.c 2.89 KB
Newer Older
Philip Carns's avatar
Philip Carns committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
/*
 * (C) 2015 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

#include <assert.h>
#include <pthread.h>
#include "data-xfer-proto.h"
#include "delegator-proto.h"
#include "delegator-service.h"

static hg_addr_t g_data_xfer_svc_addr = HG_ADDR_NULL;
static hg_id_t g_data_xfer_read_id = -1;

static void delegator_read_ult(hg_handle_t handle)
{
    hg_return_t hret;
    delegator_read_out_t out;
    delegator_read_in_t in;
    data_xfer_read_in_t in_relay;
    data_xfer_read_out_t out_relay;
    int ret;
    const struct hg_info *hgi;
    margo_instance_id mid;
    hg_handle_t handle_relay;
#if 0
    ABT_thread my_ult;
    ABT_xstream my_xstream; 
    pthread_t my_tid;
#endif

    ret = HG_Get_input(handle, &in);
    assert(ret == HG_SUCCESS);
    hgi = HG_Get_info(handle);
    assert(hgi);

#if 0
    ABT_xstream_self(&my_xstream);
    ABT_thread_self(&my_ult);
    my_tid = pthread_self();
    printf("svc1: do_thing: mplex_id: %u, ult: %p, xstream %p, tid: %lu\n", 
        hgi->target_id, my_ult, my_xstream, my_tid);
#endif

    out.ret = 0;

    mid = margo_hg_class_to_instance(hgi->hg_class);
    
    /* relay to microservice */
    hret = HG_Create(margo_get_context(mid), g_data_xfer_svc_addr, g_data_xfer_read_id, &handle_relay);
    assert(hret == HG_SUCCESS);
    /* pass through bulk handle */
    in_relay.bulk_handle = in.bulk_handle; 
    margo_forward(mid, handle_relay, &in_relay);

    hret = HG_Get_output(handle_relay, &out_relay);
    assert(hret == HG_SUCCESS);

    HG_Free_input(handle, &in);
    HG_Free_output(handle_relay, &out);

    hret = HG_Respond(handle, NULL, NULL, &out);
    assert(hret == HG_SUCCESS);

    HG_Destroy(handle);
    HG_Destroy(handle_relay);

    return;
}
DEFINE_MARGO_RPC_HANDLER(delegator_read_ult)

Philip Carns's avatar
Philip Carns committed
73
int delegator_service_register(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id, hg_addr_t data_xfer_svc_addr)
Philip Carns's avatar
Philip Carns committed
74
{
Philip Carns's avatar
Philip Carns committed
75 76 77 78
    int hret;
    hg_id_t id;
    hg_bool_t flag;

Philip Carns's avatar
Philip Carns committed
79 80 81 82
    /* save addr to relay to */
    g_data_xfer_svc_addr = data_xfer_svc_addr;

    /* register client-side of function to relay */
Philip Carns's avatar
Philip Carns committed
83 84 85 86 87 88 89 90 91 92 93 94 95 96
    /* TODO: make this safe; right now if we register again as a client we lose the RPC
     * handler ptr
     */
    hret = HG_Registered_name(margo_get_class(mid), "data_xfer_read", &id, &flag);
    assert(hret == HG_SUCCESS);
    if(!flag)
    {
        g_data_xfer_read_id = MERCURY_REGISTER(margo_get_class(mid), "data_xfer_read",
            data_xfer_read_in_t, data_xfer_read_out_t, NULL);
    }
    else
    {
        g_data_xfer_read_id = id;
    }
Philip Carns's avatar
Philip Carns committed
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111

    /* register RPC handler */
    MARGO_REGISTER(mid, "delegator_read", delegator_read_in_t, delegator_read_out_t, delegator_read_ult_handler, mplex_id, pool);

    return(0);
}

void delegator_deregister(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id)
{
    HG_Addr_free(margo_get_class(mid), g_data_xfer_svc_addr);

    /* TODO: undo what was done in delegator_register() */
    return;
}