cluster.c 11.8 KB
Newer Older
1 2 3 4 5 6
/*
 * (C) 2017 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

7
#include "mobject-store-config.h"
8 9

#include <stdio.h>
10
#include <stdlib.h>
11 12
#include <assert.h>

13 14 15
#include <margo.h>
#include <ssg.h>

16
#include "libmobject-store.h"
17 18 19
#include "src/client/cluster.h"
#include "src/io-chain/prepare-write-op.h"
#include "src/io-chain/prepare-read-op.h"
20 21
#include "src/rpc-types/write-op.h"
#include "src/rpc-types/read-op.h"
22
#include "src/util/log.h"
23

24
static int mobject_store_shutdown_servers(struct mobject_store_handle *cluster_handle);
25

26
int mobject_store_create(mobject_store_t *cluster, const char * const id)
27
{
28
    struct mobject_store_handle *cluster_handle;
29 30 31
    char *cluster_file;
    int ret;

32 33 34
    (void)id; /* XXX: id unused in mobject */

    /* allocate a new cluster handle and set some fields */
35
    cluster_handle = (struct mobject_store_handle*)calloc(1,sizeof(*cluster_handle));
36
    if (!cluster_handle)
37 38 39 40 41 42 43 44 45
        return -1;

    /* use env variable to determine how to connect to the cluster */
    /* NOTE: this is the _only_ method for specifying a cluster for now... */
    cluster_file = getenv(MOBJECT_CLUSTER_FILE_ENV);
    if (!cluster_file)
    {
        fprintf(stderr, "Error: %s env variable must point to mobject cluster file\n",
            MOBJECT_CLUSTER_FILE_ENV);
46
        free(cluster_handle);
47 48 49 50 51 52 53 54
        return -1;
    }

    ret = ssg_group_id_load(cluster_file, &cluster_handle->gid);
    if (ret != 0)
    {
        fprintf(stderr, "Error: Unable to load mobject cluster info from file %s\n",
            cluster_file);
55
        free(cluster_handle);
56 57
        return -1;
    }
58

59 60 61

    /* set the returned cluster handle */
    *cluster = cluster_handle;
62

63
    return 0;
64 65 66 67
}

int mobject_store_connect(mobject_store_t cluster)
{
68
    struct mobject_store_handle *cluster_handle;
69
    char *svr_addr_str;
70 71 72 73
    char proto[24] = {0};
    int i;
    int ret;

74 75 76
    cluster_handle = (struct mobject_store_handle *)cluster;
    assert(cluster_handle);

77 78 79
    if (cluster_handle->connected)
        return 0;

Shane Snyder's avatar
Shane Snyder committed
80
    /* figure out protocol to connect with using address information 
81 82
     * associated with the SSG group ID
     */
83 84
    svr_addr_str = ssg_group_id_get_addr_str(cluster_handle->gid);
    if (!svr_addr_str)
85 86 87 88 89 90
    {
        fprintf(stderr, "Error: Unable to obtain cluster group server address\n");
        ssg_group_id_free(cluster_handle->gid);
        free(cluster_handle);
        return -1;
    }
91
    /* we only need to get the proto portion of the address to initialize */
92 93
    for(i=0; i<24 && svr_addr_str[i] != '\0' && svr_addr_str[i] != ':'; i++)
        proto[i] = svr_addr_str[i];
94 95

    /* intialize margo */
Matthieu Dorier's avatar
Matthieu Dorier committed
96
    fprintf(stderr,"Client initialized with proto = %s\n",proto);
97
    /* XXX: probably want to expose some way of tweaking threading parameters */
98 99
    margo_instance_id mid = margo_init(proto, MARGO_SERVER_MODE, 0, -1);
    if (mid == MARGO_INSTANCE_NULL)
100 101
    {
        fprintf(stderr, "Error: Unable to initialize margo\n");
102
        free(svr_addr_str);
103 104 105 106
        ssg_group_id_free(cluster_handle->gid);
        free(cluster_handle);
        return -1;
    }
107
    cluster_handle->mid = mid;
108

109
    /* initialize ssg */
110
    /* XXX: we need to think about how to do this once per-client... clients could connect to mult. clusters */
111 112 113 114 115
    ret = ssg_init(cluster_handle->mid);
    if (ret != SSG_SUCCESS)
    {
        fprintf(stderr, "Error: Unable to initialize SSG\n");
        margo_finalize(cluster_handle->mid);
116
        free(svr_addr_str);
117 118 119 120 121 122 123 124 125 126 127 128
        ssg_group_id_free(cluster_handle->gid);
        free(cluster_handle);
        return -1;
    }

    /* attach to the cluster group */
    ret = ssg_group_attach(cluster_handle->gid);
    if (ret != SSG_SUCCESS)
    {
        fprintf(stderr, "Error: Unable to attach to the mobject cluster group\n");
        ssg_finalize();
        margo_finalize(cluster_handle->mid);
129
        free(svr_addr_str);
130 131 132 133 134 135
        ssg_group_id_free(cluster_handle->gid);
        free(cluster_handle);
        return -1;
    }
    cluster_handle->connected = 1;

136 137
    ret = mobject_client_init(mid, &(cluster_handle->mobject_clt));
    if(ret != 0)
Matthieu Dorier's avatar
Matthieu Dorier committed
138
    {
139 140 141
        fprintf(stderr, "Error: Unable to create a mobject client\n");
        free(cluster_handle);
        return -1;
Matthieu Dorier's avatar
Matthieu Dorier committed
142 143
    }

144
    free(svr_addr_str);
145 146

    return 0;
147 148
}

