DataStoreImpl.hpp 19.9 KB
Newer Older
Matthieu Dorier's avatar
Matthieu Dorier committed
1 2 3 4 5
/*
 * (C) 2018 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */
6 7 8
#ifndef __HEPNOS_PRIVATE_DATASTORE_IMPL
#define __HEPNOS_PRIVATE_DATASTORE_IMPL

9
#include <vector>
10
#include <unordered_set>
Matthieu Dorier's avatar
Matthieu Dorier committed
11
#include <unordered_map>
12 13 14
#include <functional>
#include <iostream>
#include <yaml-cpp/yaml.h>
15
#include <sdskv-client.hpp>
16 17 18 19
#include <ch-placement.h>
#include "hepnos/Exception.hpp"
#include "hepnos/DataStore.hpp"
#include "hepnos/DataSet.hpp"
Matthieu Dorier's avatar
Matthieu Dorier committed
20
#include "StringHash.hpp"
21
#include "DataSetImpl.hpp"
Matthieu Dorier's avatar
Matthieu Dorier committed
22
#include "hepnos/UUID.hpp"
23 24 25

namespace hepnos {

Matthieu Dorier's avatar
Matthieu Dorier committed
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
inline void object_resize(hepnos::UUID& uuid, size_t new_size) {
    (void)uuid;
    (void)new_size;
}

inline void* object_data(hepnos::UUID& uuid) {
    return uuid.data;
}

inline const void* object_data(const hepnos::UUID& uuid) {
    return uuid.data;
}

inline size_t object_size(const hepnos::UUID& uuid) {
    return sizeof(uuid);
}

43
////////////////////////////////////////////////////////////////////////////////////////////
44
// DataStoreImpl implementation
45 46
////////////////////////////////////////////////////////////////////////////////////////////

47 48 49 50 51
struct DistributedDBInfo {
    std::vector<sdskv::database>  dbs;
    struct ch_placement_instance* chi = nullptr;
};

52
class DataStoreImpl {
53 54
    public:

Matthieu Dorier's avatar
Matthieu Dorier committed
55 56
    margo_instance_id                         m_mid;          // Margo instance
    std::unordered_map<std::string,hg_addr_t> m_addrs;        // Addresses used by the service
57
    sdskv::client                             m_sdskv_client; // SDSKV client
58
    DistributedDBInfo                         m_databases;    // list of SDSKV databases
59
    DistributedDBInfo                         m_dataset_dbs;  // list of SDSKV databases for DataSets
60

61
    DataStoreImpl()
62
    : m_mid(MARGO_INSTANCE_NULL)
63
    {}
64

65
    void populateDatabases(DistributedDBInfo& db_info, const YAML::Node& db_config) {
66 67
        int ret;
        hg_return_t hret;
68
        for(YAML::const_iterator address_it = db_config.begin(); address_it != db_config.end(); address_it++) {
69 70 71
            std::string str_addr = address_it->first.as<std::string>();
            YAML::Node providers = address_it->second;
            // lookup the address
72
            hg_addr_t addr;
Matthieu Dorier's avatar
Matthieu Dorier committed
73 74 75 76 77 78 79
            if(m_addrs.count(str_addr) != 0) {
                addr = m_addrs[str_addr];
            } else {
                hret = margo_addr_lookup(m_mid, str_addr.c_str(), &addr);
                if(hret != HG_SUCCESS) {
                    margo_addr_free(m_mid,addr);
                    cleanup();
80
                    throw Exception("margo_addr_lookup failed (MARGO error="+std::to_string(hret)+")");
Matthieu Dorier's avatar
Matthieu Dorier committed
81 82
                }
                m_addrs[str_addr] = addr;
83
            }
84 85 86 87 88 89 90 91 92 93
            // iterate over providers for this address
            for(YAML::const_iterator provider_it = providers.begin(); provider_it != providers.end(); provider_it++) {
                // get the provider id
                uint16_t provider_id = provider_it->first.as<uint16_t>();
                // create provider handle
                sdskv::provider_handle ph(m_sdskv_client, addr, provider_id);
                // get the database ids
                YAML::Node databases = provider_it->second;
                // iterate over databases for this provider
                for(unsigned i=0; i < databases.size(); i++) {
94
                    db_info.dbs.push_back(sdskv::database(ph, databases[i].as<uint64_t>()));
95
                }
96 97
            } // for each provider
        } // for each address
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
        // initialize ch-placement
        db_info.chi = ch_placement_initialize("hash_lookup3", db_info.dbs.size(), 4, 0);
    }

