mobject-server.c 7.27 KB
Newer Older
1 2 3 4 5 6 7 8
/*
 * (C) 2017 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

#include <assert.h>
#include <mpi.h>
9
#include <abt.h>
10
#include <margo.h>
11 12 13 14 15 16
//#include <sds-keyval.h>
//#include <bake-bulk-server.h>
//#include <libpmemobj.h>
#include <ssg-mpi.h>

#include "mobject-server.h"
Matthieu Dorier's avatar
Matthieu Dorier committed
17 18 19 20
#include "src/rpc-types/write-op.h"
#include "src/rpc-types/read-op.h"
#include "src/server/exec-write-op.h"
#include "src/server/exec-read-op.h"
21

22 23
typedef struct mobject_server_context
{
24
    /* margo, bake, sds-keyval, ssg state */
25 26
    margo_instance_id mid;
    /* TODO bake, sds-keyval stuff */
27
    ssg_group_id_t gid;
28 29 30 31 32 33

    /* server shutdown conditional logic */
    ABT_mutex shutdown_mutex;
    ABT_cond shutdown_cond;
    int shutdown_flag;
    int ref_count;
34 35
} mobject_server_context_t;

36
static int mobject_server_register(mobject_server_context_t *srv_ctx);
37
static void mobject_server_cleanup(mobject_server_context_t *srv_ctx);
38

Matthieu Dorier's avatar
Matthieu Dorier committed
39 40
DECLARE_MARGO_RPC_HANDLER(mobject_write_op_ult)
DECLARE_MARGO_RPC_HANDLER(mobject_read_op_ult)
41
DECLARE_MARGO_RPC_HANDLER(mobject_shutdown_ult)
42 43

/* mobject RPC IDs */
Matthieu Dorier's avatar
Matthieu Dorier committed
44 45
static hg_id_t mobject_write_op_rpc_id;
static hg_id_t mobject_read_op_rpc_id;
46
static hg_id_t mobject_shutdown_rpc_id;
47

48
/* XXX one global mobject server state struct */
49 50
static mobject_server_context_t *g_srv_ctx = NULL;

51

52
int mobject_server_init(margo_instance_id mid, const char *cluster_file)
53
{
54
    mobject_server_context_t *srv_ctx;
55
    int my_id;
56 57 58 59 60 61 62 63
    int ret;

    if (g_srv_ctx)
    {
        fprintf(stderr, "Error: mobject server has already been initialized\n");
        return -1;
    }

64 65
    srv_ctx = calloc(1, sizeof(*srv_ctx));
    if (!srv_ctx)
66
        return -1;
67 68 69 70
    srv_ctx->mid = mid;
    srv_ctx->ref_count = 1;
    ABT_mutex_create(&srv_ctx->shutdown_mutex);
    ABT_cond_create(&srv_ctx->shutdown_cond);
71 72 73

    /* TODO bake-bulk */
    /* TODO sds-keyval */
74 75 76 77 78
# if 0
    kv_context *metadata;
    struct bake_pool_info *pool_info;
    pool_info = bake_server_makepool(poolname);
#endif
79

80 81 82
    ret = ssg_init(mid);
    if (ret != SSG_SUCCESS)
    {
83
        free(srv_ctx);
84 85 86
        fprintf(stderr, "Error: Unable to initialize SSG\n");
        return -1;
    }
87

88
    /* server group create */
89
    srv_ctx->gid = ssg_group_create_mpi(MOBJECT_SERVER_GROUP_NAME, MPI_COMM_WORLD,
90
        NULL, NULL); /* XXX membership update callbacks unused currently */
91
    if (srv_ctx->gid == SSG_GROUP_ID_NULL)
92 93 94
    {
        fprintf(stderr, "Error: Unable to create the mobject server group\n");
        ssg_finalize();
95
        free(srv_ctx);
96 97
        return -1;
    }
98
    my_id = ssg_get_group_self_id(srv_ctx->gid);
99 100

    /* register mobject & friends RPC handlers */
101
    mobject_server_register(srv_ctx);
102

103 104
    /* one proccess writes cluster connect info to file for clients to find later */
    if (my_id == 0)
105
    {
106
        ret = ssg_group_id_store(cluster_file, srv_ctx->gid);
107 108 109 110 111 112 113
        if (ret != 0)
        {
            fprintf(stderr, "Error: unable to store mobject cluster info to file %s\n",
                cluster_file);
            /* XXX: this call is performed by one process, and we do not currently
             * have an easy way to propagate this error to the entire cluster group
             */
114
            ssg_group_destroy(srv_ctx->gid);
115
            ssg_finalize();
116
            free(srv_ctx);
117 118
            return -1;
        }
119 120
    }

121
    g_srv_ctx = srv_ctx;
122 123 124 125 126

    return 0;
}

void mobject_server_shutdown(margo_instance_id mid)
127
{
128 129
    mobject_server_context_t *srv_ctx = g_srv_ctx;
    int do_cleanup;
130

131
    assert(srv_ctx);
132

133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
    ABT_mutex_lock(srv_ctx->shutdown_mutex);
    srv_ctx->shutdown_flag = 1;
    ABT_cond_broadcast(srv_ctx->shutdown_cond);

    srv_ctx->ref_count--;
    do_cleanup = srv_ctx->ref_count == 0;

    ABT_mutex_unlock(srv_ctx->shutdown_mutex);

    if (do_cleanup)
        mobject_server_cleanup(srv_ctx);

    return;
}

