cluster.c 9.41 KB
Newer Older
1 2 3 4 5 6
/*
 * (C) 2017 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

7
#include "mobject-store-config.h"
8 9

#include <stdio.h>
10
#include <stdlib.h>
11 12
#include <assert.h>

13 14 15
#include <margo.h>
#include <ssg.h>

16
#include "libmobject-store.h"
17 18 19
#include "src/client/cluster.h"
#include "src/io-chain/prepare-write-op.h"
#include "src/io-chain/prepare-read-op.h"
20 21
#include "src/rpc-types/write-op.h"
#include "src/rpc-types/read-op.h"
22
#include "src/util/log.h"
23

24 25 26 27 28 29

// global variables for RPC ids
hg_id_t mobject_write_op_rpc_id;
hg_id_t mobject_read_op_rpc_id;
hg_id_t mobject_shutdown_rpc_id;

30
static void mobject_store_register(margo_instance_id mid);
31
static int mobject_store_shutdown_servers(struct mobject_store_handle *cluster_handle);
32

33
int mobject_store_create(mobject_store_t *cluster, const char * const id)
34
{
35
    struct mobject_store_handle *cluster_handle;
36 37 38
    char *cluster_file;
    int ret;

39 40 41
    (void)id; /* XXX: id unused in mobject */

    /* allocate a new cluster handle and set some fields */
42
    cluster_handle = (struct mobject_store_handle*)calloc(1,sizeof(*cluster_handle));
43
    if (!cluster_handle)
44 45 46 47 48 49 50 51 52
        return -1;

    /* use env variable to determine how to connect to the cluster */
    /* NOTE: this is the _only_ method for specifying a cluster for now... */
    cluster_file = getenv(MOBJECT_CLUSTER_FILE_ENV);
    if (!cluster_file)
    {
        fprintf(stderr, "Error: %s env variable must point to mobject cluster file\n",
            MOBJECT_CLUSTER_FILE_ENV);
53
        free(cluster_handle);
54 55 56 57 58 59 60 61
        return -1;
    }

    ret = ssg_group_id_load(cluster_file, &cluster_handle->gid);
    if (ret != 0)
    {
        fprintf(stderr, "Error: Unable to load mobject cluster info from file %s\n",
            cluster_file);
62
        free(cluster_handle);
63 64
        return -1;
    }
65 66 67

    /* set the returned cluster handle */
    *cluster = cluster_handle;
68

69
    return 0;
70 71 72 73
}

int mobject_store_connect(mobject_store_t cluster)
{
74
    struct mobject_store_handle *cluster_handle;
75
    char *svr_addr_str;
76 77 78 79
    char proto[24] = {0};
    int i;
    int ret;

80 81 82
    cluster_handle = (struct mobject_store_handle *)cluster;
    assert(cluster_handle);

83 84 85
    if (cluster_handle->connected)
        return 0;

Shane Snyder's avatar
Shane Snyder committed
86
    /* figure out protocol to connect with using address information 
87 88
     * associated with the SSG group ID
     */
89 90
    svr_addr_str = ssg_group_id_get_addr_str(cluster_handle->gid);
    if (!svr_addr_str)
91 92 93 94 95 96
    {
        fprintf(stderr, "Error: Unable to obtain cluster group server address\n");
        ssg_group_id_free(cluster_handle->gid);
        free(cluster_handle);
        return -1;
    }
97
    /* we only need to get the proto portion of the address to initialize */
98 99
    for(i=0; i<24 && svr_addr_str[i] != '\0' && svr_addr_str[i] != ':'; i++)
        proto[i] = svr_addr_str[i];
100 101 102 103 104 105 106

    /* intialize margo */
    /* XXX: probably want to expose some way of tweaking threading parameters */
    cluster_handle->mid = margo_init(proto, MARGO_CLIENT_MODE, 0, -1);
    if (cluster_handle->mid == MARGO_INSTANCE_NULL)
    {
        fprintf(stderr, "Error: Unable to initialize margo\n");
107
        free(svr_addr_str);
108 109 110 111 112
        ssg_group_id_free(cluster_handle->gid);
        free(cluster_handle);
        return -1;
    }

113 114 115
    /* register mobject RPCs for this cluster */
    mobject_store_register(cluster_handle->mid);

116
    /* initialize ssg */
117
    /* XXX: we need to think about how to do this once per-client... clients could connect to mult. clusters */
118 119 120 121 122
    ret = ssg_init(cluster_handle->mid);
    if (ret != SSG_SUCCESS)
    {
        fprintf(stderr, "Error: Unable to initialize SSG\n");
        margo_finalize(cluster_handle->mid);
123
        free(svr_addr_str);
124 125 126 127 128 129 130 131 132 133 134 135
        ssg_group_id_free(cluster_handle->gid);
        free(cluster_handle);
        return -1;
    }

    /* attach to the cluster group */
    ret = ssg_group_attach(cluster_handle->gid);
    if (ret != SSG_SUCCESS)
    {
        fprintf(stderr, "Error: Unable to attach to the mobject cluster group\n");
        ssg_finalize();
        margo_finalize(cluster_handle->mid);
136
        free(svr_addr_str);
137 138 139 140 141 142
        ssg_group_id_free(cluster_handle->gid);
        free(cluster_handle);
        return -1;
    }
    cluster_handle->connected = 1;

143
    free(svr_addr_str);
144 145

    return 0;
146 147
}

