delegator-service.c 3.27 KB
Newer Older
Philip Carns's avatar
Philip Carns committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
/*
 * (C) 2015 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

#include <assert.h>
#include <pthread.h>
#include "data-xfer-proto.h"
#include "delegator-proto.h"
#include "delegator-service.h"

static hg_addr_t g_data_xfer_svc_addr = HG_ADDR_NULL;
static hg_id_t g_data_xfer_read_id = -1;

static void delegator_read_ult(hg_handle_t handle)
{
    hg_return_t hret;
    delegator_read_out_t out;
    delegator_read_in_t in;
    data_xfer_read_in_t in_relay;
    data_xfer_read_out_t out_relay;
    int ret;
    const struct hg_info *hgi;
    margo_instance_id mid;
    hg_handle_t handle_relay;
27 28
    char relay_addr_string[64];
    hg_size_t relay_addr_string_sz = 64;
Philip Carns's avatar
Philip Carns committed
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
#if 0
    ABT_thread my_ult;
    ABT_xstream my_xstream; 
    pthread_t my_tid;
#endif

    ret = HG_Get_input(handle, &in);
    assert(ret == HG_SUCCESS);
    hgi = HG_Get_info(handle);
    assert(hgi);

#if 0
    ABT_xstream_self(&my_xstream);
    ABT_thread_self(&my_ult);
    my_tid = pthread_self();
    printf("svc1: do_thing: mplex_id: %u, ult: %p, xstream %p, tid: %lu\n", 
        hgi->target_id, my_ult, my_xstream, my_tid);
#endif

    out.ret = 0;

    mid = margo_hg_class_to_instance(hgi->hg_class);
    
    /* relay to microservice */
    hret = HG_Create(margo_get_context(mid), g_data_xfer_svc_addr, g_data_xfer_read_id, &handle_relay);
    assert(hret == HG_SUCCESS);
    /* pass through bulk handle */
    in_relay.bulk_handle = in.bulk_handle; 
57 58
    hret = HG_Addr_to_string(margo_get_class(mid), relay_addr_string, &relay_addr_string_sz, hgi->addr);
    assert(hret == HG_SUCCESS);
Philip Carns's avatar
Philip Carns committed
59
    in_relay.client_addr = relay_addr_string;
Philip Carns's avatar
Philip Carns committed
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
    margo_forward(mid, handle_relay, &in_relay);

    hret = HG_Get_output(handle_relay, &out_relay);
    assert(hret == HG_SUCCESS);

    HG_Free_input(handle, &in);
    HG_Free_output(handle_relay, &out);

    hret = HG_Respond(handle, NULL, NULL, &out);
    assert(hret == HG_SUCCESS);

    HG_Destroy(handle);
    HG_Destroy(handle_relay);

    return;
}
DEFINE_MARGO_RPC_HANDLER(delegator_read_ult)

Philip Carns's avatar
Philip Carns committed
78
int delegator_service_register(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id, hg_addr_t data_xfer_svc_addr)
Philip Carns's avatar
Philip Carns committed
79
{
Philip Carns's avatar
Philip Carns committed
80 81 82 83
    int hret;
    hg_id_t id;
    hg_bool_t flag;

Philip Carns's avatar
Philip Carns committed
84 85 86 87
    /* save addr to relay to */
    g_data_xfer_svc_addr = data_xfer_svc_addr;

    /* register client-side of function to relay */
Philip Carns's avatar
Philip Carns committed
88 89 90 91 92 93 94
    /* TODO: make this safe; right now if we register again as a client we lose the RPC
     * handler ptr
     */
    hret = HG_Registered_name(margo_get_class(mid), "data_xfer_read", &id, &flag);
    assert(hret == HG_SUCCESS);
    if(!flag)
    {
Philip Carns's avatar
Philip Carns committed
95
        printf("DBG: registering client side data_xfer_read.\n");
Philip Carns's avatar
Philip Carns committed
96 97 98 99 100
        g_data_xfer_read_id = MERCURY_REGISTER(margo_get_class(mid), "data_xfer_read",
            data_xfer_read_in_t, data_xfer_read_out_t, NULL);
    }
    else
    {
Philip Carns's avatar
Philip Carns committed
101
        printf("DBG: NOT registering client side data_xfer_read.\n");
Philip Carns's avatar
Philip Carns committed
102 103
        g_data_xfer_read_id = id;
    }
Philip Carns's avatar
Philip Carns committed
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118

    /* register RPC handler */
    MARGO_REGISTER(mid, "delegator_read", delegator_read_in_t, delegator_read_out_t, delegator_read_ult_handler, mplex_id, pool);

    return(0);
}

void delegator_deregister(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id)
{
    HG_Addr_free(margo_get_class(mid), g_data_xfer_svc_addr);

    /* TODO: undo what was done in delegator_register() */
    return;
}