mobject-server.c 9.17 KB
Newer Older
1 2 3 4 5 6
/*
 * (C) 2017 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

7
//#define FAKE_CPP_SERVER
8

9 10
#include <assert.h>
#include <mpi.h>
11
#include <abt.h>
12
#include <margo.h>
13
//#include <sds-keyval.h>
14 15
#include <bake-bulk-server.h>
#include <bake-bulk-client.h>
16 17 18 19
//#include <libpmemobj.h>
#include <ssg-mpi.h>

#include "mobject-server.h"
20
#include "src/server/mobject-server-context.h"
Matthieu Dorier's avatar
Matthieu Dorier committed
21 22
#include "src/rpc-types/write-op.h"
#include "src/rpc-types/read-op.h"
23 24 25 26 27
//#include "src/server/print-write-op.h"
//#include "src/server/print-read-op.h"
#include "src/io-chain/write-op-impl.h"
#include "src/io-chain/read-op-impl.h"
#include "src/server/visitor-args.h"
28 29 30 31
#ifdef FAKE_CPP_SERVER
#include "src/server/fake/fake-read-op.h"
#include "src/server/fake/fake-write-op.h"
#else
32 33
#include "src/server/core/core-read-op.h"
#include "src/server/core/core-write-op.h"
34
#endif
35

36
static int mobject_server_register(mobject_server_context_t *srv_ctx);
37
static void mobject_server_cleanup(mobject_server_context_t *srv_ctx);
38

Matthieu Dorier's avatar
Matthieu Dorier committed
39 40
DECLARE_MARGO_RPC_HANDLER(mobject_write_op_ult)
DECLARE_MARGO_RPC_HANDLER(mobject_read_op_ult)
41
DECLARE_MARGO_RPC_HANDLER(mobject_shutdown_ult)
42 43

/* mobject RPC IDs */
Matthieu Dorier's avatar
Matthieu Dorier committed
44 45
static hg_id_t mobject_write_op_rpc_id;
static hg_id_t mobject_read_op_rpc_id;
46
static hg_id_t mobject_shutdown_rpc_id;
47

Matthieu Dorier's avatar
Matthieu Dorier committed
48
static int mobject_server_is_initialized = 0;
49

Matthieu Dorier's avatar
Matthieu Dorier committed
50
mobject_server_context_t* mobject_server_init(margo_instance_id mid, const char *cluster_file)
51
{
52
    mobject_server_context_t *srv_ctx;
53
    int my_id;
54 55
    int ret;

Matthieu Dorier's avatar
Matthieu Dorier committed
56
    if (mobject_server_is_initialized)
57 58
    {
        fprintf(stderr, "Error: mobject server has already been initialized\n");
Matthieu Dorier's avatar
Matthieu Dorier committed
59
        return NULL;
60 61
    }

62 63
    srv_ctx = calloc(1, sizeof(*srv_ctx));
    if (!srv_ctx)
Matthieu Dorier's avatar
Matthieu Dorier committed
64
        return NULL;
65 66 67 68
    srv_ctx->mid = mid;
    srv_ctx->ref_count = 1;
    ABT_mutex_create(&srv_ctx->shutdown_mutex);
    ABT_cond_create(&srv_ctx->shutdown_cond);
69 70 71 72

    ret = ssg_init(mid);
    if (ret != SSG_SUCCESS)
    {
73
        free(srv_ctx);
74
        fprintf(stderr, "Error: Unable to initialize SSG\n");
Matthieu Dorier's avatar
Matthieu Dorier committed
75
        return NULL;
76
    }
77

78
    /* server group create */
79
    srv_ctx->gid = ssg_group_create_mpi(MOBJECT_SERVER_GROUP_NAME, MPI_COMM_WORLD,
80
        NULL, NULL); /* XXX membership update callbacks unused currently */
81
    if (srv_ctx->gid == SSG_GROUP_ID_NULL)
82 83 84
    {
        fprintf(stderr, "Error: Unable to create the mobject server group\n");
        ssg_finalize();
85
        free(srv_ctx);
Matthieu Dorier's avatar
Matthieu Dorier committed
86
        return NULL;
87
    }
88
    my_id = ssg_get_group_self_id(srv_ctx->gid);
89 90

    /* register mobject & friends RPC handlers */
91
    mobject_server_register(srv_ctx);
92

93 94
    /* one proccess writes cluster connect info to file for clients to find later */
    if (my_id == 0)
95
    {
96
        ret = ssg_group_id_store(cluster_file, srv_ctx->gid);
97 98 99 100 101 102 103
        if (ret != 0)
        {
            fprintf(stderr, "Error: unable to store mobject cluster info to file %s\n",
                cluster_file);
            /* XXX: this call is performed by one process, and we do not currently
             * have an easy way to propagate this error to the entire cluster group
             */
104
            ssg_group_destroy(srv_ctx->gid);
105
            ssg_finalize();
106
            free(srv_ctx);
Matthieu Dorier's avatar
Matthieu Dorier committed
107
            return NULL;
108
        }
109 110
    }

111 112
    /* initialize bake-bulk */
    /* server part */
113 114 115
    bake_server_init(mid, "/dev/shm/mobject.dat");
    // XXX: check return values for the above call

116 117 118 119 120
    /* client part */
    hg_addr_t self_addr = ssg_get_addr(srv_ctx->gid, my_id);
    bake_probe_instance(mid, self_addr, &(srv_ctx->bake_id));
    // XXX: check return value of the above calls

121 122
    /* TODO setup sds-keyval */

Matthieu Dorier's avatar
Matthieu Dorier committed
123
    mobject_server_is_initialized = 1;
124

Matthieu Dorier's avatar
Matthieu Dorier committed
125
    return srv_ctx;
126 127
}

