mobject-server.c 9.09 KB
Newer Older
1 2 3 4 5 6 7 8
/*
 * (C) 2017 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

#include <assert.h>
#include <mpi.h>
9
#include <abt.h>
10
#include <margo.h>
11
//#include <sds-keyval.h>
12 13
#include <bake-bulk-server.h>
#include <bake-bulk-client.h>
14 15 16 17
//#include <libpmemobj.h>
#include <ssg-mpi.h>

#include "mobject-server.h"
Matthieu Dorier's avatar
Matthieu Dorier committed
18 19
#include "src/rpc-types/write-op.h"
#include "src/rpc-types/read-op.h"
20 21 22 23 24
//#include "src/server/print-write-op.h"
//#include "src/server/print-read-op.h"
#include "src/io-chain/write-op-impl.h"
#include "src/io-chain/read-op-impl.h"
#include "src/server/visitor-args.h"
25 26 27 28
//#include "src/server/fake/fake-write-op.h"
//#include "src/server/fake/fake-write-op.h"
#include "src/server/core/core-read-op.h"
#include "src/server/core/core-write-op.h"
29

Matthieu Dorier's avatar
Matthieu Dorier committed
30
struct mobject_server_context
31
{
32
    /* margo, bake, sds-keyval, ssg state */
33 34
    margo_instance_id mid;
    /* TODO bake, sds-keyval stuff */
35
    ssg_group_id_t gid;
36
    bake_target_id_t bake_id;
37 38 39 40 41
    /* server shutdown conditional logic */
    ABT_mutex shutdown_mutex;
    ABT_cond shutdown_cond;
    int shutdown_flag;
    int ref_count;
Matthieu Dorier's avatar
Matthieu Dorier committed
42
} ;
43

44
static int mobject_server_register(mobject_server_context_t *srv_ctx);
45
static void mobject_server_cleanup(mobject_server_context_t *srv_ctx);
46

Matthieu Dorier's avatar
Matthieu Dorier committed
47 48
DECLARE_MARGO_RPC_HANDLER(mobject_write_op_ult)
DECLARE_MARGO_RPC_HANDLER(mobject_read_op_ult)
49
DECLARE_MARGO_RPC_HANDLER(mobject_shutdown_ult)
50 51

/* mobject RPC IDs */
Matthieu Dorier's avatar
Matthieu Dorier committed
52 53
static hg_id_t mobject_write_op_rpc_id;
static hg_id_t mobject_read_op_rpc_id;
54
static hg_id_t mobject_shutdown_rpc_id;
55

Matthieu Dorier's avatar
Matthieu Dorier committed
56
static int mobject_server_is_initialized = 0;
57

Matthieu Dorier's avatar
Matthieu Dorier committed
58
mobject_server_context_t* mobject_server_init(margo_instance_id mid, const char *cluster_file)
59
{
60
    mobject_server_context_t *srv_ctx;
61
    int my_id;
62 63
    int ret;

Matthieu Dorier's avatar
Matthieu Dorier committed
64
    if (mobject_server_is_initialized)
65 66
    {
        fprintf(stderr, "Error: mobject server has already been initialized\n");
Matthieu Dorier's avatar
Matthieu Dorier committed
67
        return NULL;
68 69
    }

70 71
    srv_ctx = calloc(1, sizeof(*srv_ctx));
    if (!srv_ctx)
Matthieu Dorier's avatar
Matthieu Dorier committed
72
        return NULL;
73 74 75 76
    srv_ctx->mid = mid;
    srv_ctx->ref_count = 1;
    ABT_mutex_create(&srv_ctx->shutdown_mutex);
    ABT_cond_create(&srv_ctx->shutdown_cond);
77 78

    /* TODO sds-keyval */
79 80 81 82 83
# if 0
    kv_context *metadata;
    struct bake_pool_info *pool_info;
    pool_info = bake_server_makepool(poolname);
#endif
84

85 86 87
    ret = ssg_init(mid);
    if (ret != SSG_SUCCESS)
    {
88
        free(srv_ctx);
89
        fprintf(stderr, "Error: Unable to initialize SSG\n");
Matthieu Dorier's avatar
Matthieu Dorier committed
90
        return NULL;
91
    }
92

93
    /* server group create */
94
    srv_ctx->gid = ssg_group_create_mpi(MOBJECT_SERVER_GROUP_NAME, MPI_COMM_WORLD,
95
        NULL, NULL); /* XXX membership update callbacks unused currently */
96
    if (srv_ctx->gid == SSG_GROUP_ID_NULL)
97 98 99
    {
        fprintf(stderr, "Error: Unable to create the mobject server group\n");
        ssg_finalize();
100
        free(srv_ctx);
Matthieu Dorier's avatar
Matthieu Dorier committed
101
        return NULL;
102
    }
103
    my_id = ssg_get_group_self_id(srv_ctx->gid);
104 105

    /* register mobject & friends RPC handlers */
106
    mobject_server_register(srv_ctx);
107

108 109
    /* one proccess writes cluster connect info to file for clients to find later */
    if (my_id == 0)
110
    {
111
        ret = ssg_group_id_store(cluster_file, srv_ctx->gid);
112 113 114 115 116 117 118
        if (ret != 0)
        {
            fprintf(stderr, "Error: unable to store mobject cluster info to file %s\n",
                cluster_file);
            /* XXX: this call is performed by one process, and we do not currently
             * have an easy way to propagate this error to the entire cluster group
             */
119
            ssg_group_destroy(srv_ctx->gid);
120
            ssg_finalize();
121
            free(srv_ctx);
Matthieu Dorier's avatar
Matthieu Dorier committed
122
            return NULL;
123
        }
124 125
    }

126 127 128 129 130 131 132 133 134 135
    /* initialize bake-bulk */
    /* server part */
    struct bake_pool_info* pool_info = bake_server_makepool("/dev/shm/mobject.dat");
    bake_server_register(mid, pool_info);
    // XXX: check return values for the above two calls
    /* client part */
    hg_addr_t self_addr = ssg_get_addr(srv_ctx->gid, my_id);
    bake_probe_instance(mid, self_addr, &(srv_ctx->bake_id));
    // XXX: check return value of the above calls

Matthieu Dorier's avatar
Matthieu Dorier committed
136
    mobject_server_is_initialized = 1;
137

Matthieu Dorier's avatar
Matthieu Dorier committed
138
    return srv_ctx;
139 140
}

