mobject-server.c 7.9 KB
Newer Older
1 2 3 4 5 6
/*
 * (C) 2017 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

7
//#define FAKE_CPP_SERVER
8

9 10
#include <assert.h>
#include <mpi.h>
11
#include <abt.h>
12
#include <margo.h>
13 14 15
#include <ssg-mpi.h>

#include "mobject-server.h"
16
#include "src/server/mobject-server-context.h"
Matthieu Dorier's avatar
Matthieu Dorier committed
17 18
#include "src/rpc-types/write-op.h"
#include "src/rpc-types/read-op.h"
19 20 21 22 23
//#include "src/server/print-write-op.h"
//#include "src/server/print-read-op.h"
#include "src/io-chain/write-op-impl.h"
#include "src/io-chain/read-op-impl.h"
#include "src/server/visitor-args.h"
24 25 26 27
#ifdef FAKE_CPP_SERVER
#include "src/server/fake/fake-read-op.h"
#include "src/server/fake/fake-write-op.h"
#else
28 29
#include "src/server/core/core-read-op.h"
#include "src/server/core/core-write-op.h"
30
#endif
31

Matthieu Dorier's avatar
Matthieu Dorier committed
32 33
DECLARE_MARGO_RPC_HANDLER(mobject_write_op_ult)
DECLARE_MARGO_RPC_HANDLER(mobject_read_op_ult)
34

35
static void mobject_finalize_cb(void* data);
36

37 38
int mobject_provider_register(
        margo_instance_id mid,
Matthieu Dorier's avatar
Matthieu Dorier committed
39
        uint16_t provider_id,
40 41
        ABT_pool pool,
        bake_provider_handle_t bake_ph,
Matthieu Dorier's avatar
Matthieu Dorier committed
42
        sdskv_provider_handle_t sdskv_ph,
43
        ssg_group_id_t gid,
44 45
        const char *cluster_file,
        mobject_provider_t* provider)
46
{
47
    mobject_provider_t srv_ctx;
48
    int my_id;
49 50
    int ret;

51
    /* check if a provider with the same multiplex id already exists */
52
    {
53 54
        hg_id_t id;
        hg_bool_t flag;
Matthieu Dorier's avatar
Matthieu Dorier committed
55
        margo_provider_registered_name(mid, "mobject_write_op", provider_id, &id, &flag);
56
        if(flag == HG_TRUE) {
Matthieu Dorier's avatar
Matthieu Dorier committed
57
            fprintf(stderr, "mobject_provider_register(): a provider with the same id (%d) already exists\n", provider_id);
58 59
            return -1;
        }
60 61
    }

62

63 64
    srv_ctx = calloc(1, sizeof(*srv_ctx));
    if (!srv_ctx)
65
        return -1;
66
    srv_ctx->mid = mid;
Matthieu Dorier's avatar
Matthieu Dorier committed
67
    srv_ctx->provider_id = provider_id;
68
    srv_ctx->pool = pool;
69
    srv_ctx->ref_count = 1;
70

71
    srv_ctx->gid = gid; 
72
    my_id = ssg_get_group_self_id(srv_ctx->gid);
73 74 75

    /* one proccess writes cluster connect info to file for clients to find later */
    if (my_id == 0)
76
    {
77
        ret = ssg_group_id_store(cluster_file, srv_ctx->gid);
78 79 80 81 82 83 84
        if (ret != 0)
        {
            fprintf(stderr, "Error: unable to store mobject cluster info to file %s\n",
                cluster_file);
            /* XXX: this call is performed by one process, and we do not currently
             * have an easy way to propagate this error to the entire cluster group
             */
85
            free(srv_ctx);
86
            return -1;
87
        }
88 89
    }

