DataStoreImpl.hpp 24.5 KB
Newer Older
Matthieu Dorier's avatar
Matthieu Dorier committed
1 2 3 4 5
/*
 * (C) 2018 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */
6 7 8
#ifndef __HEPNOS_PRIVATE_DATASTORE_IMPL
#define __HEPNOS_PRIVATE_DATASTORE_IMPL

9
#include <vector>
10
#include <unordered_set>
Matthieu Dorier's avatar
Matthieu Dorier committed
11
#include <unordered_map>
12 13 14
#include <functional>
#include <iostream>
#include <yaml-cpp/yaml.h>
15
#include <sdskv-client.hpp>
16 17 18 19
#include <ch-placement.h>
#include "hepnos/Exception.hpp"
#include "hepnos/DataStore.hpp"
#include "hepnos/DataSet.hpp"
Matthieu Dorier's avatar
Matthieu Dorier committed
20
#include "StringHash.hpp"
21
#include "DataSetImpl.hpp"
Matthieu Dorier's avatar
Matthieu Dorier committed
22
#include "ItemImpl.hpp"
23 24 25

namespace hepnos {

Matthieu Dorier's avatar
Matthieu Dorier committed
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
inline void object_resize(hepnos::UUID& uuid, size_t new_size) {
    (void)uuid;
    (void)new_size;
}

inline void* object_data(hepnos::UUID& uuid) {
    return uuid.data;
}

inline const void* object_data(const hepnos::UUID& uuid) {
    return uuid.data;
}

inline size_t object_size(const hepnos::UUID& uuid) {
    return sizeof(uuid);
}

43
////////////////////////////////////////////////////////////////////////////////////////////
44
// DataStoreImpl implementation
45 46
////////////////////////////////////////////////////////////////////////////////////////////

47 48 49 50 51
struct DistributedDBInfo {
    std::vector<sdskv::database>  dbs;
    struct ch_placement_instance* chi = nullptr;
};

52
class DataStoreImpl {
53 54
    public:

Matthieu Dorier's avatar
Matthieu Dorier committed
55 56
    margo_instance_id                         m_mid;          // Margo instance
    std::unordered_map<std::string,hg_addr_t> m_addrs;        // Addresses used by the service
57
    sdskv::client                             m_sdskv_client; // SDSKV client
58
    DistributedDBInfo                         m_dataset_dbs;  // list of SDSKV databases for DataSets
Matthieu Dorier's avatar
Matthieu Dorier committed
59 60 61 62
    DistributedDBInfo                         m_run_dbs;      // list of SDSKV databases for Runs
    DistributedDBInfo                         m_subrun_dbs;   // list of SDSKV databases for Runs
    DistributedDBInfo                         m_event_dbs;    // list of SDSKV databases for Runs
    DistributedDBInfo                         m_product_dbs;  // list of SDSKV databases for Products
63

64
    DataStoreImpl()
65
    : m_mid(MARGO_INSTANCE_NULL)
66
    {}
67

68 69 70 71
    ~DataStoreImpl() {
        cleanup();
    }

72
    void populateDatabases(DistributedDBInfo& db_info, const YAML::Node& db_config) {
73 74
        int ret;
        hg_return_t hret;
75
        for(YAML::const_iterator address_it = db_config.begin(); address_it != db_config.end(); address_it++) {
76 77 78
            std::string str_addr = address_it->first.as<std::string>();
            YAML::Node providers = address_it->second;
            // lookup the address
79
            hg_addr_t addr;
Matthieu Dorier's avatar
Matthieu Dorier committed
80 81 82 83 84 85 86
            if(m_addrs.count(str_addr) != 0) {
                addr = m_addrs[str_addr];
            } else {
                hret = margo_addr_lookup(m_mid, str_addr.c_str(), &addr);
                if(hret != HG_SUCCESS) {
                    margo_addr_free(m_mid,addr);
                    cleanup();
87
                    throw Exception("margo_addr_lookup failed (MARGO error="+std::to_string(hret)+")");
Matthieu Dorier's avatar
Matthieu Dorier committed
88 89
                }
                m_addrs[str_addr] = addr;
90
            }
91 92 93 94 95 96 97 98 99 100
            // iterate over providers for this address
            for(YAML::const_iterator provider_it = providers.begin(); provider_it != providers.end(); provider_it++) {
                // get the provider id
                uint16_t provider_id = provider_it->first.as<uint16_t>();
                // create provider handle
                sdskv::provider_handle ph(m_sdskv_client, addr, provider_id);
                // get the database ids
                YAML::Node databases = provider_it->second;
                // iterate over databases for this provider
                for(unsigned i=0; i < databases.size(); i++) {
101
                    db_info.dbs.push_back(sdskv::database(ph, databases[i].as<uint64_t>()));
102
                }
103 104
            } // for each provider
        } // for each address
105 106 107 108
        // initialize ch-placement
        db_info.chi = ch_placement_initialize("hash_lookup3", db_info.dbs.size(), 4, 0);
    }

109
    void init(const std::string& configFile, bool use_progress_thread) {
110 111 112 113 114 115 116
        int ret;
        hg_return_t hret;
        YAML::Node config = YAML::LoadFile(configFile);
        checkConfig(config);
        // get protocol
        std::string proto = config["hepnos"]["client"]["protocol"].as<std::string>();
        // initialize Margo
117
        m_mid = margo_init(proto.c_str(), MARGO_CLIENT_MODE, use_progress_thread, 0);
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
        if(!m_mid) {
            cleanup();
            throw Exception("Could not initialized Margo");
        }
        // initialize SDSKV client
        try {
            m_sdskv_client = sdskv::client(m_mid);
        } catch(sdskv::exception& ex) {
            cleanup();
            throw Exception("Could not create SDSKV client (SDSKV error="+std::to_string(ex.error())+")");
        }
        // populate database info structures for each type of database
        YAML::Node databases = config["hepnos"]["databases"];
        YAML::Node dataset_db = databases["datasets"];
        YAML::Node run_db     = databases["runs"];
        YAML::Node subrun_db  = databases["subruns"];
        YAML::Node event_db   = databases["events"];
        YAML::Node product_db = databases["products"];
        populateDatabases(m_dataset_dbs, dataset_db);
Matthieu Dorier's avatar
Matthieu Dorier committed
137 138 139 140
        populateDatabases(m_run_dbs, run_db);
        populateDatabases(m_subrun_dbs, subrun_db);
        populateDatabases(m_event_dbs, event_db);
        populateDatabases(m_product_dbs, product_db);
141 142 143
    }

