cluster.c 10.5 KB
Newer Older
1 2 3 4 5 6
/*
 * (C) 2017 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

7
#include "mobject-store-config.h"
8 9

#include <stdio.h>
10
#include <stdlib.h>
11 12
#include <assert.h>

13 14 15
#include <margo.h>
#include <ssg.h>

16
#include "libmobject-store.h"
17 18 19
#include "src/client/cluster.h"
#include "src/io-chain/prepare-write-op.h"
#include "src/io-chain/prepare-read-op.h"
20 21
#include "src/rpc-types/write-op.h"
#include "src/rpc-types/read-op.h"
22
#include "src/util/log.h"
23

24 25 26 27 28 29

// global variables for RPC ids
hg_id_t mobject_write_op_rpc_id;
hg_id_t mobject_read_op_rpc_id;
hg_id_t mobject_shutdown_rpc_id;

30
static void mobject_store_register(margo_instance_id mid);
31
static int mobject_store_shutdown_servers(struct mobject_store_handle *cluster_handle);
32

33
int mobject_store_create(mobject_store_t *cluster, const char * const id)
34
{
35
    struct mobject_store_handle *cluster_handle;
36 37 38
    char *cluster_file;
    int ret;

39 40 41
    (void)id; /* XXX: id unused in mobject */

    /* allocate a new cluster handle and set some fields */
42
    cluster_handle = (struct mobject_store_handle*)calloc(1,sizeof(*cluster_handle));
43
    if (!cluster_handle)
44 45 46 47 48 49 50 51 52
        return -1;

    /* use env variable to determine how to connect to the cluster */
    /* NOTE: this is the _only_ method for specifying a cluster for now... */
    cluster_file = getenv(MOBJECT_CLUSTER_FILE_ENV);
    if (!cluster_file)
    {
        fprintf(stderr, "Error: %s env variable must point to mobject cluster file\n",
            MOBJECT_CLUSTER_FILE_ENV);
53
        free(cluster_handle);
54 55 56 57 58 59 60 61
        return -1;
    }

    ret = ssg_group_id_load(cluster_file, &cluster_handle->gid);
    if (ret != 0)
    {
        fprintf(stderr, "Error: Unable to load mobject cluster info from file %s\n",
            cluster_file);
62
        free(cluster_handle);
63 64
        return -1;
    }
Matthieu Dorier's avatar
Matthieu Dorier committed
65
    (*cluster)->my_address = NULL;
66 67 68

    /* set the returned cluster handle */
    *cluster = cluster_handle;
69

70
    return 0;
71 72 73 74
}

int mobject_store_connect(mobject_store_t cluster)
{
75
    struct mobject_store_handle *cluster_handle;
76
    char *svr_addr_str;
77 78 79 80
    char proto[24] = {0};
    int i;
    int ret;

81 82 83
    cluster_handle = (struct mobject_store_handle *)cluster;
    assert(cluster_handle);

84 85 86
    if (cluster_handle->connected)
        return 0;

Shane Snyder's avatar
Shane Snyder committed
87
    /* figure out protocol to connect with using address information 
88 89
     * associated with the SSG group ID
     */
90 91
    svr_addr_str = ssg_group_id_get_addr_str(cluster_handle->gid);
    if (!svr_addr_str)
92 93 94 95 96 97
    {
        fprintf(stderr, "Error: Unable to obtain cluster group server address\n");
        ssg_group_id_free(cluster_handle->gid);
        free(cluster_handle);
        return -1;
    }
98
    /* we only need to get the proto portion of the address to initialize */
99 100
    for(i=0; i<24 && svr_addr_str[i] != '\0' && svr_addr_str[i] != ':'; i++)
        proto[i] = svr_addr_str[i];
101 102

    /* intialize margo */
Matthieu Dorier's avatar
Matthieu Dorier committed
103
    fprintf(stderr,"Client initialized with proto = %s\n",proto);
104
    /* XXX: probably want to expose some way of tweaking threading parameters */
105
    cluster_handle->mid = margo_init(proto, MARGO_SERVER_MODE, 0, -1);
106 107 108
    if (cluster_handle->mid == MARGO_INSTANCE_NULL)
    {
        fprintf(stderr, "Error: Unable to initialize margo\n");
109
        free(svr_addr_str);
110 111 112 113 114
        ssg_group_id_free(cluster_handle->gid);
        free(cluster_handle);
        return -1;
    }

115 116 117
    /* register mobject RPCs for this cluster */
    mobject_store_register(cluster_handle->mid);

118
    /* initialize ssg */
119
    /* XXX: we need to think about how to do this once per-client... clients could connect to mult. clusters */
120 121 122 123 124
    ret = ssg_init(cluster_handle->mid);
    if (ret != SSG_SUCCESS)
    {
        fprintf(stderr, "Error: Unable to initialize SSG\n");
        margo_finalize(cluster_handle->mid);
125
        free(svr_addr_str);
126 127 128 129 130 131 132 133 134 135 136 137
        ssg_group_id_free(cluster_handle->gid);
        free(cluster_handle);
        return -1;
    }

    /* attach to the cluster group */
    ret = ssg_group_attach(cluster_handle->gid);
    if (ret != SSG_SUCCESS)
    {
        fprintf(stderr, "Error: Unable to attach to the mobject cluster group\n");
        ssg_finalize();
        margo_finalize(cluster_handle->mid);
138
        free(svr_addr_str);
139 140 141 142 143 144
        ssg_group_id_free(cluster_handle->gid);
        free(cluster_handle);
        return -1;
    }
    cluster_handle->connected = 1;

Matthieu Dorier's avatar
Matthieu Dorier committed
145 146 147 148 149 150 151 152 153 154 155
    /* get client's address */
    {
        hg_addr_t my_addr;
        margo_addr_self(cluster_handle->mid, &my_addr);
        hg_size_t addr_str_size;
        margo_addr_to_string(cluster_handle->mid, NULL, &addr_str_size, my_addr);
        cluster_handle->my_address = calloc(1,addr_str_size+1);
        margo_addr_to_string(cluster_handle->mid, (char*)(cluster_handle->my_address), &addr_str_size, my_addr);
        margo_addr_free(cluster_handle->mid, my_addr);
    }

156
    free(svr_addr_str);
157 158

    return 0;
159 160
}

