DataStoreImpl.hpp 24.8 KB
Newer Older
Matthieu Dorier's avatar
Matthieu Dorier committed
1 2 3 4 5
/*
 * (C) 2018 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */
6 7 8
#ifndef __HEPNOS_PRIVATE_DATASTORE_IMPL
#define __HEPNOS_PRIVATE_DATASTORE_IMPL

9
#include <vector>
10
#include <unordered_set>
Matthieu Dorier's avatar
Matthieu Dorier committed
11
#include <unordered_map>
12 13 14
#include <functional>
#include <iostream>
#include <yaml-cpp/yaml.h>
15
#include <sdskv-client.hpp>
16 17 18 19
#include <ch-placement.h>
#include "hepnos/Exception.hpp"
#include "hepnos/DataStore.hpp"
#include "hepnos/DataSet.hpp"
Matthieu Dorier's avatar
Matthieu Dorier committed
20
#include "StringHash.hpp"
21
#include "DataSetImpl.hpp"
Matthieu Dorier's avatar
Matthieu Dorier committed
22
#include "ItemImpl.hpp"
23 24 25

namespace hepnos {

Matthieu Dorier's avatar
Matthieu Dorier committed
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
inline void object_resize(hepnos::UUID& uuid, size_t new_size) {
    (void)uuid;
    (void)new_size;
}

inline void* object_data(hepnos::UUID& uuid) {
    return uuid.data;
}

inline const void* object_data(const hepnos::UUID& uuid) {
    return uuid.data;
}

inline size_t object_size(const hepnos::UUID& uuid) {
    return sizeof(uuid);
}

43
////////////////////////////////////////////////////////////////////////////////////////////
44
// DataStoreImpl implementation
45 46
////////////////////////////////////////////////////////////////////////////////////////////

47 48 49 50 51
struct DistributedDBInfo {
    std::vector<sdskv::database>  dbs;
    struct ch_placement_instance* chi = nullptr;
};

52
class DataStoreImpl {
53 54
    public:

Matthieu Dorier's avatar
Matthieu Dorier committed
55 56
    margo_instance_id                         m_mid;          // Margo instance
    std::unordered_map<std::string,hg_addr_t> m_addrs;        // Addresses used by the service
57
    sdskv::client                             m_sdskv_client; // SDSKV client
58
    DistributedDBInfo                         m_dataset_dbs;  // list of SDSKV databases for DataSets
Matthieu Dorier's avatar
Matthieu Dorier committed
59 60 61 62
    DistributedDBInfo                         m_run_dbs;      // list of SDSKV databases for Runs
    DistributedDBInfo                         m_subrun_dbs;   // list of SDSKV databases for Runs
    DistributedDBInfo                         m_event_dbs;    // list of SDSKV databases for Runs
    DistributedDBInfo                         m_product_dbs;  // list of SDSKV databases for Products
63

64
    DataStoreImpl()
65
    : m_mid(MARGO_INSTANCE_NULL)
66
    {}
67

68 69 70 71
    ~DataStoreImpl() {
        cleanup();
    }

72
    void populateDatabases(DistributedDBInfo& db_info, const YAML::Node& db_config) {
73 74
        int ret;
        hg_return_t hret;
75
        for(YAML::const_iterator address_it = db_config.begin(); address_it != db_config.end(); address_it++) {
76 77 78
            std::string str_addr = address_it->first.as<std::string>();
            YAML::Node providers = address_it->second;
            // lookup the address
79
            hg_addr_t addr;
Matthieu Dorier's avatar
Matthieu Dorier committed
80 81 82 83 84 85 86
            if(m_addrs.count(str_addr) != 0) {
                addr = m_addrs[str_addr];
            } else {
                hret = margo_addr_lookup(m_mid, str_addr.c_str(), &addr);
                if(hret != HG_SUCCESS) {
                    margo_addr_free(m_mid,addr);
                    cleanup();
87
                    throw Exception("margo_addr_lookup failed (MARGO error="+std::to_string(hret)+")");
Matthieu Dorier's avatar
Matthieu Dorier committed
88 89
                }
                m_addrs[str_addr] = addr;
90
            }
91 92 93 94 95 96 97 98 99 100
            // iterate over providers for this address
            for(YAML::const_iterator provider_it = providers.begin(); provider_it != providers.end(); provider_it++) {
                // get the provider id
                uint16_t provider_id = provider_it->first.as<uint16_t>();
                // create provider handle
                sdskv::provider_handle ph(m_sdskv_client, addr, provider_id);
                // get the database ids
                YAML::Node databases = provider_it->second;
                // iterate over databases for this provider
                for(unsigned i=0; i < databases.size(); i++) {
101
                    db_info.dbs.push_back(sdskv::database(ph, databases[i].as<uint64_t>()));
102
                }
103 104
            } // for each provider
        } // for each address
105 106 107 108
        // initialize ch-placement
        db_info.chi = ch_placement_initialize("hash_lookup3", db_info.dbs.size(), 4, 0);
    }

109
    void init(const std::string& configFile, bool use_progress_thread) {
110 111 112 113 114 115
        int ret;
        hg_return_t hret;
        YAML::Node config = YAML::LoadFile(configFile);
        checkConfig(config);
        // get protocol
        std::string proto = config["hepnos"]["client"]["protocol"].as<std::string>();
116 117
        // get busy spin
        bool busySpin = config["hepnos"]["client"]["busy-spin"].as<bool>();
118
        // initialize Margo
119 120 121 122 123
        hg_init_info hg_opt;
        memset(&hg_opt, 0, sizeof(hg_opt));
        if(busySpin)
            hg_opt.na_init_info.progress_mode = NA_NO_BLOCK;
        m_mid = margo_init_opt(proto.c_str(), MARGO_CLIENT_MODE, &hg_opt, use_progress_thread, 0);
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
        if(!m_mid) {
            cleanup();
            throw Exception("Could not initialized Margo");
        }
        // initialize SDSKV client
        try {
            m_sdskv_client = sdskv::client(m_mid);
        } catch(sdskv::exception& ex) {
            cleanup();
            throw Exception("Could not create SDSKV client (SDSKV error="+std::to_string(ex.error())+")");
        }
        // populate database info structures for each type of database
        YAML::Node databases = config["hepnos"]["databases"];
        YAML::Node dataset_db = databases["datasets"];
        YAML::Node run_db     = databases["runs"];
        YAML::Node subrun_db  = databases["subruns"];
        YAML::Node event_db   = databases["events"];
        YAML::Node product_db = databases["products"];
        populateDatabases(m_dataset_dbs, dataset_db);
Matthieu Dorier's avatar
Matthieu Dorier committed
143 144 145 146
        populateDatabases(m_run_dbs, run_db);
        populateDatabases(m_subrun_dbs, subrun_db);
        populateDatabases(m_event_dbs, event_db);
        populateDatabases(m_product_dbs, product_db);
147 148 149
    }