148
void mobject_store_shutdown(mobject_store_t cluster)
149
{
150
    struct mobject_store_handle *cluster_handle;
151 152 153
    char *svr_kill_env_str;
    int ret;

154
    cluster_handle = (struct mobject_store_handle *)cluster;
155 156
    assert(cluster_handle != NULL);

157 158
    if (!cluster_handle->connected)
        return;
159

160 161 162 163 164 165 166 167 168 169 170 171
    svr_kill_env_str = getenv("MOBJECT_SHUTDOWN_KILL_SERVERS");
    if (svr_kill_env_str && !strcmp(svr_kill_env_str, "true"))
    {
        /* kill server cluster if requested */
        ret = mobject_store_shutdown_servers(cluster_handle);
        if (ret != 0)
        {
            fprintf(stderr, "Warning: Unable to send shutdown signal \
                to mobject server cluster\n");
        }
    }

172 173 174 175
    ssg_group_detach(cluster_handle->gid);
    ssg_finalize();
    margo_finalize(cluster_handle->mid);
    ssg_group_id_free(cluster_handle->gid);
176
    free(cluster_handle);
177

178
    return;
179
}
180

181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
int mobject_store_ioctx_create(
    mobject_store_t cluster,
    const char * pool_name,
    mobject_store_ioctx_t *ioctx)
{
    *ioctx = (mobject_store_ioctx_t)calloc(1, sizeof(**ioctx));
    (*ioctx)->pool_name = strdup(pool_name);
    (*ioctx)->cluster   = cluster;
    return 0;
}

void mobject_store_ioctx_destroy(mobject_store_ioctx_t ioctx)
{
    if(ioctx) free(ioctx->pool_name);
    free(ioctx);
}

198 199 200 201
int mobject_store_write_op_operate(mobject_store_write_op_t write_op,
                                   mobject_store_ioctx_t io,
                                   const char *oid,
                                   time_t *mtime,
202
                                   int flags)