161
void mobject_store_shutdown(mobject_store_t cluster)
162
{
163
    struct mobject_store_handle *cluster_handle;
164 165 166
    char *svr_kill_env_str;
    int ret;

167
    cluster_handle = (struct mobject_store_handle *)cluster;
168 169
    assert(cluster_handle != NULL);

170 171
    if (!cluster_handle->connected)
        return;
172

173
    svr_kill_env_str = getenv(MOBJECT_CLUSTER_SHUTDOWN_KILL_ENV);
174 175 176 177 178 179 180 181 182 183 184
    if (svr_kill_env_str && !strcmp(svr_kill_env_str, "true"))
    {
        /* kill server cluster if requested */
        ret = mobject_store_shutdown_servers(cluster_handle);
        if (ret != 0)
        {
            fprintf(stderr, "Warning: Unable to send shutdown signal \
                to mobject server cluster\n");
        }
    }

185 186 187 188
    ssg_group_detach(cluster_handle->gid);
    ssg_finalize();
    margo_finalize(cluster_handle->mid);
    ssg_group_id_free(cluster_handle->gid);
Matthieu Dorier's avatar
Matthieu Dorier committed
189
    free((char*)cluster_handle->my_address);
190
    free(cluster_handle);
191

192
    return;
193
}
194

195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210
int mobject_store_pool_create(mobject_store_t cluster, const char * pool_name)
{
    /* XXX: this is a NOOP -- we don't implement pools currently */
    (void)cluster;
    (void)pool_name;
    return 0;
}

int mobject_store_pool_delete(mobject_store_t cluster, const char * pool_name)
{
    /* XXX: this is a NOOP -- we don't implement pools currently */
    (void)cluster;
    (void)pool_name;
    return 0;
}

211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227
int mobject_store_ioctx_create(
    mobject_store_t cluster,
    const char * pool_name,
    mobject_store_ioctx_t *ioctx)
{
    *ioctx = (mobject_store_ioctx_t)calloc(1, sizeof(**ioctx));
    (*ioctx)->pool_name = strdup(pool_name);
    (*ioctx)->cluster   = cluster;
    return 0;
}

void mobject_store_ioctx_destroy(mobject_store_ioctx_t ioctx)
{
    if(ioctx) free(ioctx->pool_name);
    free(ioctx);
}

228 229 230 231
int mobject_store_write_op_operate(mobject_store_write_op_t write_op,
                                   mobject_store_ioctx_t io,
                                   const char *oid,
                                   time_t *mtime,
232
                                   int flags)
