DataStoreImpl.hpp 13.3 KB
Newer Older
Matthieu Dorier's avatar
Matthieu Dorier committed
1 2 3 4 5
/*
 * (C) 2018 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */
6 7 8
#ifndef __HEPNOS_PRIVATE_DATASTORE_IMPL
#define __HEPNOS_PRIVATE_DATASTORE_IMPL

9
#include <vector>
10
#include <unordered_set>
Matthieu Dorier's avatar
Matthieu Dorier committed
11
#include <unordered_map>
12 13 14
#include <functional>
#include <iostream>
#include <yaml-cpp/yaml.h>
15
#include <sdskv-client.hpp>
16 17 18 19
#include <ch-placement.h>
#include "hepnos/Exception.hpp"
#include "hepnos/DataStore.hpp"
#include "hepnos/DataSet.hpp"
Matthieu Dorier's avatar
Matthieu Dorier committed
20
#include "StringHash.hpp"
21 22 23 24

namespace hepnos {

////////////////////////////////////////////////////////////////////////////////////////////
25
// DataStoreImpl implementation
26 27
////////////////////////////////////////////////////////////////////////////////////////////

28 29 30 31 32
struct DistributedDBInfo {
    std::vector<sdskv::database>  dbs;
    struct ch_placement_instance* chi = nullptr;
};

33
class DataStoreImpl {
34 35
    public:

Matthieu Dorier's avatar
Matthieu Dorier committed
36 37
    margo_instance_id                         m_mid;          // Margo instance
    std::unordered_map<std::string,hg_addr_t> m_addrs;        // Addresses used by the service
38
    sdskv::client                             m_sdskv_client; // SDSKV client
39
    DistributedDBInfo                         m_databases;    // list of SDSKV databases
Matthieu Dorier's avatar
Matthieu Dorier committed
40
    const DataStore::iterator                 m_end;          // iterator for the end() of the DataStore
41

42
    DataStoreImpl()
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
    : m_mid(MARGO_INSTANCE_NULL)
    , m_end() {}

    void init(const std::string& configFile) {
        int ret;
        hg_return_t hret;
        YAML::Node config = YAML::LoadFile(configFile);
        checkConfig(config);
        // get protocol
        std::string proto = config["hepnos"]["client"]["protocol"].as<std::string>();
        // initialize Margo
        m_mid = margo_init(proto.c_str(), MARGO_CLIENT_MODE, 0, 0);
        if(!m_mid) {
            cleanup();
            throw Exception("Could not initialized Margo");
        }
        // initialize SDSKV client
60 61
        try {
            m_sdskv_client = sdskv::client(m_mid);
62
        } catch(sdskv::exception& ex) {
63
            cleanup();
64
            throw Exception("Could not create SDSKV client (SDSKV error="+std::to_string(ex.error())+")");
65 66
        }
        // create list of sdskv provider handles
67 68 69 70 71 72
        YAML::Node databases = config["hepnos"]["databases"];
        YAML::Node dataset_db = databases["datasets"];
        for(YAML::const_iterator address_it = dataset_db.begin(); address_it != dataset_db.end(); address_it++) {
            std::string str_addr = address_it->first.as<std::string>();
            YAML::Node providers = address_it->second;
            // lookup the address
73
            hg_addr_t addr;
Matthieu Dorier's avatar
Matthieu Dorier committed
74 75 76 77 78 79 80
            if(m_addrs.count(str_addr) != 0) {
                addr = m_addrs[str_addr];
            } else {
                hret = margo_addr_lookup(m_mid, str_addr.c_str(), &addr);
                if(hret != HG_SUCCESS) {
                    margo_addr_free(m_mid,addr);
                    cleanup();
81
                    throw Exception("margo_addr_lookup failed (MARGO error="+std::to_string(hret)+")");
Matthieu Dorier's avatar
Matthieu Dorier committed
82 83
                }
                m_addrs[str_addr] = addr;
84
            }
85 86 87 88 89 90 91 92 93 94 95
            // iterate over providers for this address
            for(YAML::const_iterator provider_it = providers.begin(); provider_it != providers.end(); provider_it++) {
                // get the provider id
                uint16_t provider_id = provider_it->first.as<uint16_t>();
                // create provider handle
                sdskv::provider_handle ph(m_sdskv_client, addr, provider_id);
                // get the database ids
                YAML::Node databases = provider_it->second;
                // iterate over databases for this provider
                for(unsigned i=0; i < databases.size(); i++) {
                    m_databases.dbs.push_back(sdskv::database(ph, databases[i].as<uint64_t>()));
96
                }
97 98
            } // for each provider
        } // for each address
99
        // initialize ch-placement for the SDSKV providers
100
        m_databases.chi = ch_placement_initialize("hash_lookup3", m_databases.dbs.size(), 4, 0);
101 102 103
    }

