client.c 7.04 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
/*
 * (C) 2015 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <abt.h>
Philip Carns's avatar
Philip Carns committed
11 12
#include <abt-snoozer.h>
#include <margo.h>
13 14 15 16 17 18 19 20 21 22

#include "my-rpc.h"

/* This is an example client program that issues 4 concurrent RPCs, each of
 * which includes a bulk transfer driven by the server.
 *
 * Each client operation executes as an independent ULT in Argobots.
 * The HG forward call is executed using asynchronous operations.
 */

23 24 25 26
struct run_my_rpc_args
{
    int val;
    margo_instance_id mid;
27 28 29 30
    na_class_t *network_class;
    na_context_t *na_context;
    hg_context_t *hg_context;
    hg_class_t *hg_class;
31 32
};

33 34 35
static void run_my_rpc(void *_arg);

static hg_id_t my_rpc_id;
36
static hg_id_t my_rpc_shutdown_id;
37 38 39

int main(int argc, char **argv) 
{
40
    struct run_my_rpc_args args[4];
41 42 43 44 45
    ABT_thread threads[4];
    int i;
    int ret;
    ABT_xstream xstream;
    ABT_pool pool;
46
    margo_instance_id mid;
47 48
    ABT_xstream progress_xstream;
    ABT_pool progress_pool;
49 50 51 52
    na_class_t *network_class;
    na_context_t *na_context;
    hg_context_t *hg_context;
    hg_class_t *hg_class;
53 54
    na_addr_t svr_addr = NA_ADDR_NULL;
    hg_handle_t handle;
55 56
        
    /* boilerplate HG initialization steps */
57
    /***************************************/
58 59 60 61 62 63 64 65 66 67 68 69 70
    network_class = NA_Initialize("tcp://localhost:1234", NA_FALSE);
    if(!network_class)
    {
        fprintf(stderr, "Error: NA_Initialize()\n");
        return(-1);
    }
    na_context = NA_Context_create(network_class);
    if(!na_context)
    {
        fprintf(stderr, "Error: NA_Context_create()\n");
        NA_Finalize(network_class);
        return(-1);
    }
71
    hg_class = HG_Init(network_class, na_context);
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
    if(!hg_class)
    {
        fprintf(stderr, "Error: HG_Init()\n");
        NA_Context_destroy(network_class, na_context);
        NA_Finalize(network_class);
        return(-1);
    }
    hg_context = HG_Context_create(hg_class);
    if(!hg_context)
    {
        fprintf(stderr, "Error: HG_Context_create()\n");
        HG_Finalize(hg_class);
        NA_Context_destroy(network_class, na_context);
        NA_Finalize(network_class);
        return(-1);
    }

    /* set up argobots */
90
    /***************************************/
91 92 93 94 95 96 97
    ret = ABT_init(argc, argv);
    if(ret != 0)
    {
        fprintf(stderr, "Error: ABT_init()\n");
        return(-1);
    }

Philip Carns's avatar
Philip Carns committed
98 99
    /* set primary ES to idle without polling */
    ret = ABT_snoozer_xstream_self_set();
Philip Carns's avatar
Philip Carns committed
100
    if(ret != 0)
101
    {
Philip Carns's avatar
Philip Carns committed
102
        fprintf(stderr, "Error: ABT_snoozer_xstream_self_set()\n");
103 104 105
        return(-1);
    }

Philip Carns's avatar
Philip Carns committed
106 107
    /* retrieve current pool to use for ULT creation */
    ret = ABT_xstream_self(&xstream);
Philip Carns's avatar
Philip Carns committed
108
    if(ret != 0)
109
    {
Philip Carns's avatar
Philip Carns committed
110
        fprintf(stderr, "Error: ABT_xstream_self()\n");
111 112
        return(-1);
    }
Philip Carns's avatar
Philip Carns committed
113
    ret = ABT_xstream_get_main_pools(xstream, 1, &pool);
Philip Carns's avatar
Philip Carns committed
114
    if(ret != 0)
115
    {
Philip Carns's avatar
Philip Carns committed
116
        fprintf(stderr, "Error: ABT_xstream_get_main_pools()\n");
117 118 119
        return(-1);
    }

120 121 122 123 124 125 126 127
    /* create a dedicated ES drive Mercury progress */
    ret = ABT_snoozer_xstream_create(1, &progress_pool, &progress_xstream);
    if(ret != 0)
    {
        fprintf(stderr, "Error: ABT_snoozer_xstream_create()\n");
        return(-1);
    }

128
    /* actually start margo */
129 130 131 132
    /* provide argobots pools for driving communication progress and
     * executing rpc handlers as well as class and context for Mercury
     * communication.  The rpc handler pool is null in this example program
     * because this is a pure client that will not be servicing rpc requests.
133
     */
134
    /***************************************/
135
    mid = margo_init(progress_pool, ABT_POOL_NULL, hg_context, hg_class);
136 137

    /* register RPC */
138
    my_rpc_id = MERCURY_REGISTER(hg_class, "my_rpc", my_rpc_in_t, my_rpc_out_t, 
139 140 141
        NULL);
    my_rpc_shutdown_id = MERCURY_REGISTER(hg_class, "my_shutdown_rpc", void, void, 
        NULL);
142 143 144

    for(i=0; i<4; i++)
    {
145 146
        args[i].val = i;
        args[i].mid = mid;
147 148 149 150 151
        args[i].hg_class = hg_class;
        args[i].hg_context = hg_context;
        args[i].na_context = na_context;
        args[i].network_class = network_class;

152
        /* Each fiber gets a pointer to an element of the array to use
153 154
         * as input for the run_my_rpc() function.
         */
155
        ret = ABT_thread_create(pool, run_my_rpc, &args[i],
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
            ABT_THREAD_ATTR_NULL, &threads[i]);
        if(ret != 0)
        {
            fprintf(stderr, "Error: ABT_thread_create()\n");
            return(-1);
        }

    }

    /* yield to one of the threads */
    ABT_thread_yield_to(threads[0]);

    for(i=0; i<4; i++)
    {
        ret = ABT_thread_join(threads[i]);
        if(ret != 0)
        {
            fprintf(stderr, "Error: ABT_thread_join()\n");
            return(-1);
        }
        ret = ABT_thread_free(&threads[i]);
        if(ret != 0)
        {
            fprintf(stderr, "Error: ABT_thread_join()\n");
            return(-1);
        }
    }

184 185 186 187 188 189 190 191 192 193 194
    /* send one rpc to server to shut it down */
    /* find addr for server */
    ret = margo_na_addr_lookup(mid, network_class, na_context, "tcp://localhost:1234", &svr_addr);
    assert(ret == 0);

    /* create handle */
    ret = HG_Create(hg_context, svr_addr, my_rpc_shutdown_id, &handle);
    assert(ret == 0);

    margo_forward(mid, handle, NULL);

195
    /* shut down everything */
196
    margo_finalize(mid);
197 198 199 200
    
    ABT_xstream_join(progress_xstream);
    ABT_xstream_free(&progress_xstream);

201 202
    ABT_finalize();

203 204 205 206 207
    HG_Context_destroy(hg_context);
    HG_Finalize(hg_class);
    NA_Context_destroy(network_class, na_context);
    NA_Finalize(network_class);

208 209 210 211 212
    return(0);
}

