cluster.c 13.2 KB
Newer Older
1 2 3 4 5 6
/*
 * (C) 2017 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

7
#include "mobject-store-config.h"
8 9

#include <stdio.h>
Shane Snyder's avatar
Shane Snyder committed
10
#include <stdlib.h>
11 12
#include <assert.h>

13 14 15
#include <margo.h>
#include <ssg.h>

16
#include "libmobject-store.h"
17 18 19
#include "src/client/cluster.h"
#include "src/io-chain/prepare-write-op.h"
#include "src/io-chain/prepare-read-op.h"
Matthieu Dorier's avatar
Matthieu Dorier committed
20 21
#include "src/rpc-types/write-op.h"
#include "src/rpc-types/read-op.h"
22
#include "src/util/log.h"
Shane Snyder's avatar
Shane Snyder committed
23

24 25
static unsigned long sdbm_hash(const char* str);

26
static int mobject_store_shutdown_servers(struct mobject_store_handle *cluster_handle);
27

Shane Snyder's avatar
Shane Snyder committed
28
int mobject_store_create(mobject_store_t *cluster, const char * const id)
29
{
30
    struct mobject_store_handle *cluster_handle;
Shane Snyder's avatar
Shane Snyder committed
31 32 33
    char *cluster_file;
    int ret;

34 35 36
    (void)id; /* XXX: id unused in mobject */

    /* allocate a new cluster handle and set some fields */
37
    cluster_handle = (struct mobject_store_handle*)calloc(1,sizeof(*cluster_handle));
38
    if (!cluster_handle)
Shane Snyder's avatar
Shane Snyder committed
39 40 41 42 43 44 45 46
        return -1;

    /* use env variable to determine how to connect to the cluster */
    /* NOTE: this is the _only_ method for specifying a cluster for now... */
    cluster_file = getenv(MOBJECT_CLUSTER_FILE_ENV);
    if (!cluster_file)
    {
        fprintf(stderr, "Error: %s env variable must point to mobject cluster file\n",
47
                MOBJECT_CLUSTER_FILE_ENV);
48
        free(cluster_handle);
Shane Snyder's avatar
Shane Snyder committed
49 50 51 52 53 54 55
        return -1;
    }

    ret = ssg_group_id_load(cluster_file, &cluster_handle->gid);
    if (ret != 0)
    {
        fprintf(stderr, "Error: Unable to load mobject cluster info from file %s\n",
56
                cluster_file);
57
        free(cluster_handle);
Shane Snyder's avatar
Shane Snyder committed
58 59
        return -1;
    }
60

61 62
    /* set the returned cluster handle */
    *cluster = cluster_handle;
63

Shane Snyder's avatar
Shane Snyder committed
64
    return 0;
65 66 67 68
}