Matthieu Dorier's avatar
Matthieu Dorier committed
128 129
void mobject_server_shutdown(mobject_server_context_t *srv_ctx)
{   
130
    int do_cleanup;
131

132
    assert(srv_ctx);
133

134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
    ABT_mutex_lock(srv_ctx->shutdown_mutex);
    srv_ctx->shutdown_flag = 1;
    ABT_cond_broadcast(srv_ctx->shutdown_cond);

    srv_ctx->ref_count--;
    do_cleanup = srv_ctx->ref_count == 0;

    ABT_mutex_unlock(srv_ctx->shutdown_mutex);

    if (do_cleanup)
        mobject_server_cleanup(srv_ctx);

    return;
}

Matthieu Dorier's avatar
Matthieu Dorier committed
149
void mobject_server_wait_for_shutdown(mobject_server_context_t* srv_ctx)
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
{
    int do_cleanup;

    assert(srv_ctx);

    ABT_mutex_lock(srv_ctx->shutdown_mutex);

    srv_ctx->ref_count++;
    while(!srv_ctx->shutdown_flag)
        ABT_cond_wait(srv_ctx->shutdown_cond, srv_ctx->shutdown_mutex);
    srv_ctx->ref_count--;
    do_cleanup = srv_ctx->ref_count == 0;

    ABT_mutex_unlock(srv_ctx->shutdown_mutex);

    if (do_cleanup)
        mobject_server_cleanup(srv_ctx);
167

168
    return;
169
}
170

171
static int mobject_server_register(mobject_server_context_t *srv_ctx)
172
{
173
    int ret=0;
Matthieu Dorier's avatar
Matthieu Dorier committed
174 175 176 177 178
    margo_instance_id mid = srv_ctx->mid;

    mobject_write_op_rpc_id = MARGO_REGISTER(mid, "mobject_write_op", 
	write_op_in_t, write_op_out_t, mobject_write_op_ult);

Matthieu Dorier's avatar
Matthieu Dorier committed
179 180
    margo_register_data(mid, mobject_write_op_rpc_id, srv_ctx, NULL);

Matthieu Dorier's avatar
Matthieu Dorier committed
181
    mobject_read_op_rpc_id  = MARGO_REGISTER(mid, "mobject_read_op",
Shane Snyder's avatar
Shane Snyder committed
182
        read_op_in_t, read_op_out_t, mobject_read_op_ult);
Matthieu Dorier's avatar
Matthieu Dorier committed
183

Matthieu Dorier's avatar
Matthieu Dorier committed
184 185
    margo_register_data(mid, mobject_read_op_rpc_id, srv_ctx, NULL);

186 187
    mobject_shutdown_rpc_id = MARGO_REGISTER(mid, "mobject_shutdown",
        void, void, mobject_shutdown_ult);
188

Matthieu Dorier's avatar
Matthieu Dorier committed
189 190
    margo_register_data(mid, mobject_shutdown_rpc_id, srv_ctx, NULL);

191 192 193 194
#if 0
    bake_server_register(mid, pool_info);
    metadata = kv_server_register(mid);
#endif
195 196 197

    return ret;
}
198