static void run_my_rpc(void *_arg)
{
213
    struct run_my_rpc_args *arg = _arg;
214 215 216 217 218 219 220 221 222
    na_addr_t svr_addr = NA_ADDR_NULL;
    hg_handle_t handle;
    my_rpc_in_t in;
    my_rpc_out_t out;
    int ret;
    hg_size_t size;
    void* buffer;
    struct hg_info *hgi;

223
    printf("ULT [%d] running.\n", arg->val);
224 225 226 227 228 229 230 231

    /* allocate buffer for bulk transfer */
    size = 512;
    buffer = calloc(1, 512);
    assert(buffer);
    sprintf((char*)buffer, "Hello world!\n");

    /* find addr for server */
Philip Carns's avatar
Philip Carns committed
232
    ret = margo_na_addr_lookup(arg->mid, arg->network_class, arg->na_context, "tcp://localhost:1234", &svr_addr);
233 234 235
    assert(ret == 0);

    /* create handle */
236
    ret = HG_Create(arg->hg_context, svr_addr, my_rpc_id, &handle);
237 238 239 240 241
    assert(ret == 0);

    /* register buffer for rdma/bulk access by server */
    hgi = HG_Get_info(handle);
    assert(hgi);
242
    ret = HG_Bulk_create(hgi->hg_class, 1, &buffer, &size, 
243 244 245 246 247 248
        HG_BULK_READ_ONLY, &in.bulk_handle);
    assert(ret == 0);

    /* Send rpc. Note that we are also transmitting the bulk handle in the
     * input struct.  It was set above. 
     */ 
249 250
    in.input_val = arg->val;
    margo_forward(arg->mid, handle, &in);
251 252 253 254 255 256 257 258 259 260 261 262 263

    /* decode response */
    ret = HG_Get_output(handle, &out);
    assert(ret == 0);

    printf("Got response ret: %d\n", out.ret);

    /* clean up resources consumed by this rpc */
    HG_Bulk_free(in.bulk_handle);
    HG_Free_output(handle, &out);
    HG_Destroy(handle);
    free(buffer);

264
    printf("ULT [%d] done.\n", arg->val);
265 266 267
    return;
}