90 91 92 93 94 95 96 97 98 99 100 101 102 103
    /* Bake settings initialization */
    bake_provider_handle_ref_incr(bake_ph);
    srv_ctx->bake_ph = bake_ph;
    uint64_t num_targets;
    ret = bake_probe(bake_ph, 1, &(srv_ctx->bake_tid), &num_targets);
    if(ret != 0) {
        fprintf(stderr, "Error: unable to probe bake server for targets\n");
        return -1;
    }
    if(num_targets < 1) {
        fprintf(stderr, "Error: unable to find a target on bake provider\n");
        free(srv_ctx);
        return -1;
    }
Matthieu Dorier's avatar
Matthieu Dorier committed
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
    /* SDSKV settings initialization */
    sdskv_provider_handle_ref_incr(sdskv_ph);
    srv_ctx->sdskv_ph = sdskv_ph;
    ret = sdskv_open(sdskv_ph, "oid_map", &(srv_ctx->oid_db_id));
    if(ret != SDSKV_SUCCESS) {
        fprintf(stderr, "Error: unable to open oid_map from SDSKV provider\n");
        bake_provider_handle_release(srv_ctx->bake_ph);
        sdskv_provider_handle_release(srv_ctx->sdskv_ph);
        free(srv_ctx);
    }
    ret = sdskv_open(sdskv_ph, "name_map", &(srv_ctx->name_db_id));
    if(ret != SDSKV_SUCCESS) {
        fprintf(stderr, "Error: unable to open name_map from SDSKV provider\n");
        bake_provider_handle_release(srv_ctx->bake_ph);
        sdskv_provider_handle_release(srv_ctx->sdskv_ph);
        free(srv_ctx);
    }
    ret = sdskv_open(sdskv_ph, "seg_map", &(srv_ctx->segment_db_id));
    if(ret != SDSKV_SUCCESS) {
        fprintf(stderr, "Error: unable to open seg_map from SDSKV provider\n");
        bake_provider_handle_release(srv_ctx->bake_ph);
        sdskv_provider_handle_release(srv_ctx->sdskv_ph);
        free(srv_ctx);
    }
    ret = sdskv_open(sdskv_ph, "omap_map", &(srv_ctx->omap_db_id));
    if(ret != SDSKV_SUCCESS) {
        fprintf(stderr, "Error: unable to open omap_map from SDSKV provider\n");
        bake_provider_handle_release(srv_ctx->bake_ph);
        sdskv_provider_handle_release(srv_ctx->sdskv_ph);
        free(srv_ctx);
    }
Matthieu Dorier's avatar
Matthieu Dorier committed
135

136
    hg_id_t rpc_id;
Matthieu Dorier's avatar
Matthieu Dorier committed
137

Matthieu Dorier's avatar
Matthieu Dorier committed
138
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "mobject_write_op", 
139
            write_op_in_t, write_op_out_t, mobject_write_op_ult,
Matthieu Dorier's avatar
Matthieu Dorier committed
140 141
            provider_id, pool);
    margo_register_data(mid, rpc_id, srv_ctx, NULL);
Matthieu Dorier's avatar
Matthieu Dorier committed
142

Matthieu Dorier's avatar
Matthieu Dorier committed
143
    rpc_id = MARGO_REGISTER_PROVIDER(mid, "mobject_read_op",
144
            read_op_in_t, read_op_out_t, mobject_read_op_ult,
Matthieu Dorier's avatar
Matthieu Dorier committed
145 146
            provider_id, pool);
    margo_register_data(mid, rpc_id, srv_ctx, NULL);
Matthieu Dorier's avatar
Matthieu Dorier committed
147

148
    margo_push_finalize_callback(mid, mobject_finalize_cb, (void*)srv_ctx);
149

150
    *provider = srv_ctx;
Matthieu Dorier's avatar
Matthieu Dorier committed
151

152
    return 0;
153
}
154