233
{
234 235
    hg_return_t ret;

236 237 238 239
    write_op_in_t in;
    in.object_name = oid;
    in.pool_name   = io->pool_name;
    in.write_op    = write_op;
Matthieu Dorier's avatar
Matthieu Dorier committed
240
    in.client_addr = io->cluster->my_address;
Matthieu Dorier's avatar
Matthieu Dorier committed
241
    // TODO take mtime into account
242

Matthieu Dorier's avatar
Matthieu Dorier committed
243 244 245
    prepare_write_op(io->cluster->mid, write_op);

    hg_addr_t svr_addr = ssg_get_addr(io->cluster->gid, 0); // XXX pick other servers using ch-placement
246
    MOBJECT_ASSERT(svr_addr != HG_ADDR_NULL, "NULL server address");
247 248

    hg_handle_t h;
249 250 251 252 253
    ret = margo_create(io->cluster->mid, svr_addr, mobject_write_op_rpc_id, &h);
    MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not create RPC handle");

    ret = margo_forward(h, &in);
    MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not forward RPC");
254 255

    write_op_out_t resp;
256
    ret = margo_get_output(h, &resp); 
257
    MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not get RPC output");
258

259
    ret = margo_free_output(h,&resp); 
260 261 262 263
    MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not free RPC output");

    ret = margo_destroy(h);
    MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not destroy RPC handle");
Rob Latham's avatar
Rob Latham committed
264
    return 0;
265 266 267 268 269 270
}

int mobject_store_read_op_operate(mobject_store_read_op_t read_op,
                                  mobject_store_ioctx_t ioctx,
                                  const char *oid,
                                  int flags)
271 272
{
    hg_return_t ret;
273

274 275 276 277
    read_op_in_t in; 
    in.object_name = oid;
    in.pool_name   = ioctx->pool_name;
    in.read_op     = read_op;
Matthieu Dorier's avatar
Matthieu Dorier committed
278
    in.client_addr = ioctx->cluster->my_address;
279

Matthieu Dorier's avatar
Matthieu Dorier committed
280
    prepare_read_op(ioctx->cluster->mid, read_op);
281

Matthieu Dorier's avatar
Matthieu Dorier committed
282
    hg_addr_t svr_addr = ssg_get_addr(ioctx->cluster->gid, 0); // XXX pick other servers using ch-placement
283
    MOBJECT_ASSERT(svr_addr != HG_ADDR_NULL, "NULL server address");
Matthieu Dorier's avatar
Matthieu Dorier committed
284

285
    hg_handle_t h;
286 287 288 289
    ret = margo_create(ioctx->cluster->mid, svr_addr, mobject_read_op_rpc_id, &h);
    MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not create RPC handle");
    ret = margo_forward(h, &in);
    MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not forward RPC");
290

291
    read_op_out_t resp; 
292
    ret = margo_get_output(h, &resp); 
293
    MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not get RPC output");
294

295
    feed_read_op_pointers_from_response(read_op, resp.responses);
296 297

    ret = margo_free_output(h,&resp); 
298 299 300
    MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not free RPC output");
    ret = margo_destroy(h);
    MOBJECT_ASSERT(ret == HG_SUCCESS, "Could not destroy RPC handle");
301

302 303
    return 0;
}
304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319

/* internal helper routines */

// register mobject RPCs
static void mobject_store_register(margo_instance_id mid)
{
    /* XXX i think ultimately these need to be stored in per-mid containers instead of global... */
    mobject_write_op_rpc_id = 
    MARGO_REGISTER(mid, "mobject_write_op", write_op_in_t, write_op_out_t, NULL);
    mobject_read_op_rpc_id = 
    MARGO_REGISTER(mid, "mobject_read_op",  read_op_in_t,  read_op_out_t, NULL);
    mobject_shutdown_rpc_id =
    MARGO_REGISTER(mid, "mobject_shutdown", void, void, NULL);
}

// send a shutdown signal to a server cluster
320
static int mobject_store_shutdown_servers(struct mobject_store_handle *cluster_handle)
321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354
{
    hg_addr_t svr_addr;
    hg_handle_t h;
    hg_return_t ret;

    /* get the address of the first server */
    svr_addr = ssg_get_addr(cluster_handle->gid, 0);
    if (svr_addr == HG_ADDR_NULL)
    {
        fprintf(stderr, "Error: Unable to obtain address for mobject server\n");
        return -1;
    }

    ret = margo_create(cluster_handle->mid, svr_addr, mobject_shutdown_rpc_id, &h);
    if (ret != HG_SUCCESS)
    {
        fprintf(stderr, "Error: Unable to create margo handle\n");
        return -1;
    }

    /* send shutdown signal */
    ret = margo_forward(h, NULL);
    if (ret != HG_SUCCESS)
    {
        fprintf(stderr, "Error: Unable to forward margo handle\n");
        margo_destroy(h);
        return -1;
    }

    margo_destroy(h);

    return 0;
}