client.c 4.14 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
/*
 * (C) 2015 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <abt.h>
Philip Carns's avatar
Philip Carns committed
11 12
#include <abt-snoozer.h>
#include <margo.h>
13 14 15 16 17 18 19 20 21 22

#include "my-rpc.h"

/* This is an example client program that issues 4 concurrent RPCs, each of
 * which includes a bulk transfer driven by the server.
 *
 * Each client operation executes as an independent ULT in Argobots.
 * The HG forward call is executed using asynchronous operations.
 */

23 24 25 26 27 28
struct run_my_rpc_args
{
    int val;
    margo_instance_id mid;
};

29 30 31 32 33 34
static void run_my_rpc(void *_arg);

static hg_id_t my_rpc_id;

int main(int argc, char **argv) 
{
35
    struct run_my_rpc_args args[4];
36 37 38 39 40
    ABT_thread threads[4];
    int i;
    int ret;
    ABT_xstream xstream;
    ABT_pool pool;
41
    margo_instance_id mid;
42 43 44 45 46 47 48 49
    
    ret = ABT_init(argc, argv);
    if(ret != 0)
    {
        fprintf(stderr, "Error: ABT_init()\n");
        return(-1);
    }

Philip Carns's avatar
Philip Carns committed
50 51
    /* set primary ES to idle without polling */
    ret = ABT_snoozer_xstream_self_set();
Philip Carns's avatar
Philip Carns committed
52
    if(ret != 0)
53
    {
Philip Carns's avatar
Philip Carns committed
54
        fprintf(stderr, "Error: ABT_snoozer_xstream_self_set()\n");
55 56 57
        return(-1);
    }

Philip Carns's avatar
Philip Carns committed
58 59
    /* retrieve current pool to use for ULT creation */
    ret = ABT_xstream_self(&xstream);
Philip Carns's avatar
Philip Carns committed
60
    if(ret != 0)
61
    {
Philip Carns's avatar
Philip Carns committed
62
        fprintf(stderr, "Error: ABT_xstream_self()\n");
63 64
        return(-1);
    }
Philip Carns's avatar
Philip Carns committed
65
    ret = ABT_xstream_get_main_pools(xstream, 1, &pool);
Philip Carns's avatar
Philip Carns committed
66
    if(ret != 0)
67
    {
Philip Carns's avatar
Philip Carns committed
68
        fprintf(stderr, "Error: ABT_xstream_get_main_pools()\n");
69 70 71 72 73 74
        return(-1);
    }

    /* initialize
     *   note: address here is really just being used to identify transport 
     */
75
    mid = margo_init(NA_FALSE, "tcp://localhost:1234");
76 77

    /* register RPC */
78
    my_rpc_id = my_rpc_register(mid);
79 80 81

    for(i=0; i<4; i++)
    {
82 83 84
        args[i].val = i;
        args[i].mid = mid;
        /* Each fiber gets a pointer to an element of the array to use
85 86
         * as input for the run_my_rpc() function.
         */
87
        ret = ABT_thread_create(pool, run_my_rpc, &args[i],
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
            ABT_THREAD_ATTR_NULL, &threads[i]);
        if(ret != 0)
        {
            fprintf(stderr, "Error: ABT_thread_create()\n");
            return(-1);
        }

    }

    /* yield to one of the threads */
    ABT_thread_yield_to(threads[0]);

    for(i=0; i<4; i++)
    {
        ret = ABT_thread_join(threads[i]);
        if(ret != 0)
        {
            fprintf(stderr, "Error: ABT_thread_join()\n");
            return(-1);
        }
        ret = ABT_thread_free(&threads[i]);
        if(ret != 0)
        {
            fprintf(stderr, "Error: ABT_thread_join()\n");
            return(-1);
        }
    }

116
    margo_finalize(mid);
117 118 119 120 121 122 123
    ABT_finalize();

    return(0);
}

static void run_my_rpc(void *_arg)
{
124
    struct run_my_rpc_args *arg = _arg;
125 126 127 128 129 130 131 132 133
    na_addr_t svr_addr = NA_ADDR_NULL;
    hg_handle_t handle;
    my_rpc_in_t in;
    my_rpc_out_t out;
    int ret;
    hg_size_t size;
    void* buffer;
    struct hg_info *hgi;

134
    printf("ULT [%d] running.\n", arg->val);
135 136 137 138 139 140 141 142

    /* allocate buffer for bulk transfer */
    size = 512;
    buffer = calloc(1, 512);
    assert(buffer);
    sprintf((char*)buffer, "Hello world!\n");

    /* find addr for server */
143
    ret = margo_addr_lookup(arg->mid, "tcp://localhost:1234", &svr_addr);
144 145 146
    assert(ret == 0);

    /* create handle */
147
    ret = margo_create_handle(arg->mid, svr_addr, my_rpc_id, &handle);
148 149 150 151 152 153 154 155 156 157 158 159
    assert(ret == 0);

    /* register buffer for rdma/bulk access by server */
    hgi = HG_Get_info(handle);
    assert(hgi);
    ret = HG_Bulk_create(hgi->hg_bulk_class, 1, &buffer, &size, 
        HG_BULK_READ_ONLY, &in.bulk_handle);
    assert(ret == 0);

    /* Send rpc. Note that we are also transmitting the bulk handle in the
     * input struct.  It was set above. 
     */ 
160 161
    in.input_val = arg->val;
    margo_forward(arg->mid, handle, &in);
162 163 164 165 166 167 168 169 170 171 172 173 174

    /* decode response */
    ret = HG_Get_output(handle, &out);
    assert(ret == 0);

    printf("Got response ret: %d\n", out.ret);

    /* clean up resources consumed by this rpc */
    HG_Bulk_free(in.bulk_handle);
    HG_Free_output(handle, &out);
    HG_Destroy(handle);
    free(buffer);

175
    printf("ULT [%d] done.\n", arg->val);
176 177 178
    return;
}