Matthieu Dorier's avatar
Matthieu Dorier committed
155 156 157 158 159 160 161 162 163 164 165
static hg_return_t mobject_write_op_ult(hg_handle_t h)
{
    hg_return_t ret;

    write_op_in_t in;
    write_op_out_t out;

    /* Deserialize the input from the received handle. */
    ret = margo_get_input(h, &in);
    assert(ret == HG_SUCCESS);

166
    const struct hg_info* info = margo_get_info(h);
167
    margo_instance_id mid = margo_hg_handle_get_instance(h);
168 169 170

    server_visitor_args vargs;
    vargs.object_name = in.object_name;
171
    vargs.oid         = 0;
172
    vargs.pool_name   = in.pool_name;
Matthieu Dorier's avatar
Matthieu Dorier committed
173
    vargs.srv_ctx     = margo_registered_data(mid, info->id);
174
    if(vargs.srv_ctx == NULL) return HG_OTHER_ERROR;
175 176
    vargs.client_addr_str = in.client_addr;
    vargs.client_addr = info->addr;
177 178
    vargs.bulk_handle = in.write_op->bulk_handle;

Matthieu Dorier's avatar
Matthieu Dorier committed
179
    /* Execute the operation chain */
180
    //print_write_op(in.write_op, in.object_name);
181 182 183
#ifdef FAKE_CPP_SERVER
    fake_write_op(in.write_op, &vargs);
#else
184
    core_write_op(in.write_op, &vargs);
185
#endif
Matthieu Dorier's avatar
Matthieu Dorier committed
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218

    // set the return value of the RPC
    out.ret = 0;

    ret = margo_respond(h, &out);
    assert(ret == HG_SUCCESS);

    /* Free the input data. */
    ret = margo_free_input(h, &in);
    assert(ret == HG_SUCCESS);

    /* We are not going to use the handle anymore, so we should destroy it. */
    ret = margo_destroy(h);
 
    return ret;
}
DEFINE_MARGO_RPC_HANDLER(mobject_write_op_ult)

/* Implementation of the RPC. */
static hg_return_t mobject_read_op_ult(hg_handle_t h)
{
    hg_return_t ret;

    read_op_in_t in;
    read_op_out_t out;

    /* Deserialize the input from the received handle. */
    ret = margo_get_input(h, &in);
    assert(ret == HG_SUCCESS);

    /* Create a response list matching the input actions */
    read_response_t resp = build_matching_read_responses(in.read_op);

219
    const struct hg_info* info = margo_get_info(h);
220
    margo_instance_id mid = margo_hg_handle_get_instance(h);
221 222 223

    server_visitor_args vargs;
    vargs.object_name = in.object_name;
224
    vargs.oid         = 0;
225
    vargs.pool_name   = in.pool_name;
Matthieu Dorier's avatar
Matthieu Dorier committed
226
    vargs.srv_ctx     = margo_registered_data(mid, info->id);
227
    if(vargs.srv_ctx == NULL) return HG_OTHER_ERROR;
228 229
    vargs.client_addr_str = in.client_addr;
    vargs.client_addr = info->addr;
230 231
    vargs.bulk_handle = in.read_op->bulk_handle;

Matthieu Dorier's avatar
Matthieu Dorier committed
232
    /* Compute the result. */
233
    //print_read_op(in.read_op, in.object_name);
234 235 236
#ifdef FAKE_CPP_SERVER
    fake_read_op(in.read_op, &vargs);
#else
237
    core_read_op(in.read_op, &vargs);
238
#endif
Matthieu Dorier's avatar
Matthieu Dorier committed
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257

    out.responses = resp;

    ret = margo_respond(h, &out);
    assert(ret == HG_SUCCESS);

    free_read_responses(resp);

    /* Free the input data. */
    ret = margo_free_input(h, &in);
    assert(ret == HG_SUCCESS);

    /* We are not going to use the handle anymore, so we should destroy it. */
    ret = margo_destroy(h);
    assert(ret == HG_SUCCESS);

    return ret;
}
DEFINE_MARGO_RPC_HANDLER(mobject_read_op_ult)
258

259
static void mobject_finalize_cb(void* data)
260
{
261
    mobject_provider_t srv_ctx = (mobject_provider_t)data;
262

263
    sdskv_provider_handle_release(srv_ctx->sdskv_ph);
264
    bake_provider_handle_release(srv_ctx->bake_ph);
265 266 267

    free(srv_ctx);
}