DataStoreImpl.hpp 12.2 KB
Newer Older
Matthieu Dorier's avatar
Matthieu Dorier committed
1 2 3 4 5
/*
 * (C) 2018 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */
6 7 8
#ifndef __HEPNOS_PRIVATE_DATASTORE_IMPL
#define __HEPNOS_PRIVATE_DATASTORE_IMPL

9
#include <vector>
10
#include <unordered_set>
Matthieu Dorier's avatar
Matthieu Dorier committed
11
#include <unordered_map>
12 13 14
#include <functional>
#include <iostream>
#include <yaml-cpp/yaml.h>
15
#include <sdskv-client.hpp>
16 17 18 19
#include <ch-placement.h>
#include "hepnos/Exception.hpp"
#include "hepnos/DataStore.hpp"
#include "hepnos/DataSet.hpp"
Matthieu Dorier's avatar
Matthieu Dorier committed
20
#include "StringHash.hpp"
21 22 23 24 25 26 27 28 29 30

namespace hepnos {

////////////////////////////////////////////////////////////////////////////////////////////
// DataStore::Impl implementation
////////////////////////////////////////////////////////////////////////////////////////////

class DataStore::Impl {
    public:

Matthieu Dorier's avatar
Matthieu Dorier committed
31 32
    margo_instance_id                         m_mid;          // Margo instance
    std::unordered_map<std::string,hg_addr_t> m_addrs;        // Addresses used by the service
33 34
    sdskv::client                             m_sdskv_client; // SDSKV client
    std::vector<sdskv::database>              m_databases;    // list of SDSKV databases
Matthieu Dorier's avatar
Matthieu Dorier committed
35 36
    struct ch_placement_instance*             m_chi_sdskv;    // ch-placement instance for SDSKV
    const DataStore::iterator                 m_end;          // iterator for the end() of the DataStore
37

38
    Impl()
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
    : m_mid(MARGO_INSTANCE_NULL)
    , m_chi_sdskv(nullptr)
    , m_end() {}

    void init(const std::string& configFile) {
        int ret;
        hg_return_t hret;
        YAML::Node config = YAML::LoadFile(configFile);
        checkConfig(config);
        // get protocol
        std::string proto = config["hepnos"]["client"]["protocol"].as<std::string>();
        // initialize Margo
        m_mid = margo_init(proto.c_str(), MARGO_CLIENT_MODE, 0, 0);
        if(!m_mid) {
            cleanup();
            throw Exception("Could not initialized Margo");
        }
        // initialize SDSKV client
57 58
        try {
            m_sdskv_client = sdskv::client(m_mid);
59
        } catch(sdskv::exception& ex) {
60
            cleanup();
61
            throw Exception("Could not create SDSKV client (SDSKV error="+std::to_string(ex.error())+")");
62 63 64 65 66 67
        }
        // create list of sdskv provider handles
        YAML::Node sdskv = config["hepnos"]["providers"]["sdskv"];
        for(YAML::const_iterator it = sdskv.begin(); it != sdskv.end(); it++) {
            std::string str_addr = it->first.as<std::string>();
            hg_addr_t addr;
Matthieu Dorier's avatar
Matthieu Dorier committed
68 69 70 71 72 73 74
            if(m_addrs.count(str_addr) != 0) {
                addr = m_addrs[str_addr];
            } else {
                hret = margo_addr_lookup(m_mid, str_addr.c_str(), &addr);
                if(hret != HG_SUCCESS) {
                    margo_addr_free(m_mid,addr);
                    cleanup();
75
                    throw Exception("margo_addr_lookup failed (MARGO error="+std::to_string(hret)+")");
Matthieu Dorier's avatar
Matthieu Dorier committed
76 77
                }
                m_addrs[str_addr] = addr;
78
            }
79 80 81
            // get the number of providers
            uint16_t num_providers = it->second.as<uint16_t>();
            for(uint16_t provider_id = 0 ; provider_id < num_providers; provider_id++) {
82 83 84 85
                std::vector<sdskv::database> dbs;
                try {
                    sdskv::provider_handle ph(m_sdskv_client, addr, provider_id);
                    dbs = m_sdskv_client.open(ph);
86
                } catch(sdskv::exception& ex) {
87
                    cleanup();
88
                    throw Exception("Could not open databases (SDSKV error="+std::to_string(ex.error())+")");
89
                }
90
                if(dbs.size() == 0) {
91 92
                    continue;
                }
93
                for(auto& db : dbs)
94
                    m_databases.push_back(db);
95 96 97
            }
        }
        // initialize ch-placement for the SDSKV providers
98
        m_chi_sdskv = ch_placement_initialize("hash_lookup3", m_databases.size(), 4, 0);
99 100 101
    }

