mobject-server.c 5.57 KB
Newer Older
1 2 3 4 5 6 7 8
/*
 * (C) 2017 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

#include <assert.h>
#include <mpi.h>
9
#include <margo.h>
10 11 12 13 14 15
//#include <sds-keyval.h>
//#include <bake-bulk-server.h>
//#include <libpmemobj.h>
#include <ssg-mpi.h>

#include "mobject-server.h"
Matthieu Dorier's avatar
Matthieu Dorier committed
16 17 18 19
#include "src/rpc-types/write-op.h"
#include "src/rpc-types/read-op.h"
#include "src/server/exec-write-op.h"
#include "src/server/exec-read-op.h"
20

21 22
typedef struct mobject_server_context
{
23 24
    margo_instance_id mid;
    /* TODO bake, sds-keyval stuff */
25 26 27
    ssg_group_id_t gid;
} mobject_server_context_t;

28 29 30
static int mobject_server_register(mobject_server_context_t *srv_ctx);

DECLARE_MARGO_RPC_HANDLER(mobject_shutdown_ult)
Matthieu Dorier's avatar
Matthieu Dorier committed
31 32
DECLARE_MARGO_RPC_HANDLER(mobject_write_op_ult)
DECLARE_MARGO_RPC_HANDLER(mobject_read_op_ult)
33 34 35

/* mobject RPC IDs */
static hg_id_t mobject_shutdown_rpc_id;
Matthieu Dorier's avatar
Matthieu Dorier committed
36 37
static hg_id_t mobject_write_op_rpc_id;
static hg_id_t mobject_read_op_rpc_id;
38

39
/* XXX one global mobject server state struct */
40 41
static mobject_server_context_t *g_srv_ctx = NULL;

42

43
int mobject_server_init(margo_instance_id mid, const char *cluster_file)
44
{
45
    int my_id;
46 47 48 49 50 51 52 53 54 55 56 57
    int ret;

    if (g_srv_ctx)
    {
        fprintf(stderr, "Error: mobject server has already been initialized\n");
        return -1;
    }

    g_srv_ctx = malloc(sizeof(*g_srv_ctx));
    if (!g_srv_ctx)
        return -1;
    memset(g_srv_ctx, 0, sizeof(*g_srv_ctx));
58
    g_srv_ctx->mid = mid;
59 60 61

    /* TODO bake-bulk */
    /* TODO sds-keyval */
62 63 64 65 66
# if 0
    kv_context *metadata;
    struct bake_pool_info *pool_info;
    pool_info = bake_server_makepool(poolname);
#endif
67

68 69 70 71 72 73
    ret = ssg_init(mid);
    if (ret != SSG_SUCCESS)
    {
        fprintf(stderr, "Error: Unable to initialize SSG\n");
        return -1;
    }
74

75 76
    /* server group create */
    g_srv_ctx->gid = ssg_group_create_mpi(MOBJECT_SERVER_GROUP_NAME, MPI_COMM_WORLD,
77
        NULL, NULL); /* XXX membership update callbacks unused currently */
78 79 80 81 82 83
    if (g_srv_ctx->gid == SSG_GROUP_ID_NULL)
    {
        fprintf(stderr, "Error: Unable to create the mobject server group\n");
        ssg_finalize();
        return -1;
    }
84 85 86 87
    my_id = ssg_get_group_self_id(g_srv_ctx->gid);

    /* register mobject & friends RPC handlers */
    mobject_server_register(g_srv_ctx);
88

89 90
    /* one proccess writes cluster connect info to file for clients to find later */
    if (my_id == 0)
91
    {
92 93 94 95 96 97 98 99 100 101 102 103
        ret = ssg_group_id_store(cluster_file, g_srv_ctx->gid);
        if (ret != 0)
        {
            fprintf(stderr, "Error: unable to store mobject cluster info to file %s\n",
                cluster_file);
            /* XXX: this call is performed by one process, and we do not currently
             * have an easy way to propagate this error to the entire cluster group
             */
            ssg_group_destroy(g_srv_ctx->gid);
            ssg_finalize();
            return -1;
        }
104 105
    }

106 107 108 109 110 111
    /* XXX cleanup? */

    return 0;
}

