data-xfer-service.c 2.55 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
/*
 * (C) 2015 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

#include <assert.h>
#include <pthread.h>
#include "data-xfer-proto.h"
#include "data-xfer-service.h"

static hg_size_t g_buffer_size = 8*1024*1024;
static void *g_buffer;
static hg_bulk_t g_buffer_bulk_handle;

static void data_xfer_read_ult(hg_handle_t handle)
{
    hg_return_t hret;
    data_xfer_read_out_t out;
    data_xfer_read_in_t in;
    const struct hg_info *hgi;
    margo_instance_id mid;
Philip Carns's avatar
Philip Carns committed
23
    hg_addr_t client_addr;
24 25 26 27 28 29
#if 0
    ABT_thread my_ult;
    ABT_xstream my_xstream; 
    pthread_t my_tid;
#endif

30 31 32
    hret = margo_get_input(handle, &in);
    assert(hret == HG_SUCCESS);
    hgi = margo_get_info(handle);
33
    assert(hgi);
34 35
    mid = margo_hg_info_get_instance(hgi);
    assert(mid != MARGO_INSTANCE_NULL);
36 37 38 39 40

#if 0
    ABT_xstream_self(&my_xstream);
    ABT_thread_self(&my_ult);
    my_tid = pthread_self();
Philip Carns's avatar
Philip Carns committed
41
    printf("svc1: do_thing: provider_id: %u, ult: %p, xstream %p, tid: %lu\n", 
42 43 44 45 46
        hgi->target_id, my_ult, my_xstream, my_tid);
#endif

    out.ret = 0;

Philip Carns's avatar
Philip Carns committed
47 48
    if(!in.client_addr)
        client_addr = hgi->addr;
49 50
    else
    {
Philip Carns's avatar
Philip Carns committed
51
        hret = margo_addr_lookup(mid, in.client_addr, &client_addr);
52 53 54
        assert(hret == HG_SUCCESS);
    }

55
    /* do bulk transfer from client to server */
56
    hret = margo_bulk_transfer(mid, HG_BULK_PUSH,
Philip Carns's avatar
Philip Carns committed
57
        client_addr, in.bulk_handle, 0,
58
        g_buffer_bulk_handle, 0, g_buffer_size);
59
    assert(hret == HG_SUCCESS);
60

Philip Carns's avatar
Philip Carns committed
61
    if(in.client_addr)
62
        margo_addr_free(mid, client_addr);
Philip Carns's avatar
Philip Carns committed
63

64
    margo_free_input(handle, &in);
65

66
    hret = margo_respond(handle, &out);
67 68
    assert(hret == HG_SUCCESS);

69
    margo_destroy(handle);
70 71 72 73 74

    return;
}
DEFINE_MARGO_RPC_HANDLER(data_xfer_read_ult)

Philip Carns's avatar
Philip Carns committed
75
int data_xfer_service_register(margo_instance_id mid, ABT_pool pool, uint32_t provider_id)
76 77 78 79 80 81 82 83
{
    hg_return_t hret;

    /* set up global target buffer for bulk transfer */
    g_buffer = calloc(1, g_buffer_size);
    assert(g_buffer);

    /* register local target buffer for bulk access */
84
    hret = margo_bulk_create(mid, 1, &g_buffer,
85 86 87 88
        &g_buffer_size, HG_BULK_READ_ONLY, &g_buffer_bulk_handle);
    assert(hret == HG_SUCCESS);

    /* register RPC handler */
Philip Carns's avatar
Philip Carns committed
89
    MARGO_REGISTER_PROVIDER(mid, "data_xfer_read", 
90
        data_xfer_read_in_t, data_xfer_read_out_t, 
Philip Carns's avatar
Philip Carns committed
91
        data_xfer_read_ult, provider_id, pool);
92 93 94 95

    return(0);
}

Philip Carns's avatar
Philip Carns committed
96
void data_xfer_deregister(margo_instance_id mid, ABT_pool pool, uint32_t provider_id)
97
{
98
    margo_bulk_free(g_buffer_bulk_handle);
99 100 101 102 103
    free(g_buffer);

    /* TODO: undo what was done in data_xfer_register() */
    return;
}