DataStoreImpl.hpp 26.3 KB
Newer Older
Matthieu Dorier's avatar
Matthieu Dorier committed
1 2
/*
 * (C) 2018 The University of Chicago
3
 *
Matthieu Dorier's avatar
Matthieu Dorier committed
4 5
 * See COPYRIGHT in top-level directory.
 */
6 7 8
#ifndef __HEPNOS_PRIVATE_DATASTORE_IMPL
#define __HEPNOS_PRIVATE_DATASTORE_IMPL

9
#include <vector>
10
#include <unordered_set>
Matthieu Dorier's avatar
Matthieu Dorier committed
11
#include <unordered_map>
12 13
#include <functional>
#include <iostream>
14
#include <thallium.hpp>
15
#include <yaml-cpp/yaml.h>
16
#include <sdskv-client.hpp>
17 18 19 20
#include <ch-placement.h>
#include "hepnos/Exception.hpp"
#include "hepnos/DataStore.hpp"
#include "hepnos/DataSet.hpp"
Matthieu Dorier's avatar
Matthieu Dorier committed
21
#include "StringHash.hpp"
22
#include "DataSetImpl.hpp"
Matthieu Dorier's avatar
Matthieu Dorier committed
23
#include "ItemImpl.hpp"
24 25 26

namespace hepnos {

Matthieu Dorier's avatar
Matthieu Dorier committed
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
inline void object_resize(hepnos::UUID& uuid, size_t new_size) {
    (void)uuid;
    (void)new_size;
}

inline void* object_data(hepnos::UUID& uuid) {
    return uuid.data;
}

inline const void* object_data(const hepnos::UUID& uuid) {
    return uuid.data;
}

inline size_t object_size(const hepnos::UUID& uuid) {
    return sizeof(uuid);
}

44
////////////////////////////////////////////////////////////////////////////////////////////
45
// DataStoreImpl implementation
46 47
////////////////////////////////////////////////////////////////////////////////////////////

48 49 50 51 52
struct DistributedDBInfo {
    std::vector<sdskv::database>  dbs;
    struct ch_placement_instance* chi = nullptr;
};

53 54 55
using namespace std::string_literals;
namespace tl = thallium;

56
class DataStoreImpl {
57 58
    public:

59 60 61 62 63 64 65 66 67
    tl::engine                                   m_engine;       // Thallium engine
    bool                                         m_engine_initialized = false;
    std::unordered_map<std::string,tl::endpoint> m_addrs;        // Addresses used by the service
    sdskv::client                                m_sdskv_client; // SDSKV client
    DistributedDBInfo                            m_dataset_dbs;  // list of SDSKV databases for DataSets
    DistributedDBInfo                            m_run_dbs;      // list of SDSKV databases for Runs
    DistributedDBInfo                            m_subrun_dbs;   // list of SDSKV databases for Runs
    DistributedDBInfo                            m_event_dbs;    // list of SDSKV databases for Runs
    DistributedDBInfo                            m_product_dbs;  // list of SDSKV databases for Products
68

69
    DataStoreImpl()
70
    {}
71

72 73 74 75
    ~DataStoreImpl() {
        cleanup();
    }

76
    void populateDatabases(DistributedDBInfo& db_info, const YAML::Node& db_config) {
77 78
        int ret;
        hg_return_t hret;
79
        for(YAML::const_iterator address_it = db_config.begin(); address_it != db_config.end(); address_it++) {
80 81 82
            std::string str_addr = address_it->first.as<std::string>();
            YAML::Node providers = address_it->second;
            // lookup the address
83
            tl::endpoint addr;
Matthieu Dorier's avatar
Matthieu Dorier committed
84 85 86
            if(m_addrs.count(str_addr) != 0) {
                addr = m_addrs[str_addr];
            } else {
87 88 89 90 91
                try {
                    addr = m_engine.lookup(str_addr);
                    m_addrs[str_addr] = addr;
                } catch(const std::exception& ex) {
                    throw Exception("Address lookup failed: "s + ex.what());
Matthieu Dorier's avatar
Matthieu Dorier committed
92
                }
93
            }
94 95 96 97 98
            // iterate over providers for this address
            for(YAML::const_iterator provider_it = providers.begin(); provider_it != providers.end(); provider_it++) {
                // get the provider id
                uint16_t provider_id = provider_it->first.as<uint16_t>();
                // create provider handle
99 100 101 102 103 104
                sdskv::provider_handle ph;
                try {
                    ph = sdskv::provider_handle(m_sdskv_client, addr.get_addr(), provider_id);
                } catch(const std::exception& ex) {
                    throw Exception("Could not create SDSKV provider handle: "s + ex.what());
                }
105 106 107 108
                // get the database ids
                YAML::Node databases = provider_it->second;
                // iterate over databases for this provider
                for(unsigned i=0; i < databases.size(); i++) {
109
                    db_info.dbs.push_back(sdskv::database(ph, databases[i].as<uint64_t>()));
110
                }
111 112
            } // for each provider
        } // for each address
113 114 115 116
        // initialize ch-placement
        db_info.chi = ch_placement_initialize("hash_lookup3", db_info.dbs.size(), 4, 0);
    }

117
    void init(const std::string& configFile, bool use_progress_thread) {
118 119 120 121 122 123
        int ret;
        hg_return_t hret;
        YAML::Node config = YAML::LoadFile(configFile);
        checkConfig(config);
        // get protocol
        std::string proto = config["hepnos"]["client"]["protocol"].as<std::string>();
124 125
        // get busy spin
        bool busySpin = config["hepnos"]["client"]["busy-spin"].as<bool>();
126
        // initialize Margo
127 128 129 130
        hg_init_info hg_opt;
        memset(&hg_opt, 0, sizeof(hg_opt));
        if(busySpin)
            hg_opt.na_init_info.progress_mode = NA_NO_BLOCK;
131 132 133 134
        try {
            m_engine = tl::engine(proto, THALLIUM_SERVER_MODE, use_progress_thread, -1, &hg_opt);
            m_engine_initialized = true;
        } catch(const std::exception& ex) {
135
            cleanup();
136
            throw Exception("Could not initialized Thallium: "s + ex.what());
137 138 139
        }
        // initialize SDSKV client
        try {
140
            m_sdskv_client = sdskv::client(m_engine.get_margo_instance());
141 142 143 144 145 146 147 148 149 150 151 152
        } catch(sdskv::exception& ex) {
            cleanup();
            throw Exception("Could not create SDSKV client (SDSKV error="+std::to_string(ex.error())+")");
        }
        // populate database info structures for each type of database
        YAML::Node databases = config["hepnos"]["databases"];
        YAML::Node dataset_db = databases["datasets"];
        YAML::Node run_db     = databases["runs"];
        YAML::Node subrun_db  = databases["subruns"];
        YAML::Node event_db   = databases["events"];
        YAML::Node product_db = databases["products"];
        populateDatabases(m_dataset_dbs, dataset_db);
Matthieu Dorier's avatar
Matthieu Dorier committed
153 154 155 156
        populateDatabases(m_run_dbs, run_db);
        populateDatabases(m_subrun_dbs, subrun_db);
        populateDatabases(m_event_dbs, event_db);
        populateDatabases(m_product_dbs, product_db);
157 158 159
    }

