my-rpc.c 4.39 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
/*
 * (C) 2015 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

#include <assert.h>
#include "my-rpc.h"

/* my-rpc:
 * This is an example RPC operation.  It includes a small bulk transfer, 
 * driven by the server, that moves data from the client to the server.  The
 * server writes the data to a local file in /tmp.
 */

/* The rpc handler is defined as a single ULT in Argobots.  It uses
 * underlying asynchronous operations for the HG transfer, open, write, and
 * close.
 */

static void my_rpc_ult(hg_handle_t handle)
{
    hg_return_t hret;
    my_rpc_out_t out;
    my_rpc_in_t in;
    hg_size_t size;
    void *buffer;
    hg_bulk_t bulk_handle;
Philip Carns's avatar
Philip Carns committed
29
    const struct hg_info *hgi;
30
#if 0
31
    int ret;
32 33 34 35 36
    int fd;
    char filename[256];
#endif
    margo_instance_id mid;

37 38
    hret = margo_get_input(handle, &in);
    assert(hret == HG_SUCCESS);
39 40 41 42 43 44 45 46 47

    printf("Got RPC request with input_val: %d\n", in.input_val);
    out.ret = 0;

    /* set up target buffer for bulk transfer */
    size = 512;
    buffer = calloc(1, 512);
    assert(buffer);

48 49
    /* get handle info and margo instance */
    hgi = margo_get_info(handle);
50
    assert(hgi);
51
    mid = margo_hg_info_get_instance(hgi);
52
    assert(mid != MARGO_INSTANCE_NULL);
53

54 55 56 57
    /* register local target buffer for bulk access */
    hret = margo_bulk_create(mid, 1, &buffer,
        &size, HG_BULK_WRITE_ONLY, &bulk_handle);
    assert(hret == HG_SUCCESS);
58 59

    /* do bulk transfer from client to server */
60
    hret = margo_bulk_transfer(mid, HG_BULK_PULL,
61
        hgi->addr, in.bulk_handle, 0,
62 63
        bulk_handle, 0, size, HG_OP_ID_IGNORE);
    assert(hret == HG_SUCCESS);
64 65 66

    /* write to a file; would be done with abt-io if we enabled it */
#if 0
Philip Carns's avatar
Philip Carns committed
67
    sprintf(filename, "/tmp/margo-%d.txt", in.input_val);
68 69 70 71 72 73 74 75 76
    fd = abt_io_open(aid, filename, O_WRONLY|O_CREAT, S_IWUSR|S_IRUSR);
    assert(fd > -1);

    ret = abt_io_pwrite(aid, fd, buffer, 512, 0);
    assert(ret == 512);

    abt_io_close(aid, fd);
#endif

77 78
    margo_free_input(handle, &in);

79
    hret = margo_respond(mid, handle, &out);
80 81
    assert(hret == HG_SUCCESS);

82
    margo_bulk_free(bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
83
    margo_destroy(mid, handle);
84 85 86 87 88 89 90 91 92
    free(buffer);

    return;
}
DEFINE_MARGO_RPC_HANDLER(my_rpc_ult)

static void my_rpc_shutdown_ult(hg_handle_t handle)
{
    hg_return_t hret;
Philip Carns's avatar
Philip Carns committed
93
    const struct hg_info *hgi;
94 95 96 97
    margo_instance_id mid;

    printf("Got RPC request to shutdown\n");

98 99
    /* get handle info and margo instance */
    hgi = margo_get_info(handle);
100
    assert(hgi);
101
    mid = margo_hg_info_get_instance(hgi);
102
    assert(mid != MARGO_INSTANCE_NULL);
103 104 105 106

    hret = margo_respond(mid, handle, NULL);
    assert(hret == HG_SUCCESS);

Shane Snyder's avatar
Shane Snyder committed
107
    margo_destroy(mid, handle);
108 109 110 111 112 113 114 115 116 117

    /* NOTE: we assume that the server daemon is using
     * margo_wait_for_finalize() to suspend until this RPC executes, so there
     * is no need to send any extra signal to notify it.
     */
    margo_finalize(mid);

    return;
}
DEFINE_MARGO_RPC_HANDLER(my_rpc_shutdown_ult)
Philip Carns's avatar
Philip Carns committed
118 119 120 121 122 123 124 125 126

static void my_rpc_hang_ult(hg_handle_t handle)
{
    hg_return_t hret;
    my_rpc_out_t out;
    my_rpc_in_t in;
    hg_size_t size;
    void *buffer;
    hg_bulk_t bulk_handle;
Philip Carns's avatar
Philip Carns committed
127
    const struct hg_info *hgi;
Philip Carns's avatar
Philip Carns committed
128 129
    margo_instance_id mid;

130 131
    hret = margo_get_input(handle, &in);
    assert(hret == HG_SUCCESS);
Philip Carns's avatar
Philip Carns committed
132 133 134 135

    printf("Got RPC request with input_val: %d, deliberately hanging.\n", in.input_val);
    out.ret = 0;

136 137
    /* get handle info and margo instance */
    hgi = margo_get_info(handle);
Philip Carns's avatar
Philip Carns committed
138
    assert(hgi);
139
    mid = margo_hg_info_get_instance(hgi);
140
    assert(mid != MARGO_INSTANCE_NULL);
Philip Carns's avatar
Philip Carns committed
141 142 143 144 145 146 147 148 149 150

    /* sleep for an hour (to allow client to test timeout capability) */
    margo_thread_sleep(mid, 1000*60*60);

    /* set up target buffer for bulk transfer */
    size = 512;
    buffer = calloc(1, 512);
    assert(buffer);

    /* register local target buffer for bulk access */
151
    hret = margo_bulk_create(mid, 1, &buffer,
Philip Carns's avatar
Philip Carns committed
152
        &size, HG_BULK_WRITE_ONLY, &bulk_handle);
153
    assert(hret == HG_SUCCESS);
Philip Carns's avatar
Philip Carns committed
154 155

    /* do bulk transfer from client to server */
156
    hret = margo_bulk_transfer(mid, HG_BULK_PULL,
Philip Carns's avatar
Philip Carns committed
157
        hgi->addr, in.bulk_handle, 0,
158 159
        bulk_handle, 0, size, HG_OP_ID_IGNORE);
    assert(hret == HG_SUCCESS);
Philip Carns's avatar
Philip Carns committed
160

161
    margo_free_input(handle, &in);
Philip Carns's avatar
Philip Carns committed
162 163 164 165

    hret = margo_respond(mid, handle, &out);
    assert(hret == HG_SUCCESS);

166
    margo_bulk_free(bulk_handle);
Shane Snyder's avatar
Shane Snyder committed
167
    margo_destroy(mid, handle);
Philip Carns's avatar
Philip Carns committed
168 169 170 171 172
    free(buffer);

    return;
}
DEFINE_MARGO_RPC_HANDLER(my_rpc_hang_ult)