data-xfer-service.c 2.53 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/*
 * (C) 2015 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

#include <assert.h>
#include <pthread.h>
#include "data-xfer-proto.h"
#include "data-xfer-service.h"

static hg_size_t g_buffer_size = 8*1024*1024;
static void *g_buffer;
static hg_bulk_t g_buffer_bulk_handle;

static void data_xfer_read_ult(hg_handle_t handle)
{
    hg_return_t hret;
    data_xfer_read_out_t out;
    data_xfer_read_in_t in;
    const struct hg_info *hgi;
    margo_instance_id mid;
Philip Carns's avatar
Philip Carns committed
23
    hg_addr_t client_addr;
24
25
26
27
28
29
#if 0
    ABT_thread my_ult;
    ABT_xstream my_xstream; 
    pthread_t my_tid;
#endif

30
31
32
    hret = margo_get_input(handle, &in);
    assert(hret == HG_SUCCESS);
    hgi = margo_get_info(handle);
33
    assert(hgi);
34
35
    mid = margo_hg_info_get_instance(hgi);
    assert(mid != MARGO_INSTANCE_NULL);
36
37
38
39
40
41
42
43
44
45
46

#if 0
    ABT_xstream_self(&my_xstream);
    ABT_thread_self(&my_ult);
    my_tid = pthread_self();
    printf("svc1: do_thing: mplex_id: %u, ult: %p, xstream %p, tid: %lu\n", 
        hgi->target_id, my_ult, my_xstream, my_tid);
#endif

    out.ret = 0;

Philip Carns's avatar
Philip Carns committed
47
48
    if(!in.client_addr)
        client_addr = hgi->addr;
49
50
    else
    {
Philip Carns's avatar
Philip Carns committed
51
        hret = margo_addr_lookup(mid, in.client_addr, &client_addr);
52
53
54
        assert(hret == HG_SUCCESS);
    }

55
    /* do bulk transfer from client to server */
56
    hret = margo_bulk_transfer(mid, HG_BULK_PUSH,
Philip Carns's avatar
Philip Carns committed
57
        client_addr, in.bulk_handle, 0,
58
        g_buffer_bulk_handle, 0, g_buffer_size);
59
    assert(hret == HG_SUCCESS);
60

Philip Carns's avatar
Philip Carns committed
61
    if(in.client_addr)
62
        margo_addr_free(mid, client_addr);
Philip Carns's avatar
Philip Carns committed
63

64
    margo_free_input(handle, &in);
65

66
    hret = margo_respond(handle, &out);
67
68
    assert(hret == HG_SUCCESS);

69
    margo_destroy(handle);
70
71
72
73
74

    return;
}
DEFINE_MARGO_RPC_HANDLER(data_xfer_read_ult)

Philip Carns's avatar
Philip Carns committed
75
int data_xfer_service_register(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id)
76
77
78
79
80
81
82
83
{
    hg_return_t hret;

    /* set up global target buffer for bulk transfer */
    g_buffer = calloc(1, g_buffer_size);
    assert(g_buffer);

    /* register local target buffer for bulk access */
84
    hret = margo_bulk_create(mid, 1, &g_buffer,
85
86
87
88
        &g_buffer_size, HG_BULK_READ_ONLY, &g_buffer_bulk_handle);
    assert(hret == HG_SUCCESS);

    /* register RPC handler */
89
90
    MARGO_REGISTER_MPLEX(mid, "data_xfer_read", 
        data_xfer_read_in_t, data_xfer_read_out_t, 
91
        data_xfer_read_ult, mplex_id, pool);
92
93
94
95
96
97

    return(0);
}

void data_xfer_deregister(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id)
{
98
    margo_bulk_free(g_buffer_bulk_handle);
99
100
101
102
103
    free(g_buffer);

    /* TODO: undo what was done in data_xfer_register() */
    return;
}