mobject-server.c 5.57 KB
Newer Older
1
2
3
4
5
6
7
8
/*
 * (C) 2017 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

#include <assert.h>
#include <mpi.h>
9
#include <margo.h>
10
11
12
13
14
15
//#include <sds-keyval.h>
//#include <bake-bulk-server.h>
//#include <libpmemobj.h>
#include <ssg-mpi.h>

#include "mobject-server.h"
Matthieu Dorier's avatar
Matthieu Dorier committed
16
17
18
19
#include "src/rpc-types/write-op.h"
#include "src/rpc-types/read-op.h"
#include "src/server/exec-write-op.h"
#include "src/server/exec-read-op.h"
20

21
22
typedef struct mobject_server_context
{
23
24
    margo_instance_id mid;
    /* TODO bake, sds-keyval stuff */
25
26
27
    ssg_group_id_t gid;
} mobject_server_context_t;

28
29
30
static int mobject_server_register(mobject_server_context_t *srv_ctx);

DECLARE_MARGO_RPC_HANDLER(mobject_shutdown_ult)
Matthieu Dorier's avatar
Matthieu Dorier committed
31
32
DECLARE_MARGO_RPC_HANDLER(mobject_write_op_ult)
DECLARE_MARGO_RPC_HANDLER(mobject_read_op_ult)
33
34
35

/* mobject RPC IDs */
static hg_id_t mobject_shutdown_rpc_id;
Matthieu Dorier's avatar
Matthieu Dorier committed
36
37
static hg_id_t mobject_write_op_rpc_id;
static hg_id_t mobject_read_op_rpc_id;
38

39
/* XXX one global mobject server state struct */
40
41
static mobject_server_context_t *g_srv_ctx = NULL;

42

43
int mobject_server_init(margo_instance_id mid, const char *cluster_file)
44
{
45
    int my_id;
46
47
48
49
50
51
52
53
54
55
56
57
    int ret;

    if (g_srv_ctx)
    {
        fprintf(stderr, "Error: mobject server has already been initialized\n");
        return -1;
    }

    g_srv_ctx = malloc(sizeof(*g_srv_ctx));
    if (!g_srv_ctx)
        return -1;
    memset(g_srv_ctx, 0, sizeof(*g_srv_ctx));
58
    g_srv_ctx->mid = mid;
59
60
61

    /* TODO bake-bulk */
    /* TODO sds-keyval */
62
63
64
65
66
# if 0
    kv_context *metadata;
    struct bake_pool_info *pool_info;
    pool_info = bake_server_makepool(poolname);
#endif
67

68
69
70
71
72
73
    ret = ssg_init(mid);
    if (ret != SSG_SUCCESS)
    {
        fprintf(stderr, "Error: Unable to initialize SSG\n");
        return -1;
    }
74

75
76
    /* server group create */
    g_srv_ctx->gid = ssg_group_create_mpi(MOBJECT_SERVER_GROUP_NAME, MPI_COMM_WORLD,
77
        NULL, NULL); /* XXX membership update callbacks unused currently */
78
79
80
81
82
83
    if (g_srv_ctx->gid == SSG_GROUP_ID_NULL)
    {
        fprintf(stderr, "Error: Unable to create the mobject server group\n");
        ssg_finalize();
        return -1;
    }
84
85
86
87
    my_id = ssg_get_group_self_id(g_srv_ctx->gid);

    /* register mobject & friends RPC handlers */
    mobject_server_register(g_srv_ctx);
88

89
90
    /* one proccess writes cluster connect info to file for clients to find later */
    if (my_id == 0)
91
    {
92
93
94
95
96
97
98
99
100
101
102
103
        ret = ssg_group_id_store(cluster_file, g_srv_ctx->gid);
        if (ret != 0)
        {
            fprintf(stderr, "Error: unable to store mobject cluster info to file %s\n",
                cluster_file);
            /* XXX: this call is performed by one process, and we do not currently
             * have an easy way to propagate this error to the entire cluster group
             */
            ssg_group_destroy(g_srv_ctx->gid);
            ssg_finalize();
            return -1;
        }
104
105
    }

106
107
108
109
110
111
    /* XXX cleanup? */

    return 0;
}

