GitLab maintenance scheduled for Today, 2020-08-11, from 17:00 to 18:00 CT - Services will be unavailable during this time.

my-rpc.c 4.06 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
/*
 * (C) 2015 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

#include <assert.h>
#include "my-rpc.h"

/* my-rpc:
 * This is an example RPC operation.  It includes a small bulk transfer, 
 * driven by the server, that moves data from the client to the server.  The
 * server writes the data to a local file in /tmp.
 */

/* The rpc handler is defined as a single ULT in Argobots.  It uses
 * underlying asynchronous operations for the HG transfer, open, write, and
 * close.
 */

static void my_rpc_ult(hg_handle_t handle)
{
    hg_return_t hret;
    my_rpc_out_t out;
    my_rpc_in_t in;
    int ret;
    hg_size_t size;
    void *buffer;
    hg_bulk_t bulk_handle;
Philip Carns's avatar
Philip Carns committed
30
    const struct hg_info *hgi;
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
#if 0
    int fd;
    char filename[256];
#endif
    margo_instance_id mid;

    ret = HG_Get_input(handle, &in);
    assert(ret == HG_SUCCESS);

    printf("Got RPC request with input_val: %d\n", in.input_val);
    out.ret = 0;

    /* set up target buffer for bulk transfer */
    size = 512;
    buffer = calloc(1, 512);
    assert(buffer);

    /* register local target buffer for bulk access */
    hgi = HG_Get_info(handle);
    assert(hgi);
    ret = HG_Bulk_create(hgi->hg_class, 1, &buffer,
        &size, HG_BULK_WRITE_ONLY, &bulk_handle);
    assert(ret == 0);

    mid = margo_hg_class_to_instance(hgi->hg_class);

    /* do bulk transfer from client to server */
    ret = margo_bulk_transfer(mid, HG_BULK_PULL,
        hgi->addr, in.bulk_handle, 0,
        bulk_handle, 0, size);
    assert(ret == 0);

63 64
    HG_Free_input(handle, &in);

65 66
    /* write to a file; would be done with abt-io if we enabled it */
#if 0
Philip Carns's avatar
Philip Carns committed
67
    sprintf(filename, "/tmp/margo-%d.txt", in.input_val);
68 69 70 71 72 73 74 75 76
    fd = abt_io_open(aid, filename, O_WRONLY|O_CREAT, S_IWUSR|S_IRUSR);
    assert(fd > -1);

    ret = abt_io_pwrite(aid, fd, buffer, 512, 0);
    assert(ret == 512);

    abt_io_close(aid, fd);
#endif

77
    hret = margo_respond(mid, handle, &out);
78 79 80 81 82 83 84 85 86 87 88 89 90
    assert(hret == HG_SUCCESS);

    HG_Bulk_free(bulk_handle);
    HG_Destroy(handle);
    free(buffer);

    return;
}
DEFINE_MARGO_RPC_HANDLER(my_rpc_ult)

static void my_rpc_shutdown_ult(hg_handle_t handle)
{
    hg_return_t hret;
Philip Carns's avatar
Philip Carns committed
91
    const struct hg_info *hgi;
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
    margo_instance_id mid;

    printf("Got RPC request to shutdown\n");

    hgi = HG_Get_info(handle);
    assert(hgi);
    mid = margo_hg_class_to_instance(hgi->hg_class);

    hret = margo_respond(mid, handle, NULL);
    assert(hret == HG_SUCCESS);

    HG_Destroy(handle);

    /* NOTE: we assume that the server daemon is using
     * margo_wait_for_finalize() to suspend until this RPC executes, so there
     * is no need to send any extra signal to notify it.
     */
    margo_finalize(mid);

    return;
}
DEFINE_MARGO_RPC_HANDLER(my_rpc_shutdown_ult)
Philip Carns's avatar
Philip Carns committed
114 115 116 117 118 119 120 121 122 123

static void my_rpc_hang_ult(hg_handle_t handle)
{
    hg_return_t hret;
    my_rpc_out_t out;
    my_rpc_in_t in;
    int ret;
    hg_size_t size;
    void *buffer;
    hg_bulk_t bulk_handle;
Philip Carns's avatar
Philip Carns committed
124
    const struct hg_info *hgi;
Philip Carns's avatar
Philip Carns committed
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
    margo_instance_id mid;

    ret = HG_Get_input(handle, &in);
    assert(ret == HG_SUCCESS);

    printf("Got RPC request with input_val: %d, deliberately hanging.\n", in.input_val);
    out.ret = 0;

    hgi = HG_Get_info(handle);
    assert(hgi);
    mid = margo_hg_class_to_instance(hgi->hg_class);

    /* sleep for an hour (to allow client to test timeout capability) */
    margo_thread_sleep(mid, 1000*60*60);

    /* set up target buffer for bulk transfer */
    size = 512;
    buffer = calloc(1, 512);
    assert(buffer);

    /* register local target buffer for bulk access */
    ret = HG_Bulk_create(hgi->hg_class, 1, &buffer,
        &size, HG_BULK_WRITE_ONLY, &bulk_handle);
    assert(ret == 0);

    /* do bulk transfer from client to server */
    ret = margo_bulk_transfer(mid, HG_BULK_PULL,
        hgi->addr, in.bulk_handle, 0,
        bulk_handle, 0, size);
    assert(ret == 0);

    HG_Free_input(handle, &in);

    hret = margo_respond(mid, handle, &out);
    assert(hret == HG_SUCCESS);

    HG_Bulk_free(bulk_handle);
    HG_Destroy(handle);
    free(buffer);

    return;
}
DEFINE_MARGO_RPC_HANDLER(my_rpc_hang_ult)