HEPnOSService.cpp 3.61 KB
Newer Older
Matthieu Dorier's avatar
Matthieu Dorier committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/*
 * (C) 2018 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */
#include <string>
#include <iostream>
#include <fstream>
#include <vector>
#include <unistd.h>
#include <mpi.h>
#include <margo.h>
#include <bake-server.h>
#include <sdskv-server.h>
#include <yaml-cpp/yaml.h>
16
#include "hepnos-service.h"
Matthieu Dorier's avatar
Matthieu Dorier committed
17
18
19
20
21

#define ASSERT(__cond, __msg, ...) { if(!(__cond)) { fprintf(stderr, "[%s:%d] " __msg, __FILE__, __LINE__, __VA_ARGS__); exit(-1); } }

static void generate_config_file(MPI_Comm comm, const char* addr, const char* config_file);

22
void hepnos_run_service(MPI_Comm comm, const char* listen_addr, const char* config_file)
Matthieu Dorier's avatar
Matthieu Dorier committed
23
24
25
26
{
    margo_instance_id mid;
    int ret;
    int rank;
27
28

    MPI_Comm_rank(comm, &rank);
Matthieu Dorier's avatar
Matthieu Dorier committed
29
30
31
32
33
34

    /* Margo initialization */
    mid = margo_init(listen_addr, MARGO_SERVER_MODE, 0, -1);
    if (mid == MARGO_INSTANCE_NULL)
    {
        fprintf(stderr, "Error: Unable to initialize margo\n");
35
36
        MPI_Abort(MPI_COMM_WORLD, -1);
        return;
Matthieu Dorier's avatar
Matthieu Dorier committed
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
    }
    margo_enable_remote_shutdown(mid);

    /* Get self address as string */
    hg_addr_t self_addr;
    margo_addr_self(mid, &self_addr);
    char self_addr_str[128];
    hg_size_t self_addr_str_size = 128;
    margo_addr_to_string(mid, self_addr_str, &self_addr_str_size, self_addr);

    /* Bake provider initialization */
    uint16_t bake_mplex_id = 1;
    char bake_target_name[128];
    sprintf(bake_target_name, "/dev/shm/hepnos.%d.dat", rank);
    /* create the bake target if it does not exist */
    if(-1 == access(bake_target_name, F_OK)) {
        // XXX creating a pool of 10MB - this should come from a config file
        ret = bake_makepool(bake_target_name, 10*1024*1024, 0664);
        ASSERT(ret == 0, "bake_makepool() failed (ret = %d)\n", ret);
    }
    bake_provider_t bake_prov;
    bake_target_id_t bake_tid;
    ret = bake_provider_register(mid, bake_mplex_id, BAKE_ABT_POOL_DEFAULT, &bake_prov);
    ASSERT(ret == 0, "bake_provider_register() failed (ret = %d)\n", ret);
    ret = bake_provider_add_storage_target(bake_prov, bake_target_name, &bake_tid);
    ASSERT(ret == 0, "bake_provider_add_storage_target() failed to add target %s (ret = %d)\n",
            bake_target_name, ret);

    /* SDSKV provider initialization */
    uint8_t sdskv_mplex_id = 1;
    sdskv_provider_t sdskv_prov;
    ret = sdskv_provider_register(mid, sdskv_mplex_id, SDSKV_ABT_POOL_DEFAULT, &sdskv_prov);
    ASSERT(ret == 0, "sdskv_provider_register() failed (ret = %d)\n", ret);

71
    // XXX creating the database - this should come from a config file
72
    sdskv_database_id_t db_id;
Matthieu Dorier's avatar
Matthieu Dorier committed
73
    ret = sdskv_provider_add_database(sdskv_prov, "hepnosdb", "", KVDB_MAP, SDSKV_COMPARE_DEFAULT,  &db_id);
74
    ASSERT(ret == 0, "sdskv_provider_add_database() failed (ret = %d)\n", ret);
Matthieu Dorier's avatar
Matthieu Dorier committed
75
76
77

    margo_addr_free(mid, self_addr);

78
    generate_config_file(MPI_COMM_WORLD, self_addr_str, config_file);
Matthieu Dorier's avatar
Matthieu Dorier committed
79

80
    margo_wait_for_finalize(mid);
Matthieu Dorier's avatar
Matthieu Dorier committed
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
}

static void generate_config_file(MPI_Comm comm, const char* addr, const char* config_file)
{
    int rank, size;
    MPI_Comm_rank(comm, &rank);
    MPI_Comm_size(comm, &size);

    unsigned j=0;
    while(addr[j] != '\0' && addr[j] != ':') j++;
    std::string proto(addr, j);

    std::vector<char> buf(128*size);

    MPI_Gather(addr, 128, MPI_BYTE, buf.data(), 128, MPI_BYTE, 0, comm);
    
    if(rank != 0) return;

    std::vector<std::string> addresses;
    for(unsigned i=0; i < size; i++) {
        addresses.emplace_back(&buf[128*i]);
    }

    YAML::Node config;
    config["hepnos"]["client"]["protocol"] = proto;
    for(auto& s :  addresses)
        config["hepnos"]["providers"]["sdskv"][s] = 1;

    std::ofstream fout(config_file);
    fout << config;
}