DataStoreImpl.hpp 12.4 KB
Newer Older
Matthieu Dorier's avatar
Matthieu Dorier committed
1 2 3 4 5
/*
 * (C) 2018 The University of Chicago
 * 
 * See COPYRIGHT in top-level directory.
 */
6 7 8
#ifndef __HEPNOS_PRIVATE_DATASTORE_IMPL
#define __HEPNOS_PRIVATE_DATASTORE_IMPL

9
#include <vector>
10
#include <unordered_set>
Matthieu Dorier's avatar
Matthieu Dorier committed
11
#include <unordered_map>
12 13 14
#include <functional>
#include <iostream>
#include <yaml-cpp/yaml.h>
15
#include <sdskv-client.hpp>
16 17 18 19
#include <ch-placement.h>
#include "hepnos/Exception.hpp"
#include "hepnos/DataStore.hpp"
#include "hepnos/DataSet.hpp"
Matthieu Dorier's avatar
Matthieu Dorier committed
20
#include "StringHash.hpp"
21 22 23 24

namespace hepnos {

////////////////////////////////////////////////////////////////////////////////////////////
25
// DataStoreImpl implementation
26 27
////////////////////////////////////////////////////////////////////////////////////////////

28 29 30 31 32
struct DistributedDBInfo {
    std::vector<sdskv::database>  dbs;
    struct ch_placement_instance* chi = nullptr;
};

33
class DataStoreImpl {
34 35
    public:

Matthieu Dorier's avatar
Matthieu Dorier committed
36 37
    margo_instance_id                         m_mid;          // Margo instance
    std::unordered_map<std::string,hg_addr_t> m_addrs;        // Addresses used by the service
38
    sdskv::client                             m_sdskv_client; // SDSKV client
39
    DistributedDBInfo                         m_databases;    // list of SDSKV databases
Matthieu Dorier's avatar
Matthieu Dorier committed
40
    const DataStore::iterator                 m_end;          // iterator for the end() of the DataStore
41

42
    DataStoreImpl()
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
    : m_mid(MARGO_INSTANCE_NULL)
    , m_end() {}

    void init(const std::string& configFile) {
        int ret;
        hg_return_t hret;
        YAML::Node config = YAML::LoadFile(configFile);
        checkConfig(config);
        // get protocol
        std::string proto = config["hepnos"]["client"]["protocol"].as<std::string>();
        // initialize Margo
        m_mid = margo_init(proto.c_str(), MARGO_CLIENT_MODE, 0, 0);
        if(!m_mid) {
            cleanup();
            throw Exception("Could not initialized Margo");
        }
        // initialize SDSKV client
60 61
        try {
            m_sdskv_client = sdskv::client(m_mid);
62
        } catch(sdskv::exception& ex) {
63
            cleanup();
64
            throw Exception("Could not create SDSKV client (SDSKV error="+std::to_string(ex.error())+")");
65 66
        }
        // create list of sdskv provider handles
67 68 69 70 71 72
        YAML::Node databases = config["hepnos"]["databases"];
        YAML::Node dataset_db = databases["datasets"];
        for(YAML::const_iterator address_it = dataset_db.begin(); address_it != dataset_db.end(); address_it++) {
            std::string str_addr = address_it->first.as<std::string>();
            YAML::Node providers = address_it->second;
            // lookup the address
73
            hg_addr_t addr;
Matthieu Dorier's avatar
Matthieu Dorier committed
74 75 76 77 78 79 80
            if(m_addrs.count(str_addr) != 0) {
                addr = m_addrs[str_addr];
            } else {
                hret = margo_addr_lookup(m_mid, str_addr.c_str(), &addr);
                if(hret != HG_SUCCESS) {
                    margo_addr_free(m_mid,addr);
                    cleanup();
81
                    throw Exception("margo_addr_lookup failed (MARGO error="+std::to_string(hret)+")");
Matthieu Dorier's avatar
Matthieu Dorier committed
82 83
                }
                m_addrs[str_addr] = addr;
84
            }
85 86 87 88 89 90 91 92 93 94 95
            // iterate over providers for this address
            for(YAML::const_iterator provider_it = providers.begin(); provider_it != providers.end(); provider_it++) {
                // get the provider id
                uint16_t provider_id = provider_it->first.as<uint16_t>();
                // create provider handle
                sdskv::provider_handle ph(m_sdskv_client, addr, provider_id);
                // get the database ids
                YAML::Node databases = provider_it->second;
                // iterate over databases for this provider
                for(unsigned i=0; i < databases.size(); i++) {
                    m_databases.dbs.push_back(sdskv::database(ph, databases[i].as<uint64_t>()));
96
                }
97 98
            } // for each provider
        } // for each address
99
        // initialize ch-placement for the SDSKV providers
100
        m_databases.chi = ch_placement_initialize("hash_lookup3", m_databases.dbs.size(), 4, 0);
101 102 103
    }