203
{
204 205
    hg_return_t ret;

206 207 208 209
    write_op_in_t in;
    in.object_name = oid;
    in.pool_name   = io->pool_name;
    in.write_op    = write_op;
Matthieu Dorier's avatar
Matthieu Dorier committed
210
    // TODO take mtime into account
211

Matthieu Dorier's avatar
Matthieu Dorier committed
212 213 214
    prepare_write_op(io->cluster->mid, write_op);

    hg_addr_t svr_addr = ssg_get_addr(io->cluster->gid, 0); // XXX pick other servers using ch-placement
215
    MOBJECT_ASSERT(svr_addr != HG_ADDR_NULL, "NULL server address");
216 217

    hg_handle_t h;
218 219 220 221 222
    ret = margo_create(io->cluster->mid, svr_addr, mobject_write_op_rpc_id, &h);
    MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not create RPC handle");

    ret = margo_forward(h, &in);
    MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not forward RPC");
223 224

    write_op_out_t resp;
225
    ret = margo_get_output(h, &resp); 
226
    MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not get RPC output");
227

228
    ret = margo_free_output(h,&resp); 
229 230 231 232
    MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not free RPC output");

    ret = margo_destroy(h);
    MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not destroy RPC handle");
Rob Latham's avatar
Rob Latham committed
233
    return 0;
234 235 236 237 238 239
}

int mobject_store_read_op_operate(mobject_store_read_op_t read_op,
                                  mobject_store_ioctx_t ioctx,
                                  const char *oid,
                                  int flags)
240 241
{
    hg_return_t ret;
242

243 244 245 246
    read_op_in_t in; 
    in.object_name = oid;
    in.pool_name   = ioctx->pool_name;
    in.read_op     = read_op;
247

Matthieu Dorier's avatar
Matthieu Dorier committed
248
    prepare_read_op(ioctx->cluster->mid, read_op);
249

Matthieu Dorier's avatar
Matthieu Dorier committed
250
    hg_addr_t svr_addr = ssg_get_addr(ioctx->cluster->gid, 0); // XXX pick other servers using ch-placement
251
    MOBJECT_ASSERT(svr_addr != HG_ADDR_NULL, "NULL server address");
Matthieu Dorier's avatar
Matthieu Dorier committed
252

253
    hg_handle_t h;
254 255 256 257
    ret = margo_create(ioctx->cluster->mid, svr_addr, mobject_read_op_rpc_id, &h);
    MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not create RPC handle");
    ret = margo_forward(h, &in);
    MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not forward RPC");
258

259
    read_op_out_t resp; 
260
    ret = margo_get_output(h, &resp); 
261
    MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not get RPC output");
262

263
    feed_read_op_pointers_from_response(read_op, resp.responses);
264 265

    ret = margo_free_output(h,&resp); 
266 267 268
    MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not free RPC output");
    ret = margo_destroy(h);
    MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not destroy RPC handle");
269

270 271
    return 0;
}
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287

/* internal helper routines */

// register mobject RPCs
static void mobject_store_register(margo_instance_id mid)
{
    /* XXX i think ultimately these need to be stored in per-mid containers instead of global... */
    mobject_write_op_rpc_id = 
    MARGO_REGISTER(mid, "mobject_write_op", write_op_in_t, write_op_out_t, NULL);
    mobject_read_op_rpc_id = 
    MARGO_REGISTER(mid, "mobject_read_op",  read_op_in_t,  read_op_out_t, NULL);
    mobject_shutdown_rpc_id =
    MARGO_REGISTER(mid, "mobject_shutdown", void, void, NULL);
}

// send a shutdown signal to a server cluster
288
static int mobject_store_shutdown_servers(struct mobject_store_handle *cluster_handle)
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322
{
    hg_addr_t svr_addr;
    hg_handle_t h;
    hg_return_t ret;

    /* get the address of the first server */
    svr_addr = ssg_get_addr(cluster_handle->gid, 0);
    if (svr_addr == HG_ADDR_NULL)
    {
        fprintf(stderr, "Error: Unable to obtain address for mobject server\n");
        return -1;
    }

    ret = margo_create(cluster_handle->mid, svr_addr, mobject_shutdown_rpc_id, &h);
    if (ret != HG_SUCCESS)
    {
        fprintf(stderr, "Error: Unable to create margo handle\n");
        return -1;
    }

    /* send shutdown signal */
    ret = margo_forward(h, NULL);
    if (ret != HG_SUCCESS)
    {
        fprintf(stderr, "Error: Unable to forward margo handle\n");
        margo_destroy(h);
        return -1;
    }

    margo_destroy(h);

    return 0;
}