    void cleanup() {
104
        m_databases.dbs.clear();
105
        m_sdskv_client = sdskv::client();
106 107
        if(m_databases.chi)
            ch_placement_finalize(m_databases.chi);
Matthieu Dorier's avatar
Matthieu Dorier committed
108 109 110
        for(auto& addr : m_addrs) {
            margo_addr_free(m_mid, addr.second);
        }
111 112 113 114 115 116
        if(m_mid) margo_finalize(m_mid);
    }

    private:

    static void checkConfig(YAML::Node& config) {
Matthieu Dorier's avatar
Matthieu Dorier committed
117
        // config file starts with hepnos entry
118 119 120 121
        auto hepnosNode = config["hepnos"];
        if(!hepnosNode) {
            throw Exception("\"hepnos\" entry not found in YAML file");
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
122
        // hepnos entry has client entry
123 124 125 126
        auto clientNode = hepnosNode["client"];
        if(!clientNode) {
            throw Exception("\"client\" entry not found in \"hepnos\" section");
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
127
        // client entry has protocol entry
128 129 130 131
        auto protoNode = clientNode["protocol"];
        if(!protoNode) {
            throw Exception("\"protocol\" entry not found in \"client\" section");
        }
132 133 134 135
        // hepnos entry has databases entry
        auto databasesNode = hepnosNode["databases"];
        if(!databasesNode) {
            throw Exception("\"databasess\" entry not found in \"hepnos\" section");
136
        }
137 138
        if(!databasesNode.IsMap()) {
            throw Exception("\"databases\" entry should be a map");
139 140 141 142 143 144 145 146 147 148
        }
        // database entry has keys datasets, runs, subruns, events, and products.
        std::vector<std::string> fields = { "datasets", "runs", "subruns", "events", "products" };
        for(auto& f : fields) {
            auto fieldNode = databasesNode[f];
            if(!fieldNode) {
                throw Exception("\""+f+"\" entry not found in databases section");
            }
            if(!fieldNode.IsMap()) {
                throw Exception("\""+f+"\" entry should be a mapping from addresses to providers");
149
            }
150 151 152 153 154 155 156 157 158 159 160 161 162
            for(auto addresses_it = fieldNode.begin(); addresses_it != fieldNode.end(); addresses_it++) {
                auto providers = addresses_it->second;
                for(auto provider_it = providers.begin(); provider_it != providers.end(); provider_it++) {
                    // provider entry should be a sequence
                    if(!provider_it->second.IsSequence()) {
                        std::cerr << "Error,  provider_it->second is " << provider_it->second << std::endl;
                        throw Exception("provider entry should be a sequence");
                    }
                    for(auto db : provider_it->second) {
                        if(!db.IsScalar()) {
                            throw Exception("database id should be a scalar");
                        }
                    }
163
                }
164 165 166 167
            }
        }
    }

168 169
    public:

170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
    static inline std::string buildKey(
            uint8_t level,
            const std::string& containerName,
            const std::string& objectName) {
        size_t c = 1 + objectName.size();
        if(!containerName.empty()) c += containerName.size() + 1;
        std::string result(c,'\0');
        result[0] = level;
        if(!containerName.empty()) {
            std::memcpy(&result[1], containerName.data(), containerName.size());
            size_t x = 1+containerName.size();
            result[x] = '/';
            std::memcpy(&result[x+1], objectName.data(), objectName.size());
        } else {
            std::memcpy(&result[1], objectName.data(), objectName.size());
        }
        return result;
    }

189
    unsigned long computeDbIndex(uint8_t level, const std::string& containerName, const std::string& key) const {
190
        // hash the name to get the provider id
191
        long unsigned sdskv_db_idx = 0;
Matthieu Dorier's avatar
Matthieu Dorier committed
192
        uint64_t name_hash;
193
        if(level != 0) {
Matthieu Dorier's avatar
Matthieu Dorier committed
194
            name_hash = hashString(containerName);
195
        } else {
196
            // use the complete name for final objects (level 0)
Matthieu Dorier's avatar
Matthieu Dorier committed
197
            name_hash = hashString(key);
198
        }
199
        ch_placement_find_closest(m_databases.chi, name_hash, 1, &sdskv_db_idx);
200 201 202 203 204 205 206 207 208 209
        return sdskv_db_idx;
    }

    bool load(uint8_t level, const std::string& containerName,
            const std::string& objectName, std::string& data) const {
        int ret;
        // build key
        auto key = buildKey(level, containerName, objectName);
        // find out which DB to access
        long unsigned sdskv_db_idx = computeDbIndex(level, containerName, key);
210
        // make corresponding datastore entry
211
        auto& db = m_databases.dbs[sdskv_db_idx];
212
        // read the value
213 214
        if(data.size() == 0)
            data.resize(2048); // eagerly allocate 2KB
215
        try {
216
            db.get(key, data);
217 218 219 220
        } catch(sdskv::exception& ex) {
            if(ex.error() == SDSKV_ERR_UNKNOWN_KEY)
                return false;
            else
221
                throw Exception("Error occured when calling sdskv::database::get (SDSKV error="+std::to_string(ex.error())+")");
222 223 224 225
        }
        return true;
    }

226 227 228 229 230 231 232 233
    bool load(uint8_t level, const std::string& containerName,
            const std::string& objectName, char* value, size_t* vsize) const {
        int ret;
        // build key
        auto key = buildKey(level, containerName, objectName);
        // find out which DB to access
        long unsigned sdskv_db_idx = computeDbIndex(level, containerName, key);
        // make corresponding datastore entry
234
        auto& db = m_databases.dbs[sdskv_db_idx];
235 236 237 238 239 240 241 242 243 244 245 246 247 248
        // read the value
        try {
            db.get(key.data(), key.size(), value, vsize);
        } catch(sdskv::exception& ex) {
            if(ex.error() == SDSKV_ERR_UNKNOWN_KEY)
                return false;
            else if(ex.error() == SDSKV_ERR_SIZE)
                return false;
            else
                throw Exception("Error occured when calling sdskv::database::get (SDSKV error="+std::to_string(ex.error())+")");
        }
        return true;
    }

249 250 251 252 253 254 255 256
    bool exists(uint8_t level, const std::string& containerName,
            const std::string& objectName) const {
        int ret;
        // build key
        auto key = buildKey(level, containerName, objectName);
        // find out which DB to access
        long unsigned sdskv_db_idx = computeDbIndex(level, containerName, key);
        // make corresponding datastore entry
257
        auto& db = m_databases.dbs[sdskv_db_idx];
258 259 260 261 262 263 264 265
        try {
            return db.exists(key);
        } catch(sdskv::exception& ex) {
            throw Exception("Error occured when calling sdskv::database::exists (SDSKV error="+std::to_string(ex.error())+")");
        }
        return false;
    }

266
    ProductID store(uint8_t level, const std::string& containerName,
267
            const std::string& objectName, const char* data=nullptr, size_t data_size=0) {
268
        // build full name
269
        auto key = buildKey(level, containerName, objectName);
270 271
        // find out which DB to access
        long unsigned sdskv_db_idx = computeDbIndex(level, containerName, key);
272
        // Create the product id
273
        const auto& db = m_databases.dbs[sdskv_db_idx];
274
        try {
275
            db.put(key.data(), key.size(), data, data_size);
276
        } catch(sdskv::exception& ex) {
277 278 279
            if(ex.error() == SDSKV_ERR_KEYEXISTS) {
                return ProductID();
            } else {
280
                throw Exception("Error occured when calling sdskv::database::put (SDSKV error=" +std::to_string(ex.error()) + ")");
281
            }
282
        }
283 284 285 286
        return ProductID(level, containerName, objectName);
    }

    void storeMultiple(unsigned long db_index, 
287 288
            const std::vector<std::string>& keys,
            const std::vector<std::string>& values) {
289
        // Create the product id
290
        const auto& db = m_databases.dbs[db_index];
291
        try {
292
            db.put_multi(keys, values);
293 294 295
        } catch(sdskv::exception& ex) {
            throw Exception("Error occured when calling sdskv::database::put (SDSKV error=" +std::to_string(ex.error()) + ")");
        }
296 297 298 299 300 301 302 303
    }

    size_t nextKeys(uint8_t level, const std::string& containerName,
            const std::string& lower,
            std::vector<std::string>& keys, size_t maxKeys) const {
        int ret;
        if(level == 0) return 0; // cannot iterate at object level
        // hash the name to get the provider id
304
        long unsigned db_idx = 0;
Matthieu Dorier's avatar
Matthieu Dorier committed
305
        uint64_t h = hashString(containerName);
306
        ch_placement_find_closest(m_databases.chi, h, 1, &db_idx);
307
        // make an entry for the lower bound
308
        auto lb_entry = buildKey(level, containerName, lower);
309
        // get provider and database
310
        const auto& db = m_databases.dbs[db_idx];
311 312 313 314 315 316 317 318 319
        // ignore keys that don't have the same level or the same prefix
        std::string prefix(2+containerName.size(), '\0');
        prefix[0] = level;
        if(containerName.size() != 0) {
            std::memcpy(&prefix[1], containerName.data(), containerName.size());
            prefix[prefix.size()-1] = '/';
        } else {
            prefix.resize(1);
        }
320
        // issue an sdskv_list_keys
321
        std::vector<std::string> entries(maxKeys, std::string(1024,'\0'));
322 323
        try {
            db.list_keys(lb_entry, prefix, entries);
324 325
        } catch(sdskv::exception& ex) {
            throw Exception("Error occured when calling sdskv::database::list_keys (SDSKV error="+std::string(ex.what()) + ")");
326 327
        }
        keys.resize(0);
328 329
        for(const auto& entry : entries) {
            keys.emplace_back(&entry[1], entry.size()-1);
330
        }
331
        return keys.size();
332 333 334 335
    }
};

}
336 337

#endif