DataStoreImpl.hpp 18.6 KB
Newer Older
Matthieu Dorier's avatar
Matthieu Dorier committed
1 2 3 4 5
/*
 * (C) 2018 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */
6 7 8
#ifndef __HEPNOS_PRIVATE_DATASTORE_IMPL
#define __HEPNOS_PRIVATE_DATASTORE_IMPL

9
#include <vector>
10
#include <unordered_set>
Matthieu Dorier's avatar
Matthieu Dorier committed
11
#include <unordered_map>
12 13 14
#include <functional>
#include <iostream>
#include <yaml-cpp/yaml.h>
15
#include <sdskv-client.hpp>
16 17 18 19
#include <ch-placement.h>
#include "hepnos/Exception.hpp"
#include "hepnos/DataStore.hpp"
#include "hepnos/DataSet.hpp"
Matthieu Dorier's avatar
Matthieu Dorier committed
20
#include "StringHash.hpp"
21
#include "DataSetImpl.hpp"
22 23 24 25

namespace hepnos {

////////////////////////////////////////////////////////////////////////////////////////////
26
// DataStoreImpl implementation
27 28
////////////////////////////////////////////////////////////////////////////////////////////

29 30 31 32 33
struct DistributedDBInfo {
    std::vector<sdskv::database>  dbs;
    struct ch_placement_instance* chi = nullptr;
};

34
class DataStoreImpl {
35 36
    public:

Matthieu Dorier's avatar
Matthieu Dorier committed
37 38
    margo_instance_id                         m_mid;          // Margo instance
    std::unordered_map<std::string,hg_addr_t> m_addrs;        // Addresses used by the service
39
    sdskv::client                             m_sdskv_client; // SDSKV client
40
    DistributedDBInfo                         m_databases;    // list of SDSKV databases
41
    DistributedDBInfo                         m_dataset_dbs;  // list of SDSKV databases for DataSets
Matthieu Dorier's avatar
Matthieu Dorier committed
42
    const DataStore::iterator                 m_end;          // iterator for the end() of the DataStore
43

44
    DataStoreImpl()
45 46 47
    : m_mid(MARGO_INSTANCE_NULL)
    , m_end() {}

48
    void populateDatabases(DistributedDBInfo& db_info, const YAML::Node& db_config) {
49 50
        int ret;
        hg_return_t hret;
51
        for(YAML::const_iterator address_it = db_config.begin(); address_it != db_config.end(); address_it++) {
52 53 54
            std::string str_addr = address_it->first.as<std::string>();
            YAML::Node providers = address_it->second;
            // lookup the address
55
            hg_addr_t addr;
Matthieu Dorier's avatar
Matthieu Dorier committed
56 57 58 59 60 61 62
            if(m_addrs.count(str_addr) != 0) {
                addr = m_addrs[str_addr];
            } else {
                hret = margo_addr_lookup(m_mid, str_addr.c_str(), &addr);
                if(hret != HG_SUCCESS) {
                    margo_addr_free(m_mid,addr);
                    cleanup();
63
                    throw Exception("margo_addr_lookup failed (MARGO error="+std::to_string(hret)+")");
Matthieu Dorier's avatar
Matthieu Dorier committed
64 65
                }
                m_addrs[str_addr] = addr;
66
            }
67 68 69 70 71 72 73 74 75 76
            // iterate over providers for this address
            for(YAML::const_iterator provider_it = providers.begin(); provider_it != providers.end(); provider_it++) {
                // get the provider id
                uint16_t provider_id = provider_it->first.as<uint16_t>();
                // create provider handle
                sdskv::provider_handle ph(m_sdskv_client, addr, provider_id);
                // get the database ids
                YAML::Node databases = provider_it->second;
                // iterate over databases for this provider
                for(unsigned i=0; i < databases.size(); i++) {
77
                    db_info.dbs.push_back(sdskv::database(ph, databases[i].as<uint64_t>()));
78
                }
79 80
            } // for each provider
        } // for each address
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
        // initialize ch-placement
        db_info.chi = ch_placement_initialize("hash_lookup3", db_info.dbs.size(), 4, 0);
    }

