bake-bulk-server-daemon.c 3.88 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * (C) 2015 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */

#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <uuid/uuid.h>
#include <margo.h>
#include <libpmemobj.h>
#include <bake-bulk-server.h>

15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
struct options
{
    char *listen_addr;
    char *pmem_pool;
    char *host_file;
};

static void usage(int argc, char **argv)
{
    fprintf(stderr, "Usage: bake-bulk-server-daemon [OPTIONS] <listen_addr> <pmem_pool>\n");
    fprintf(stderr, "       listen_addr is the Mercury address to listen on\n");
    fprintf(stderr, "       pmem_pool is the path to the pmemobj pool\n");
    fprintf(stderr, "       [-f filename] to write the server address to a file\n");
    fprintf(stderr, "Example: ./bake-bulk-server-daemon tcp://localhost:1234 /dev/shm/foo.dat\n");
    return;
}

static void parse_args(int argc, char **argv, struct options *opts)
{
    int opt;

    memset(opts, 0, sizeof(*opts));

    /* get options */
    while((opt = getopt(argc, argv, "f:")) != -1)
    {
        switch(opt)
        {
            case 'f':
                opts->host_file = optarg;
                break;
            default:
                usage(argc, argv);
                exit(EXIT_FAILURE);
        }
    }

    /* get required arguments after options */
    if((argc - optind) != 2)
    {
        usage(argc, argv);
        exit(EXIT_FAILURE);
    }
    opts->listen_addr = argv[optind++];
    opts->pmem_pool = argv[optind++];

    return;
}

64 65
int main(int argc, char **argv) 
{
66
    struct options opts;
67
    struct bake_pool_info * pool_info;
68
    margo_instance_id mid;
69

70 71 72 73 74 75 76 77
    parse_args(argc, argv, &opts);

    pool_info = bake_server_makepool(opts.pmem_pool);

    /* start margo */
    /* use the main xstream for driving progress and executing rpc handlers */
    mid = margo_init(opts.listen_addr, MARGO_SERVER_MODE, 0, -1);
    if(mid == MARGO_INSTANCE_NULL)
78
    {
79
        fprintf(stderr, "Error: margo_init()\n");
80 81 82
        return(-1);
    }

83 84 85 86 87 88 89 90
    if(opts.host_file)
    {
        /* write the server address to file if requested */
        FILE *fp;
        hg_addr_t self_addr;
        char self_addr_str[128];
        hg_size_t self_addr_str_sz = 128;
        hg_return_t hret;
91

92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
        /* figure out what address this server is listening on */
        hret = margo_addr_self(mid, &self_addr);
        if(hret != HG_SUCCESS)
        {
            fprintf(stderr, "Error: margo_addr_self()\n");
            margo_finalize(mid);
            return(-1);
        }
        hret = margo_addr_to_string(mid, self_addr_str, &self_addr_str_sz, self_addr);
        if(hret != HG_SUCCESS)
        {
            fprintf(stderr, "Error: margo_addr_to_string()\n");
            margo_addr_free(mid, self_addr);
            margo_finalize(mid);
            return(-1);
        }
        margo_addr_free(mid, self_addr);

        fp = fopen(opts.host_file, "w");
        if(!fp)
        {
            perror("fopen");
            margo_finalize(mid);
            return(-1);
        }

        fprintf(fp, "%s", self_addr_str);
        fclose(fp);
    }
121

122
    /* register the bake bulk server */
123
    bake_server_register(mid, pool_info);
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140

    /* NOTE: at this point this server ULT has two options.  It can wait on
     * whatever mechanism it wants to (however long the daemon should run and
     * then call margo_finalize().  Otherwise, it can call
     * margo_wait_for_finalize() on the assumption that it should block until
     * some other entity calls margo_finalize().
     *
     * This example does the latter.  Margo will be finalized by a special
     * RPC from the client.
     *
     * This approach will allow the server to idle gracefully even when
     * executed in "single" mode, in which the main thread of the server
     * daemon and the progress thread for Mercury are executing in the same
     * ABT pool.
     */
    margo_wait_for_finalize(mid);

141
    pmemobj_close(pool_info->bb_pmem_pool);
142 143 144 145

    return(0);
}