Matthieu Dorier's avatar
Matthieu Dorier committed
141 142
void mobject_server_shutdown(mobject_server_context_t *srv_ctx)
{   
143
    int do_cleanup;
144

145
    assert(srv_ctx);
146

147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
    ABT_mutex_lock(srv_ctx->shutdown_mutex);
    srv_ctx->shutdown_flag = 1;
    ABT_cond_broadcast(srv_ctx->shutdown_cond);

    srv_ctx->ref_count--;
    do_cleanup = srv_ctx->ref_count == 0;

    ABT_mutex_unlock(srv_ctx->shutdown_mutex);

    if (do_cleanup)
        mobject_server_cleanup(srv_ctx);

    return;
}

Matthieu Dorier's avatar
Matthieu Dorier committed
162
void mobject_server_wait_for_shutdown(mobject_server_context_t* srv_ctx)
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
{
    int do_cleanup;

    assert(srv_ctx);

    ABT_mutex_lock(srv_ctx->shutdown_mutex);

    srv_ctx->ref_count++;
    while(!srv_ctx->shutdown_flag)
        ABT_cond_wait(srv_ctx->shutdown_cond, srv_ctx->shutdown_mutex);
    srv_ctx->ref_count--;
    do_cleanup = srv_ctx->ref_count == 0;

    ABT_mutex_unlock(srv_ctx->shutdown_mutex);

    if (do_cleanup)
        mobject_server_cleanup(srv_ctx);
180

181
    return;
182
}
183

184
static int mobject_server_register(mobject_server_context_t *srv_ctx)
185
{
186
    int ret=0;
Matthieu Dorier's avatar
Matthieu Dorier committed
187 188 189 190 191
    margo_instance_id mid = srv_ctx->mid;

    mobject_write_op_rpc_id = MARGO_REGISTER(mid, "mobject_write_op", 
	write_op_in_t, write_op_out_t, mobject_write_op_ult);

Matthieu Dorier's avatar
Matthieu Dorier committed
192 193
    margo_register_data(mid, mobject_write_op_rpc_id, srv_ctx, NULL);

Matthieu Dorier's avatar
Matthieu Dorier committed
194
    mobject_read_op_rpc_id  = MARGO_REGISTER(mid, "mobject_read_op",
Shane Snyder's avatar
Shane Snyder committed
195
        read_op_in_t, read_op_out_t, mobject_read_op_ult);
Matthieu Dorier's avatar
Matthieu Dorier committed
196

Matthieu Dorier's avatar
Matthieu Dorier committed
197 198
    margo_register_data(mid, mobject_read_op_rpc_id, srv_ctx, NULL);

199 200
    mobject_shutdown_rpc_id = MARGO_REGISTER(mid, "mobject_shutdown",
        void, void, mobject_shutdown_ult);
201

Matthieu Dorier's avatar
Matthieu Dorier committed
202 203
    margo_register_data(mid, mobject_shutdown_rpc_id, srv_ctx, NULL);

204 205 206 207
#if 0
    bake_server_register(mid, pool_info);
    metadata = kv_server_register(mid);
#endif
208 209 210

    return ret;
}
211