    void init(const std::string& configFile) {
        int ret;
        hg_return_t hret;
        YAML::Node config = YAML::LoadFile(configFile);
        checkConfig(config);
        // get protocol
        std::string proto = config["hepnos"]["client"]["protocol"].as<std::string>();
        // initialize Margo
        m_mid = margo_init(proto.c_str(), MARGO_CLIENT_MODE, 0, 0);
        if(!m_mid) {
            cleanup();
            throw Exception("Could not initialized Margo");
        }
        // initialize SDSKV client
        try {
            m_sdskv_client = sdskv::client(m_mid);
        } catch(sdskv::exception& ex) {
            cleanup();
            throw Exception("Could not create SDSKV client (SDSKV error="+std::to_string(ex.error())+")");
        }
        // populate database info structures for each type of database
        YAML::Node databases = config["hepnos"]["databases"];
        YAML::Node dataset_db = databases["datasets"];
        YAML::Node run_db     = databases["runs"];
        YAML::Node subrun_db  = databases["subruns"];
        YAML::Node event_db   = databases["events"];
        YAML::Node product_db = databases["products"];
        populateDatabases(m_dataset_dbs, dataset_db);
        populateDatabases(m_databases, product_db); // TODO change this
114 115 116
    }

    void cleanup() {
117
        m_databases.dbs.clear();
118
        m_sdskv_client = sdskv::client();
119 120
        if(m_databases.chi)
            ch_placement_finalize(m_databases.chi);
Matthieu Dorier's avatar
Matthieu Dorier committed
121 122 123
        for(auto& addr : m_addrs) {
            margo_addr_free(m_mid, addr.second);
        }
124 125 126 127 128 129
        if(m_mid) margo_finalize(m_mid);
    }

    private:

    static void checkConfig(YAML::Node& config) {
Matthieu Dorier's avatar
Matthieu Dorier committed
130
        // config file starts with hepnos entry
131 132 133 134
        auto hepnosNode = config["hepnos"];
        if(!hepnosNode) {
            throw Exception("\"hepnos\" entry not found in YAML file");
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
135
        // hepnos entry has client entry
136 137 138 139
        auto clientNode = hepnosNode["client"];
        if(!clientNode) {
            throw Exception("\"client\" entry not found in \"hepnos\" section");
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
140
        // client entry has protocol entry
141 142 143 144
        auto protoNode = clientNode["protocol"];
        if(!protoNode) {
            throw Exception("\"protocol\" entry not found in \"client\" section");
        }
145 146 147 148
        // hepnos entry has databases entry
        auto databasesNode = hepnosNode["databases"];
        if(!databasesNode) {
            throw Exception("\"databasess\" entry not found in \"hepnos\" section");
149
        }
150 151
        if(!databasesNode.IsMap()) {
            throw Exception("\"databases\" entry should be a map");
152 153 154 155 156 157 158 159 160 161
        }
        // database entry has keys datasets, runs, subruns, events, and products.
        std::vector<std::string> fields = { "datasets", "runs", "subruns", "events", "products" };
        for(auto& f : fields) {
            auto fieldNode = databasesNode[f];
            if(!fieldNode) {
                throw Exception("\""+f+"\" entry not found in databases section");
            }
            if(!fieldNode.IsMap()) {
                throw Exception("\""+f+"\" entry should be a mapping from addresses to providers");
162
            }
163 164 165 166 167 168 169 170 171 172 173 174 175
            for(auto addresses_it = fieldNode.begin(); addresses_it != fieldNode.end(); addresses_it++) {
                auto providers = addresses_it->second;
                for(auto provider_it = providers.begin(); provider_it != providers.end(); provider_it++) {
                    // provider entry should be a sequence
                    if(!provider_it->second.IsSequence()) {
                        std::cerr << "Error,  provider_it->second is " << provider_it->second << std::endl;
                        throw Exception("provider entry should be a sequence");
                    }
                    for(auto db : provider_it->second) {
                        if(!db.IsScalar()) {
                            throw Exception("database id should be a scalar");
                        }
                    }
176
                }
177 178 179 180
            }
        }
    }

181 182
    public:

183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
    static inline std::string buildKey(
            uint8_t level,
            const std::string& containerName,
            const std::string& objectName) {
        size_t c = 1 + objectName.size();
        if(!containerName.empty()) c += containerName.size() + 1;
        std::string result(c,'\0');
        result[0] = level;
        if(!containerName.empty()) {
            std::memcpy(&result[1], containerName.data(), containerName.size());
            size_t x = 1+containerName.size();
            result[x] = '/';
            std::memcpy(&result[x+1], objectName.data(), objectName.size());
        } else {
            std::memcpy(&result[1], objectName.data(), objectName.size());
        }
        return result;
    }

202
    unsigned long computeDbIndex(uint8_t level, const std::string& containerName, const std::string& key) const {
203
        // hash the name to get the provider id
204
        long unsigned sdskv_db_idx = 0;
Matthieu Dorier's avatar
Matthieu Dorier committed
205
        uint64_t name_hash;
206
        if(level != 0) {
Matthieu Dorier's avatar
Matthieu Dorier committed
207
            name_hash = hashString(containerName);
208
        } else {
209
            // use the complete name for final objects (level 0)
Matthieu Dorier's avatar
Matthieu Dorier committed
210
            name_hash = hashString(key);
211
        }
212
        ch_placement_find_closest(m_databases.chi, name_hash, 1, &sdskv_db_idx);
213 214 215 216 217 218 219 220 221 222
        return sdskv_db_idx;
    }

    bool load(uint8_t level, const std::string& containerName,
            const std::string& objectName, std::string& data) const {
        int ret;
        // build key
        auto key = buildKey(level, containerName, objectName);
        // find out which DB to access
        long unsigned sdskv_db_idx = computeDbIndex(level, containerName, key);
223
        // make corresponding datastore entry
224
        auto& db = m_databases.dbs[sdskv_db_idx];
225
        // read the value
226 227
        if(data.size() == 0)
            data.resize(2048); // eagerly allocate 2KB
228
        try {
229
            db.get(key, data);
230 231 232 233
        } catch(sdskv::exception& ex) {
            if(ex.error() == SDSKV_ERR_UNKNOWN_KEY)
                return false;
            else
234
                throw Exception("Error occured when calling sdskv::database::get (SDSKV error="+std::to_string(ex.error())+")");
235 236 237 238
        }
        return true;
    }

239 240 241 242 243 244 245 246
    bool load(uint8_t level, const std::string& containerName,
            const std::string& objectName, char* value, size_t* vsize) const {
        int ret;
        // build key
        auto key = buildKey(level, containerName, objectName);
        // find out which DB to access
        long unsigned sdskv_db_idx = computeDbIndex(level, containerName, key);
        // make corresponding datastore entry
247
        auto& db = m_databases.dbs[sdskv_db_idx];
248 249 250 251 252 253 254 255 256 257 258 259 260 261
        // read the value
        try {
            db.get(key.data(), key.size(), value, vsize);
        } catch(sdskv::exception& ex) {
            if(ex.error() == SDSKV_ERR_UNKNOWN_KEY)
                return false;
            else if(ex.error() == SDSKV_ERR_SIZE)
                return false;
            else
                throw Exception("Error occured when calling sdskv::database::get (SDSKV error="+std::to_string(ex.error())+")");
        }
        return true;
    }

262 263 264 265 266 267 268 269
    bool exists(uint8_t level, const std::string& containerName,
            const std::string& objectName) const {
        int ret;
        // build key
        auto key = buildKey(level, containerName, objectName);
        // find out which DB to access
        long unsigned sdskv_db_idx = computeDbIndex(level, containerName, key);
        // make corresponding datastore entry
270
        auto& db = m_databases.dbs[sdskv_db_idx];
271 272 273 274 275 276 277 278
        try {
            return db.exists(key);
        } catch(sdskv::exception& ex) {
            throw Exception("Error occured when calling sdskv::database::exists (SDSKV error="+std::to_string(ex.error())+")");
        }
        return false;
    }

279
    ProductID store(uint8_t level, const std::string& containerName,
280
            const std::string& objectName, const char* data=nullptr, size_t data_size=0) {
281
        // build full name
282
        auto key = buildKey(level, containerName, objectName);
283 284
        // find out which DB to access
        long unsigned sdskv_db_idx = computeDbIndex(level, containerName, key);
285
        // Create the product id
286
        const auto& db = m_databases.dbs[sdskv_db_idx];
287
        try {
288
            db.put(key.data(), key.size(), data, data_size);
289
        } catch(sdskv::exception& ex) {
290 291 292
            if(ex.error() == SDSKV_ERR_KEYEXISTS) {
                return ProductID();
            } else {
293
                throw Exception("Error occured when calling sdskv::database::put (SDSKV error=" +std::to_string(ex.error()) + ")");
294
            }
295
        }
296 297 298 299
        return ProductID(level, containerName, objectName);
    }

    void storeMultiple(unsigned long db_index, 
300 301
            const std::vector<std::string>& keys,
            const std::vector<std::string>& values) {
302
        // Create the product id
303
        const auto& db = m_databases.dbs[db_index];
304
        try {
305
            db.put_multi(keys, values);
306 307 308
        } catch(sdskv::exception& ex) {
            throw Exception("Error occured when calling sdskv::database::put (SDSKV error=" +std::to_string(ex.error()) + ")");
        }
309 310 311 312 313 314 315 316
    }

    size_t nextKeys(uint8_t level, const std::string& containerName,
            const std::string& lower,
            std::vector<std::string>& keys, size_t maxKeys) const {
        int ret;
        if(level == 0) return 0; // cannot iterate at object level
        // hash the name to get the provider id
317
        long unsigned db_idx = 0;
Matthieu Dorier's avatar
Matthieu Dorier committed
318
        uint64_t h = hashString(containerName);
319
        ch_placement_find_closest(m_databases.chi, h, 1, &db_idx);
320
        // make an entry for the lower bound
321
        auto lb_entry = buildKey(level, containerName, lower);
322
        // get provider and database
323
        const auto& db = m_databases.dbs[db_idx];
324 325 326 327 328 329 330 331 332
        // ignore keys that don't have the same level or the same prefix
        std::string prefix(2+containerName.size(), '\0');
        prefix[0] = level;
        if(containerName.size() != 0) {
            std::memcpy(&prefix[1], containerName.data(), containerName.size());
            prefix[prefix.size()-1] = '/';
        } else {
            prefix.resize(1);
        }
333
        // issue an sdskv_list_keys
334
        std::vector<std::string> entries(maxKeys, std::string(1024,'\0'));
335 336
        try {
            db.list_keys(lb_entry, prefix, entries);
337 338
        } catch(sdskv::exception& ex) {
            throw Exception("Error occured when calling sdskv::database::list_keys (SDSKV error="+std::string(ex.what()) + ")");
339 340
        }
        keys.resize(0);
341 342
        for(const auto& entry : entries) {
            keys.emplace_back(&entry[1], entry.size()-1);
343
        }
344
        return keys.size();
345
    }
346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467

    ///////////////////////////////////////////////////////////////////////////
    // DataSet access functions
    ///////////////////////////////////////////////////////////////////////////

    /**
     * Builds the database key of a particular DataSet.
     */
    static inline std::string _buildDataSetKey(uint8_t level, const std::string& containerName, const std::string& objectName) {
        size_t c = 1 + objectName.size();
        if(!containerName.empty()) c += containerName.size() + 1;
        std::string result(c,'\0');
        result[0] = level;
        if(!containerName.empty()) {
            std::memcpy(&result[1], containerName.data(), containerName.size());
            size_t x = 1+containerName.size();
            result[x] = '/';
            std::memcpy(&result[x+1], objectName.data(), objectName.size());
        } else {
            std::memcpy(&result[1], objectName.data(), objectName.size());
        }
        return result;
    }

    /**
     * Locates and return the database in charge of the provided DataSet info.
     */
    const sdskv::database& _locateDataSetDb(const std::string& containerName) const {
        // hash the name to get the provider id
        long unsigned db_idx = 0;
        uint64_t hash;
        hash = hashString(containerName);
        ch_placement_find_closest(m_dataset_dbs.chi, hash, 1, &db_idx);
        return m_dataset_dbs.dbs[db_idx];
    }

    /**
     * @brief Fills the result vector with a sequence of up to
     * maxDataSets shared_ptr to DataSetImpl coming after the
     * current dataset. Returns the number of DataSets read.
     */
    size_t nextDataSets(const std::shared_ptr<DataSetImpl>& current, 
            std::vector<std::shared_ptr<DataSetImpl>>& result,
            size_t maxDataSets) const {
        int ret;
        result.resize(0);
        auto& level = current->m_level;
        auto& containerName = *current->m_container;
        auto& currentName = current->m_name;
        if(current->m_level == 0) return 0; // cannot iterate at object level
        auto& db = _locateDataSetDb(containerName);
        // make an entry for the lower bound
        auto lb_entry = _buildDataSetKey(level, containerName, currentName);
        // ignore keys that don't have the same level or the same prefix
        std::string prefix(2+containerName.size(), '\0');
        prefix[0] = level;
        if(containerName.size() != 0) {
            std::memcpy(&prefix[1], containerName.data(), containerName.size());
            prefix[prefix.size()-1] = '/';
        } else {
            prefix.resize(1);
        }
        // issue an sdskv_list_keys
        std::vector<std::string> entries(maxDataSets, std::string(1024,'\0'));
        try {
            db.list_keys(lb_entry, prefix, entries);
        } catch(sdskv::exception& ex) {
            throw Exception("Error occured when calling sdskv::database::list_keys (SDSKV error="+std::string(ex.what()) + ")");
        }
        result.resize(0);
        for(const auto& entry : entries) {
            size_t i = entry.find_last_of('/');
            if(i == std::string::npos) i = 1;
            else i += 1;
            result.push_back(
                    std::make_shared<DataSetImpl>(
                        current->m_datastore,
                        level,
                        current->m_container,
                        entry.substr(i)
                    )
                );
        }
        return result.size();
    }

    /**
     * @brief Checks if a particular dataset exists.
     */
    bool dataSetExists(uint8_t level, const std::string& containerName, const std::string& objectName) const {
        int ret;
        // build key
        auto key = buildKey(level, containerName, objectName);
        // find out which DB to access
        auto& db = _locateDataSetDb(containerName);
        try {
            return db.exists(key);
        } catch(sdskv::exception& ex) {
            throw Exception("Error occured when calling sdskv::database::exists (SDSKV error="+std::to_string(ex.error())+")");
        }
        return false;
    }

    /**
     * Creates a DataSet
     */
    bool createDataSet(uint8_t level, const std::string& containerName, const std::string& objectName) {
        // build full name
        auto key = _buildDataSetKey(level, containerName, objectName);
        // find out which DB to access
        auto& db = _locateDataSetDb(containerName);
        try {
            db.put(key.data(), key.size(), nullptr, 0);
        } catch(sdskv::exception& ex) {
            if(!ex.error() == SDSKV_ERR_KEYEXISTS) {
                return false;
            } else {
                throw Exception("Error occured when calling sdskv::database::put (SDSKV error=" +std::to_string(ex.error()) + ")");
            }
        }
        return true;
    }
468 469 470
};

}
471 472

#endif