Matthieu Dorier's avatar
Matthieu Dorier committed
199 200 201 202 203 204 205 206 207 208 209
static hg_return_t mobject_write_op_ult(hg_handle_t h)
{
    hg_return_t ret;

    write_op_in_t in;
    write_op_out_t out;

    /* Deserialize the input from the received handle. */
    ret = margo_get_input(h, &in);
    assert(ret == HG_SUCCESS);

210
    const struct hg_info* info = margo_get_info(h);
211
    margo_instance_id mid = margo_hg_handle_get_instance(h);
212 213 214 215

    server_visitor_args vargs;
    vargs.object_name = in.object_name;
    vargs.pool_name   = in.pool_name;
216
    vargs.srv_ctx     = margo_registered_data(mid, info->id);
Matthieu Dorier's avatar
Matthieu Dorier committed
217 218
    vargs.client_addr_type = MOBJECT_ADDR_STRING;
    vargs.client_addr.as_string = in.client_addr;
219 220
    vargs.bulk_handle = in.write_op->bulk_handle;

Matthieu Dorier's avatar
Matthieu Dorier committed
221
    /* Execute the operation chain */
222
    //print_write_op(in.write_op, in.object_name);
223
#ifdef FAKE_CPP_SERVER
Matthieu Dorier's avatar
Matthieu Dorier committed
224 225
    vargs.client_addr_type = MOBJECT_ADDR_HANDLE;
    vargs.client_addr.as_handle = info->addr;
226 227
    fake_write_op(in.write_op, &vargs);
#else
228
    core_write_op(in.write_op, &vargs);
229
#endif
Matthieu Dorier's avatar
Matthieu Dorier committed
230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262

    // set the return value of the RPC
    out.ret = 0;

    ret = margo_respond(h, &out);
    assert(ret == HG_SUCCESS);

    /* Free the input data. */
    ret = margo_free_input(h, &in);
    assert(ret == HG_SUCCESS);

    /* We are not going to use the handle anymore, so we should destroy it. */
    ret = margo_destroy(h);
 
    return ret;
}
DEFINE_MARGO_RPC_HANDLER(mobject_write_op_ult)

/* Implementation of the RPC. */
static hg_return_t mobject_read_op_ult(hg_handle_t h)
{
    hg_return_t ret;

    read_op_in_t in;
    read_op_out_t out;

    /* Deserialize the input from the received handle. */
    ret = margo_get_input(h, &in);
    assert(ret == HG_SUCCESS);

    /* Create a response list matching the input actions */
    read_response_t resp = build_matching_read_responses(in.read_op);

263
    const struct hg_info* info = margo_get_info(h);
264
    margo_instance_id mid = margo_hg_handle_get_instance(h);
265 266 267 268

    server_visitor_args vargs;
    vargs.object_name = in.object_name;
    vargs.pool_name   = in.pool_name;
269
    vargs.srv_ctx     = margo_registered_data(mid,info->id);
Matthieu Dorier's avatar
Matthieu Dorier committed
270 271
    vargs.client_addr_type = MOBJECT_ADDR_STRING;
    vargs.client_addr.as_string = in.client_addr;
272 273
    vargs.bulk_handle = in.read_op->bulk_handle;

Matthieu Dorier's avatar
Matthieu Dorier committed
274
    /* Compute the result. */
275
    //print_read_op(in.read_op, in.object_name);
276
#ifdef FAKE_CPP_SERVER
Matthieu Dorier's avatar
Matthieu Dorier committed
277 278
    vargs.client_addr_type = MOBJECT_ADDR_HANDLE;
    vargs.client_addr.as_handle = info->addr;
279 280
    fake_read_op(in.read_op, &vargs);
#else
281
    core_read_op(in.read_op, &vargs);
282
#endif
Matthieu Dorier's avatar
Matthieu Dorier committed
283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301

    out.responses = resp;

    ret = margo_respond(h, &out);
    assert(ret == HG_SUCCESS);

    free_read_responses(resp);

    /* Free the input data. */
    ret = margo_free_input(h, &in);
    assert(ret == HG_SUCCESS);

    /* We are not going to use the handle anymore, so we should destroy it. */
    ret = margo_destroy(h);
    assert(ret == HG_SUCCESS);

    return ret;
}
DEFINE_MARGO_RPC_HANDLER(mobject_read_op_ult)
302

303
static void mobject_shutdown_ult(hg_handle_t h)
304
{
305 306
    hg_return_t ret;

Matthieu Dorier's avatar
Matthieu Dorier committed
307
    const struct hg_info *info = margo_get_info(h);
308
    margo_instance_id mid = margo_hg_handle_get_instance(h);
Matthieu Dorier's avatar
Matthieu Dorier committed
309
    mobject_server_context_t* srv_ctx = margo_registered_data(mid, info->id);
310 311 312 313 314 315 316

    ret = margo_respond(h, NULL);
    assert(ret == HG_SUCCESS);

    ret = margo_destroy(h);
    assert(ret == HG_SUCCESS);

317
    /* TODO: propagate shutdown to other servers */
Matthieu Dorier's avatar
Matthieu Dorier committed
318
    mobject_server_shutdown(srv_ctx);
319 320 321 322

    return;
}
DEFINE_MARGO_RPC_HANDLER(mobject_shutdown_ult)
323 324 325

static void mobject_server_cleanup(mobject_server_context_t *srv_ctx)
{
326 327 328 329 330
    // cleanup bake-bulk
    bake_shutdown_service(srv_ctx->bake_id);
    bake_release_instance(srv_ctx->bake_id);
    // XXX: check the return value of these calls

331 332 333 334 335 336 337 338 339 340 341
    ssg_group_destroy(srv_ctx->gid);
    ssg_finalize();

    //pmemobj_close(NULL);

    ABT_mutex_free(&srv_ctx->shutdown_mutex);
    ABT_cond_free(&srv_ctx->shutdown_cond);
    free(srv_ctx);

    return;
}