void mobject_server_shutdown(margo_instance_id mid)
112
{
113
114
115
116
117
118
119
    assert(g_srv_ctx);

    ssg_group_destroy(g_srv_ctx->gid);
    ssg_finalize();

    //margo_wait_for_finalize(mid);
    //pmemobj_close(NULL);
120

121
    return;
122
}
123

124
static int mobject_server_register(mobject_server_context_t *srv_ctx)
125
{
126
    int ret=0;
127

Matthieu Dorier's avatar
Matthieu Dorier committed
128
129
130
    margo_instance_id mid = srv_ctx->mid;

    mobject_shutdown_rpc_id = MARGO_REGISTER(mid, "mobject_shutdown",
131
        void, void, mobject_shutdown_ult);
132

Matthieu Dorier's avatar
Matthieu Dorier committed
133
134
135
136
137
138
    mobject_write_op_rpc_id = MARGO_REGISTER(mid, "mobject_write_op", 
	write_op_in_t, write_op_out_t, mobject_write_op_ult);

    mobject_read_op_rpc_id  = MARGO_REGISTER(mid, "mobject_read_op",
        read_op_in_t, read_op_out_t, mobject_read_op_ult)

139
140
141
142
#if 0
    bake_server_register(mid, pool_info);
    metadata = kv_server_register(mid);
#endif
143
144
145

    return ret;
}
146
147
148
149
150
151
152
153

static void mobject_shutdown_ult(hg_handle_t handle)
{

    margo_destroy(handle);
    return;
}
DEFINE_MARGO_RPC_HANDLER(mobject_shutdown_ult)
Matthieu Dorier's avatar
Matthieu Dorier committed
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225

static hg_return_t mobject_write_op_ult(hg_handle_t h)
{
    hg_return_t ret;

    write_op_in_t in;
    write_op_out_t out;

    margo_instance_id mid = margo_hg_handle_get_instance(h);

    /* Deserialize the input from the received handle. */
    ret = margo_get_input(h, &in);
    assert(ret == HG_SUCCESS);

    /* Execute the operation chain */
    execute_write_op(in.write_op, in.object_name);

    // set the return value of the RPC
    out.ret = 0;

    ret = margo_respond(h, &out);
    assert(ret == HG_SUCCESS);

    /* Free the input data. */
    ret = margo_free_input(h, &in);
    assert(ret == HG_SUCCESS);

    /* We are not going to use the handle anymore, so we should destroy it. */
    ret = margo_destroy(h);
 
    return ret;
}
DEFINE_MARGO_RPC_HANDLER(mobject_write_op_ult)

/* Implementation of the RPC. */
static hg_return_t mobject_read_op_ult(hg_handle_t h)
{
    hg_return_t ret;

    read_op_in_t in;
    read_op_out_t out;

    margo_instance_id mid = margo_hg_handle_get_instance(h);

    /* Deserialize the input from the received handle. */
    ret = margo_get_input(h, &in);
    assert(ret == HG_SUCCESS);

    /* Create a response list matching the input actions */
    read_response_t resp = build_matching_read_responses(in.read_op);

    /* Compute the result. */
    execute_read_op(in.read_op, in.object_name);

    out.responses = resp;

    ret = margo_respond(h, &out);
    assert(ret == HG_SUCCESS);

    free_read_responses(resp);

    /* Free the input data. */
    ret = margo_free_input(h, &in);
    assert(ret == HG_SUCCESS);

    /* We are not going to use the handle anymore, so we should destroy it. */
    ret = margo_destroy(h);
    assert(ret == HG_SUCCESS);

    return ret;
}
DEFINE_MARGO_RPC_HANDLER(mobject_read_op_ult)