int mobject_store_connect(mobject_store_t cluster)
{
69
    struct mobject_store_handle *cluster_handle;
70
    char *svr_addr_str;
71 72 73 74
    char proto[24] = {0};
    int i;
    int ret;

75 76 77
    cluster_handle = (struct mobject_store_handle *)cluster;
    assert(cluster_handle);

78 79 80
    if (cluster_handle->connected)
        return 0;

Shane Snyder's avatar
Shane Snyder committed
81
    /* figure out protocol to connect with using address information 
82 83
     * associated with the SSG group ID
     */
84 85
    svr_addr_str = ssg_group_id_get_addr_str(cluster_handle->gid);
    if (!svr_addr_str)
86 87 88 89 90 91
    {
        fprintf(stderr, "Error: Unable to obtain cluster group server address\n");
        ssg_group_id_free(cluster_handle->gid);
        free(cluster_handle);
        return -1;
    }
92
    /* we only need to get the proto portion of the address to initialize */
93 94
    for(i=0; i<24 && svr_addr_str[i] != '\0' && svr_addr_str[i] != ':'; i++)
        proto[i] = svr_addr_str[i];
95 96

    /* intialize margo */
Matthieu Dorier's avatar
Matthieu Dorier committed
97
    fprintf(stderr,"Client initialized with proto = %s\n",proto);
98
    /* XXX: probably want to expose some way of tweaking threading parameters */
99 100
    margo_instance_id mid = margo_init(proto, MARGO_SERVER_MODE, 0, -1);
    if (mid == MARGO_INSTANCE_NULL)
101 102
    {
        fprintf(stderr, "Error: Unable to initialize margo\n");
103
        free(svr_addr_str);
104 105 106 107
        ssg_group_id_free(cluster_handle->gid);
        free(cluster_handle);
        return -1;
    }
108
    cluster_handle->mid = mid;
109

110
    /* initialize ssg */
111
    /* XXX: we need to think about how to do this once per-client... clients could connect to mult. clusters */
112 113 114 115 116
    ret = ssg_init(cluster_handle->mid);
    if (ret != SSG_SUCCESS)
    {
        fprintf(stderr, "Error: Unable to initialize SSG\n");
        margo_finalize(cluster_handle->mid);
117
        free(svr_addr_str);
118 119 120 121 122 123 124 125 126 127 128 129
        ssg_group_id_free(cluster_handle->gid);
        free(cluster_handle);
        return -1;
    }

    /* attach to the cluster group */
    ret = ssg_group_attach(cluster_handle->gid);
    if (ret != SSG_SUCCESS)
    {
        fprintf(stderr, "Error: Unable to attach to the mobject cluster group\n");
        ssg_finalize();
        margo_finalize(cluster_handle->mid);
130
        free(svr_addr_str);
131 132 133 134 135 136
        ssg_group_id_free(cluster_handle->gid);
        free(cluster_handle);
        return -1;
    }
    cluster_handle->connected = 1;

137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
    // get number of servers
    int gsize = ssg_get_group_size(cluster_handle->gid);
    if(gsize == 0)
    {
        fprintf(stderr, "Error: Unable to get SSG group size\n");
        ssg_finalize();
        margo_finalize(cluster_handle->mid);
        free(svr_addr_str);
        ssg_group_id_free(cluster_handle->gid);
        free(cluster_handle);
        return -1;
    }

    // initialize ch-placement
    cluster_handle->ch_instance = 
        ch_placement_initialize("hash_lookup3", gsize, 4, 0);
    if(!cluster_handle->ch_instance)
    {
        fprintf(stderr, "Error: Unable to initialize ch-placement instance\n");
        ssg_finalize();
        margo_finalize(cluster_handle->mid);
        free(svr_addr_str);
        ssg_group_id_free(cluster_handle->gid);
        free(cluster_handle);
        return -1;
    }

    // initialize mobject client
165 166
    ret = mobject_client_init(mid, &(cluster_handle->mobject_clt));
    if(ret != 0)
Matthieu Dorier's avatar
Matthieu Dorier committed
167
    {
168 169 170
        fprintf(stderr, "Error: Unable to create a mobject client\n");
        free(cluster_handle);
        return -1;
Matthieu Dorier's avatar
Matthieu Dorier committed
171 172
    }

173
    free(svr_addr_str);
Shane Snyder's avatar
Shane Snyder committed
174 175

    return 0;
176 177
}

