libmobject-store.c 8.09 KB
Newer Older
1 2 3 4 5 6
/*
 * (C) 2017 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

7
#include "mobject-store-config.h"
8 9

#include <stdio.h>
10
#include <stdlib.h>
11 12
#include <assert.h>

13 14 15
#include <margo.h>
#include <ssg.h>

16
#include "libmobject-store.h"
17 18 19 20 21 22
#include "src/rpc-types/write-op.h"
#include "src/rpc-types/read-op.h"
#include "src/rpc-types/read-op.h"
#include "src/client/io-context.h"
#include "src/io-chain/prepare-read-op.h"
#include "src/io-chain/prepare-write-op.h"
23

24 25
#define MOBJECT_CLUSTER_FILE_ENV "MOBJECT_CLUSTER_FILE"

26 27 28 29 30 31

// global variables for RPC ids
hg_id_t mobject_write_op_rpc_id;
hg_id_t mobject_read_op_rpc_id;
hg_id_t mobject_shutdown_rpc_id;

32
typedef struct mobject_store_handle
33
{
34
    margo_instance_id mid;
35
    ssg_group_id_t gid;
36
    int connected;
37 38
} mobject_store_handle_t;

39 40
static void mobject_store_register(margo_instance_id mid);
static int mobject_store_shutdown_servers(mobject_store_handle_t *cluster_handle);
41

42
int mobject_store_create(mobject_store_t *cluster, const char * const id)
43
{
44 45 46 47
    mobject_store_handle_t *cluster_handle;
    char *cluster_file;
    int ret;

48 49 50
    (void)id; /* XXX: id unused in mobject */

    /* allocate a new cluster handle and set some fields */
Matthieu Dorier's avatar
Matthieu Dorier committed
51
    cluster_handle = (mobject_store_handle_t*)calloc(1,sizeof(*cluster_handle));
52
    if (!cluster_handle)
53 54 55 56 57 58 59 60 61
        return -1;

    /* use env variable to determine how to connect to the cluster */
    /* NOTE: this is the _only_ method for specifying a cluster for now... */
    cluster_file = getenv(MOBJECT_CLUSTER_FILE_ENV);
    if (!cluster_file)
    {
        fprintf(stderr, "Error: %s env variable must point to mobject cluster file\n",
            MOBJECT_CLUSTER_FILE_ENV);
62
        free(cluster_handle);
63 64 65 66 67 68 69 70
        return -1;
    }

    ret = ssg_group_id_load(cluster_file, &cluster_handle->gid);
    if (ret != 0)
    {
        fprintf(stderr, "Error: Unable to load mobject cluster info from file %s\n",
            cluster_file);
71
        free(cluster_handle);
72 73
        return -1;
    }
74 75 76

    /* set the returned cluster handle */
    *cluster = cluster_handle;
77

78
    return 0;
79 80 81 82
}

int mobject_store_connect(mobject_store_t cluster)
{
83
    mobject_store_handle_t *cluster_handle = (mobject_store_handle_t *)cluster;
84
    char *svr_addr_str;
85 86 87 88 89 90 91
    char proto[24] = {0};
    int i;
    int ret;

    if (cluster_handle->connected)
        return 0;

Shane Snyder's avatar
Shane Snyder committed
92
    /* figure out protocol to connect with using address information 
93 94
     * associated with the SSG group ID
     */
95 96
    svr_addr_str = ssg_group_id_get_addr_str(cluster_handle->gid);
    if (!svr_addr_str)
97 98 99 100 101 102
    {
        fprintf(stderr, "Error: Unable to obtain cluster group server address\n");
        ssg_group_id_free(cluster_handle->gid);
        free(cluster_handle);
        return -1;
    }
Shane Snyder's avatar
Shane Snyder committed
103
    /* we only need to the proto portion of the address to initialize */
104 105
    for(i=0; i<24 && svr_addr_str[i] != '\0' && svr_addr_str[i] != ':'; i++)
        proto[i] = svr_addr_str[i];
106 107 108 109 110 111 112

    /* intialize margo */
    /* XXX: probably want to expose some way of tweaking threading parameters */
    cluster_handle->mid = margo_init(proto, MARGO_CLIENT_MODE, 0, -1);
    if (cluster_handle->mid == MARGO_INSTANCE_NULL)
    {
        fprintf(stderr, "Error: Unable to initialize margo\n");
113
        free(svr_addr_str);
114 115 116 117 118
        ssg_group_id_free(cluster_handle->gid);
        free(cluster_handle);
        return -1;
    }

119 120 121
    /* register mobject RPCs for this cluster */
    mobject_store_register(cluster_handle->mid);

122
    /* initialize ssg */
123
    /* XXX: we need to think about how to do this once per-client... clients could connect to mult. clusters */
124 125 126 127 128
    ret = ssg_init(cluster_handle->mid);
    if (ret != SSG_SUCCESS)
    {
        fprintf(stderr, "Error: Unable to initialize SSG\n");
        margo_finalize(cluster_handle->mid);
129
        free(svr_addr_str);
130 131 132 133 134 135 136 137 138 139 140 141
        ssg_group_id_free(cluster_handle->gid);
        free(cluster_handle);
        return -1;
    }

    /* attach to the cluster group */
    ret = ssg_group_attach(cluster_handle->gid);
    if (ret != SSG_SUCCESS)
    {
        fprintf(stderr, "Error: Unable to attach to the mobject cluster group\n");
        ssg_finalize();
        margo_finalize(cluster_handle->mid);
142
        free(svr_addr_str);
143 144 145 146 147 148
        ssg_group_id_free(cluster_handle->gid);
        free(cluster_handle);
        return -1;
    }
    cluster_handle->connected = 1;

149
    free(svr_addr_str);
150 151

    return 0;
152 153
}