149
void mobject_store_shutdown(mobject_store_t cluster)
150
{
151
    struct mobject_store_handle *cluster_handle;
152 153 154
    char *svr_kill_env_str;
    int ret;

155
    cluster_handle = (struct mobject_store_handle *)cluster;
156 157
    assert(cluster_handle != NULL);

158 159
    if (!cluster_handle->connected)
        return;
160

161
    svr_kill_env_str = getenv(MOBJECT_CLUSTER_SHUTDOWN_KILL_ENV);
162 163 164 165 166 167 168 169 170 171 172
    if (svr_kill_env_str && !strcmp(svr_kill_env_str, "true"))
    {
        /* kill server cluster if requested */
        ret = mobject_store_shutdown_servers(cluster_handle);
        if (ret != 0)
        {
            fprintf(stderr, "Warning: Unable to send shutdown signal \
                to mobject server cluster\n");
        }
    }

173
    mobject_client_finalize(cluster_handle->mobject_clt);
174 175 176 177
    ssg_group_detach(cluster_handle->gid);
    ssg_finalize();
    margo_finalize(cluster_handle->mid);
    ssg_group_id_free(cluster_handle->gid);
178
    free(cluster_handle);
179

180
    return;
181
}
182

183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
int mobject_store_pool_create(mobject_store_t cluster, const char * pool_name)
{
    /* XXX: this is a NOOP -- we don't implement pools currently */
    (void)cluster;
    (void)pool_name;
    return 0;
}

int mobject_store_pool_delete(mobject_store_t cluster, const char * pool_name)
{
    /* XXX: this is a NOOP -- we don't implement pools currently */
    (void)cluster;
    (void)pool_name;
    return 0;
}

199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
int mobject_store_ioctx_create(
    mobject_store_t cluster,
    const char * pool_name,
    mobject_store_ioctx_t *ioctx)
{
    *ioctx = (mobject_store_ioctx_t)calloc(1, sizeof(**ioctx));
    (*ioctx)->pool_name = strdup(pool_name);
    (*ioctx)->cluster   = cluster;
    return 0;
}

void mobject_store_ioctx_destroy(mobject_store_ioctx_t ioctx)
{
    if(ioctx) free(ioctx->pool_name);
    free(ioctx);
}

216
mobject_store_write_op_t mobject_store_create_write_op(void)
217
{
218 219
    return mobject_create_write_op();
}
220

221 222 223 224
void mobject_store_release_write_op(mobject_store_write_op_t write_op)
{
    mobject_release_write_op(write_op);
}
225

226 227 228 229 230 231
void mobject_store_write_op_create(mobject_store_write_op_t write_op,
        int exclusive,
        const char* category)
{
    mobject_write_op_create(write_op, exclusive, category);
}
232

233 234 235 236 237 238 239 240
void mobject_store_write_op_write(mobject_store_write_op_t write_op,
        const char *buffer,
        size_t len,
        uint64_t offset)
{
    // fields are exchanged in the mobject-client API, it's normal
    mobject_write_op_write(write_op, buffer, offset, len);
}
241

242 243 244 245 246 247
void mobject_store_write_op_write_full(mobject_store_write_op_t write_op,
        const char *buffer,
        size_t len)
{
    return mobject_write_op_write_full(write_op, buffer, len);
}
248

249 250 251 252 253 254 255 256 257
void mobject_store_write_op_writesame(mobject_store_write_op_t write_op,
        const char *buffer,
        size_t data_len,
        size_t write_len,
        uint64_t offset)
{
    // fields are exchanged in the mobject-client API, it's normal
    mobject_write_op_write_same(write_op, buffer, offset, data_len, write_len);
}
258

259 260 261 262 263 264
void mobject_store_write_op_append(mobject_store_write_op_t write_op,
        const char *buffer,
        size_t len)
{
    mobject_write_op_append(write_op, buffer, len);
}
265

266 267 268 269
void mobject_store_write_op_remove(mobject_store_write_op_t write_op)
{
    mobject_write_op_remove(write_op);
}
270

271 272 273 274
void mobject_store_write_op_truncate(mobject_store_write_op_t write_op,
        uint64_t offset)
{
    mobject_write_op_truncate(write_op, offset);
275 276
}

277 278 279
void mobject_store_write_op_zero(mobject_store_write_op_t write_op,
        uint64_t offset,
        uint64_t len)