    void cleanup() {
102 103
        m_databases.clear();
        m_sdskv_client = sdskv::client();
104 105
        if(m_chi_sdskv)
            ch_placement_finalize(m_chi_sdskv);
Matthieu Dorier's avatar
Matthieu Dorier committed
106 107 108
        for(auto& addr : m_addrs) {
            margo_addr_free(m_mid, addr.second);
        }
109 110 111 112 113 114
        if(m_mid) margo_finalize(m_mid);
    }

    private:

    static void checkConfig(YAML::Node& config) {
Matthieu Dorier's avatar
Matthieu Dorier committed
115
        // config file starts with hepnos entry
116 117 118 119
        auto hepnosNode = config["hepnos"];
        if(!hepnosNode) {
            throw Exception("\"hepnos\" entry not found in YAML file");
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
120
        // hepnos entry has client entry
121 122 123 124
        auto clientNode = hepnosNode["client"];
        if(!clientNode) {
            throw Exception("\"client\" entry not found in \"hepnos\" section");
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
125
        // client entry has protocol entry
126 127 128 129
        auto protoNode = clientNode["protocol"];
        if(!protoNode) {
            throw Exception("\"protocol\" entry not found in \"client\" section");
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
130
        // hepnos entry has providers entry
131 132 133 134
        auto providersNode = hepnosNode["providers"];
        if(!providersNode) {
            throw Exception("\"providers\" entry not found in \"hepnos\" section");
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
135
        // provider entry has sdskv entry
136 137 138 139
        auto sdskvNode = providersNode["sdskv"];
        if(!sdskvNode) {
            throw Exception("\"sdskv\" entry not found in \"providers\" section");
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
140
        // sdskv entry is not empty
141 142 143
        if(sdskvNode.size() == 0) {
            throw Exception("No provider found in \"sdskv\" section");
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
144
        // for each sdskv entry
145 146
        for(auto it = sdskvNode.begin(); it != sdskvNode.end(); it++) {
            if(it->second.IsScalar()) continue; // one provider id given
147
            else {
148 149 150 151 152
                throw Exception("Invalid value type for provider in \"sdskv\" section");
            }
        }
    }

153 154
    public:

155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
    static inline std::string buildKey(
            uint8_t level,
            const std::string& containerName,
            const std::string& objectName) {
        size_t c = 1 + objectName.size();
        if(!containerName.empty()) c += containerName.size() + 1;
        std::string result(c,'\0');
        result[0] = level;
        if(!containerName.empty()) {
            std::memcpy(&result[1], containerName.data(), containerName.size());
            size_t x = 1+containerName.size();
            result[x] = '/';
            std::memcpy(&result[x+1], objectName.data(), objectName.size());
        } else {
            std::memcpy(&result[1], objectName.data(), objectName.size());
        }
        return result;
    }

174
    unsigned long computeDbIndex(uint8_t level, const std::string& containerName, const std::string& key) const {
175
        // hash the name to get the provider id
176
        long unsigned sdskv_db_idx = 0;
Matthieu Dorier's avatar
Matthieu Dorier committed
177
        uint64_t name_hash;
178
        if(level != 0) {
Matthieu Dorier's avatar
Matthieu Dorier committed
179
            name_hash = hashString(containerName);
180
        } else {
181
            // use the complete name for final objects (level 0)
Matthieu Dorier's avatar
Matthieu Dorier committed
182
            name_hash = hashString(key);
183
        }
184
        ch_placement_find_closest(m_chi_sdskv, name_hash, 1, &sdskv_db_idx);
185 186 187 188 189 190 191 192 193 194
        return sdskv_db_idx;
    }

    bool load(uint8_t level, const std::string& containerName,
            const std::string& objectName, std::string& data) const {
        int ret;
        // build key
        auto key = buildKey(level, containerName, objectName);
        // find out which DB to access
        long unsigned sdskv_db_idx = computeDbIndex(level, containerName, key);
195
        // make corresponding datastore entry
196
        auto& db = m_databases[sdskv_db_idx];
197
        // read the value
198 199
        if(data.size() == 0)
            data.resize(2048); // eagerly allocate 2KB
200
        try {
201
            db.get(key, data);
202 203 204 205
        } catch(sdskv::exception& ex) {
            if(ex.error() == SDSKV_ERR_UNKNOWN_KEY)
                return false;
            else
206
                throw Exception("Error occured when calling sdskv::database::get (SDSKV error="+std::to_string(ex.error())+")");
207 208 209 210
        }
        return true;
    }

211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
    bool load(uint8_t level, const std::string& containerName,
            const std::string& objectName, char* value, size_t* vsize) const {
        int ret;
        // build key
        auto key = buildKey(level, containerName, objectName);
        // find out which DB to access
        long unsigned sdskv_db_idx = computeDbIndex(level, containerName, key);
        // make corresponding datastore entry
        auto& db = m_databases[sdskv_db_idx];
        // read the value
        try {
            db.get(key.data(), key.size(), value, vsize);
        } catch(sdskv::exception& ex) {
            if(ex.error() == SDSKV_ERR_UNKNOWN_KEY)
                return false;
            else if(ex.error() == SDSKV_ERR_SIZE)
                return false;
            else
                throw Exception("Error occured when calling sdskv::database::get (SDSKV error="+std::to_string(ex.error())+")");
        }
        return true;
    }

234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250
    bool exists(uint8_t level, const std::string& containerName,
            const std::string& objectName) const {
        int ret;
        // build key
        auto key = buildKey(level, containerName, objectName);
        // find out which DB to access
        long unsigned sdskv_db_idx = computeDbIndex(level, containerName, key);
        // make corresponding datastore entry
        auto& db = m_databases[sdskv_db_idx];
        try {
            return db.exists(key);
        } catch(sdskv::exception& ex) {
            throw Exception("Error occured when calling sdskv::database::exists (SDSKV error="+std::to_string(ex.error())+")");
        }
        return false;
    }

251
    ProductID store(uint8_t level, const std::string& containerName,
252
            const std::string& objectName, const char* data=nullptr, size_t data_size=0) {
253
        // build full name
254
        auto key = buildKey(level, containerName, objectName);
255 256
        // find out which DB to access
        long unsigned sdskv_db_idx = computeDbIndex(level, containerName, key);
257
        // Create the product id
258 259
        const auto& db = m_databases[sdskv_db_idx];
        try {
260
            db.put(key.data(), key.size(), data, data_size);
261
        } catch(sdskv::exception& ex) {
262 263 264
            if(ex.error() == SDSKV_ERR_KEYEXISTS) {
                return ProductID();
            } else {
265
                throw Exception("Error occured when calling sdskv::database::put (SDSKV error=" +std::to_string(ex.error()) + ")");
266
            }
267
        }
268 269 270 271
        return ProductID(level, containerName, objectName);
    }

    void storeMultiple(unsigned long db_index, 
272 273
            const std::vector<std::string>& keys,
            const std::vector<std::string>& values) {
274 275 276
        // Create the product id
        const auto& db = m_databases[db_index];
        try {
277
            db.put_multi(keys, values);
278 279 280
        } catch(sdskv::exception& ex) {
            throw Exception("Error occured when calling sdskv::database::put (SDSKV error=" +std::to_string(ex.error()) + ")");
        }
281 282 283 284 285 286 287 288
    }

    size_t nextKeys(uint8_t level, const std::string& containerName,
            const std::string& lower,
            std::vector<std::string>& keys, size_t maxKeys) const {
        int ret;
        if(level == 0) return 0; // cannot iterate at object level
        // hash the name to get the provider id
289
        long unsigned db_idx = 0;
Matthieu Dorier's avatar
Matthieu Dorier committed
290
        uint64_t h = hashString(containerName);
291
        ch_placement_find_closest(m_chi_sdskv, h, 1, &db_idx);
292
        // make an entry for the lower bound
293
        auto lb_entry = buildKey(level, containerName, lower);
294
        // get provider and database
295 296 297 298 299 300 301 302 303 304
        const auto& db = m_databases[db_idx];
        // ignore keys that don't have the same level or the same prefix
        std::string prefix(2+containerName.size(), '\0');
        prefix[0] = level;
        if(containerName.size() != 0) {
            std::memcpy(&prefix[1], containerName.data(), containerName.size());
            prefix[prefix.size()-1] = '/';
        } else {
            prefix.resize(1);
        }
305
        // issue an sdskv_list_keys
306 307 308
        std::vector<std::string> entries(maxKeys);
        try {
            db.list_keys(lb_entry, prefix, entries);
309 310
        } catch(sdskv::exception& ex) {
            throw Exception("Error occured when calling sdskv::database::list_keys (SDSKV error="+std::string(ex.what()) + ")");
311 312
        }
        keys.resize(0);
313 314
        for(const auto& entry : entries) {
            keys.emplace_back(&entry[1], entry.size()-1);
315
        }
316
        return keys.size();
317 318 319 320
    }
};

}
321 322

#endif