aio-operate.c 4.77 KB
Newer Older
1 2 3 4 5 6 7 8 9
/*
 * (C) 2017 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

#include <stdlib.h>
#include "mobject-store-config.h"
#include "libmobject-store.h"
10
#include "src/client/mobject-client-impl.h"
11 12 13 14
#include "src/io-chain/prepare-write-op.h"
#include "src/io-chain/prepare-read-op.h"
#include "src/rpc-types/write-op.h"
#include "src/rpc-types/read-op.h"
15 16
#include "src/util/log.h"

17 18 19 20
int mobject_aio_write_op_operate(
        mobject_provider_handle_t mph,
        mobject_store_write_op_t write_op,
        const char *pool_name,
21 22
        const char *oid,
        time_t *mtime,
23 24
        int flags,
        mobject_request_t* req)
25 26
{   
    hg_return_t ret;
27

28 29
    write_op_in_t in;
    in.object_name = oid;
30
    in.pool_name   = pool_name;
31 32 33
    in.write_op    = write_op;
    // TODO take mtime into account

34
    prepare_write_op(mph->client->mid, write_op);
35

36 37 38 39 40
    hg_addr_t svr_addr = mph->addr;
    if(svr_addr == HG_ADDR_NULL) {
        fprintf(stderr, "[MOBJECT] NULL provider address passed to mobject_aio_write_op_operate\n");
        return -1;
    }
41

42
    hg_handle_t h;
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
    ret = margo_create(mph->client->mid, svr_addr, mph->client->mobject_write_op_rpc_id, &h);
    if(ret != HG_SUCCESS) {
        fprintf(stderr, "[MOBJECT] margo_create() failed in mobject_aio_write_op_operate()\n");
        return -1;
    }

    margo_set_target_id(h, mph->mplex_id);

    margo_request mreq;
    ret = margo_iforward(h, &in, &mreq);
    if(ret != HG_SUCCESS) {
        fprintf(stderr, "[MOBJECT] margo_iforward() failed in mobject_aio_write_op_operate()\n");
        margo_destroy(h);
        return -1;
    }

    mobject_request_t tmp_req = calloc(1, sizeof(*tmp_req));
    tmp_req->type             = MOBJECT_AIO_WRITE;
    tmp_req->op.write_op      = write_op;
    tmp_req->request          = mreq;
    tmp_req->handle           = h;

    *req = tmp_req;
66 67 68

    return 0;
}
69

70 71 72 73
int mobject_aio_read_op_operate(
        mobject_provider_handle_t mph,
        mobject_store_read_op_t read_op,
        const char *pool_name,
74
        const char *oid,
75 76
        int flags,
        mobject_request_t* req)
77 78 79 80 81
{   
    hg_return_t ret;

    read_op_in_t in; 
    in.object_name = oid;
82
    in.pool_name   = pool_name;
83 84
    in.read_op     = read_op;

85
    prepare_read_op(mph->client->mid, read_op);
86

87 88 89 90 91
    hg_addr_t svr_addr = mph->addr; 
    if(svr_addr == HG_ADDR_NULL) {
        fprintf(stderr, "[MOBJECT] NULL provider address passed to mobject_aio_read_op_operate\n");
        return -1;
    }
92 93

    hg_handle_t h;
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
    ret = margo_create(mph->client->mid, svr_addr, mph->client->mobject_read_op_rpc_id, &h);
    if(ret != HG_SUCCESS) {
        fprintf(stderr, "[MOBJECT] margo_forward() failed in mobject_write_op_operate()\n");
        return -1;
    }

    margo_set_target_id(h, mph->mplex_id);

    margo_request mreq;
    ret = margo_iforward(h, &in, &mreq);
    if(ret != HG_SUCCESS) {
        fprintf(stderr, "[MOBJECT] margo_iforward() failed in mobject_aio_write_op_operate()\n");
        margo_destroy(h);
        return -1;
    }

    mobject_request_t tmp_req = calloc(1, sizeof(*tmp_req));
    tmp_req->type             = MOBJECT_AIO_READ;
    tmp_req->op.read_op       = read_op;
    tmp_req->request          = mreq;
    tmp_req->handle           = h;

    *req = tmp_req;
117

118
    return 0;
119
}
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178

int mobject_aio_wait(mobject_request_t req, int* ret)
{
    if(req == MOBJECT_REQUEST_NULL)
        return -1;

    int r = margo_wait(req->request);
    if(r != HG_SUCCESS) {
        return r;
    }
    req->request = MARGO_REQUEST_NULL;

    switch(req->type) {

        case MOBJECT_AIO_WRITE: {
            write_op_out_t resp;
            r = margo_get_output(req->handle, &resp);
            if(r != HG_SUCCESS) {
                *ret = r;
                margo_destroy(req->handle);
                free(req);
                return r;
            }
            *ret = resp.ret;
            r = margo_free_output(req->handle,&resp);
            if(r != HG_SUCCESS) {
                *ret = r;
            }
            margo_destroy(req->handle);
            free(req);
            return r;
        } break;

        case MOBJECT_AIO_READ: {
            read_op_out_t resp;
            r = margo_get_output(req->handle, &resp);
            if(r != HG_SUCCESS) {
                *ret = r;
                margo_destroy(req->handle);
                free(req);
                return r;
            }
            feed_read_op_pointers_from_response(req->op.read_op, resp.responses);
            r = margo_free_output(req->handle, &resp);
            if(r != HG_SUCCESS) {
                *ret = r;
            }
            margo_destroy(req->handle);
            free(req);
            return r;
        } break;
    }
}

int mobject_aio_test(mobject_request_t req, int* flag)
{
    if(req == MOBJECT_REQUEST_NULL) return -1;
    return margo_test(req->request, flag);
}