mobject-server.c 9.1 KB
Newer Older
1 2 3 4 5 6
/*
 * (C) 2017 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

7 8
//#define FAKE_CPP_SERVER

9 10
#include <assert.h>
#include <mpi.h>
11
#include <abt.h>
12
#include <margo.h>
13
//#include <sds-keyval.h>
14 15
#include <bake-bulk-server.h>
#include <bake-bulk-client.h>
16 17 18 19
//#include <libpmemobj.h>
#include <ssg-mpi.h>

#include "mobject-server.h"
Matthieu Dorier's avatar
Matthieu Dorier committed
20 21
#include "src/rpc-types/write-op.h"
#include "src/rpc-types/read-op.h"
22 23 24 25 26
//#include "src/server/print-write-op.h"
//#include "src/server/print-read-op.h"
#include "src/io-chain/write-op-impl.h"
#include "src/io-chain/read-op-impl.h"
#include "src/server/visitor-args.h"
27 28 29 30
#ifdef FAKE_CPP_SERVER
#include "src/server/fake/fake-read-op.h"
#include "src/server/fake/fake-write-op.h"
#else
31 32
#include "src/server/core/core-read-op.h"
#include "src/server/core/core-write-op.h"
33
#endif
34

Matthieu Dorier's avatar
Matthieu Dorier committed
35
struct mobject_server_context
36
{
37
    /* margo, bake, sds-keyval, ssg state */
38 39
    margo_instance_id mid;
    /* TODO bake, sds-keyval stuff */
40
    ssg_group_id_t gid;
41
    bake_target_id_t bake_id;
42 43 44 45 46
    /* server shutdown conditional logic */
    ABT_mutex shutdown_mutex;
    ABT_cond shutdown_cond;
    int shutdown_flag;
    int ref_count;
Matthieu Dorier's avatar
Matthieu Dorier committed
47
} ;
48

49
static int mobject_server_register(mobject_server_context_t *srv_ctx);
50
static void mobject_server_cleanup(mobject_server_context_t *srv_ctx);
51

Matthieu Dorier's avatar
Matthieu Dorier committed
52 53
DECLARE_MARGO_RPC_HANDLER(mobject_write_op_ult)
DECLARE_MARGO_RPC_HANDLER(mobject_read_op_ult)
54
DECLARE_MARGO_RPC_HANDLER(mobject_shutdown_ult)
55 56

/* mobject RPC IDs */
Matthieu Dorier's avatar
Matthieu Dorier committed
57 58
static hg_id_t mobject_write_op_rpc_id;
static hg_id_t mobject_read_op_rpc_id;
59
static hg_id_t mobject_shutdown_rpc_id;
60

Matthieu Dorier's avatar
Matthieu Dorier committed
61
static int mobject_server_is_initialized = 0;
62

Matthieu Dorier's avatar
Matthieu Dorier committed
63
mobject_server_context_t* mobject_server_init(margo_instance_id mid, const char *cluster_file)
64
{
65
    mobject_server_context_t *srv_ctx;
66
    int my_id;
67 68
    int ret;

Matthieu Dorier's avatar
Matthieu Dorier committed
69
    if (mobject_server_is_initialized)
70 71
    {
        fprintf(stderr, "Error: mobject server has already been initialized\n");
Matthieu Dorier's avatar
Matthieu Dorier committed
72
        return NULL;
73 74
    }

75 76
    srv_ctx = calloc(1, sizeof(*srv_ctx));
    if (!srv_ctx)
Matthieu Dorier's avatar
Matthieu Dorier committed
77
        return NULL;
78 79 80 81
    srv_ctx->mid = mid;
    srv_ctx->ref_count = 1;
    ABT_mutex_create(&srv_ctx->shutdown_mutex);
    ABT_cond_create(&srv_ctx->shutdown_cond);
82 83 84 85

    ret = ssg_init(mid);
    if (ret != SSG_SUCCESS)
    {
86
        free(srv_ctx);
87
        fprintf(stderr, "Error: Unable to initialize SSG\n");
Matthieu Dorier's avatar
Matthieu Dorier committed
88
        return NULL;
89
    }
90

91
    /* server group create */
92
    srv_ctx->gid = ssg_group_create_mpi(MOBJECT_SERVER_GROUP_NAME, MPI_COMM_WORLD,
93
        NULL, NULL); /* XXX membership update callbacks unused currently */
94
    if (srv_ctx->gid == SSG_GROUP_ID_NULL)
95 96 97
    {
        fprintf(stderr, "Error: Unable to create the mobject server group\n");
        ssg_finalize();
98
        free(srv_ctx);
Matthieu Dorier's avatar
Matthieu Dorier committed
99
        return NULL;
100
    }
101
    my_id = ssg_get_group_self_id(srv_ctx->gid);
102 103

    /* register mobject & friends RPC handlers */
104
    mobject_server_register(srv_ctx);
105

106 107
    /* one proccess writes cluster connect info to file for clients to find later */
    if (my_id == 0)
108
    {
109
        ret = ssg_group_id_store(cluster_file, srv_ctx->gid);
110 111 112 113 114 115 116
        if (ret != 0)
        {
            fprintf(stderr, "Error: unable to store mobject cluster info to file %s\n",
                cluster_file);
            /* XXX: this call is performed by one process, and we do not currently
             * have an easy way to propagate this error to the entire cluster group
             */
117
            ssg_group_destroy(srv_ctx->gid);
118
            ssg_finalize();
119
            free(srv_ctx);
Matthieu Dorier's avatar
Matthieu Dorier committed
120
            return NULL;
121
        }
122 123
    }

124 125 126 127 128 129 130 131 132 133
    /* initialize bake-bulk */
    /* server part */
    struct bake_pool_info* pool_info = bake_server_makepool("/dev/shm/mobject.dat");
    bake_server_register(mid, pool_info);
    // XXX: check return values for the above two calls
    /* client part */
    hg_addr_t self_addr = ssg_get_addr(srv_ctx->gid, my_id);
    bake_probe_instance(mid, self_addr, &(srv_ctx->bake_id));
    // XXX: check return value of the above calls

134 135
    /* TODO setup sds-keyval */

Matthieu Dorier's avatar
Matthieu Dorier committed
136
    mobject_server_is_initialized = 1;
137

Matthieu Dorier's avatar
Matthieu Dorier committed
138
    return srv_ctx;
139 140
}