    void cleanup() {
Matthieu Dorier's avatar
Matthieu Dorier committed
144 145 146 147 148
        m_dataset_dbs.dbs.clear();
        m_run_dbs.dbs.clear();
        m_subrun_dbs.dbs.clear();
        m_event_dbs.dbs.clear();
        m_product_dbs.dbs.clear();
149
        m_sdskv_client = sdskv::client();
Matthieu Dorier's avatar
Matthieu Dorier committed
150 151 152 153 154
        if(m_dataset_dbs.chi) ch_placement_finalize(m_dataset_dbs.chi);
        if(m_run_dbs.chi)     ch_placement_finalize(m_run_dbs.chi);
        if(m_subrun_dbs.chi)  ch_placement_finalize(m_subrun_dbs.chi);
        if(m_event_dbs.chi)   ch_placement_finalize(m_event_dbs.chi);
        if(m_product_dbs.chi)   ch_placement_finalize(m_product_dbs.chi);
Matthieu Dorier's avatar
Matthieu Dorier committed
155 156 157
        for(auto& addr : m_addrs) {
            margo_addr_free(m_mid, addr.second);
        }
158 159 160
        if(m_mid) margo_finalize(m_mid);
    }

161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
    size_t numTargets(const ItemType& type) const {
        switch(type) {
            case ItemType::DATASET:
                return m_dataset_dbs.dbs.size();
            case ItemType::RUN:
                return m_run_dbs.dbs.size();
            case ItemType::SUBRUN:
                return m_subrun_dbs.dbs.size();
            case ItemType::EVENT:
                return m_event_dbs.dbs.size();
            case ItemType::PRODUCT:
                return m_product_dbs.dbs.size();
        }
        return 0;
    }

177 178 179
    private:

    static void checkConfig(YAML::Node& config) {
Matthieu Dorier's avatar
Matthieu Dorier committed
180
        // config file starts with hepnos entry
181 182 183 184
        auto hepnosNode = config["hepnos"];
        if(!hepnosNode) {
            throw Exception("\"hepnos\" entry not found in YAML file");
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
185
        // hepnos entry has client entry
186 187 188 189
        auto clientNode = hepnosNode["client"];
        if(!clientNode) {
            throw Exception("\"client\" entry not found in \"hepnos\" section");
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
190
        // client entry has protocol entry
191 192 193 194
        auto protoNode = clientNode["protocol"];
        if(!protoNode) {
            throw Exception("\"protocol\" entry not found in \"client\" section");
        }
195 196 197 198
        // hepnos entry has databases entry
        auto databasesNode = hepnosNode["databases"];
        if(!databasesNode) {
            throw Exception("\"databasess\" entry not found in \"hepnos\" section");
199
        }
200 201
        if(!databasesNode.IsMap()) {
            throw Exception("\"databases\" entry should be a map");
202 203 204 205 206 207 208 209 210 211
        }
        // database entry has keys datasets, runs, subruns, events, and products.
        std::vector<std::string> fields = { "datasets", "runs", "subruns", "events", "products" };
        for(auto& f : fields) {
            auto fieldNode = databasesNode[f];
            if(!fieldNode) {
                throw Exception("\""+f+"\" entry not found in databases section");
            }
            if(!fieldNode.IsMap()) {
                throw Exception("\""+f+"\" entry should be a mapping from addresses to providers");
212
            }
213 214 215 216 217 218 219 220 221 222 223 224
            for(auto addresses_it = fieldNode.begin(); addresses_it != fieldNode.end(); addresses_it++) {
                auto providers = addresses_it->second;
                for(auto provider_it = providers.begin(); provider_it != providers.end(); provider_it++) {
                    // provider entry should be a sequence
                    if(!provider_it->second.IsSequence()) {
                        throw Exception("provider entry should be a sequence");
                    }
                    for(auto db : provider_it->second) {
                        if(!db.IsScalar()) {
                            throw Exception("database id should be a scalar");
                        }
                    }
225
                }
226 227 228 229
            }
        }
    }

230 231
    public:

Matthieu Dorier's avatar
Matthieu Dorier committed
232 233 234 235 236 237 238 239 240
    ///////////////////////////////////////////////////////////////////////////
    // Product access functions
    ///////////////////////////////////////////////////////////////////////////

    static inline ProductID buildProductID(const ItemDescriptor& id, const std::string& productName) {
        ProductID result;
        result.m_key.resize(sizeof(id)+productName.size());
        std::memcpy(const_cast<char*>(result.m_key.data()), &id, sizeof(id));
        std::memcpy(const_cast<char*>(result.m_key.data()+sizeof(id)), productName.data(), productName.size());
241 242 243
        return result;
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
244
    const sdskv::database& locateProductDb(const ProductID& productID) const {
245
        // hash the name to get the provider id
Matthieu Dorier's avatar
Matthieu Dorier committed
246 247 248 249 250
        long unsigned db_idx = 0;
        uint64_t hash;
        hash = hashString(productID.m_key);
        ch_placement_find_closest(m_product_dbs.chi, hash, 1, &db_idx);
        return m_product_dbs.dbs[db_idx];
251 252
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
253 254
    bool loadRawProduct(const ProductID& key,
                        std::string& data) const {
255
        // find out which DB to access
Matthieu Dorier's avatar
Matthieu Dorier committed
256
        auto& db =  locateProductDb(key);
257
        // read the value
258 259
        if(data.size() == 0)
            data.resize(2048); // eagerly allocate 2KB
260
        try {
Matthieu Dorier's avatar
Matthieu Dorier committed
261
            db.get(key.m_key, data);
262 263 264 265
        } catch(sdskv::exception& ex) {
            if(ex.error() == SDSKV_ERR_UNKNOWN_KEY)
                return false;
            else
266
                throw Exception("Error occured when calling sdskv::database::get (SDSKV error="+std::to_string(ex.error())+")");
267 268 269 270
        }
        return true;
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
271 272 273 274 275 276 277 278 279 280
    bool loadRawProduct(const ItemDescriptor& id,
                        const std::string& productName,
                        std::string& data) const {
        // build product id
        auto key = buildProductID(id, productName);
        return loadRawProduct(key, data);
    }

    bool loadRawProduct(const ProductID& key,
                        char* value, size_t* vsize) const {
281
        // find out which DB to access
Matthieu Dorier's avatar
Matthieu Dorier committed
282
        auto& db =  locateProductDb(key);
283
        try {
Matthieu Dorier's avatar
Matthieu Dorier committed
284
            db.get(key.m_key.data(), key.m_key.size(), value, vsize);
285 286 287 288 289 290 291 292 293 294 295
        } catch(sdskv::exception& ex) {
            if(ex.error() == SDSKV_ERR_UNKNOWN_KEY)
                return false;
            else if(ex.error() == SDSKV_ERR_SIZE)
                return false;
            else
                throw Exception("Error occured when calling sdskv::database::get (SDSKV error="+std::to_string(ex.error())+")");
        }
        return true;
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
296 297 298 299 300 301 302 303 304 305 306 307 308
    bool loadRawProduct(const ItemDescriptor& id,
                        const std::string& productName,
                        char* value, size_t* vsize) const {
        // build product id
        auto key = buildProductID(id, productName);
        return loadRawProduct(key, value, vsize);
    }

    ProductID storeRawProduct(const ItemDescriptor& id,
                              const std::string& productName,
                              const char* value, size_t vsize) const {
        // build product id
        auto key = buildProductID(id, productName);
309
        // find out which DB to access
Matthieu Dorier's avatar
Matthieu Dorier committed
310 311
        auto& db =  locateProductDb(key);
        // read the value
312
        try {
Matthieu Dorier's avatar
Matthieu Dorier committed
313
            db.put(key.m_key.data(), key.m_key.size(), value, vsize);
314
        } catch(sdskv::exception& ex) {
Matthieu Dorier's avatar
Matthieu Dorier committed
315 316 317 318 319
            if(ex.error() == SDSKV_ERR_KEYEXISTS) {
                return ProductID();
            } else {
                throw Exception("Error occured when calling sdskv::database::put (SDSKV error=" +std::to_string(ex.error()) + ")");
            }
320
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
321
        return key;
322 323
    }

Matthieu Dorier's avatar
Matthieu Dorier committed
324 325 326 327 328
    ProductID storeRawProduct(const ItemDescriptor& id,
                              const std::string& productName,
                              const std::string& data) const {
        // build product id
        auto key = buildProductID(id, productName);
329
        // find out which DB to access
Matthieu Dorier's avatar
Matthieu Dorier committed
330 331
        auto& db = locateProductDb(key);
        // read the value
332
        try {
Matthieu Dorier's avatar
Matthieu Dorier committed
333
            db.put(key.m_key, data);
334
        } catch(sdskv::exception& ex) {
335 336 337
            if(ex.error() == SDSKV_ERR_KEYEXISTS) {
                return ProductID();
            } else {
338
                throw Exception("Error occured when calling sdskv::database::put (SDSKV error=" +std::to_string(ex.error()) + ")");
339
            }
340
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
341
        return key;
342
    }
343

344 345 346 347 348 349 350
    ///////////////////////////////////////////////////////////////////////////
    // DataSet access functions
    ///////////////////////////////////////////////////////////////////////////

    /**
     * Builds the database key of a particular DataSet.
     */
Matthieu Dorier's avatar
Matthieu Dorier committed
351
    static inline std::string buildDataSetKey(uint8_t level, const std::string& containerName, const std::string& objectName) {
352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369
        size_t c = 1 + objectName.size();
        if(!containerName.empty()) c += containerName.size() + 1;
        std::string result(c,'\0');
        result[0] = level;
        if(!containerName.empty()) {
            std::memcpy(&result[1], containerName.data(), containerName.size());
            size_t x = 1+containerName.size();
            result[x] = '/';
            std::memcpy(&result[x+1], objectName.data(), objectName.size());
        } else {
            std::memcpy(&result[1], objectName.data(), objectName.size());
        }
        return result;
    }

    /**
     * Locates and return the database in charge of the provided DataSet info.
     */
Matthieu Dorier's avatar
Matthieu Dorier committed
370
    const sdskv::database& locateDataSetDb(const std::string& containerName) const {
371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392
        // hash the name to get the provider id
        long unsigned db_idx = 0;
        uint64_t hash;
        hash = hashString(containerName);
        ch_placement_find_closest(m_dataset_dbs.chi, hash, 1, &db_idx);
        return m_dataset_dbs.dbs[db_idx];
    }

    /**
     * @brief Fills the result vector with a sequence of up to
     * maxDataSets shared_ptr to DataSetImpl coming after the
     * current dataset. Returns the number of DataSets read.
     */
    size_t nextDataSets(const std::shared_ptr<DataSetImpl>& current, 
            std::vector<std::shared_ptr<DataSetImpl>>& result,
            size_t maxDataSets) const {
        int ret;
        result.resize(0);
        auto& level = current->m_level;
        auto& containerName = *current->m_container;
        auto& currentName = current->m_name;
        if(current->m_level == 0) return 0; // cannot iterate at object level
Matthieu Dorier's avatar
Matthieu Dorier committed
393
        auto& db = locateDataSetDb(containerName);
394
        // make an entry for the lower bound
Matthieu Dorier's avatar
Matthieu Dorier committed
395
        auto lb_entry = buildDataSetKey(level, containerName, currentName);
396 397 398 399 400 401 402 403 404 405 406
        // ignore keys that don't have the same level or the same prefix
        std::string prefix(2+containerName.size(), '\0');
        prefix[0] = level;
        if(containerName.size() != 0) {
            std::memcpy(&prefix[1], containerName.data(), containerName.size());
            prefix[prefix.size()-1] = '/';
        } else {
            prefix.resize(1);
        }
        // issue an sdskv_list_keys
        std::vector<std::string> entries(maxDataSets, std::string(1024,'\0'));
Matthieu Dorier's avatar
Matthieu Dorier committed
407
        std::vector<UUID> uuids(maxDataSets);
408
        try {
Matthieu Dorier's avatar
Matthieu Dorier committed
409
            db.list_keyvals(lb_entry, prefix, entries, uuids);
410 411 412 413
        } catch(sdskv::exception& ex) {
            throw Exception("Error occured when calling sdskv::database::list_keys (SDSKV error="+std::string(ex.what()) + ")");
        }
        result.resize(0);
414
        unsigned j=0;
415 416 417 418 419 420 421 422 423
        for(const auto& entry : entries) {
            size_t i = entry.find_last_of('/');
            if(i == std::string::npos) i = 1;
            else i += 1;
            result.push_back(
                    std::make_shared<DataSetImpl>(
                        current->m_datastore,
                        level,
                        current->m_container,
Matthieu Dorier's avatar
Matthieu Dorier committed
424
                        entry.substr(i),
425
                        uuids[j]
426 427
                    )
                );
428
            j += 1;
429 430 431 432 433 434 435 436 437 438
        }
        return result.size();
    }

    /**
     * @brief Checks if a particular dataset exists.
     */
    bool dataSetExists(uint8_t level, const std::string& containerName, const std::string& objectName) const {
        int ret;
        // build key
Matthieu Dorier's avatar
Matthieu Dorier committed
439
        auto key = buildDataSetKey(level, containerName, objectName);
440
        // find out which DB to access
Matthieu Dorier's avatar
Matthieu Dorier committed
441
        auto& db = locateDataSetDb(containerName);
442
        try {
443 444
            bool b = db.exists(key);
            return b;
445 446 447 448 449
        } catch(sdskv::exception& ex) {
            throw Exception("Error occured when calling sdskv::database::exists (SDSKV error="+std::to_string(ex.error())+")");
        }
        return false;
    }
Matthieu Dorier's avatar
Matthieu Dorier committed
450 451 452 453 454 455 456
    
    /*
     * @brief Loads a dataset.
     */
    bool loadDataSet(uint8_t level, const std::string& containerName, const std::string& objectName, UUID& uuid) const {
        int ret;
        // build key
Matthieu Dorier's avatar
Matthieu Dorier committed
457
        auto key = buildDataSetKey(level, containerName, objectName);
Matthieu Dorier's avatar
Matthieu Dorier committed
458
        // find out which DB to access
Matthieu Dorier's avatar
Matthieu Dorier committed
459
        auto& db = locateDataSetDb(containerName);
Matthieu Dorier's avatar
Matthieu Dorier committed
460 461 462 463 464 465 466 467 468 469 470 471 472 473 474
        try {
            size_t s = sizeof(uuid);
            db.get(static_cast<const void*>(key.data()), 
                   key.size(),
                   static_cast<void*>(uuid.data),
                   &s);
            return s == sizeof(uuid);
        } catch(sdskv::exception& ex) {
            if(ex.error() == SDSKV_ERR_UNKNOWN_KEY) {
                return false;
            }
            throw Exception("Error occured when calling sdskv::database::get (SDSKV error="+std::to_string(ex.error())+")");
        }
        return false;
    }
475 476 477 478

    /**
     * Creates a DataSet
     */
Matthieu Dorier's avatar
Matthieu Dorier committed
479
    bool createDataSet(uint8_t level, const std::string& containerName, const std::string& objectName, UUID& uuid) {
480
        // build full name
Matthieu Dorier's avatar
Matthieu Dorier committed
481
        auto key = buildDataSetKey(level, containerName, objectName);
482
        // find out which DB to access
Matthieu Dorier's avatar
Matthieu Dorier committed
483
        auto& db = locateDataSetDb(containerName);
Matthieu Dorier's avatar
Matthieu Dorier committed
484
        uuid.randomize();
485
        try {
Matthieu Dorier's avatar
Matthieu Dorier committed
486
            db.put(key.data(), key.size(), uuid.data, sizeof(uuid));
487
        } catch(sdskv::exception& ex) {
Matthieu Dorier's avatar
Matthieu Dorier committed
488 489 490 491 492 493 494 495 496 497 498 499 500
            if(ex.error() == SDSKV_ERR_KEYEXISTS) {
                return false;
            } else {
                throw Exception("Error occured when calling sdskv::database::put (SDSKV error=" +std::to_string(ex.error()) + ")");
            }
        }
        return true;
    }

    ///////////////////////////////////////////////////////////////////////////
    // Access functions for numbered items (Runs, SubRuns, and Events)
    ///////////////////////////////////////////////////////////////////////////

501
    const sdskv::database& locateItemDb(const ItemType& type, const ItemDescriptor& id, int target=-1) const {
Matthieu Dorier's avatar
Matthieu Dorier committed
502
        long unsigned db_idx = 0;
503 504 505 506 507
        if(target >= 0) {
            if(type == ItemType::RUN)    return m_run_dbs.dbs[target];
            if(type == ItemType::SUBRUN) return m_subrun_dbs.dbs[target];
            if(type == ItemType::EVENT)  return m_event_dbs.dbs[target];
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
508 509 510
        uint64_t hash;
        size_t prime = 1099511628211ULL;
        hash = id.dataset.hash();
Matthieu Dorier's avatar
Matthieu Dorier committed
511
        if(type == ItemType::RUN) { // we are locating a Run
Matthieu Dorier's avatar
Matthieu Dorier committed
512 513
            ch_placement_find_closest(m_run_dbs.chi, hash, 1, &db_idx);
            return m_run_dbs.dbs[db_idx];
Matthieu Dorier's avatar
Matthieu Dorier committed
514
        } else if(type == ItemType::SUBRUN) { // we are locating a SubRun
Matthieu Dorier's avatar
Matthieu Dorier committed
515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531
            hash *= prime;
            hash = hash ^ id.run;
            ch_placement_find_closest(m_subrun_dbs.chi, hash, 1, &db_idx);
            return m_subrun_dbs.dbs[db_idx];
        } else { // we are locating an Event
            hash *= prime;
            hash = hash ^ id.subrun;
            ch_placement_find_closest(m_event_dbs.chi, hash, 1, &db_idx);
            return m_event_dbs.dbs[db_idx];
        }
    }

    /**
     * @brief Fills the result vector with a sequence of up to
     * maxRuns shared_ptr to RunImpl coming after the
     * current run. Returns the number of Runs read.
     */
Matthieu Dorier's avatar
Matthieu Dorier committed
532
    size_t nextItems(
533 534
            const ItemType& item_type,
            const ItemType& prefix_type,
Matthieu Dorier's avatar
Matthieu Dorier committed
535
            const std::shared_ptr<ItemImpl>& current, 
Matthieu Dorier's avatar
Matthieu Dorier committed
536
            std::vector<std::shared_ptr<ItemImpl>>& result,
537 538
            size_t maxItems,
            int target=-1) const {
Matthieu Dorier's avatar
Matthieu Dorier committed
539 540 541
        int ret;
        result.resize(0);
        const ItemDescriptor& start_key   = current->m_descriptor;
542
        auto& db = locateItemDb(item_type, start_key, target);
Matthieu Dorier's avatar
Matthieu Dorier committed
543 544 545 546 547 548 549 550 551 552 553
        // ignore keys that don't have the same uuid
        // issue an sdskv_list_keys
        std::vector<ItemDescriptor> descriptors(maxItems);
        std::vector<void*> keys_addr(maxItems);
        std::vector<hg_size_t> keys_sizes(maxItems, sizeof(ItemDescriptor));
        for(auto i=0; i < maxItems; i++) {
            keys_addr[i] = static_cast<void*>(&descriptors[i]);
        }
        try {
            hg_size_t s = maxItems;
            db.list_keys(&start_key, sizeof(start_key),
554
                         &start_key, ItemImpl::descriptorSize(prefix_type),
Matthieu Dorier's avatar
Matthieu Dorier committed
555 556 557 558 559 560 561 562 563 564 565 566 567 568 569
                         keys_addr.data(), keys_sizes.data(), &s);
            maxItems = s;
        } catch(sdskv::exception& ex) {
            throw Exception("Error occured when calling sdskv::database::list_keys (SDSKV error="+std::string(ex.what()) + ")");
        }
        descriptors.resize(maxItems);
        for(const auto& key : descriptors) {
            result.push_back(std::make_shared<ItemImpl>(current->m_datastore, key));
        }
        return result.size();
    }

    /**
     * @brief Checks if a particular Run/SubRun/Event exists.
     */
570
    bool itemExists(const ItemDescriptor& descriptor,
571
                    int target = -1) const {
Matthieu Dorier's avatar
Matthieu Dorier committed
572
        ItemType type = ItemType::RUN;
573
        if(descriptor.subrun != InvalidSubRunNumber) {
Matthieu Dorier's avatar
Matthieu Dorier committed
574
            type = ItemType::SUBRUN;
575
            if(descriptor.event != InvalidEventNumber)
Matthieu Dorier's avatar
Matthieu Dorier committed
576 577
                type = ItemType::EVENT;
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
578
        // find out which DB to access
579
        auto& db = locateItemDb(type, descriptor, target);
Matthieu Dorier's avatar
Matthieu Dorier committed
580
        try {
581
            bool b = db.exists(&descriptor, sizeof(descriptor));
Matthieu Dorier's avatar
Matthieu Dorier committed
582 583 584 585 586 587
            return b;
        } catch(sdskv::exception& ex) {
            throw Exception("Error occured when calling sdskv::database::exists (SDSKV error="+std::to_string(ex.error())+")");
        }
        return false;
    }
588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604

    /**
     * @brief Checks if a particular Run/SubRun/Event exists.
     */
    bool itemExists(const UUID& containerUUID,
                    const RunNumber& run_number,
                    const SubRunNumber& subrun_number = InvalidSubRunNumber,
                    const EventNumber& event_number = InvalidEventNumber,
                    int target = -1) const {
        // build the key
        ItemDescriptor k;
        k.dataset = containerUUID;
        k.run     = run_number;
        k.subrun  = subrun_number;
        k.event   = event_number;
        return itemExists(k, target);
    }
Matthieu Dorier's avatar
Matthieu Dorier committed
605 606 607 608 609 610 611 612 613 614 615 616 617 618
    
    /**
     * Creates a Run, SubRun, or Event
     */
    bool createItem(const UUID& containerUUID,
                    const RunNumber& run_number,
                    const SubRunNumber& subrun_number = InvalidSubRunNumber,
                    const EventNumber& event_number = InvalidEventNumber) {
        // build the key
        ItemDescriptor k;
        k.dataset = containerUUID;
        k.run     = run_number;
        k.subrun  = subrun_number;
        k.event   = event_number;
Matthieu Dorier's avatar
Matthieu Dorier committed
619 620 621 622 623 624
        ItemType type = ItemType::RUN;
        if(subrun_number != InvalidSubRunNumber) {
            type = ItemType::SUBRUN;
            if(event_number != InvalidEventNumber)
                type = ItemType::EVENT;
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
625
        // find out which DB to access
Matthieu Dorier's avatar
Matthieu Dorier committed
626
        auto& db = locateItemDb(type, k);
Matthieu Dorier's avatar
Matthieu Dorier committed
627 628 629 630
        try {
            db.put(&k, sizeof(k), nullptr, 0);
        } catch(sdskv::exception& ex) {
            if(ex.error() == SDSKV_ERR_KEYEXISTS) {
631 632 633 634 635 636 637
                return false;
            } else {
                throw Exception("Error occured when calling sdskv::database::put (SDSKV error=" +std::to_string(ex.error()) + ")");
            }
        }
        return true;
    }
Matthieu Dorier's avatar
Matthieu Dorier committed
638

639 640 641
};

}
642 643

#endif