    void cleanup() {
Matthieu Dorier's avatar
Matthieu Dorier committed
160 161 162 163 164
        m_dataset_dbs.dbs.clear();
        m_run_dbs.dbs.clear();
        m_subrun_dbs.dbs.clear();
        m_event_dbs.dbs.clear();
        m_product_dbs.dbs.clear();
165
        m_sdskv_client = sdskv::client();
Matthieu Dorier's avatar
Matthieu Dorier committed
166 167 168 169
        if(m_dataset_dbs.chi) ch_placement_finalize(m_dataset_dbs.chi);
        if(m_run_dbs.chi)     ch_placement_finalize(m_run_dbs.chi);
        if(m_subrun_dbs.chi)  ch_placement_finalize(m_subrun_dbs.chi);
        if(m_event_dbs.chi)   ch_placement_finalize(m_event_dbs.chi);
170 171 172 173
        if(m_product_dbs.chi) ch_placement_finalize(m_product_dbs.chi);
        m_addrs.clear();
        if(m_engine_initialized) m_engine.finalize();
        m_engine_initialized = false;
174 175
    }

176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
    size_t numTargets(const ItemType& type) const {
        switch(type) {
            case ItemType::DATASET:
                return m_dataset_dbs.dbs.size();
            case ItemType::RUN:
                return m_run_dbs.dbs.size();
            case ItemType::SUBRUN:
                return m_subrun_dbs.dbs.size();
            case ItemType::EVENT:
                return m_event_dbs.dbs.size();
            case ItemType::PRODUCT:
                return m_product_dbs.dbs.size();
        }
        return 0;
    }

192 193 194
    private:

    static void checkConfig(YAML::Node& config) {
Matthieu Dorier's avatar
Matthieu Dorier committed
195
        // config file starts with hepnos entry
196 197 198 199
        auto hepnosNode = config["hepnos"];
        if(!hepnosNode) {
            throw Exception("\"hepnos\" entry not found in YAML file");
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
200
        // hepnos entry has client entry
201 202 203 204
        auto clientNode = hepnosNode["client"];
        if(!clientNode) {
            throw Exception("\"client\" entry not found in \"hepnos\" section");
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
205
        // client entry has protocol entry
206 207 208 209
        auto protoNode = clientNode["protocol"];
        if(!protoNode) {
            throw Exception("\"protocol\" entry not found in \"client\" section");
        }
210 211 212 213
        // hepnos entry has databases entry
        auto databasesNode = hepnosNode["databases"];
        if(!databasesNode) {
            throw Exception("\"databasess\" entry not found in \"hepnos\" section");
214
        }
215 216
        if(!databasesNode.IsMap()) {
            throw Exception("\"databases\" entry should be a map");
217 218 219 220 221 222 223 224 225 226
        }
        // database entry has keys datasets, runs, subruns, events, and products.
        std::vector<std::string> fields = { "datasets", "runs", "subruns", "events", "products" };
        for(auto& f : fields) {
            auto fieldNode = databasesNode[f];
            if(!fieldNode) {
                throw Exception("\""+f+"\" entry not found in databases section");
            }
            if(!fieldNode.IsMap()) {
                throw Exception("\""+f+"\" entry should be a mapping from addresses to providers");
227
            }
228 229 230 231 232 233 234 235 236 237 238 239
            for(auto addresses_it = fieldNode.begin(); addresses_it != fieldNode.end(); addresses_it++) {
                auto providers = addresses_it->second;
                for(auto provider_it = providers.begin(); provider_it != providers.end(); provider_it++) {
                    // provider entry should be a sequence
                    if(!provider_it->second.IsSequence()) {
                        throw Exception("provider entry should be a sequence");
                    }
                    for(auto db : provider_it->second) {
                        if(!db.IsScalar()) {
                            throw Exception("database id should be a scalar");
                        }
                    }
240
                }
241 242 243 244
            }
        }
    }

245 246
    public:

Matthieu Dorier's avatar
Matthieu Dorier committed
247 248 249 250 251 252 253 254 255
    ///////////////////////////////////////////////////////////////////////////
    // Product access functions
    ///////////////////////////////////////////////////////////////////////////

    static inline ProductID buildProductID(const ItemDescriptor& id, const std::string& productName) {
        ProductID result;
        result.m_key.resize(sizeof(id)+productName.size());
        std::memcpy(const_cast<char*>(result.m_key.data()), &id, sizeof(id));
        std::memcpy(const_cast<char*>(result.m_key.data()+sizeof(id)), productName.data(), productName.size());
256 257 258
        return result;
    }

259
    long unsigned computeProductDbIndex(const ProductID& productID) const {
260
        // hash the name to get the provider id
Matthieu Dorier's avatar
Matthieu Dorier committed
261 262
        long unsigned db_idx = 0;
        uint64_t hash;
263 264
        // we are taking only the dataset+run+subrun part of the productID
        hash = hashString(productID.m_key.c_str(), SubRunDescriptorLength);
Matthieu Dorier's avatar
Matthieu Dorier committed
265
        ch_placement_find_closest(m_product_dbs.chi, hash, 1, &db_idx);
266 267 268 269 270 271 272 273 274
        return db_idx;
    }

    const sdskv::database& locateProductDb(const ProductID& productID) const {
        return m_product_dbs.dbs[computeProductDbIndex(productID)];
    }

    const sdskv::database& getProductDatabase(unsigned long index) const {
        return m_product_dbs.dbs[index];
275 276
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
277 278
    bool loadRawProduct(const ProductID& key,
                        std::string& data) const {
279
        // find out which DB to access
Matthieu Dorier's avatar
Matthieu Dorier committed
280
        auto& db =  locateProductDb(key);
281
        // read the value
282 283
        if(data.size() == 0)
            data.resize(2048); // eagerly allocate 2KB
284
        try {
Matthieu Dorier's avatar
Matthieu Dorier committed
285
            db.get(key.m_key, data);
286 287 288 289
        } catch(sdskv::exception& ex) {
            if(ex.error() == SDSKV_ERR_UNKNOWN_KEY)
                return false;
            else
290
                throw Exception("Error occured when calling sdskv::database::get (SDSKV error="+std::to_string(ex.error())+")");
291 292 293 294
        }
        return true;
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
295 296 297 298 299 300 301 302 303 304
    bool loadRawProduct(const ItemDescriptor& id,
                        const std::string& productName,
                        std::string& data) const {
        // build product id
        auto key = buildProductID(id, productName);
        return loadRawProduct(key, data);
    }

    bool loadRawProduct(const ProductID& key,
                        char* value, size_t* vsize) const {
305
        // find out which DB to access
Matthieu Dorier's avatar
Matthieu Dorier committed
306
        auto& db =  locateProductDb(key);
307
        try {
Matthieu Dorier's avatar
Matthieu Dorier committed
308
            db.get(key.m_key.data(), key.m_key.size(), value, vsize);
309 310 311 312 313 314 315 316 317 318 319
        } catch(sdskv::exception& ex) {
            if(ex.error() == SDSKV_ERR_UNKNOWN_KEY)
                return false;
            else if(ex.error() == SDSKV_ERR_SIZE)
                return false;
            else
                throw Exception("Error occured when calling sdskv::database::get (SDSKV error="+std::to_string(ex.error())+")");
        }
        return true;
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
320 321 322 323 324 325 326 327 328 329 330 331 332
    bool loadRawProduct(const ItemDescriptor& id,
                        const std::string& productName,
                        char* value, size_t* vsize) const {
        // build product id
        auto key = buildProductID(id, productName);
        return loadRawProduct(key, value, vsize);
    }

    ProductID storeRawProduct(const ItemDescriptor& id,
                              const std::string& productName,
                              const char* value, size_t vsize) const {
        // build product id
        auto key = buildProductID(id, productName);
333
        // find out which DB to access
Matthieu Dorier's avatar
Matthieu Dorier committed
334 335
        auto& db =  locateProductDb(key);
        // read the value
336
        try {
Matthieu Dorier's avatar
Matthieu Dorier committed
337
            db.put(key.m_key.data(), key.m_key.size(), value, vsize);
338
        } catch(sdskv::exception& ex) {
Matthieu Dorier's avatar
Matthieu Dorier committed
339 340 341 342 343
            if(ex.error() == SDSKV_ERR_KEYEXISTS) {
                return ProductID();
            } else {
                throw Exception("Error occured when calling sdskv::database::put (SDSKV error=" +std::to_string(ex.error()) + ")");
            }
344
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
345
        return key;
346 347
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
348 349 350 351 352
    ProductID storeRawProduct(const ItemDescriptor& id,
                              const std::string& productName,
                              const std::string& data) const {
        // build product id
        auto key = buildProductID(id, productName);
353
        // find out which DB to access
Matthieu Dorier's avatar
Matthieu Dorier committed
354
        auto& db = locateProductDb(key);
355
        // store the value
356
        try {
Matthieu Dorier's avatar
Matthieu Dorier committed
357
            db.put(key.m_key, data);
358
        } catch(sdskv::exception& ex) {
359 360 361
            if(ex.error() == SDSKV_ERR_KEYEXISTS) {
                return ProductID();
            } else {
362
                throw Exception("Error occured when calling sdskv::database::put (SDSKV error=" +std::to_string(ex.error()) + ")");
363
            }
364
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
365
        return key;
366
    }
367

368 369 370 371 372 373 374
    ///////////////////////////////////////////////////////////////////////////
    // DataSet access functions
    ///////////////////////////////////////////////////////////////////////////

    /**
     * Builds the database key of a particular DataSet.
     */
Matthieu Dorier's avatar
Matthieu Dorier committed
375
    static inline std::string buildDataSetKey(uint8_t level, const std::string& containerName, const std::string& objectName) {
376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393
        size_t c = 1 + objectName.size();
        if(!containerName.empty()) c += containerName.size() + 1;
        std::string result(c,'\0');
        result[0] = level;
        if(!containerName.empty()) {
            std::memcpy(&result[1], containerName.data(), containerName.size());
            size_t x = 1+containerName.size();
            result[x] = '/';
            std::memcpy(&result[x+1], objectName.data(), objectName.size());
        } else {
            std::memcpy(&result[1], objectName.data(), objectName.size());
        }
        return result;
    }

    /**
     * Locates and return the database in charge of the provided DataSet info.
     */
Matthieu Dorier's avatar
Matthieu Dorier committed
394
    const sdskv::database& locateDataSetDb(const std::string& containerName) const {
395 396 397
        // hash the name to get the provider id
        long unsigned db_idx = 0;
        uint64_t hash;
398
        hash = hashString(containerName.c_str(), containerName.size());
399 400 401 402 403 404 405 406 407
        ch_placement_find_closest(m_dataset_dbs.chi, hash, 1, &db_idx);
        return m_dataset_dbs.dbs[db_idx];
    }

    /**
     * @brief Fills the result vector with a sequence of up to
     * maxDataSets shared_ptr to DataSetImpl coming after the
     * current dataset. Returns the number of DataSets read.
     */
408
    size_t nextDataSets(const std::shared_ptr<DataSetImpl>& current,
409 410 411 412 413 414 415 416
            std::vector<std::shared_ptr<DataSetImpl>>& result,
            size_t maxDataSets) const {
        int ret;
        result.resize(0);
        auto& level = current->m_level;
        auto& containerName = *current->m_container;
        auto& currentName = current->m_name;
        if(current->m_level == 0) return 0; // cannot iterate at object level
Matthieu Dorier's avatar
Matthieu Dorier committed
417
        auto& db = locateDataSetDb(containerName);
418
        // make an entry for the lower bound
Matthieu Dorier's avatar
Matthieu Dorier committed
419
        auto lb_entry = buildDataSetKey(level, containerName, currentName);
420 421 422 423 424 425 426 427 428 429 430
        // ignore keys that don't have the same level or the same prefix
        std::string prefix(2+containerName.size(), '\0');
        prefix[0] = level;
        if(containerName.size() != 0) {
            std::memcpy(&prefix[1], containerName.data(), containerName.size());
            prefix[prefix.size()-1] = '/';
        } else {
            prefix.resize(1);
        }
        // issue an sdskv_list_keys
        std::vector<std::string> entries(maxDataSets, std::string(1024,'\0'));
Matthieu Dorier's avatar
Matthieu Dorier committed
431
        std::vector<UUID> uuids(maxDataSets);
432
        try {
Matthieu Dorier's avatar
Matthieu Dorier committed
433
            db.list_keyvals(lb_entry, prefix, entries, uuids);
434 435 436 437
        } catch(sdskv::exception& ex) {
            throw Exception("Error occured when calling sdskv::database::list_keys (SDSKV error="+std::string(ex.what()) + ")");
        }
        result.resize(0);
438
        unsigned j=0;
439 440 441 442 443 444 445 446 447
        for(const auto& entry : entries) {
            size_t i = entry.find_last_of('/');
            if(i == std::string::npos) i = 1;
            else i += 1;
            result.push_back(
                    std::make_shared<DataSetImpl>(
                        current->m_datastore,
                        level,
                        current->m_container,
Matthieu Dorier's avatar
Matthieu Dorier committed
448
                        entry.substr(i),
449
                        uuids[j]
450 451
                    )
                );
452
            j += 1;
453 454 455 456 457 458 459 460 461 462
        }
        return result.size();
    }

    /**
     * @brief Checks if a particular dataset exists.
     */
    bool dataSetExists(uint8_t level, const std::string& containerName, const std::string& objectName) const {
        int ret;
        // build key
Matthieu Dorier's avatar
Matthieu Dorier committed
463
        auto key = buildDataSetKey(level, containerName, objectName);
464
        // find out which DB to access
Matthieu Dorier's avatar
Matthieu Dorier committed
465
        auto& db = locateDataSetDb(containerName);
466
        try {
467 468
            bool b = db.exists(key);
            return b;
469 470 471 472 473
        } catch(sdskv::exception& ex) {
            throw Exception("Error occured when calling sdskv::database::exists (SDSKV error="+std::to_string(ex.error())+")");
        }
        return false;
    }
474

Matthieu Dorier's avatar
Matthieu Dorier committed
475 476 477 478 479 480
    /*
     * @brief Loads a dataset.
     */
    bool loadDataSet(uint8_t level, const std::string& containerName, const std::string& objectName, UUID& uuid) const {
        int ret;
        // build key
Matthieu Dorier's avatar
Matthieu Dorier committed
481
        auto key = buildDataSetKey(level, containerName, objectName);
Matthieu Dorier's avatar
Matthieu Dorier committed
482
        // find out which DB to access
Matthieu Dorier's avatar
Matthieu Dorier committed
483
        auto& db = locateDataSetDb(containerName);
Matthieu Dorier's avatar
Matthieu Dorier committed
484 485
        try {
            size_t s = sizeof(uuid);
486
            db.get(static_cast<const void*>(key.data()),
Matthieu Dorier's avatar
Matthieu Dorier committed
487 488 489 490 491 492 493 494 495 496 497 498
                   key.size(),
                   static_cast<void*>(uuid.data),
                   &s);
            return s == sizeof(uuid);
        } catch(sdskv::exception& ex) {
            if(ex.error() == SDSKV_ERR_UNKNOWN_KEY) {
                return false;
            }
            throw Exception("Error occured when calling sdskv::database::get (SDSKV error="+std::to_string(ex.error())+")");
        }
        return false;
    }
499 500 501 502

    /**
     * Creates a DataSet
     */
Matthieu Dorier's avatar
Matthieu Dorier committed
503
    bool createDataSet(uint8_t level, const std::string& containerName, const std::string& objectName, UUID& uuid) {
504
        // build full name
Matthieu Dorier's avatar
Matthieu Dorier committed
505
        auto key = buildDataSetKey(level, containerName, objectName);
506
        // find out which DB to access
Matthieu Dorier's avatar
Matthieu Dorier committed
507
        auto& db = locateDataSetDb(containerName);
Matthieu Dorier's avatar
Matthieu Dorier committed
508
        uuid.randomize();
509
        try {
Matthieu Dorier's avatar
Matthieu Dorier committed
510
            db.put(key.data(), key.size(), uuid.data, sizeof(uuid));
511
        } catch(sdskv::exception& ex) {
Matthieu Dorier's avatar
Matthieu Dorier committed
512 513 514 515 516 517 518 519 520 521 522 523 524
            if(ex.error() == SDSKV_ERR_KEYEXISTS) {
                return false;
            } else {
                throw Exception("Error occured when calling sdskv::database::put (SDSKV error=" +std::to_string(ex.error()) + ")");
            }
        }
        return true;
    }

    ///////////////////////////////////////////////////////////////////////////
    // Access functions for numbered items (Runs, SubRuns, and Events)
    ///////////////////////////////////////////////////////////////////////////

525
    const sdskv::database& locateItemDb(const ItemType& type, const ItemDescriptor& id, int target=-1) const {
Matthieu Dorier's avatar
Matthieu Dorier committed
526
        long unsigned db_idx = 0;
527 528 529 530 531
        if(target >= 0) {
            if(type == ItemType::RUN)    return m_run_dbs.dbs[target];
            if(type == ItemType::SUBRUN) return m_subrun_dbs.dbs[target];
            if(type == ItemType::EVENT)  return m_event_dbs.dbs[target];
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
532 533 534
        uint64_t hash;
        size_t prime = 1099511628211ULL;
        hash = id.dataset.hash();
Matthieu Dorier's avatar
Matthieu Dorier committed
535
        if(type == ItemType::RUN) { // we are locating a Run
Matthieu Dorier's avatar
Matthieu Dorier committed
536 537
            ch_placement_find_closest(m_run_dbs.chi, hash, 1, &db_idx);
            return m_run_dbs.dbs[db_idx];
Matthieu Dorier's avatar
Matthieu Dorier committed
538
        } else if(type == ItemType::SUBRUN) { // we are locating a SubRun
Matthieu Dorier's avatar
Matthieu Dorier committed
539 540 541 542 543 544 545 546 547 548 549 550 551 552
            hash *= prime;
            hash = hash ^ id.run;
            ch_placement_find_closest(m_subrun_dbs.chi, hash, 1, &db_idx);
            return m_subrun_dbs.dbs[db_idx];
        } else { // we are locating an Event
            hash *= prime;
            hash = hash ^ id.subrun;
            ch_placement_find_closest(m_event_dbs.chi, hash, 1, &db_idx);
            return m_event_dbs.dbs[db_idx];
        }
    }

    /**
     * @brief Fills the result vector with a sequence of up to
553
     * maxItems descriptors coming after provided current descriptor.
Matthieu Dorier's avatar
Matthieu Dorier committed
554
     */
555
    size_t nextItemDescriptors(
556 557
            const ItemType& item_type,
            const ItemType& prefix_type,
558 559
            const ItemDescriptor& current,
            std::vector<ItemDescriptor>& descriptors,
560 561
            size_t maxItems,
            int target=-1) const {
Matthieu Dorier's avatar
Matthieu Dorier committed
562
        int ret;
563
        const ItemDescriptor& start_key   = current;
564
        auto& db = locateItemDb(item_type, start_key, target);
Matthieu Dorier's avatar
Matthieu Dorier committed
565 566
        // ignore keys that don't have the same uuid
        // issue an sdskv_list_keys
567
        descriptors.resize(maxItems);
Matthieu Dorier's avatar
Matthieu Dorier committed
568 569 570 571 572
        std::vector<void*> keys_addr(maxItems);
        std::vector<hg_size_t> keys_sizes(maxItems, sizeof(ItemDescriptor));
        for(auto i=0; i < maxItems; i++) {
            keys_addr[i] = static_cast<void*>(&descriptors[i]);
        }
573
        size_t numItems = maxItems;
Matthieu Dorier's avatar
Matthieu Dorier committed
574 575
        try {
            db.list_keys(&start_key, sizeof(start_key),
576
                         &start_key, ItemImpl::descriptorSize(prefix_type),
577
                         keys_addr.data(), keys_sizes.data(), &numItems);
Matthieu Dorier's avatar
Matthieu Dorier committed
578 579 580
        } catch(sdskv::exception& ex) {
            throw Exception("Error occured when calling sdskv::database::list_keys (SDSKV error="+std::string(ex.what()) + ")");
        }
581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603
        descriptors.resize(numItems);
        return numItems;
    }

    /**
     * @brief Fills the result vector with a sequence of up to
     * maxRuns shared_ptr to RunImpl coming after the
     * current run. Returns the number of Runs read.
     */
    size_t nextItems(
            const ItemType& item_type,
            const ItemType& prefix_type,
            const std::shared_ptr<ItemImpl>& current,
            std::vector<std::shared_ptr<ItemImpl>>& result,
            size_t maxItems,
            int target=-1) const {
        const ItemDescriptor& start_key = current->m_descriptor;
        std::vector<ItemDescriptor> descriptors;
        size_t numDescriptors = nextItemDescriptors(item_type, prefix_type,
                start_key, descriptors, maxItems, target);
        descriptors.resize(numDescriptors);
        result.resize(0);
        result.reserve(numDescriptors);
Matthieu Dorier's avatar
Matthieu Dorier committed
604 605 606 607 608 609 610 611 612
        for(const auto& key : descriptors) {
            result.push_back(std::make_shared<ItemImpl>(current->m_datastore, key));
        }
        return result.size();
    }

    /**
     * @brief Checks if a particular Run/SubRun/Event exists.
     */
613
    bool itemExists(const ItemDescriptor& descriptor,
614
                    int target = -1) const {
Matthieu Dorier's avatar
Matthieu Dorier committed
615
        ItemType type = ItemType::RUN;
616
        if(descriptor.subrun != InvalidSubRunNumber) {
Matthieu Dorier's avatar
Matthieu Dorier committed
617
            type = ItemType::SUBRUN;
618
            if(descriptor.event != InvalidEventNumber)
Matthieu Dorier's avatar
Matthieu Dorier committed
619 620
                type = ItemType::EVENT;
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
621
        // find out which DB to access
622
        auto& db = locateItemDb(type, descriptor, target);
Matthieu Dorier's avatar
Matthieu Dorier committed
623
        try {
624
            bool b = db.exists(&descriptor, sizeof(descriptor));
Matthieu Dorier's avatar
Matthieu Dorier committed
625 626 627 628 629 630
            return b;
        } catch(sdskv::exception& ex) {
            throw Exception("Error occured when calling sdskv::database::exists (SDSKV error="+std::to_string(ex.error())+")");
        }
        return false;
    }
631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647

    /**
     * @brief Checks if a particular Run/SubRun/Event exists.
     */
    bool itemExists(const UUID& containerUUID,
                    const RunNumber& run_number,
                    const SubRunNumber& subrun_number = InvalidSubRunNumber,
                    const EventNumber& event_number = InvalidEventNumber,
                    int target = -1) const {
        // build the key
        ItemDescriptor k;
        k.dataset = containerUUID;
        k.run     = run_number;
        k.subrun  = subrun_number;
        k.event   = event_number;
        return itemExists(k, target);
    }
648

Matthieu Dorier's avatar
Matthieu Dorier committed
649 650 651 652 653 654 655 656 657 658 659 660 661
    /**
     * Creates a Run, SubRun, or Event
     */
    bool createItem(const UUID& containerUUID,
                    const RunNumber& run_number,
                    const SubRunNumber& subrun_number = InvalidSubRunNumber,
                    const EventNumber& event_number = InvalidEventNumber) {
        // build the key
        ItemDescriptor k;
        k.dataset = containerUUID;
        k.run     = run_number;
        k.subrun  = subrun_number;
        k.event   = event_number;
Matthieu Dorier's avatar
Matthieu Dorier committed
662 663 664 665 666 667
        ItemType type = ItemType::RUN;
        if(subrun_number != InvalidSubRunNumber) {
            type = ItemType::SUBRUN;
            if(event_number != InvalidEventNumber)
                type = ItemType::EVENT;
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
668
        // find out which DB to access
Matthieu Dorier's avatar
Matthieu Dorier committed
669
        auto& db = locateItemDb(type, k);
Matthieu Dorier's avatar
Matthieu Dorier committed
670 671 672 673
        try {
            db.put(&k, sizeof(k), nullptr, 0);
        } catch(sdskv::exception& ex) {
            if(ex.error() == SDSKV_ERR_KEYEXISTS) {
674 675 676 677 678 679 680
                return false;
            } else {
                throw Exception("Error occured when calling sdskv::database::put (SDSKV error=" +std::to_string(ex.error()) + ")");
            }
        }
        return true;
    }
Matthieu Dorier's avatar
Matthieu Dorier committed
681

682 683 684
};

}
685 686

#endif