Shane Snyder's avatar
Shane Snyder committed
178
void mobject_store_shutdown(mobject_store_t cluster)
179
{
180
    struct mobject_store_handle *cluster_handle;
181 182 183
    char *svr_kill_env_str;
    int ret;

184
    cluster_handle = (struct mobject_store_handle *)cluster;
185 186
    assert(cluster_handle != NULL);

187 188
    if (!cluster_handle->connected)
        return;
189

190
    svr_kill_env_str = getenv(MOBJECT_CLUSTER_SHUTDOWN_KILL_ENV);
191 192 193 194 195 196 197
    if (svr_kill_env_str && !strcmp(svr_kill_env_str, "true"))
    {
        /* kill server cluster if requested */
        ret = mobject_store_shutdown_servers(cluster_handle);
        if (ret != 0)
        {
            fprintf(stderr, "Warning: Unable to send shutdown signal \
198
                    to mobject server cluster\n");
199 200 201
        }
    }

202
    mobject_client_finalize(cluster_handle->mobject_clt);
203 204 205 206
    ssg_group_detach(cluster_handle->gid);
    ssg_finalize();
    margo_finalize(cluster_handle->mid);
    ssg_group_id_free(cluster_handle->gid);
207
    ch_placement_finalize(cluster_handle->ch_instance);
208
    free(cluster_handle);
209

210
    return;
211
}
Matthieu Dorier's avatar
Matthieu Dorier committed
212

213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
int mobject_store_pool_create(mobject_store_t cluster, const char * pool_name)
{
    /* XXX: this is a NOOP -- we don't implement pools currently */
    (void)cluster;
    (void)pool_name;
    return 0;
}

int mobject_store_pool_delete(mobject_store_t cluster, const char * pool_name)
{
    /* XXX: this is a NOOP -- we don't implement pools currently */
    (void)cluster;
    (void)pool_name;
    return 0;
}

229
int mobject_store_ioctx_create(
230 231 232
        mobject_store_t cluster,
        const char * pool_name,
        mobject_store_ioctx_t *ioctx)
233 234 235 236 237 238 239 240 241 242 243 244 245
{
    *ioctx = (mobject_store_ioctx_t)calloc(1, sizeof(**ioctx));
    (*ioctx)->pool_name = strdup(pool_name);
    (*ioctx)->cluster   = cluster;
    return 0;
}

void mobject_store_ioctx_destroy(mobject_store_ioctx_t ioctx)
{
    if(ioctx) free(ioctx->pool_name);
    free(ioctx);
}

246
mobject_store_write_op_t mobject_store_create_write_op(void)
Matthieu Dorier's avatar
Matthieu Dorier committed
247
{
248 249
    return mobject_create_write_op();
}
250

251 252 253 254
void mobject_store_release_write_op(mobject_store_write_op_t write_op)
{
    mobject_release_write_op(write_op);
}
Matthieu Dorier's avatar
Matthieu Dorier committed
255

256 257 258 259 260 261
void mobject_store_write_op_create(mobject_store_write_op_t write_op,
        int exclusive,
        const char* category)
{
    mobject_write_op_create(write_op, exclusive, category);
}
Matthieu Dorier's avatar
Matthieu Dorier committed
262

263 264 265 266 267 268 269 270
void mobject_store_write_op_write(mobject_store_write_op_t write_op,
        const char *buffer,
        size_t len,
        uint64_t offset)
{
    // fields are exchanged in the mobject-client API, it's normal
    mobject_write_op_write(write_op, buffer, offset, len);
}
Matthieu Dorier's avatar
Matthieu Dorier committed
271

272 273 274 275 276 277
void mobject_store_write_op_write_full(mobject_store_write_op_t write_op,
        const char *buffer,
        size_t len)
{
    return mobject_write_op_write_full(write_op, buffer, len);
}
278

279 280 281 282 283 284 285 286 287
void mobject_store_write_op_writesame(mobject_store_write_op_t write_op,
        const char *buffer,
        size_t data_len,
        size_t write_len,
        uint64_t offset)
{
    // fields are exchanged in the mobject-client API, it's normal
    mobject_write_op_write_same(write_op, buffer, offset, data_len, write_len);
}
Matthieu Dorier's avatar
Matthieu Dorier committed
288

289 290 291 292 293 294
void mobject_store_write_op_append(mobject_store_write_op_t write_op,
        const char *buffer,
        size_t len)
{
    mobject_write_op_append(write_op, buffer, len);
}
Matthieu Dorier's avatar
Matthieu Dorier committed
295

296 297 298 299
void mobject_store_write_op_remove(mobject_store_write_op_t write_op)
{
    mobject_write_op_remove(write_op);
}
300

301 302 303 304
void mobject_store_write_op_truncate(mobject_store_write_op_t write_op,
        uint64_t offset)
{
    mobject_write_op_truncate(write_op, offset);
Matthieu Dorier's avatar
Matthieu Dorier committed
305 306
}

307 308 309
void mobject_store_write_op_zero(mobject_store_write_op_t write_op,
        uint64_t offset,
        uint64_t len)
310
{
311 312
    mobject_write_op_zero(write_op, offset, len);
}
313

314 315 316 317 318 319 320 321
void mobject_store_write_op_omap_set(mobject_store_write_op_t write_op,
        char const* const* keys,
        char const* const* vals,
        const size_t *lens,
        size_t num)
{
    mobject_write_op_omap_set(write_op, keys, vals, lens, num);
}
322

323 324 325 326 327 328
void mobject_store_write_op_omap_rm_keys(mobject_store_write_op_t write_op,
        char const* const* keys,
        size_t keys_len)
{
    mobject_write_op_omap_rm_keys(write_op, keys, keys_len);
}
329

330
int mobject_store_write_op_operate(mobject_store_write_op_t write_op,
331 332 333 334
        mobject_store_ioctx_t io,
        const char *oid,
        time_t *mtime,
        int flags)
335
{
336 337 338 339 340 341 342
    mobject_provider_handle_t mph = MOBJECT_PROVIDER_HANDLE_NULL;
    uint64_t oid_hash = sdbm_hash(oid);
    unsigned long server_idx;
    ch_placement_find_closest(io->cluster->ch_instance, oid_hash, 1, &server_idx);
    // XXX multiple providers may be in the same node (with distinct mplex ids)
    hg_addr_t svr_addr = ssg_get_addr(io->cluster->gid, server_idx);

343 344
    fprintf(stderr,"Object oid=%s will go to server %ld\n", oid, server_idx);

345 346 347 348 349 350 351 352
    // TODO for now multiplex id is hard-coded as 1
    int r = mobject_provider_handle_create(io->cluster->mobject_clt, svr_addr, 1, &mph);
    if(r != 0) return r;

    r = mobject_write_op_operate(mph, write_op, io->pool_name, oid, mtime, flags);
    mobject_provider_handle_release(mph);
    return r;
}
Matthieu Dorier's avatar
Matthieu Dorier committed
353

354 355 356 357
mobject_store_read_op_t mobject_store_create_read_op(void)
{
    return mobject_create_read_op();
}
358

359 360 361 362
void mobject_store_release_read_op(mobject_store_read_op_t read_op)
{
    mobject_release_read_op(read_op);
}
363

364 365 366 367 368 369 370
void mobject_store_read_op_stat(mobject_store_read_op_t read_op,
        uint64_t *psize,
        time_t *pmtime,
        int *prval)
{
    mobject_read_op_stat(read_op, psize, pmtime, prval);
}
371

372 373 374 375 376 377 378 379 380
void mobject_store_read_op_read(mobject_store_read_op_t read_op,
        uint64_t offset,
        size_t len,
        char *buffer,
        size_t *bytes_read,
        int *prval)
{
    mobject_read_op_read(read_op, buffer, offset, len, bytes_read, prval);
}
381

382 383 384 385 386 387 388
void mobject_store_read_op_omap_get_keys(mobject_store_read_op_t read_op,
        const char *start_after,
        uint64_t max_return,
        mobject_store_omap_iter_t *iter,
        int *prval)
{
    mobject_read_op_omap_get_keys(read_op, start_after, max_return, iter, prval);
Matthieu Dorier's avatar
Matthieu Dorier committed
389
}
390

391 392 393 394 395 396 397 398 399
void mobject_store_read_op_omap_get_vals(mobject_store_read_op_t read_op,
        const char *start_after,
        const char *filter_prefix,
        uint64_t max_return,
        mobject_store_omap_iter_t *iter,
        int *prval)
{
    mobject_read_op_omap_get_vals(read_op, start_after, filter_prefix, max_return, iter, prval);
}
400

401 402 403 404 405
void mobject_store_read_op_omap_get_vals_by_keys(mobject_store_read_op_t read_op,
        char const* const* keys,
        size_t keys_len,
        mobject_store_omap_iter_t *iter,
        int *prval)
406
{
407 408 409 410
    mobject_read_op_omap_get_vals_by_keys(read_op, keys, keys_len, iter, prval);
}

int mobject_store_read_op_operate(mobject_store_read_op_t read_op,
411 412 413
        mobject_store_ioctx_t ioctx,
        const char *oid,
        int flags)
414
{
415 416 417 418 419 420
    mobject_provider_handle_t mph = MOBJECT_PROVIDER_HANDLE_NULL;
    uint64_t oid_hash = sdbm_hash(oid);
    unsigned long server_idx;
    ch_placement_find_closest(ioctx->cluster->ch_instance, oid_hash, 1, &server_idx);
    // XXX multiple providers may be in the same node (with distinct mplex ids)
    hg_addr_t svr_addr = ssg_get_addr(ioctx->cluster->gid, server_idx);
421
    fprintf(stderr,"Object oid=%s is read from server %ld\n", oid, server_idx);
422 423 424
    // TODO for now multiplex id is hard-coded as 1
    int r = mobject_provider_handle_create(ioctx->cluster->mobject_clt, svr_addr, 1, &mph);
    if(r != 0) return r;
425

426
    r = mobject_read_op_operate(mph,read_op, ioctx->pool_name, oid, flags);
427 428
    mobject_provider_handle_release(mph);
    return r;
429 430 431
}

// send a shutdown signal to a server cluster
432
static int mobject_store_shutdown_servers(struct mobject_store_handle *cluster_handle)
433 434 435 436 437 438 439 440 441 442
{
    hg_addr_t svr_addr;

    /* get the address of the first server */
    svr_addr = ssg_get_addr(cluster_handle->gid, 0);
    if (svr_addr == HG_ADDR_NULL)
    {
        fprintf(stderr, "Error: Unable to obtain address for mobject server\n");
        return -1;
    }
443 444
    // TODO we should actually call that for all the members of the group
    return mobject_shutdown(cluster_handle->mobject_clt, svr_addr);
445 446
}

447 448 449 450 451 452 453 454 455 456
static unsigned long sdbm_hash(const char* str)
{
    unsigned long hash = 0;
    int c;

    while (c = *str++)
        hash = c + (hash << 6) + (hash << 16) - hash;

    return hash;
}