my-rpc.c 2.64 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * (C) 2015 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

#include <assert.h>
#include "my-rpc.h"

/* my-rpc:
 * This is an example RPC operation.  It includes a small bulk transfer, 
 * driven by the server, that moves data from the client to the server.  The
 * server writes the data to a local file in /tmp.
 */

/* The rpc handler is defined as a single ULT in Argobots.  It uses
 * underlying asynchronous operations for the HG transfer, open, write, and
 * close.
 */

21 22
extern ABT_eventual* shutdown_eventual;

23 24 25 26 27 28 29 30 31 32 33 34 35 36
static void my_rpc_ult(void *_arg)
{
    hg_handle_t *handle = _arg;

    hg_return_t hret;
    my_rpc_out_t out;
    my_rpc_in_t in;
    int ret;
    hg_size_t size;
    void *buffer;
    hg_bulk_t bulk_handle;
    struct hg_info *hgi;
    int fd;
    char filename[256];
37
    margo_instance_id mid;
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52

    ret = HG_Get_input(*handle, &in);
    assert(ret == HG_SUCCESS);

    printf("Got RPC request with input_val: %d\n", in.input_val);
    out.ret = 0;

    /* set up target buffer for bulk transfer */
    size = 512;
    buffer = calloc(1, 512);
    assert(buffer);

    /* register local target buffer for bulk access */
    hgi = HG_Get_info(*handle);
    assert(hgi);
53
    ret = HG_Bulk_create(hgi->hg_class, 1, &buffer,
54 55 56
        &size, HG_BULK_WRITE_ONLY, &bulk_handle);
    assert(ret == 0);

57 58
    mid = margo_hg_class_to_instance(hgi->hg_class);

59
    /* do bulk transfer from client to server */
60
    ret = margo_bulk_transfer(mid, hgi->context, HG_BULK_PULL, 
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
        hgi->addr, in.bulk_handle, 0,
        bulk_handle, 0, size);
    assert(ret == 0);

#if 0

    /* write to a file */
    sprintf(filename, "/tmp/hg-fiber-%d.txt", in.input_val);
    fd = fbr_eio_open(fctx, filename, O_WRONLY|O_CREAT, S_IWUSR|S_IRUSR, 0);
    assert(fd > -1);

    ret = fbr_eio_write(fctx, fd, buffer, 512, 0, 0);
    assert(ret == 512);

    fbr_eio_close(fctx, fd, 0);
#endif

    hret = HG_Respond(*handle, NULL, NULL, &out);
    assert(hret == HG_SUCCESS);

    HG_Bulk_free(bulk_handle);
    HG_Destroy(*handle);
    free(buffer);
    free(handle);

    return;
}
88
DEFINE_MARGO_RPC_HANDLER(my_rpc_ult)
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115

static void my_rpc_shutdown_ult(void *_arg)
{
    hg_handle_t *handle = _arg;

    hg_return_t hret;
    struct hg_info *hgi;
    margo_instance_id mid;

    printf("Got RPC request to shutdown\n");

    hgi = HG_Get_info(*handle);
    assert(hgi);
    mid = margo_hg_class_to_instance(hgi->hg_class);

    hret = HG_Respond(*handle, NULL, NULL, NULL);
    assert(hret == HG_SUCCESS);
    
    HG_Destroy(*handle);

    margo_finalize(mid);

    ABT_eventual_set(*shutdown_eventual, NULL, 0);

    return;
}
DEFINE_MARGO_RPC_HANDLER(my_rpc_shutdown_ult)