Matthieu Dorier's avatar
Matthieu Dorier committed
141 142
void mobject_server_shutdown(mobject_server_context_t *srv_ctx)
{   
143
    int do_cleanup;
144

145
    assert(srv_ctx);
146

147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
    ABT_mutex_lock(srv_ctx->shutdown_mutex);
    srv_ctx->shutdown_flag = 1;
    ABT_cond_broadcast(srv_ctx->shutdown_cond);

    srv_ctx->ref_count--;
    do_cleanup = srv_ctx->ref_count == 0;

    ABT_mutex_unlock(srv_ctx->shutdown_mutex);

    if (do_cleanup)
        mobject_server_cleanup(srv_ctx);

    return;
}

Matthieu Dorier's avatar
Matthieu Dorier committed
162
void mobject_server_wait_for_shutdown(mobject_server_context_t* srv_ctx)
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
{
    int do_cleanup;

    assert(srv_ctx);

    ABT_mutex_lock(srv_ctx->shutdown_mutex);

    srv_ctx->ref_count++;
    while(!srv_ctx->shutdown_flag)
        ABT_cond_wait(srv_ctx->shutdown_cond, srv_ctx->shutdown_mutex);
    srv_ctx->ref_count--;
    do_cleanup = srv_ctx->ref_count == 0;

    ABT_mutex_unlock(srv_ctx->shutdown_mutex);

    if (do_cleanup)
        mobject_server_cleanup(srv_ctx);
180

181
    return;
182
}
183

184
static int mobject_server_register(mobject_server_context_t *srv_ctx)
185
{
186
    int ret=0;
Matthieu Dorier's avatar
Matthieu Dorier committed
187 188 189 190 191
    margo_instance_id mid = srv_ctx->mid;

    mobject_write_op_rpc_id = MARGO_REGISTER(mid, "mobject_write_op", 
	write_op_in_t, write_op_out_t, mobject_write_op_ult);

Matthieu Dorier's avatar
Matthieu Dorier committed
192 193
    margo_register_data(mid, mobject_write_op_rpc_id, srv_ctx, NULL);

Matthieu Dorier's avatar
Matthieu Dorier committed
194
    mobject_read_op_rpc_id  = MARGO_REGISTER(mid, "mobject_read_op",
Shane Snyder's avatar
Shane Snyder committed
195
        read_op_in_t, read_op_out_t, mobject_read_op_ult);
Matthieu Dorier's avatar
Matthieu Dorier committed
196

Matthieu Dorier's avatar
Matthieu Dorier committed
197 198
    margo_register_data(mid, mobject_read_op_rpc_id, srv_ctx, NULL);

199 200
    mobject_shutdown_rpc_id = MARGO_REGISTER(mid, "mobject_shutdown",
        void, void, mobject_shutdown_ult);
201

Matthieu Dorier's avatar
Matthieu Dorier committed
202 203
    margo_register_data(mid, mobject_shutdown_rpc_id, srv_ctx, NULL);

204 205 206 207
#if 0
    bake_server_register(mid, pool_info);
    metadata = kv_server_register(mid);
#endif
208 209 210

    return ret;
}
211