    void init(const std::string& configFile) {
        int ret;
        hg_return_t hret;
        YAML::Node config = YAML::LoadFile(configFile);
        checkConfig(config);
        // get protocol
        std::string proto = config["hepnos"]["client"]["protocol"].as<std::string>();
        // initialize Margo
        m_mid = margo_init(proto.c_str(), MARGO_CLIENT_MODE, 0, 0);
        if(!m_mid) {
            cleanup();
            throw Exception("Could not initialized Margo");
        }
        // initialize SDSKV client
        try {
            m_sdskv_client = sdskv::client(m_mid);
        } catch(sdskv::exception& ex) {
            cleanup();
            throw Exception("Could not create SDSKV client (SDSKV error="+std::to_string(ex.error())+")");
        }
        // populate database info structures for each type of database
        YAML::Node databases = config["hepnos"]["databases"];
        YAML::Node dataset_db = databases["datasets"];
        YAML::Node run_db     = databases["runs"];
        YAML::Node subrun_db  = databases["subruns"];
        YAML::Node event_db   = databases["events"];
        YAML::Node product_db = databases["products"];
        populateDatabases(m_dataset_dbs, dataset_db);
        populateDatabases(m_databases, product_db); // TODO change this
131 132 133
    }

    void cleanup() {
134
        m_databases.dbs.clear();
135
        m_sdskv_client = sdskv::client();
136 137
        if(m_databases.chi)
            ch_placement_finalize(m_databases.chi);
138 139
        if(m_dataset_dbs.chi)
            ch_placement_finalize(m_dataset_dbs.chi);
Matthieu Dorier's avatar
Matthieu Dorier committed
140 141 142
        for(auto& addr : m_addrs) {
            margo_addr_free(m_mid, addr.second);
        }
143 144 145 146 147 148
        if(m_mid) margo_finalize(m_mid);
    }

    private:

    static void checkConfig(YAML::Node& config) {
Matthieu Dorier's avatar
Matthieu Dorier committed
149
        // config file starts with hepnos entry
150 151 152 153
        auto hepnosNode = config["hepnos"];
        if(!hepnosNode) {
            throw Exception("\"hepnos\" entry not found in YAML file");
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
154
        // hepnos entry has client entry
155 156 157 158
        auto clientNode = hepnosNode["client"];
        if(!clientNode) {
            throw Exception("\"client\" entry not found in \"hepnos\" section");
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
159
        // client entry has protocol entry
160 161 162 163
        auto protoNode = clientNode["protocol"];
        if(!protoNode) {
            throw Exception("\"protocol\" entry not found in \"client\" section");
        }
164 165 166 167
        // hepnos entry has databases entry
        auto databasesNode = hepnosNode["databases"];
        if(!databasesNode) {
            throw Exception("\"databasess\" entry not found in \"hepnos\" section");
168
        }
169 170
        if(!databasesNode.IsMap()) {
            throw Exception("\"databases\" entry should be a map");
171 172 173 174 175 176 177 178 179 180
        }
        // database entry has keys datasets, runs, subruns, events, and products.
        std::vector<std::string> fields = { "datasets", "runs", "subruns", "events", "products" };
        for(auto& f : fields) {
            auto fieldNode = databasesNode[f];
            if(!fieldNode) {
                throw Exception("\""+f+"\" entry not found in databases section");
            }
            if(!fieldNode.IsMap()) {
                throw Exception("\""+f+"\" entry should be a mapping from addresses to providers");
181
            }
182 183 184 185 186 187 188 189 190 191 192 193
            for(auto addresses_it = fieldNode.begin(); addresses_it != fieldNode.end(); addresses_it++) {
                auto providers = addresses_it->second;
                for(auto provider_it = providers.begin(); provider_it != providers.end(); provider_it++) {
                    // provider entry should be a sequence
                    if(!provider_it->second.IsSequence()) {
                        throw Exception("provider entry should be a sequence");
                    }
                    for(auto db : provider_it->second) {
                        if(!db.IsScalar()) {
                            throw Exception("database id should be a scalar");
                        }
                    }
194
                }
195 196 197 198
            }
        }
    }

199 200
    public:

201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219
    static inline std::string buildKey(
            uint8_t level,
            const std::string& containerName,
            const std::string& objectName) {
        size_t c = 1 + objectName.size();
        if(!containerName.empty()) c += containerName.size() + 1;
        std::string result(c,'\0');
        result[0] = level;
        if(!containerName.empty()) {
            std::memcpy(&result[1], containerName.data(), containerName.size());
            size_t x = 1+containerName.size();
            result[x] = '/';
            std::memcpy(&result[x+1], objectName.data(), objectName.size());
        } else {
            std::memcpy(&result[1], objectName.data(), objectName.size());
        }
        return result;
    }

220
    unsigned long computeDbIndex(uint8_t level, const std::string& containerName, const std::string& key) const {
221
        // hash the name to get the provider id
222
        long unsigned sdskv_db_idx = 0;
Matthieu Dorier's avatar
Matthieu Dorier committed
223
        uint64_t name_hash;
224
        if(level != 0) {
Matthieu Dorier's avatar
Matthieu Dorier committed
225
            name_hash = hashString(containerName);
226
        } else {
227
            // use the complete name for final objects (level 0)
Matthieu Dorier's avatar
Matthieu Dorier committed
228
            name_hash = hashString(key);
229
        }
230
        ch_placement_find_closest(m_databases.chi, name_hash, 1, &sdskv_db_idx);
231 232 233 234 235 236 237 238 239 240
        return sdskv_db_idx;
    }

    bool load(uint8_t level, const std::string& containerName,
            const std::string& objectName, std::string& data) const {
        int ret;
        // build key
        auto key = buildKey(level, containerName, objectName);
        // find out which DB to access
        long unsigned sdskv_db_idx = computeDbIndex(level, containerName, key);
241
        // make corresponding datastore entry
242
        auto& db = m_databases.dbs[sdskv_db_idx];
243
        // read the value
244 245
        if(data.size() == 0)
            data.resize(2048); // eagerly allocate 2KB
246
        try {
247
            db.get(key, data);
248 249 250 251
        } catch(sdskv::exception& ex) {
            if(ex.error() == SDSKV_ERR_UNKNOWN_KEY)
                return false;
            else
252
                throw Exception("Error occured when calling sdskv::database::get (SDSKV error="+std::to_string(ex.error())+")");
253 254 255 256
        }
        return true;
    }

257 258 259 260 261 262 263 264
    bool load(uint8_t level, const std::string& containerName,
            const std::string& objectName, char* value, size_t* vsize) const {
        int ret;
        // build key
        auto key = buildKey(level, containerName, objectName);
        // find out which DB to access
        long unsigned sdskv_db_idx = computeDbIndex(level, containerName, key);
        // make corresponding datastore entry
265
        auto& db = m_databases.dbs[sdskv_db_idx];
266 267 268 269 270 271 272 273 274 275 276 277 278 279
        // read the value
        try {
            db.get(key.data(), key.size(), value, vsize);
        } catch(sdskv::exception& ex) {
            if(ex.error() == SDSKV_ERR_UNKNOWN_KEY)
                return false;
            else if(ex.error() == SDSKV_ERR_SIZE)
                return false;
            else
                throw Exception("Error occured when calling sdskv::database::get (SDSKV error="+std::to_string(ex.error())+")");
        }
        return true;
    }

280 281 282 283 284 285 286 287
    bool exists(uint8_t level, const std::string& containerName,
            const std::string& objectName) const {
        int ret;
        // build key
        auto key = buildKey(level, containerName, objectName);
        // find out which DB to access
        long unsigned sdskv_db_idx = computeDbIndex(level, containerName, key);
        // make corresponding datastore entry
288
        auto& db = m_databases.dbs[sdskv_db_idx];
289 290 291 292 293 294 295 296
        try {
            return db.exists(key);
        } catch(sdskv::exception& ex) {
            throw Exception("Error occured when calling sdskv::database::exists (SDSKV error="+std::to_string(ex.error())+")");
        }
        return false;
    }

297
    ProductID store(uint8_t level, const std::string& containerName,
298
            const std::string& objectName, const char* data=nullptr, size_t data_size=0) {
299
        // build full name
300
        auto key = buildKey(level, containerName, objectName);
301 302
        // find out which DB to access
        long unsigned sdskv_db_idx = computeDbIndex(level, containerName, key);
303
        // Create the product id
304
        const auto& db = m_databases.dbs[sdskv_db_idx];
305
        try {
306
            db.put(key.data(), key.size(), data, data_size);
307
        } catch(sdskv::exception& ex) {
308 309 310
            if(ex.error() == SDSKV_ERR_KEYEXISTS) {
                return ProductID();
            } else {
311
                throw Exception("Error occured when calling sdskv::database::put (SDSKV error=" +std::to_string(ex.error()) + ")");
312
            }
313
        }
314 315 316 317
        return ProductID(level, containerName, objectName);
    }

    void storeMultiple(unsigned long db_index, 
318 319
            const std::vector<std::string>& keys,
            const std::vector<std::string>& values) {
320
        // Create the product id
321
        const auto& db = m_databases.dbs[db_index];
322
        try {
323
            db.put_multi(keys, values);
324 325 326
        } catch(sdskv::exception& ex) {
            throw Exception("Error occured when calling sdskv::database::put (SDSKV error=" +std::to_string(ex.error()) + ")");
        }
327 328 329 330 331 332 333 334
    }

    size_t nextKeys(uint8_t level, const std::string& containerName,
            const std::string& lower,
            std::vector<std::string>& keys, size_t maxKeys) const {
        int ret;
        if(level == 0) return 0; // cannot iterate at object level
        // hash the name to get the provider id
335
        long unsigned db_idx = 0;
Matthieu Dorier's avatar
Matthieu Dorier committed
336
        uint64_t h = hashString(containerName);
337
        ch_placement_find_closest(m_databases.chi, h, 1, &db_idx);
338
        // make an entry for the lower bound
339
        auto lb_entry = buildKey(level, containerName, lower);
340
        // get provider and database
341
        const auto& db = m_databases.dbs[db_idx];
342 343 344 345 346 347 348 349 350
        // ignore keys that don't have the same level or the same prefix
        std::string prefix(2+containerName.size(), '\0');
        prefix[0] = level;
        if(containerName.size() != 0) {
            std::memcpy(&prefix[1], containerName.data(), containerName.size());
            prefix[prefix.size()-1] = '/';
        } else {
            prefix.resize(1);
        }
351
        // issue an sdskv_list_keys
352
        std::vector<std::string> entries(maxKeys, std::string(1024,'\0'));
353 354
        try {
            db.list_keys(lb_entry, prefix, entries);
355 356
        } catch(sdskv::exception& ex) {
            throw Exception("Error occured when calling sdskv::database::list_keys (SDSKV error="+std::string(ex.what()) + ")");
357 358
        }
        keys.resize(0);
359 360
        for(const auto& entry : entries) {
            keys.emplace_back(&entry[1], entry.size()-1);
361
        }
362
        return keys.size();
363
    }
364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427

    ///////////////////////////////////////////////////////////////////////////
    // DataSet access functions
    ///////////////////////////////////////////////////////////////////////////

    /**
     * Builds the database key of a particular DataSet.
     */
    static inline std::string _buildDataSetKey(uint8_t level, const std::string& containerName, const std::string& objectName) {
        size_t c = 1 + objectName.size();
        if(!containerName.empty()) c += containerName.size() + 1;
        std::string result(c,'\0');
        result[0] = level;
        if(!containerName.empty()) {
            std::memcpy(&result[1], containerName.data(), containerName.size());
            size_t x = 1+containerName.size();
            result[x] = '/';
            std::memcpy(&result[x+1], objectName.data(), objectName.size());
        } else {
            std::memcpy(&result[1], objectName.data(), objectName.size());
        }
        return result;
    }

    /**
     * Locates and return the database in charge of the provided DataSet info.
     */
    const sdskv::database& _locateDataSetDb(const std::string& containerName) const {
        // hash the name to get the provider id
        long unsigned db_idx = 0;
        uint64_t hash;
        hash = hashString(containerName);
        ch_placement_find_closest(m_dataset_dbs.chi, hash, 1, &db_idx);
        return m_dataset_dbs.dbs[db_idx];
    }

    /**
     * @brief Fills the result vector with a sequence of up to
     * maxDataSets shared_ptr to DataSetImpl coming after the
     * current dataset. Returns the number of DataSets read.
     */
    size_t nextDataSets(const std::shared_ptr<DataSetImpl>& current, 
            std::vector<std::shared_ptr<DataSetImpl>>& result,
            size_t maxDataSets) const {
        int ret;
        result.resize(0);
        auto& level = current->m_level;
        auto& containerName = *current->m_container;
        auto& currentName = current->m_name;
        if(current->m_level == 0) return 0; // cannot iterate at object level
        auto& db = _locateDataSetDb(containerName);
        // make an entry for the lower bound
        auto lb_entry = _buildDataSetKey(level, containerName, currentName);
        // ignore keys that don't have the same level or the same prefix
        std::string prefix(2+containerName.size(), '\0');
        prefix[0] = level;
        if(containerName.size() != 0) {
            std::memcpy(&prefix[1], containerName.data(), containerName.size());
            prefix[prefix.size()-1] = '/';
        } else {
            prefix.resize(1);
        }
        // issue an sdskv_list_keys
        std::vector<std::string> entries(maxDataSets, std::string(1024,'\0'));
Matthieu Dorier's avatar
Matthieu Dorier committed
428
        std::vector<UUID> uuids(maxDataSets);
429
        try {
Matthieu Dorier's avatar
Matthieu Dorier committed
430
            db.list_keyvals(lb_entry, prefix, entries, uuids);
431 432 433 434 435 436 437 438 439 440 441 442 443
        } catch(sdskv::exception& ex) {
            throw Exception("Error occured when calling sdskv::database::list_keys (SDSKV error="+std::string(ex.what()) + ")");
        }
        result.resize(0);
        for(const auto& entry : entries) {
            size_t i = entry.find_last_of('/');
            if(i == std::string::npos) i = 1;
            else i += 1;
            result.push_back(
                    std::make_shared<DataSetImpl>(
                        current->m_datastore,
                        level,
                        current->m_container,
Matthieu Dorier's avatar
Matthieu Dorier committed
444 445
                        entry.substr(i),
                        uuids[i]
446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461
                    )
                );
        }
        return result.size();
    }

    /**
     * @brief Checks if a particular dataset exists.
     */
    bool dataSetExists(uint8_t level, const std::string& containerName, const std::string& objectName) const {
        int ret;
        // build key
        auto key = buildKey(level, containerName, objectName);
        // find out which DB to access
        auto& db = _locateDataSetDb(containerName);
        try {
462 463
            bool b = db.exists(key);
            return b;
464 465 466 467 468
        } catch(sdskv::exception& ex) {
            throw Exception("Error occured when calling sdskv::database::exists (SDSKV error="+std::to_string(ex.error())+")");
        }
        return false;
    }
Matthieu Dorier's avatar
Matthieu Dorier committed
469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493
    
    /*
     * @brief Loads a dataset.
     */
    bool loadDataSet(uint8_t level, const std::string& containerName, const std::string& objectName, UUID& uuid) const {
        int ret;
        // build key
        auto key = buildKey(level, containerName, objectName);
        // find out which DB to access
        auto& db = _locateDataSetDb(containerName);
        try {
            size_t s = sizeof(uuid);
            db.get(static_cast<const void*>(key.data()), 
                   key.size(),
                   static_cast<void*>(uuid.data),
                   &s);
            return s == sizeof(uuid);
        } catch(sdskv::exception& ex) {
            if(ex.error() == SDSKV_ERR_UNKNOWN_KEY) {
                return false;
            }
            throw Exception("Error occured when calling sdskv::database::get (SDSKV error="+std::to_string(ex.error())+")");
        }
        return false;
    }
494 495 496 497

    /**
     * Creates a DataSet
     */
Matthieu Dorier's avatar
Matthieu Dorier committed
498
    bool createDataSet(uint8_t level, const std::string& containerName, const std::string& objectName, UUID& uuid) {
499 500 501 502
        // build full name
        auto key = _buildDataSetKey(level, containerName, objectName);
        // find out which DB to access
        auto& db = _locateDataSetDb(containerName);
Matthieu Dorier's avatar
Matthieu Dorier committed
503
        uuid.randomize();
504
        try {
Matthieu Dorier's avatar
Matthieu Dorier committed
505
            db.put(key.data(), key.size(), uuid.data, sizeof(uuid));
506 507 508 509 510 511 512 513 514
        } catch(sdskv::exception& ex) {
            if(!ex.error() == SDSKV_ERR_KEYEXISTS) {
                return false;
            } else {
                throw Exception("Error occured when calling sdskv::database::put (SDSKV error=" +std::to_string(ex.error()) + ")");
            }
        }
        return true;
    }
515 516 517
};

}
518 519

#endif