HEPnOSService.cpp 4.88 KB
Newer Older
Matthieu Dorier's avatar
Matthieu Dorier committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * (C) 2018 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */
#include <string>
#include <iostream>
#include <fstream>
#include <vector>
#include <unistd.h>
#include <mpi.h>
#include <margo.h>
#include <bake-server.h>
#include <sdskv-server.h>
15
#include "ServiceConfig.hpp"
Matthieu Dorier's avatar
Matthieu Dorier committed
16
#include "ConnectionInfoGenerator.hpp"
17
#include "hepnos-service.h"
Matthieu Dorier's avatar
Matthieu Dorier committed
18 19 20

#define ASSERT(__cond, __msg, ...) { if(!(__cond)) { fprintf(stderr, "[%s:%d] " __msg, __FILE__, __LINE__, __VA_ARGS__); exit(-1); } }

Matthieu Dorier's avatar
Matthieu Dorier committed
21
//static void generate_connection_file(MPI_Comm comm, const char* addr, const char* filename);
Matthieu Dorier's avatar
Matthieu Dorier committed
22

23
void hepnos_run_service(MPI_Comm comm, const char* config_file, const char* connection_file)
Matthieu Dorier's avatar
Matthieu Dorier committed
24 25 26 27
{
    margo_instance_id mid;
    int ret;
    int rank;
28 29

    MPI_Comm_rank(comm, &rank);
Matthieu Dorier's avatar
Matthieu Dorier committed
30

31 32 33 34 35 36 37 38 39 40 41 42
    /* load configuration */
    std::unique_ptr<hepnos::ServiceConfig> config;
    try {
        config = std::make_unique<hepnos::ServiceConfig>(config_file, rank);
    } catch(const std::exception& e) {
        std::cerr << "Error: when reading configuration:" << std::endl;
        std::cerr << "  " << e.what() << std::endl;
        std::cerr << "Aborting." << std::endl;
        MPI_Abort(MPI_COMM_WORLD, -1);
        return;
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
43
    /* Margo initialization */
44
    mid = margo_init(config->getAddress().c_str(), MARGO_SERVER_MODE, 0, -1);
Matthieu Dorier's avatar
Matthieu Dorier committed
45 46
    if (mid == MARGO_INSTANCE_NULL)
    {
47 48
        std::cerr << "Error: unable to initialize margo" << std::endl;
        std::cerr << "Aborting." << std::endl;
49 50
        MPI_Abort(MPI_COMM_WORLD, -1);
        return;
Matthieu Dorier's avatar
Matthieu Dorier committed
51 52 53 54 55 56 57 58 59 60
    }
    margo_enable_remote_shutdown(mid);

    /* Get self address as string */
    hg_addr_t self_addr;
    margo_addr_self(mid, &self_addr);
    char self_addr_str[128];
    hg_size_t self_addr_str_size = 128;
    margo_addr_to_string(mid, self_addr_str, &self_addr_str_size, self_addr);

Matthieu Dorier's avatar
Matthieu Dorier committed
61
    uint16_t bake_provider_id = 0;
62 63
    if(config->hasStorage()) {
        /* Bake provider initialization */
Matthieu Dorier's avatar
Matthieu Dorier committed
64
        bake_provider_id = 1; // XXX we can make that come from the config file
65 66 67 68 69 70 71 72 73
        const char* bake_target_name = config->getStoragePath().c_str();
        size_t bake_target_size = config->getStorageSize()*(1024*1024);
        /* create the bake target if it does not exist */
        if(-1 == access(bake_target_name, F_OK)) {
            ret = bake_makepool(bake_target_name, bake_target_size, 0664);
            ASSERT(ret == 0, "bake_makepool() failed (ret = %d)\n", ret);
        }
        bake_provider_t bake_prov;
        bake_target_id_t bake_tid;
Matthieu Dorier's avatar
Matthieu Dorier committed
74
        ret = bake_provider_register(mid, bake_provider_id, BAKE_ABT_POOL_DEFAULT, &bake_prov);
75 76 77 78 79 80
        ASSERT(ret == 0, "bake_provider_register() failed (ret = %d)\n", ret);
        ret = bake_provider_add_storage_target(bake_prov, bake_target_name, &bake_tid);
        ASSERT(ret == 0, "bake_provider_add_storage_target() failed to add target %s (ret = %d)\n",
                bake_target_name, ret);
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
81
    uint8_t sdskv_provider_id = 0;
82 83
    if(config->hasDatabase()) {
        /* SDSKV provider initialization */
Matthieu Dorier's avatar
Matthieu Dorier committed
84
        sdskv_provider_id = 1; // XXX we can make that come from the config file
85
        sdskv_provider_t sdskv_prov;
Matthieu Dorier's avatar
Matthieu Dorier committed
86
        ret = sdskv_provider_register(mid, sdskv_provider_id, SDSKV_ABT_POOL_DEFAULT, &sdskv_prov);
87 88 89 90 91 92 93 94 95 96 97 98
        ASSERT(ret == 0, "sdskv_provider_register() failed (ret = %d)\n", ret);

        /* creating the database */
        const char* db_path = config->getDatabasePath().c_str();
        const char* db_name = config->getDatabaseName().c_str();
        sdskv_db_type_t db_type;
        if(config->getDatabaseType() == "map") db_type = KVDB_MAP;
        if(config->getDatabaseType() == "ldb") db_type = KVDB_LEVELDB;
        if(config->getDatabaseType() == "bdb") db_type = KVDB_BERKELEYDB;
        sdskv_database_id_t db_id;
        ret = sdskv_provider_add_database(sdskv_prov, db_name, db_path, db_type, SDSKV_COMPARE_DEFAULT,  &db_id);
        ASSERT(ret == 0, "sdskv_provider_add_database() failed (ret = %d)\n", ret);
Matthieu Dorier's avatar
Matthieu Dorier committed
99 100 101 102
    }

    margo_addr_free(mid, self_addr);

Matthieu Dorier's avatar
Matthieu Dorier committed
103 104
    hepnos::ConnectionInfoGenerator fileGen(self_addr_str, sdskv_provider_id, bake_provider_id);
    fileGen.generateFile(MPI_COMM_WORLD, connection_file);
Matthieu Dorier's avatar
Matthieu Dorier committed
105

106
    margo_wait_for_finalize(mid);
Matthieu Dorier's avatar
Matthieu Dorier committed
107
}
Matthieu Dorier's avatar
Matthieu Dorier committed
108
/*
109
static void generate_connection_file(MPI_Comm comm, const char* addr, const char* filename)
Matthieu Dorier's avatar
Matthieu Dorier committed
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
{
    int rank, size;
    MPI_Comm_rank(comm, &rank);
    MPI_Comm_size(comm, &size);

    unsigned j=0;
    while(addr[j] != '\0' && addr[j] != ':') j++;
    std::string proto(addr, j);

    std::vector<char> buf(128*size);

    MPI_Gather(addr, 128, MPI_BYTE, buf.data(), 128, MPI_BYTE, 0, comm);
    
    if(rank != 0) return;

    std::vector<std::string> addresses;
    for(unsigned i=0; i < size; i++) {
        addresses.emplace_back(&buf[128*i]);
    }

    YAML::Node config;
    config["hepnos"]["client"]["protocol"] = proto;
    for(auto& s :  addresses)
        config["hepnos"]["providers"]["sdskv"][s] = 1;

135
    std::ofstream fout(filename);
Matthieu Dorier's avatar
Matthieu Dorier committed
136 137
    fout << config;
}
Matthieu Dorier's avatar
Matthieu Dorier committed
138
*/