void mobject_server_shutdown(margo_instance_id mid)
112
{
113 114 115 116 117 118 119
    assert(g_srv_ctx);

    ssg_group_destroy(g_srv_ctx->gid);
    ssg_finalize();

    //margo_wait_for_finalize(mid);
    //pmemobj_close(NULL);
120

121
    return;
122
}
123

124
static int mobject_server_register(mobject_server_context_t *srv_ctx)
125
{
126
    int ret=0;
127

Matthieu Dorier's avatar
Matthieu Dorier committed
128 129 130
    margo_instance_id mid = srv_ctx->mid;

    mobject_shutdown_rpc_id = MARGO_REGISTER(mid, "mobject_shutdown",
131
        void, void, mobject_shutdown_ult);
132

Matthieu Dorier's avatar
Matthieu Dorier committed
133 134 135 136 137 138
    mobject_write_op_rpc_id = MARGO_REGISTER(mid, "mobject_write_op", 
	write_op_in_t, write_op_out_t, mobject_write_op_ult);

    mobject_read_op_rpc_id  = MARGO_REGISTER(mid, "mobject_read_op",
        read_op_in_t, read_op_out_t, mobject_read_op_ult)

139 140 141 142
#if 0
    bake_server_register(mid, pool_info);
    metadata = kv_server_register(mid);
#endif
143 144 145

    return ret;
}
146 147 148 149 150 151 152 153

static void mobject_shutdown_ult(hg_handle_t handle)
{

    margo_destroy(handle);
    return;
}
DEFINE_MARGO_RPC_HANDLER(mobject_shutdown_ult)
Matthieu Dorier's avatar
Matthieu Dorier committed
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225

static hg_return_t mobject_write_op_ult(hg_handle_t h)
{
    hg_return_t ret;

    write_op_in_t in;
    write_op_out_t out;

    margo_instance_id mid = margo_hg_handle_get_instance(h);

    /* Deserialize the input from the received handle. */
    ret = margo_get_input(h, &in);
    assert(ret == HG_SUCCESS);

    /* Execute the operation chain */
    execute_write_op(in.write_op, in.object_name);

    // set the return value of the RPC
    out.ret = 0;

    ret = margo_respond(h, &out);
    assert(ret == HG_SUCCESS);

    /* Free the input data. */
    ret = margo_free_input(h, &in);
    assert(ret == HG_SUCCESS);

    /* We are not going to use the handle anymore, so we should destroy it. */
    ret = margo_destroy(h);
 
    return ret;
}
DEFINE_MARGO_RPC_HANDLER(mobject_write_op_ult)

/* Implementation of the RPC. */
static hg_return_t mobject_read_op_ult(hg_handle_t h)
{
    hg_return_t ret;

    read_op_in_t in;
    read_op_out_t out;

    margo_instance_id mid = margo_hg_handle_get_instance(h);

    /* Deserialize the input from the received handle. */
    ret = margo_get_input(h, &in);
    assert(ret == HG_SUCCESS);

    /* Create a response list matching the input actions */
    read_response_t resp = build_matching_read_responses(in.read_op);

    /* Compute the result. */
    execute_read_op(in.read_op, in.object_name);

    out.responses = resp;

    ret = margo_respond(h, &out);
    assert(ret == HG_SUCCESS);

    free_read_responses(resp);

    /* Free the input data. */
    ret = margo_free_input(h, &in);
    assert(ret == HG_SUCCESS);

    /* We are not going to use the handle anymore, so we should destroy it. */
    ret = margo_destroy(h);
    assert(ret == HG_SUCCESS);

    return ret;
}
DEFINE_MARGO_RPC_HANDLER(mobject_read_op_ult)