280
{
281 282
    mobject_write_op_zero(write_op, offset, len);
}
283

284 285 286 287 288 289 290 291
void mobject_store_write_op_omap_set(mobject_store_write_op_t write_op,
        char const* const* keys,
        char const* const* vals,
        const size_t *lens,
        size_t num)
{
    mobject_write_op_omap_set(write_op, keys, vals, lens, num);
}
292

293 294 295 296 297 298
void mobject_store_write_op_omap_rm_keys(mobject_store_write_op_t write_op,
        char const* const* keys,
        size_t keys_len)
{
    mobject_write_op_omap_rm_keys(write_op, keys, keys_len);
}
299

300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317
int mobject_store_write_op_operate(mobject_store_write_op_t write_op,
                                   mobject_store_ioctx_t io,
                                   const char *oid,
                                   time_t *mtime,
                                   int flags)
{
    mobject_provider_handle_t mph;
    // TODO chose the target server based on ch-placement
    // remember that multiple providers may be in the same node (with distinct mplex ids)
    hg_addr_t svr_addr = ssg_get_addr(io->cluster->gid, 0);
    // TODO for now multiplex id is hard-coded as 1
    int r = mobject_provider_handle_create(io->cluster->mobject_clt, svr_addr, 1, &mph);
    if(r != 0) return r;

    r = mobject_write_op_operate(mph, write_op, io->pool_name, oid, mtime, flags);
    mobject_provider_handle_release(mph);
    return r;
}
318

319 320 321 322
mobject_store_read_op_t mobject_store_create_read_op(void)
{
    return mobject_create_read_op();
}
323

324 325 326 327
void mobject_store_release_read_op(mobject_store_read_op_t read_op)
{
    mobject_release_read_op(read_op);
}
328

329 330 331 332 333 334 335
void mobject_store_read_op_stat(mobject_store_read_op_t read_op,
        uint64_t *psize,
        time_t *pmtime,
        int *prval)
{
    mobject_read_op_stat(read_op, psize, pmtime, prval);
}
336

337 338 339 340 341 342 343 344 345
void mobject_store_read_op_read(mobject_store_read_op_t read_op,
        uint64_t offset,
        size_t len,
        char *buffer,
        size_t *bytes_read,
        int *prval)
{
    mobject_read_op_read(read_op, buffer, offset, len, bytes_read, prval);
}
346

347 348 349 350 351 352 353
void mobject_store_read_op_omap_get_keys(mobject_store_read_op_t read_op,
        const char *start_after,
        uint64_t max_return,
        mobject_store_omap_iter_t *iter,
        int *prval)
{
    mobject_read_op_omap_get_keys(read_op, start_after, max_return, iter, prval);
354
}
355

356 357 358 359 360 361 362 363 364
void mobject_store_read_op_omap_get_vals(mobject_store_read_op_t read_op,
        const char *start_after,
        const char *filter_prefix,
        uint64_t max_return,
        mobject_store_omap_iter_t *iter,
        int *prval)
{
    mobject_read_op_omap_get_vals(read_op, start_after, filter_prefix, max_return, iter, prval);
}
365

366 367 368 369 370
void mobject_store_read_op_omap_get_vals_by_keys(mobject_store_read_op_t read_op,
        char const* const* keys,
        size_t keys_len,
        mobject_store_omap_iter_t *iter,
        int *prval)
371
{
372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389
    mobject_read_op_omap_get_vals_by_keys(read_op, keys, keys_len, iter, prval);
}

int mobject_store_read_op_operate(mobject_store_read_op_t read_op,
                                  mobject_store_ioctx_t ioctx,
                                  const char *oid,
                                  int flags)
{
    mobject_provider_handle_t mph;
    // TODO chose the target server based on ch-placement
    // remember that multiple providers may be in the same node (with distinct mplex ids)
    hg_addr_t svr_addr = ssg_get_addr(ioctx->cluster->gid, 0);
    // TODO for now multiplex id is hard-coded as 1
    int r = mobject_provider_handle_create(ioctx->cluster->mobject_clt, svr_addr, 1, &mph);
    if(r != 0) return r;
    
    r = mobject_read_op_operate(mph,read_op, ioctx->pool_name, oid, flags);
    return 0;
390 391 392
}

// send a shutdown signal to a server cluster
393
static int mobject_store_shutdown_servers(struct mobject_store_handle *cluster_handle)
394 395 396 397 398 399 400 401 402 403
{
    hg_addr_t svr_addr;

    /* get the address of the first server */
    svr_addr = ssg_get_addr(cluster_handle->gid, 0);
    if (svr_addr == HG_ADDR_NULL)
    {
        fprintf(stderr, "Error: Unable to obtain address for mobject server\n");
        return -1;
    }
404 405
    // TODO we should actually call that for all the members of the group
    return mobject_shutdown(cluster_handle->mobject_clt, svr_addr);
406 407
}