    void cleanup() {
Matthieu Dorier's avatar
Matthieu Dorier committed
150 151 152 153 154
        m_dataset_dbs.dbs.clear();
        m_run_dbs.dbs.clear();
        m_subrun_dbs.dbs.clear();
        m_event_dbs.dbs.clear();
        m_product_dbs.dbs.clear();
155
        m_sdskv_client = sdskv::client();
Matthieu Dorier's avatar
Matthieu Dorier committed
156 157 158 159 160
        if(m_dataset_dbs.chi) ch_placement_finalize(m_dataset_dbs.chi);
        if(m_run_dbs.chi)     ch_placement_finalize(m_run_dbs.chi);
        if(m_subrun_dbs.chi)  ch_placement_finalize(m_subrun_dbs.chi);
        if(m_event_dbs.chi)   ch_placement_finalize(m_event_dbs.chi);
        if(m_product_dbs.chi)   ch_placement_finalize(m_product_dbs.chi);
Matthieu Dorier's avatar
Matthieu Dorier committed
161 162 163
        for(auto& addr : m_addrs) {
            margo_addr_free(m_mid, addr.second);
        }
164 165 166
        if(m_mid) margo_finalize(m_mid);
    }

167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
    size_t numTargets(const ItemType& type) const {
        switch(type) {
            case ItemType::DATASET:
                return m_dataset_dbs.dbs.size();
            case ItemType::RUN:
                return m_run_dbs.dbs.size();
            case ItemType::SUBRUN:
                return m_subrun_dbs.dbs.size();
            case ItemType::EVENT:
                return m_event_dbs.dbs.size();
            case ItemType::PRODUCT:
                return m_product_dbs.dbs.size();
        }
        return 0;
    }

183 184 185
    private:

    static void checkConfig(YAML::Node& config) {
Matthieu Dorier's avatar
Matthieu Dorier committed
186
        // config file starts with hepnos entry
187 188 189 190
        auto hepnosNode = config["hepnos"];
        if(!hepnosNode) {
            throw Exception("\"hepnos\" entry not found in YAML file");
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
191
        // hepnos entry has client entry
192 193 194 195
        auto clientNode = hepnosNode["client"];
        if(!clientNode) {
            throw Exception("\"client\" entry not found in \"hepnos\" section");
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
196
        // client entry has protocol entry
197 198 199 200
        auto protoNode = clientNode["protocol"];
        if(!protoNode) {
            throw Exception("\"protocol\" entry not found in \"client\" section");
        }
201 202 203 204
        // hepnos entry has databases entry
        auto databasesNode = hepnosNode["databases"];
        if(!databasesNode) {
            throw Exception("\"databasess\" entry not found in \"hepnos\" section");
205
        }
206 207
        if(!databasesNode.IsMap()) {
            throw Exception("\"databases\" entry should be a map");
208 209 210 211 212 213 214 215 216 217
        }
        // database entry has keys datasets, runs, subruns, events, and products.
        std::vector<std::string> fields = { "datasets", "runs", "subruns", "events", "products" };
        for(auto& f : fields) {
            auto fieldNode = databasesNode[f];
            if(!fieldNode) {
                throw Exception("\""+f+"\" entry not found in databases section");
            }
            if(!fieldNode.IsMap()) {
                throw Exception("\""+f+"\" entry should be a mapping from addresses to providers");
218
            }
219 220 221 222 223 224 225 226 227 228 229 230
            for(auto addresses_it = fieldNode.begin(); addresses_it != fieldNode.end(); addresses_it++) {
                auto providers = addresses_it->second;
                for(auto provider_it = providers.begin(); provider_it != providers.end(); provider_it++) {
                    // provider entry should be a sequence
                    if(!provider_it->second.IsSequence()) {
                        throw Exception("provider entry should be a sequence");
                    }
                    for(auto db : provider_it->second) {
                        if(!db.IsScalar()) {
                            throw Exception("database id should be a scalar");
                        }
                    }
231
                }
232 233 234 235
            }
        }
    }

236 237
    public:

Matthieu Dorier's avatar
Matthieu Dorier committed
238 239 240 241 242 243 244 245 246
    ///////////////////////////////////////////////////////////////////////////
    // Product access functions
    ///////////////////////////////////////////////////////////////////////////

    static inline ProductID buildProductID(const ItemDescriptor& id, const std::string& productName) {
        ProductID result;
        result.m_key.resize(sizeof(id)+productName.size());
        std::memcpy(const_cast<char*>(result.m_key.data()), &id, sizeof(id));
        std::memcpy(const_cast<char*>(result.m_key.data()+sizeof(id)), productName.data(), productName.size());
247 248 249
        return result;
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
250
    const sdskv::database& locateProductDb(const ProductID& productID) const {
251
        // hash the name to get the provider id
Matthieu Dorier's avatar
Matthieu Dorier committed
252 253 254 255 256
        long unsigned db_idx = 0;
        uint64_t hash;
        hash = hashString(productID.m_key);
        ch_placement_find_closest(m_product_dbs.chi, hash, 1, &db_idx);
        return m_product_dbs.dbs[db_idx];
257 258
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
259 260
    bool loadRawProduct(const ProductID& key,
                        std::string& data) const {
261
        // find out which DB to access
Matthieu Dorier's avatar
Matthieu Dorier committed
262
        auto& db =  locateProductDb(key);
263
        // read the value
264 265
        if(data.size() == 0)
            data.resize(2048); // eagerly allocate 2KB
266
        try {
Matthieu Dorier's avatar
Matthieu Dorier committed
267
            db.get(key.m_key, data);
268 269 270 271
        } catch(sdskv::exception& ex) {
            if(ex.error() == SDSKV_ERR_UNKNOWN_KEY)
                return false;
            else
272
                throw Exception("Error occured when calling sdskv::database::get (SDSKV error="+std::to_string(ex.error())+")");
273 274 275 276
        }
        return true;
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
277 278 279 280 281 282 283 284 285 286
    bool loadRawProduct(const ItemDescriptor& id,
                        const std::string& productName,
                        std::string& data) const {
        // build product id
        auto key = buildProductID(id, productName);
        return loadRawProduct(key, data);
    }

    bool loadRawProduct(const ProductID& key,
                        char* value, size_t* vsize) const {
287
        // find out which DB to access
Matthieu Dorier's avatar
Matthieu Dorier committed
288
        auto& db =  locateProductDb(key);
289
        try {
Matthieu Dorier's avatar
Matthieu Dorier committed
290
            db.get(key.m_key.data(), key.m_key.size(), value, vsize);
291 292 293 294 295 296 297 298 299 300 301
        } catch(sdskv::exception& ex) {
            if(ex.error() == SDSKV_ERR_UNKNOWN_KEY)
                return false;
            else if(ex.error() == SDSKV_ERR_SIZE)
                return false;
            else
                throw Exception("Error occured when calling sdskv::database::get (SDSKV error="+std::to_string(ex.error())+")");
        }
        return true;
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
302 303 304 305 306 307 308 309 310 311 312 313 314
    bool loadRawProduct(const ItemDescriptor& id,
                        const std::string& productName,
                        char* value, size_t* vsize) const {
        // build product id
        auto key = buildProductID(id, productName);
        return loadRawProduct(key, value, vsize);
    }

    ProductID storeRawProduct(const ItemDescriptor& id,
                              const std::string& productName,
                              const char* value, size_t vsize) const {
        // build product id
        auto key = buildProductID(id, productName);
315
        // find out which DB to access
Matthieu Dorier's avatar
Matthieu Dorier committed
316 317
        auto& db =  locateProductDb(key);
        // read the value
318
        try {
Matthieu Dorier's avatar
Matthieu Dorier committed
319
            db.put(key.m_key.data(), key.m_key.size(), value, vsize);
320
        } catch(sdskv::exception& ex) {
Matthieu Dorier's avatar
Matthieu Dorier committed
321 322 323 324 325
            if(ex.error() == SDSKV_ERR_KEYEXISTS) {
                return ProductID();
            } else {
                throw Exception("Error occured when calling sdskv::database::put (SDSKV error=" +std::to_string(ex.error()) + ")");
            }
326
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
327
        return key;
328 329
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
330 331 332 333 334
    ProductID storeRawProduct(const ItemDescriptor& id,
                              const std::string& productName,
                              const std::string& data) const {
        // build product id
        auto key = buildProductID(id, productName);
335
        // find out which DB to access
Matthieu Dorier's avatar
Matthieu Dorier committed
336 337
        auto& db = locateProductDb(key);
        // read the value
338
        try {
Matthieu Dorier's avatar
Matthieu Dorier committed
339
            db.put(key.m_key, data);
340
        } catch(sdskv::exception& ex) {
341 342 343
            if(ex.error() == SDSKV_ERR_KEYEXISTS) {
                return ProductID();
            } else {
344
                throw Exception("Error occured when calling sdskv::database::put (SDSKV error=" +std::to_string(ex.error()) + ")");
345
            }
346
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
347
        return key;
348
    }
349

350 351 352 353 354 355 356
    ///////////////////////////////////////////////////////////////////////////
    // DataSet access functions
    ///////////////////////////////////////////////////////////////////////////

    /**
     * Builds the database key of a particular DataSet.
     */
Matthieu Dorier's avatar
Matthieu Dorier committed
357
    static inline std::string buildDataSetKey(uint8_t level, const std::string& containerName, const std::string& objectName) {
358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375
        size_t c = 1 + objectName.size();
        if(!containerName.empty()) c += containerName.size() + 1;
        std::string result(c,'\0');
        result[0] = level;
        if(!containerName.empty()) {
            std::memcpy(&result[1], containerName.data(), containerName.size());
            size_t x = 1+containerName.size();
            result[x] = '/';
            std::memcpy(&result[x+1], objectName.data(), objectName.size());
        } else {
            std::memcpy(&result[1], objectName.data(), objectName.size());
        }
        return result;
    }

    /**
     * Locates and return the database in charge of the provided DataSet info.
     */
Matthieu Dorier's avatar
Matthieu Dorier committed
376
    const sdskv::database& locateDataSetDb(const std::string& containerName) const {
377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398
        // hash the name to get the provider id
        long unsigned db_idx = 0;
        uint64_t hash;
        hash = hashString(containerName);
        ch_placement_find_closest(m_dataset_dbs.chi, hash, 1, &db_idx);
        return m_dataset_dbs.dbs[db_idx];
    }

    /**
     * @brief Fills the result vector with a sequence of up to
     * maxDataSets shared_ptr to DataSetImpl coming after the
     * current dataset. Returns the number of DataSets read.
     */
    size_t nextDataSets(const std::shared_ptr<DataSetImpl>& current, 
            std::vector<std::shared_ptr<DataSetImpl>>& result,
            size_t maxDataSets) const {
        int ret;
        result.resize(0);
        auto& level = current->m_level;
        auto& containerName = *current->m_container;
        auto& currentName = current->m_name;
        if(current->m_level == 0) return 0; // cannot iterate at object level
Matthieu Dorier's avatar
Matthieu Dorier committed
399
        auto& db = locateDataSetDb(containerName);
400
        // make an entry for the lower bound
Matthieu Dorier's avatar
Matthieu Dorier committed
401
        auto lb_entry = buildDataSetKey(level, containerName, currentName);
402 403 404 405 406 407 408 409 410 411 412
        // ignore keys that don't have the same level or the same prefix
        std::string prefix(2+containerName.size(), '\0');
        prefix[0] = level;
        if(containerName.size() != 0) {
            std::memcpy(&prefix[1], containerName.data(), containerName.size());
            prefix[prefix.size()-1] = '/';
        } else {
            prefix.resize(1);
        }
        // issue an sdskv_list_keys
        std::vector<std::string> entries(maxDataSets, std::string(1024,'\0'));
Matthieu Dorier's avatar
Matthieu Dorier committed
413
        std::vector<UUID> uuids(maxDataSets);
414
        try {
Matthieu Dorier's avatar
Matthieu Dorier committed
415
            db.list_keyvals(lb_entry, prefix, entries, uuids);
416 417 418 419
        } catch(sdskv::exception& ex) {
            throw Exception("Error occured when calling sdskv::database::list_keys (SDSKV error="+std::string(ex.what()) + ")");
        }
        result.resize(0);
420
        unsigned j=0;
421 422 423 424 425 426 427 428 429
        for(const auto& entry : entries) {
            size_t i = entry.find_last_of('/');
            if(i == std::string::npos) i = 1;
            else i += 1;
            result.push_back(
                    std::make_shared<DataSetImpl>(
                        current->m_datastore,
                        level,
                        current->m_container,
Matthieu Dorier's avatar
Matthieu Dorier committed
430
                        entry.substr(i),
431
                        uuids[j]
432 433
                    )
                );
434
            j += 1;
435 436 437 438 439 440 441 442 443 444
        }
        return result.size();
    }

    /**
     * @brief Checks if a particular dataset exists.
     */
    bool dataSetExists(uint8_t level, const std::string& containerName, const std::string& objectName) const {
        int ret;
        // build key
Matthieu Dorier's avatar
Matthieu Dorier committed
445
        auto key = buildDataSetKey(level, containerName, objectName);
446
        // find out which DB to access
Matthieu Dorier's avatar
Matthieu Dorier committed
447
        auto& db = locateDataSetDb(containerName);
448
        try {
449 450
            bool b = db.exists(key);
            return b;
451 452 453 454 455
        } catch(sdskv::exception& ex) {
            throw Exception("Error occured when calling sdskv::database::exists (SDSKV error="+std::to_string(ex.error())+")");
        }
        return false;
    }
Matthieu Dorier's avatar
Matthieu Dorier committed
456 457 458 459 460 461 462
    
    /*
     * @brief Loads a dataset.
     */
    bool loadDataSet(uint8_t level, const std::string& containerName, const std::string& objectName, UUID& uuid) const {
        int ret;
        // build key
Matthieu Dorier's avatar
Matthieu Dorier committed
463
        auto key = buildDataSetKey(level, containerName, objectName);
Matthieu Dorier's avatar
Matthieu Dorier committed
464
        // find out which DB to access
Matthieu Dorier's avatar
Matthieu Dorier committed
465
        auto& db = locateDataSetDb(containerName);
Matthieu Dorier's avatar
Matthieu Dorier committed
466 467 468 469 470 471 472 473 474 475 476 477 478 479 480
        try {
            size_t s = sizeof(uuid);
            db.get(static_cast<const void*>(key.data()), 
                   key.size(),
                   static_cast<void*>(uuid.data),
                   &s);
            return s == sizeof(uuid);
        } catch(sdskv::exception& ex) {
            if(ex.error() == SDSKV_ERR_UNKNOWN_KEY) {
                return false;
            }
            throw Exception("Error occured when calling sdskv::database::get (SDSKV error="+std::to_string(ex.error())+")");
        }
        return false;
    }
481 482 483 484

    /**
     * Creates a DataSet
     */
Matthieu Dorier's avatar
Matthieu Dorier committed
485
    bool createDataSet(uint8_t level, const std::string& containerName, const std::string& objectName, UUID& uuid) {
486
        // build full name
Matthieu Dorier's avatar
Matthieu Dorier committed
487
        auto key = buildDataSetKey(level, containerName, objectName);
488
        // find out which DB to access
Matthieu Dorier's avatar
Matthieu Dorier committed
489
        auto& db = locateDataSetDb(containerName);
Matthieu Dorier's avatar
Matthieu Dorier committed
490
        uuid.randomize();
491
        try {
Matthieu Dorier's avatar
Matthieu Dorier committed
492
            db.put(key.data(), key.size(), uuid.data, sizeof(uuid));
493
        } catch(sdskv::exception& ex) {
Matthieu Dorier's avatar
Matthieu Dorier committed
494 495 496 497 498 499 500 501 502 503 504 505 506
            if(ex.error() == SDSKV_ERR_KEYEXISTS) {
                return false;
            } else {
                throw Exception("Error occured when calling sdskv::database::put (SDSKV error=" +std::to_string(ex.error()) + ")");
            }
        }
        return true;
    }

    ///////////////////////////////////////////////////////////////////////////
    // Access functions for numbered items (Runs, SubRuns, and Events)
    ///////////////////////////////////////////////////////////////////////////

507
    const sdskv::database& locateItemDb(const ItemType& type, const ItemDescriptor& id, int target=-1) const {
Matthieu Dorier's avatar
Matthieu Dorier committed
508
        long unsigned db_idx = 0;
509 510 511 512 513
        if(target >= 0) {
            if(type == ItemType::RUN)    return m_run_dbs.dbs[target];
            if(type == ItemType::SUBRUN) return m_subrun_dbs.dbs[target];
            if(type == ItemType::EVENT)  return m_event_dbs.dbs[target];
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
514 515 516
        uint64_t hash;
        size_t prime = 1099511628211ULL;
        hash = id.dataset.hash();
Matthieu Dorier's avatar
Matthieu Dorier committed
517
        if(type == ItemType::RUN) { // we are locating a Run
Matthieu Dorier's avatar
Matthieu Dorier committed
518 519
            ch_placement_find_closest(m_run_dbs.chi, hash, 1, &db_idx);
            return m_run_dbs.dbs[db_idx];
Matthieu Dorier's avatar
Matthieu Dorier committed
520
        } else if(type == ItemType::SUBRUN) { // we are locating a SubRun
Matthieu Dorier's avatar
Matthieu Dorier committed
521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537
            hash *= prime;
            hash = hash ^ id.run;
            ch_placement_find_closest(m_subrun_dbs.chi, hash, 1, &db_idx);
            return m_subrun_dbs.dbs[db_idx];
        } else { // we are locating an Event
            hash *= prime;
            hash = hash ^ id.subrun;
            ch_placement_find_closest(m_event_dbs.chi, hash, 1, &db_idx);
            return m_event_dbs.dbs[db_idx];
        }
    }

    /**
     * @brief Fills the result vector with a sequence of up to
     * maxRuns shared_ptr to RunImpl coming after the
     * current run. Returns the number of Runs read.
     */
Matthieu Dorier's avatar
Matthieu Dorier committed
538
    size_t nextItems(
539 540
            const ItemType& item_type,
            const ItemType& prefix_type,
Matthieu Dorier's avatar
Matthieu Dorier committed
541
            const std::shared_ptr<ItemImpl>& current, 
Matthieu Dorier's avatar
Matthieu Dorier committed
542
            std::vector<std::shared_ptr<ItemImpl>>& result,
543 544
            size_t maxItems,
            int target=-1) const {
Matthieu Dorier's avatar
Matthieu Dorier committed
545 546 547
        int ret;
        result.resize(0);
        const ItemDescriptor& start_key   = current->m_descriptor;
548
        auto& db = locateItemDb(item_type, start_key, target);
Matthieu Dorier's avatar
Matthieu Dorier committed
549 550 551 552 553 554 555 556 557 558 559
        // ignore keys that don't have the same uuid
        // issue an sdskv_list_keys
        std::vector<ItemDescriptor> descriptors(maxItems);
        std::vector<void*> keys_addr(maxItems);
        std::vector<hg_size_t> keys_sizes(maxItems, sizeof(ItemDescriptor));
        for(auto i=0; i < maxItems; i++) {
            keys_addr[i] = static_cast<void*>(&descriptors[i]);
        }
        try {
            hg_size_t s = maxItems;
            db.list_keys(&start_key, sizeof(start_key),
560
                         &start_key, ItemImpl::descriptorSize(prefix_type),
Matthieu Dorier's avatar
Matthieu Dorier committed
561 562 563 564 565 566 567 568 569 570 571 572 573 574 575
                         keys_addr.data(), keys_sizes.data(), &s);
            maxItems = s;
        } catch(sdskv::exception& ex) {
            throw Exception("Error occured when calling sdskv::database::list_keys (SDSKV error="+std::string(ex.what()) + ")");
        }
        descriptors.resize(maxItems);
        for(const auto& key : descriptors) {
            result.push_back(std::make_shared<ItemImpl>(current->m_datastore, key));
        }
        return result.size();
    }

    /**
     * @brief Checks if a particular Run/SubRun/Event exists.
     */
576
    bool itemExists(const ItemDescriptor& descriptor,
577
                    int target = -1) const {
Matthieu Dorier's avatar
Matthieu Dorier committed
578
        ItemType type = ItemType::RUN;
579
        if(descriptor.subrun != InvalidSubRunNumber) {
Matthieu Dorier's avatar
Matthieu Dorier committed
580
            type = ItemType::SUBRUN;
581
            if(descriptor.event != InvalidEventNumber)
Matthieu Dorier's avatar
Matthieu Dorier committed
582 583
                type = ItemType::EVENT;
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
584
        // find out which DB to access
585
        auto& db = locateItemDb(type, descriptor, target);
Matthieu Dorier's avatar
Matthieu Dorier committed
586
        try {
587
            bool b = db.exists(&descriptor, sizeof(descriptor));
Matthieu Dorier's avatar
Matthieu Dorier committed
588 589 590 591 592 593
            return b;
        } catch(sdskv::exception& ex) {
            throw Exception("Error occured when calling sdskv::database::exists (SDSKV error="+std::to_string(ex.error())+")");
        }
        return false;
    }
594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610

    /**
     * @brief Checks if a particular Run/SubRun/Event exists.
     */
    bool itemExists(const UUID& containerUUID,
                    const RunNumber& run_number,
                    const SubRunNumber& subrun_number = InvalidSubRunNumber,
                    const EventNumber& event_number = InvalidEventNumber,
                    int target = -1) const {
        // build the key
        ItemDescriptor k;
        k.dataset = containerUUID;
        k.run     = run_number;
        k.subrun  = subrun_number;
        k.event   = event_number;
        return itemExists(k, target);
    }
Matthieu Dorier's avatar
Matthieu Dorier committed
611 612 613 614 615 616 617 618 619 620 621 622 623 624
    
    /**
     * Creates a Run, SubRun, or Event
     */
    bool createItem(const UUID& containerUUID,
                    const RunNumber& run_number,
                    const SubRunNumber& subrun_number = InvalidSubRunNumber,
                    const EventNumber& event_number = InvalidEventNumber) {
        // build the key
        ItemDescriptor k;
        k.dataset = containerUUID;
        k.run     = run_number;
        k.subrun  = subrun_number;
        k.event   = event_number;
Matthieu Dorier's avatar
Matthieu Dorier committed
625 626 627 628 629 630
        ItemType type = ItemType::RUN;
        if(subrun_number != InvalidSubRunNumber) {
            type = ItemType::SUBRUN;
            if(event_number != InvalidEventNumber)
                type = ItemType::EVENT;
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
631
        // find out which DB to access
Matthieu Dorier's avatar
Matthieu Dorier committed
632
        auto& db = locateItemDb(type, k);
Matthieu Dorier's avatar
Matthieu Dorier committed
633 634 635 636
        try {
            db.put(&k, sizeof(k), nullptr, 0);
        } catch(sdskv::exception& ex) {
            if(ex.error() == SDSKV_ERR_KEYEXISTS) {
637 638 639 640 641 642 643
                return false;
            } else {
                throw Exception("Error occured when calling sdskv::database::put (SDSKV error=" +std::to_string(ex.error()) + ")");
            }
        }
        return true;
    }
Matthieu Dorier's avatar
Matthieu Dorier committed
644

645 646 647
};

}
648 649

#endif