    void cleanup() {
104
        m_databases.dbs.clear();
105
        m_sdskv_client = sdskv::client();
106 107
        if(m_databases.chi)
            ch_placement_finalize(m_databases.chi);
Matthieu Dorier's avatar
Matthieu Dorier committed
108 109 110
        for(auto& addr : m_addrs) {
            margo_addr_free(m_mid, addr.second);
        }
111 112 113 114 115 116
        if(m_mid) margo_finalize(m_mid);
    }

    private:

    static void checkConfig(YAML::Node& config) {
Matthieu Dorier's avatar
Matthieu Dorier committed
117
        // config file starts with hepnos entry
118 119 120 121
        auto hepnosNode = config["hepnos"];
        if(!hepnosNode) {
            throw Exception("\"hepnos\" entry not found in YAML file");
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
122
        // hepnos entry has client entry
123 124 125 126
        auto clientNode = hepnosNode["client"];
        if(!clientNode) {
            throw Exception("\"client\" entry not found in \"hepnos\" section");
        }
Matthieu Dorier's avatar
Matthieu Dorier committed
127
        // client entry has protocol entry
128 129 130 131
        auto protoNode = clientNode["protocol"];
        if(!protoNode) {
            throw Exception("\"protocol\" entry not found in \"client\" section");
        }
132 133 134 135
        // hepnos entry has databases entry
        auto databasesNode = hepnosNode["databases"];
        if(!databasesNode) {
            throw Exception("\"databasess\" entry not found in \"hepnos\" section");
136
        }
137 138 139 140 141 142 143 144 145 146 147 148
        if(!databasesNode.IsMap()) {
            throw Exception("\"databases\" entry should be a map");
        } /*
        for(auto provider_it = databasesNode.begin(); provider_it != databasesNode.end(); provider_it++) {
            // provider entry should be a sequence
            if(!provider.IsSequence()) {
                throw Exception("provider entry should be a sequence");
            }
            for(auto db : provider) {
                if(!db.IsScalar()) {
                    throw Exception("database id should be a scalar");
                }
149 150
            }
        }
151
        */
152 153
    }

154 155
    public:

156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
    static inline std::string buildKey(
            uint8_t level,
            const std::string& containerName,
            const std::string& objectName) {
        size_t c = 1 + objectName.size();
        if(!containerName.empty()) c += containerName.size() + 1;
        std::string result(c,'\0');
        result[0] = level;
        if(!containerName.empty()) {
            std::memcpy(&result[1], containerName.data(), containerName.size());
            size_t x = 1+containerName.size();
            result[x] = '/';
            std::memcpy(&result[x+1], objectName.data(), objectName.size());
        } else {
            std::memcpy(&result[1], objectName.data(), objectName.size());
        }
        return result;
    }

175
    unsigned long computeDbIndex(uint8_t level, const std::string& containerName, const std::string& key) const {
176
        // hash the name to get the provider id
177
        long unsigned sdskv_db_idx = 0;
Matthieu Dorier's avatar
Matthieu Dorier committed
178
        uint64_t name_hash;
179
        if(level != 0) {
Matthieu Dorier's avatar
Matthieu Dorier committed
180
            name_hash = hashString(containerName);
181
        } else {
182
            // use the complete name for final objects (level 0)
Matthieu Dorier's avatar
Matthieu Dorier committed
183
            name_hash = hashString(key);
184
        }
185
        ch_placement_find_closest(m_databases.chi, name_hash, 1, &sdskv_db_idx);
186 187 188 189 190 191 192 193 194 195
        return sdskv_db_idx;
    }

    bool load(uint8_t level, const std::string& containerName,
            const std::string& objectName, std::string& data) const {
        int ret;
        // build key
        auto key = buildKey(level, containerName, objectName);
        // find out which DB to access
        long unsigned sdskv_db_idx = computeDbIndex(level, containerName, key);
196
        // make corresponding datastore entry
197
        auto& db = m_databases.dbs[sdskv_db_idx];
198
        // read the value
199 200
        if(data.size() == 0)
            data.resize(2048); // eagerly allocate 2KB
201
        try {
202
            db.get(key, data);
203 204 205 206
        } catch(sdskv::exception& ex) {
            if(ex.error() == SDSKV_ERR_UNKNOWN_KEY)
                return false;
            else
207
                throw Exception("Error occured when calling sdskv::database::get (SDSKV error="+std::to_string(ex.error())+")");
208 209 210 211
        }
        return true;
    }

212 213 214 215 216 217 218 219
    bool load(uint8_t level, const std::string& containerName,
            const std::string& objectName, char* value, size_t* vsize) const {
        int ret;
        // build key
        auto key = buildKey(level, containerName, objectName);
        // find out which DB to access
        long unsigned sdskv_db_idx = computeDbIndex(level, containerName, key);
        // make corresponding datastore entry
220
        auto& db = m_databases.dbs[sdskv_db_idx];
221 222 223 224 225 226 227 228 229 230 231 232 233 234
        // read the value
        try {
            db.get(key.data(), key.size(), value, vsize);
        } catch(sdskv::exception& ex) {
            if(ex.error() == SDSKV_ERR_UNKNOWN_KEY)
                return false;
            else if(ex.error() == SDSKV_ERR_SIZE)
                return false;
            else
                throw Exception("Error occured when calling sdskv::database::get (SDSKV error="+std::to_string(ex.error())+")");
        }
        return true;
    }

235 236 237 238 239 240 241 242
    bool exists(uint8_t level, const std::string& containerName,
            const std::string& objectName) const {
        int ret;
        // build key
        auto key = buildKey(level, containerName, objectName);
        // find out which DB to access
        long unsigned sdskv_db_idx = computeDbIndex(level, containerName, key);
        // make corresponding datastore entry
243
        auto& db = m_databases.dbs[sdskv_db_idx];
244 245 246 247 248 249 250 251
        try {
            return db.exists(key);
        } catch(sdskv::exception& ex) {
            throw Exception("Error occured when calling sdskv::database::exists (SDSKV error="+std::to_string(ex.error())+")");
        }
        return false;
    }

252
    ProductID store(uint8_t level, const std::string& containerName,
253
            const std::string& objectName, const char* data=nullptr, size_t data_size=0) {
254
        // build full name
255
        auto key = buildKey(level, containerName, objectName);
256 257
        // find out which DB to access
        long unsigned sdskv_db_idx = computeDbIndex(level, containerName, key);
258
        // Create the product id
259
        const auto& db = m_databases.dbs[sdskv_db_idx];
260
        try {
261
            db.put(key.data(), key.size(), data, data_size);
262
        } catch(sdskv::exception& ex) {
263 264 265
            if(ex.error() == SDSKV_ERR_KEYEXISTS) {
                return ProductID();
            } else {
266
                throw Exception("Error occured when calling sdskv::database::put (SDSKV error=" +std::to_string(ex.error()) + ")");
267
            }
268
        }
269 270 271 272
        return ProductID(level, containerName, objectName);
    }

    void storeMultiple(unsigned long db_index, 
273 274
            const std::vector<std::string>& keys,
            const std::vector<std::string>& values) {
275
        // Create the product id
276
        const auto& db = m_databases.dbs[db_index];
277
        try {
278
            db.put_multi(keys, values);
279 280 281
        } catch(sdskv::exception& ex) {
            throw Exception("Error occured when calling sdskv::database::put (SDSKV error=" +std::to_string(ex.error()) + ")");
        }
282 283 284 285 286 287 288 289
    }

    size_t nextKeys(uint8_t level, const std::string& containerName,
            const std::string& lower,
            std::vector<std::string>& keys, size_t maxKeys) const {
        int ret;
        if(level == 0) return 0; // cannot iterate at object level
        // hash the name to get the provider id
290
        long unsigned db_idx = 0;
Matthieu Dorier's avatar
Matthieu Dorier committed
291
        uint64_t h = hashString(containerName);
292
        ch_placement_find_closest(m_databases.chi, h, 1, &db_idx);
293
        // make an entry for the lower bound
294
        auto lb_entry = buildKey(level, containerName, lower);
295
        // get provider and database
296
        const auto& db = m_databases.dbs[db_idx];
297 298 299 300 301 302 303 304 305
        // ignore keys that don't have the same level or the same prefix
        std::string prefix(2+containerName.size(), '\0');
        prefix[0] = level;
        if(containerName.size() != 0) {
            std::memcpy(&prefix[1], containerName.data(), containerName.size());
            prefix[prefix.size()-1] = '/';
        } else {
            prefix.resize(1);
        }
306
        // issue an sdskv_list_keys
307 308 309
        std::vector<std::string> entries(maxKeys);
        try {
            db.list_keys(lb_entry, prefix, entries);
310 311
        } catch(sdskv::exception& ex) {
            throw Exception("Error occured when calling sdskv::database::list_keys (SDSKV error="+std::string(ex.what()) + ")");
312 313
        }
        keys.resize(0);
314 315
        for(const auto& entry : entries) {
            keys.emplace_back(&entry[1], entry.size()-1);
316
        }
317
        return keys.size();
318 319 320 321
    }
};

}
322 323

#endif