composed-client-lib.c 2.94 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
/*
 * (C) 2015 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <abt.h>

#include "composed-client-lib.h"
#include "delegator-proto.h"
#include "data-xfer-proto.h"

/* NOTE: just making these global for test example, would be cleaner if there
 * were an instance pointer for the client.
 */
static hg_id_t delegator_read_id = -1;
static hg_id_t data_xfer_read_id = -1;

int composed_register_client(margo_instance_id mid)
{

    delegator_read_id = MERCURY_REGISTER(margo_get_class(mid), "delegator_read", 
        delegator_read_in_t, delegator_read_out_t, NULL);

    return(0);
}

int data_xfer_register_client(margo_instance_id mid)
{

    data_xfer_read_id = MERCURY_REGISTER(margo_get_class(mid), "data_xfer_read", 
        data_xfer_read_in_t, data_xfer_read_out_t, NULL);

    return(0);
}

40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
void composed_read(margo_instance_id mid, hg_addr_t svr_addr, void *buffer, hg_size_t buffer_sz)
{
    hg_handle_t handle;
    delegator_read_in_t in;
    delegator_read_out_t out;
    int ret;
    const struct hg_info *hgi;

    /* create handle */
    ret = HG_Create(margo_get_context(mid), svr_addr, delegator_read_id, &handle);
    assert(ret == 0);

    /* register buffer for rdma/bulk access by server */
    hgi = HG_Get_info(handle);
    assert(hgi);
    ret = HG_Bulk_create(hgi->hg_class, 1, &buffer, &buffer_sz, 
        HG_BULK_WRITE_ONLY, &in.bulk_handle);
    assert(ret == 0);

#if 0
    HG_Set_target_id(handle, mplex_id);
#endif

    /* Send rpc. Note that we are also transmitting the bulk handle in the
     * input struct.  It was set above. 
     */ 
    margo_forward(mid, handle, &in);

    /* decode response */
    ret = HG_Get_output(handle, &out);
    assert(ret == 0);

    /* clean up resources consumed by this rpc */
    HG_Bulk_free(in.bulk_handle);
    HG_Free_output(handle, &out);
    HG_Destroy(handle);

    return;
}

80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
void data_xfer_read(margo_instance_id mid, hg_addr_t svr_addr, void *buffer, hg_size_t buffer_sz)
{
    hg_handle_t handle;
    data_xfer_read_in_t in;
    data_xfer_read_out_t out;
    int ret;
    const struct hg_info *hgi;

    /* create handle */
    ret = HG_Create(margo_get_context(mid), svr_addr, data_xfer_read_id, &handle);
    assert(ret == 0);

    /* register buffer for rdma/bulk access by server */
    hgi = HG_Get_info(handle);
    assert(hgi);
    ret = HG_Bulk_create(hgi->hg_class, 1, &buffer, &buffer_sz, 
        HG_BULK_WRITE_ONLY, &in.bulk_handle);
    assert(ret == 0);

#if 0
    HG_Set_target_id(handle, mplex_id);
#endif

    /* Send rpc. Note that we are also transmitting the bulk handle in the
     * input struct.  It was set above. 
     */ 
    margo_forward(mid, handle, &in);

    /* decode response */
    ret = HG_Get_output(handle, &out);
    assert(ret == 0);

    /* clean up resources consumed by this rpc */
    HG_Bulk_free(in.bulk_handle);
    HG_Free_output(handle, &out);
    HG_Destroy(handle);

    return;
}