void mobject_server_wait_for_shutdown()
{
    mobject_server_context_t *srv_ctx = g_srv_ctx;
    int do_cleanup;

    assert(srv_ctx);

    ABT_mutex_lock(srv_ctx->shutdown_mutex);

    srv_ctx->ref_count++;
    while(!srv_ctx->shutdown_flag)
        ABT_cond_wait(srv_ctx->shutdown_cond, srv_ctx->shutdown_mutex);
    srv_ctx->ref_count--;
    do_cleanup = srv_ctx->ref_count == 0;

    ABT_mutex_unlock(srv_ctx->shutdown_mutex);

    if (do_cleanup)
        mobject_server_cleanup(srv_ctx);
167

168
    return;
169
}
170

171
static int mobject_server_register(mobject_server_context_t *srv_ctx)
172
{
173
    int ret=0;
Matthieu Dorier's avatar
Matthieu Dorier committed
174 175 176 177 178 179 180 181
    margo_instance_id mid = srv_ctx->mid;

    mobject_write_op_rpc_id = MARGO_REGISTER(mid, "mobject_write_op", 
	write_op_in_t, write_op_out_t, mobject_write_op_ult);

    mobject_read_op_rpc_id  = MARGO_REGISTER(mid, "mobject_read_op",
        read_op_in_t, read_op_out_t, mobject_read_op_ult)

182 183
    mobject_shutdown_rpc_id = MARGO_REGISTER(mid, "mobject_shutdown",
        void, void, mobject_shutdown_ult);
184

185 186 187 188
#if 0
    bake_server_register(mid, pool_info);
    metadata = kv_server_register(mid);
#endif
189 190 191

    return ret;
}
192

Matthieu Dorier's avatar
Matthieu Dorier committed
193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263
static hg_return_t mobject_write_op_ult(hg_handle_t h)
{
    hg_return_t ret;

    write_op_in_t in;
    write_op_out_t out;

    margo_instance_id mid = margo_hg_handle_get_instance(h);

    /* Deserialize the input from the received handle. */
    ret = margo_get_input(h, &in);
    assert(ret == HG_SUCCESS);

    /* Execute the operation chain */
    execute_write_op(in.write_op, in.object_name);

    // set the return value of the RPC
    out.ret = 0;

    ret = margo_respond(h, &out);
    assert(ret == HG_SUCCESS);

    /* Free the input data. */
    ret = margo_free_input(h, &in);
    assert(ret == HG_SUCCESS);

    /* We are not going to use the handle anymore, so we should destroy it. */
    ret = margo_destroy(h);
 
    return ret;
}
DEFINE_MARGO_RPC_HANDLER(mobject_write_op_ult)

/* Implementation of the RPC. */
static hg_return_t mobject_read_op_ult(hg_handle_t h)
{
    hg_return_t ret;

    read_op_in_t in;
    read_op_out_t out;

    margo_instance_id mid = margo_hg_handle_get_instance(h);

    /* Deserialize the input from the received handle. */
    ret = margo_get_input(h, &in);
    assert(ret == HG_SUCCESS);

    /* Create a response list matching the input actions */
    read_response_t resp = build_matching_read_responses(in.read_op);

    /* Compute the result. */
    execute_read_op(in.read_op, in.object_name);

    out.responses = resp;

    ret = margo_respond(h, &out);
    assert(ret == HG_SUCCESS);

    free_read_responses(resp);

    /* Free the input data. */
    ret = margo_free_input(h, &in);
    assert(ret == HG_SUCCESS);

    /* We are not going to use the handle anymore, so we should destroy it. */
    ret = margo_destroy(h);
    assert(ret == HG_SUCCESS);

    return ret;
}
DEFINE_MARGO_RPC_HANDLER(mobject_read_op_ult)
264

265
static void mobject_shutdown_ult(hg_handle_t h)
266
{
267 268 269 270 271 272 273 274 275 276 277 278 279
    hg_return_t ret;

    margo_instance_id mid = margo_hg_handle_get_instance(h);

    printf("Got shutdown signal!\n");

    ret = margo_respond(h, NULL);
    assert(ret == HG_SUCCESS);

    ret = margo_destroy(h);
    assert(ret == HG_SUCCESS);

    mobject_server_shutdown(mid);
280 281 282 283

    return;
}
DEFINE_MARGO_RPC_HANDLER(mobject_shutdown_ult)
284 285 286 287 288 289 290 291 292 293 294 295 296 297

static void mobject_server_cleanup(mobject_server_context_t *srv_ctx)
{
    ssg_group_destroy(srv_ctx->gid);
    ssg_finalize();

    //pmemobj_close(NULL);

    ABT_mutex_free(&srv_ctx->shutdown_mutex);
    ABT_cond_free(&srv_ctx->shutdown_cond);
    free(srv_ctx);

    return;
}