154
void mobject_store_shutdown(mobject_store_t cluster)
155
{
156 157
    mobject_store_handle_t *cluster_handle =
        (mobject_store_handle_t *)cluster;
158 159 160
    char *svr_kill_env_str;
    int ret;

161 162
    assert(cluster_handle != NULL);

163 164
    if (!cluster_handle->connected)
        return;
165

166 167 168 169 170 171 172 173 174 175 176 177
    svr_kill_env_str = getenv("MOBJECT_SHUTDOWN_KILL_SERVERS");
    if (svr_kill_env_str && !strcmp(svr_kill_env_str, "true"))
    {
        /* kill server cluster if requested */
        ret = mobject_store_shutdown_servers(cluster_handle);
        if (ret != 0)
        {
            fprintf(stderr, "Warning: Unable to send shutdown signal \
                to mobject server cluster\n");
        }
    }

178 179 180 181
    ssg_group_detach(cluster_handle->gid);
    ssg_finalize();
    margo_finalize(cluster_handle->mid);
    ssg_group_id_free(cluster_handle->gid);
182
    free(cluster_handle);
183

184
    return;
185
}
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208

int mobject_store_write_op_operate(mobject_store_write_op_t write_op,
                                   mobject_store_ioctx_t io,
                                   const char *oid,
                                   time_t *mtime,
                                   int flags) 
{
    write_op_in_t in;
    in.object_name = oid;
    in.pool_name   = io->pool_name;
    in.write_op    = write_op;

    prepare_write_op(io->mid, write_op);

    hg_handle_t h;
    margo_create(io->mid, io->svr_addr, mobject_write_op_rpc_id, &h);
    margo_forward(h, &in);

    write_op_out_t resp;
    margo_get_output(h, &resp);

    margo_free_output(h,&resp);
    margo_destroy(h);
Rob Latham's avatar
Rob Latham committed
209
    return 0;
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239
}

int mobject_store_read_op_operate(mobject_store_read_op_t read_op,
                                  mobject_store_ioctx_t ioctx,
                                  const char *oid,
                                  int flags)
{   
    read_op_in_t in; 
    in.object_name = oid;
    in.pool_name   = ioctx->pool_name;
    in.read_op     = read_op;
    
    prepare_read_op(ioctx->mid, read_op);
    
    // TODO: svr_addr should be computed based on the pool name, object name,
    // and SSG structures accessible via the io context
    hg_handle_t h;
    margo_create(ioctx->mid, ioctx->svr_addr, mobject_read_op_rpc_id, &h);
    margo_forward(h, &in);
    
    read_op_out_t resp; 
    margo_get_output(h, &resp);
    
    feed_read_op_pointers_from_response(read_op, resp.responses);
    
    margo_free_output(h,&resp);
    margo_destroy(h);
    
    return 0;
}
240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290

/* internal helper routines */

// register mobject RPCs
static void mobject_store_register(margo_instance_id mid)
{
    /* XXX i think ultimately these need to be stored in per-mid containers instead of global... */
    mobject_write_op_rpc_id = 
    MARGO_REGISTER(mid, "mobject_write_op", write_op_in_t, write_op_out_t, NULL);
    mobject_read_op_rpc_id = 
    MARGO_REGISTER(mid, "mobject_read_op",  read_op_in_t,  read_op_out_t, NULL);
    mobject_shutdown_rpc_id =
    MARGO_REGISTER(mid, "mobject_shutdown", void, void, NULL);
}

// send a shutdown signal to a server cluster
static int mobject_store_shutdown_servers(mobject_store_handle_t *cluster_handle)
{
    hg_addr_t svr_addr;
    hg_handle_t h;
    hg_return_t ret;

    /* get the address of the first server */
    svr_addr = ssg_get_addr(cluster_handle->gid, 0);
    if (svr_addr == HG_ADDR_NULL)
    {
        fprintf(stderr, "Error: Unable to obtain address for mobject server\n");
        return -1;
    }

    ret = margo_create(cluster_handle->mid, svr_addr, mobject_shutdown_rpc_id, &h);
    if (ret != HG_SUCCESS)
    {
        fprintf(stderr, "Error: Unable to create margo handle\n");
        return -1;
    }

    /* send shutdown signal */
    ret = margo_forward(h, NULL);
    if (ret != HG_SUCCESS)
    {
        fprintf(stderr, "Error: Unable to forward margo handle\n");
        margo_destroy(h);
        return -1;
    }

    margo_destroy(h);

    return 0;
}