Matthieu Dorier's avatar
Matthieu Dorier committed
212 213 214 215 216 217 218 219 220 221 222
static hg_return_t mobject_write_op_ult(hg_handle_t h)
{
    hg_return_t ret;

    write_op_in_t in;
    write_op_out_t out;

    /* Deserialize the input from the received handle. */
    ret = margo_get_input(h, &in);
    assert(ret == HG_SUCCESS);

223 224 225 226 227 228 229 230 231
    const struct hg_info* info = margo_get_info(h);

    server_visitor_args vargs;
    vargs.object_name = in.object_name;
    vargs.pool_name   = in.pool_name;
    vargs.mid         = margo_hg_handle_get_instance(h);
    vargs.client_addr = info->addr;
    vargs.bulk_handle = in.write_op->bulk_handle;

Matthieu Dorier's avatar
Matthieu Dorier committed
232
    /* Execute the operation chain */
233
    //print_write_op(in.write_op, in.object_name);
234 235
    //fake_write_op(in.write_op, &vargs);
    core_write_op(in.write_op, &vargs);
Matthieu Dorier's avatar
Matthieu Dorier committed
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268

    // set the return value of the RPC
    out.ret = 0;

    ret = margo_respond(h, &out);
    assert(ret == HG_SUCCESS);

    /* Free the input data. */
    ret = margo_free_input(h, &in);
    assert(ret == HG_SUCCESS);

    /* We are not going to use the handle anymore, so we should destroy it. */
    ret = margo_destroy(h);
 
    return ret;
}
DEFINE_MARGO_RPC_HANDLER(mobject_write_op_ult)

/* Implementation of the RPC. */
static hg_return_t mobject_read_op_ult(hg_handle_t h)
{
    hg_return_t ret;

    read_op_in_t in;
    read_op_out_t out;

    /* Deserialize the input from the received handle. */
    ret = margo_get_input(h, &in);
    assert(ret == HG_SUCCESS);

    /* Create a response list matching the input actions */
    read_response_t resp = build_matching_read_responses(in.read_op);

269 270 271 272 273 274 275 276 277
    const struct hg_info* info = margo_get_info(h);

    server_visitor_args vargs;
    vargs.object_name = in.object_name;
    vargs.pool_name   = in.pool_name;
    vargs.mid         = margo_hg_handle_get_instance(h);
    vargs.client_addr = info->addr;
    vargs.bulk_handle = in.read_op->bulk_handle;

Matthieu Dorier's avatar
Matthieu Dorier committed
278
    /* Compute the result. */
279
    //print_read_op(in.read_op, in.object_name);
280 281
    //fake_read_op(in.read_op, &vargs);
    core_read_op(in.read_op, &vargs);
Matthieu Dorier's avatar
Matthieu Dorier committed
282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300

    out.responses = resp;

    ret = margo_respond(h, &out);
    assert(ret == HG_SUCCESS);

    free_read_responses(resp);

    /* Free the input data. */
    ret = margo_free_input(h, &in);
    assert(ret == HG_SUCCESS);

    /* We are not going to use the handle anymore, so we should destroy it. */
    ret = margo_destroy(h);
    assert(ret == HG_SUCCESS);

    return ret;
}
DEFINE_MARGO_RPC_HANDLER(mobject_read_op_ult)
301

302
static void mobject_shutdown_ult(hg_handle_t h)
303
{
304 305
    hg_return_t ret;

Matthieu Dorier's avatar
Matthieu Dorier committed
306
    const struct hg_info *info = margo_get_info(h);
307
    margo_instance_id mid = margo_hg_handle_get_instance(h);
Matthieu Dorier's avatar
Matthieu Dorier committed
308
    mobject_server_context_t* srv_ctx = margo_registered_data(mid, info->id);
309 310 311 312 313 314 315

    ret = margo_respond(h, NULL);
    assert(ret == HG_SUCCESS);

    ret = margo_destroy(h);
    assert(ret == HG_SUCCESS);

316
    /* TODO: propagate shutdown to other servers */
Matthieu Dorier's avatar
Matthieu Dorier committed
317
    mobject_server_shutdown(srv_ctx);
318 319 320 321

    return;
}
DEFINE_MARGO_RPC_HANDLER(mobject_shutdown_ult)
322 323 324

static void mobject_server_cleanup(mobject_server_context_t *srv_ctx)
{
325 326 327 328 329
    // cleanup bake-bulk
    bake_shutdown_service(srv_ctx->bake_id);
    bake_release_instance(srv_ctx->bake_id);
    // XXX: check the return value of these calls

330 331 332 333 334 335 336 337 338 339 340
    ssg_group_destroy(srv_ctx->gid);
    ssg_finalize();

    //pmemobj_close(NULL);

    ABT_mutex_free(&srv_ctx->shutdown_mutex);
    ABT_cond_free(&srv_ctx->shutdown_cond);
    free(srv_ctx);

    return;
}