delegator-service.c 2.96 KB
Newer Older
Philip Carns's avatar
Philip Carns committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
/*
 * (C) 2015 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

#include <assert.h>
#include <pthread.h>
#include "data-xfer-proto.h"
#include "delegator-proto.h"
#include "delegator-service.h"

static hg_id_t g_data_xfer_read_id = -1;

static void delegator_read_ult(hg_handle_t handle)
{
    hg_return_t hret;
    delegator_read_out_t out;
    delegator_read_in_t in;
    data_xfer_read_in_t in_relay;
    data_xfer_read_out_t out_relay;
    const struct hg_info *hgi;
    margo_instance_id mid;
24 25
    hg_addr_t data_xfer_svc_addr;

Philip Carns's avatar
Philip Carns committed
26
    hg_handle_t handle_relay;
27 28
    char client_addr_string[64];
    hg_size_t client_addr_string_sz = 64;
Philip Carns's avatar
Philip Carns committed
29 30 31 32 33 34
#if 0
    ABT_thread my_ult;
    ABT_xstream my_xstream; 
    pthread_t my_tid;
#endif

35 36 37
    hret = margo_get_input(handle, &in);
    assert(hret == HG_SUCCESS);
    hgi = margo_get_info(handle);
Philip Carns's avatar
Philip Carns committed
38
    assert(hgi);
39 40
    mid = margo_hg_info_get_instance(hgi);
    assert(mid != MARGO_INSTANCE_NULL);
Philip Carns's avatar
Philip Carns committed
41 42 43 44 45 46 47 48 49 50 51

#if 0
    ABT_xstream_self(&my_xstream);
    ABT_thread_self(&my_ult);
    my_tid = pthread_self();
    printf("svc1: do_thing: mplex_id: %u, ult: %p, xstream %p, tid: %lu\n", 
        hgi->target_id, my_ult, my_xstream, my_tid);
#endif

    out.ret = 0;

52 53
    hret = margo_addr_lookup(mid, in.data_xfer_svc_addr, &data_xfer_svc_addr);
    assert(hret == HG_SUCCESS);
Philip Carns's avatar
Philip Carns committed
54 55
    
    /* relay to microservice */
56
    hret = margo_create(mid, data_xfer_svc_addr, g_data_xfer_read_id, &handle_relay);
Philip Carns's avatar
Philip Carns committed
57 58 59
    assert(hret == HG_SUCCESS);
    /* pass through bulk handle */
    in_relay.bulk_handle = in.bulk_handle; 
60
    /* get addr of client to relay to data_xfer service */
61
    hret = margo_addr_to_string(mid, client_addr_string, &client_addr_string_sz, hgi->addr);
62
    assert(hret == HG_SUCCESS);
63
    in_relay.client_addr = client_addr_string;
64 65
    hret = margo_forward(mid, handle_relay, &in_relay);
    assert(hret == HG_SUCCESS);
Philip Carns's avatar
Philip Carns committed
66

67
    hret = margo_get_output(handle_relay, &out_relay);
Philip Carns's avatar
Philip Carns committed
68 69
    assert(hret == HG_SUCCESS);

70 71
    margo_free_input(handle, &in);
    margo_free_output(handle_relay, &out);
Philip Carns's avatar
Philip Carns committed
72

73
    hret = margo_respond(mid, handle, &out);
Philip Carns's avatar
Philip Carns committed
74 75
    assert(hret == HG_SUCCESS);

76 77 78
    margo_addr_free(mid, data_xfer_svc_addr);
    margo_destroy(handle);
    margo_destroy(handle_relay);
Philip Carns's avatar
Philip Carns committed
79 80 81 82 83

    return;
}
DEFINE_MARGO_RPC_HANDLER(delegator_read_ult)

84
int delegator_service_register(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id)
Philip Carns's avatar
Philip Carns committed
85 86
{
    /* register client-side of function to relay */
87 88
    /* NOTE: this RPC may already be registered if this process has already registered a
     * data-xfer service
Philip Carns's avatar
Philip Carns committed
89
     */
90 91
	g_data_xfer_read_id = MARGO_REGISTER(mid, "data_xfer_read",
        data_xfer_read_in_t, data_xfer_read_out_t, NULL);
Philip Carns's avatar
Philip Carns committed
92 93

    /* register RPC handler */
94 95
    MARGO_REGISTER_MPLEX(mid, "delegator_read", 
        delegator_read_in_t, delegator_read_out_t, 
96
        delegator_read_ult, mplex_id, pool);
Philip Carns's avatar
Philip Carns committed
97 98 99 100 101 102 103 104 105

    return(0);
}

void delegator_deregister(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id)
{
    /* TODO: undo what was done in delegator_register() */
    return;
}