Matthieu Dorier's avatar
Matthieu Dorier committed
212 213 214 215 216 217 218 219 220 221 222
static hg_return_t mobject_write_op_ult(hg_handle_t h)
{
    hg_return_t ret;

    write_op_in_t in;
    write_op_out_t out;

    /* Deserialize the input from the received handle. */
    ret = margo_get_input(h, &in);
    assert(ret == HG_SUCCESS);

223 224 225 226 227 228 229 230 231
    const struct hg_info* info = margo_get_info(h);

    server_visitor_args vargs;
    vargs.object_name = in.object_name;
    vargs.pool_name   = in.pool_name;
    vargs.mid         = margo_hg_handle_get_instance(h);
    vargs.client_addr = info->addr;
    vargs.bulk_handle = in.write_op->bulk_handle;

Matthieu Dorier's avatar
Matthieu Dorier committed
232
    /* Execute the operation chain */
233
    //print_write_op(in.write_op, in.object_name);
234 235 236
#ifdef FAKE_CPP_SERVER
    fake_write_op(in.write_op, &vargs);
#else
237
    core_write_op(in.write_op, &vargs);
238
#endif
Matthieu Dorier's avatar
Matthieu Dorier committed
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271

    // set the return value of the RPC
    out.ret = 0;

    ret = margo_respond(h, &out);
    assert(ret == HG_SUCCESS);

    /* Free the input data. */
    ret = margo_free_input(h, &in);
    assert(ret == HG_SUCCESS);

    /* We are not going to use the handle anymore, so we should destroy it. */
    ret = margo_destroy(h);
 
    return ret;
}
DEFINE_MARGO_RPC_HANDLER(mobject_write_op_ult)

/* Implementation of the RPC. */
static hg_return_t mobject_read_op_ult(hg_handle_t h)
{
    hg_return_t ret;

    read_op_in_t in;
    read_op_out_t out;

    /* Deserialize the input from the received handle. */
    ret = margo_get_input(h, &in);
    assert(ret == HG_SUCCESS);

    /* Create a response list matching the input actions */
    read_response_t resp = build_matching_read_responses(in.read_op);

272 273 274 275 276 277 278 279 280
    const struct hg_info* info = margo_get_info(h);

    server_visitor_args vargs;
    vargs.object_name = in.object_name;
    vargs.pool_name   = in.pool_name;
    vargs.mid         = margo_hg_handle_get_instance(h);
    vargs.client_addr = info->addr;
    vargs.bulk_handle = in.read_op->bulk_handle;

Matthieu Dorier's avatar
Matthieu Dorier committed
281
    /* Compute the result. */
282
    //print_read_op(in.read_op, in.object_name);
283 284 285
#ifdef FAKE_CPP_SERVER
    fake_read_op(in.read_op, &vargs);
#else
286
    core_read_op(in.read_op, &vargs);
287
#endif
Matthieu Dorier's avatar
Matthieu Dorier committed
288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306

    out.responses = resp;

    ret = margo_respond(h, &out);
    assert(ret == HG_SUCCESS);

    free_read_responses(resp);

    /* Free the input data. */
    ret = margo_free_input(h, &in);
    assert(ret == HG_SUCCESS);

    /* We are not going to use the handle anymore, so we should destroy it. */
    ret = margo_destroy(h);
    assert(ret == HG_SUCCESS);

    return ret;
}
DEFINE_MARGO_RPC_HANDLER(mobject_read_op_ult)
307

308
static void mobject_shutdown_ult(hg_handle_t h)
309
{
310 311
    hg_return_t ret;

Matthieu Dorier's avatar
Matthieu Dorier committed
312
    const struct hg_info *info = margo_get_info(h);
313
    margo_instance_id mid = margo_hg_handle_get_instance(h);
Matthieu Dorier's avatar
Matthieu Dorier committed
314
    mobject_server_context_t* srv_ctx = margo_registered_data(mid, info->id);
315 316 317 318 319 320 321

    ret = margo_respond(h, NULL);
    assert(ret == HG_SUCCESS);

    ret = margo_destroy(h);
    assert(ret == HG_SUCCESS);

322
    /* TODO: propagate shutdown to other servers */
Matthieu Dorier's avatar
Matthieu Dorier committed
323
    mobject_server_shutdown(srv_ctx);
324 325 326 327

    return;
}
DEFINE_MARGO_RPC_HANDLER(mobject_shutdown_ult)
328 329 330

static void mobject_server_cleanup(mobject_server_context_t *srv_ctx)
{
331 332 333 334 335
    // cleanup bake-bulk
    bake_shutdown_service(srv_ctx->bake_id);
    bake_release_instance(srv_ctx->bake_id);
    // XXX: check the return value of these calls

336 337 338 339 340 341 342 343 344 345 346
    ssg_group_destroy(srv_ctx->gid);
    ssg_finalize();

    //pmemobj_close(NULL);

    ABT_mutex_free(&srv_ctx->shutdown_mutex);
    ABT_cond_free(&srv_ctx->shutdown_cond);
    free(srv_ctx);

    return;
}