leveldb_datastore.cc 8.29 KB
Newer Older
1 2 3
// Copyright (c) 2017, Los Alamos National Security, LLC.
// All rights reserved.
#include "leveldb_datastore.h"
4
#include "fs_util.h"
5
#include "kv-config.h"
Matthieu Dorier's avatar
Matthieu Dorier committed
6
#include <cstring>
7 8
#include <chrono>
#include <iostream>
Matthieu Dorier's avatar
Matthieu Dorier committed
9
#include <sstream>
10 11 12 13

using namespace std::chrono;

LevelDBDataStore::LevelDBDataStore() :
14
  AbstractDataStore(Duplicates::IGNORE, false, false), _less(nullptr), _keycmp(this) {
15 16 17 18
  _dbm = NULL;
};

LevelDBDataStore::LevelDBDataStore(Duplicates duplicates, bool eraseOnGet, bool debug) :
19
  AbstractDataStore(duplicates, eraseOnGet, debug), _less(nullptr), _keycmp(this) {
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
  _dbm = NULL;
};
  
std::string LevelDBDataStore::toString(const ds_bulk_t &bulk_val) {
  std::string str_val(bulk_val.begin(), bulk_val.end());
  return str_val;
};

ds_bulk_t LevelDBDataStore::fromString(const std::string &str_val) {
  ds_bulk_t bulk_val(str_val.begin(), str_val.end());
  return bulk_val;
};

LevelDBDataStore::~LevelDBDataStore() {
  delete _dbm;
  //leveldb::Env::Shutdown(); // Riak version only
};

Matthieu Dorier's avatar
Matthieu Dorier committed
38 39 40 41
void LevelDBDataStore::sync() {

}

42
bool LevelDBDataStore::openDatabase(const std::string& db_name, const std::string& db_path) {
43 44 45
    _name = db_name;
    _path = db_path;

46 47 48
  leveldb::Options options;
  leveldb::Status status;
  
49
  if (!db_path.empty()) {
50
    mkdirs(db_path.c_str());
51
  }
52
  options.comparator = &_keycmp;
53
  options.create_if_missing = true;
54 55 56 57
  std::string fullname = db_path;
  if(!fullname.empty()) fullname += std::string("/");
  fullname += db_name;
  status = leveldb::DB::Open(options, fullname, &_dbm);
58 59 60 61
  
  if (!status.ok()) {
    // error
    std::cerr << "LevelDBDataStore::createDatabase: LevelDB error on Open = " << status.ToString() << std::endl;
62
    return false;
63
  }
64
  return true;
65 66
};

67 68
void LevelDBDataStore::set_comparison_function(const std::string& name, comparator_fn less) {
    _comp_fun_name = name;
69
   _less = less; 
70 71
}

72 73 74
bool LevelDBDataStore::put(const ds_bulk_t &key, const ds_bulk_t &data) {
  leveldb::Status status;
  bool success = false;
75 76 77 78 79 80

  if(_no_overwrite) {
      if(exists(key)) return false;
  }

  //high_resolution_clock::time_point start = high_resolution_clock::now();
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
  // IGNORE case deals with redundant puts (where key/value is the same). In LevelDB a
  // redundant put simply overwrites previous value which is fine when key/value is the same.
  if (_duplicates == Duplicates::IGNORE) {
    status = _dbm->Put(leveldb::WriteOptions(), toString(key), toString(data));
    if (status.ok()) {
      success = true;
    }
    else {
      std::cerr << "LevelDBDataStore::put: LevelDB error on Put = " << status.ToString() << std::endl;
    }
  }
  else if (_duplicates == Duplicates::ALLOW) {
    std::cerr << "LevelDBDataStore::put: Duplicates::ALLOW set, LevelDB does not support duplicates" << std::endl;
  }
  else {
    std::cerr << "LevelDBDataStore::put: Unexpected Duplicates option = " << int32_t(_duplicates) << std::endl;
  }
98 99
//  uint64_t elapsed = duration_cast<microseconds>(high_resolution_clock::now()-start).count();
//  std::cout << "LevelDBDataStore::put time = " << elapsed << " microseconds" << std::endl;
100 101 102 103

  return success;
};

Matthieu Dorier's avatar
Matthieu Dorier committed
104 105 106 107 108 109
bool LevelDBDataStore::erase(const ds_bulk_t &key) {
    leveldb::Status status;
    status = _dbm->Delete(leveldb::WriteOptions(), toString(key));
    return status.ok();
}

110 111 112 113 114 115 116
bool LevelDBDataStore::exists(const ds_bulk_t &key) {
    leveldb::Status status;
    std::string value;
    status = _dbm->Get(leveldb::ReadOptions(), toString(key), &value);
    return status.ok();
}

117 118 119 120
bool LevelDBDataStore::get(const ds_bulk_t &key, ds_bulk_t &data) {
  leveldb::Status status;
  bool success = false;

121
  //high_resolution_clock::time_point start = high_resolution_clock::now();
122 123 124 125 126 127 128 129 130 131
  data.clear();
  std::string value;
  status = _dbm->Get(leveldb::ReadOptions(), toString(key), &value);
  if (status.ok()) {
    data = fromString(value);
    success = true;
  }
  else if (!status.IsNotFound()) {
    std::cerr << "LevelDBDataStore::get: LevelDB error on Get = " << status.ToString() << std::endl;
  }
132 133
//  uint64_t elapsed = duration_cast<microseconds>(high_resolution_clock::now()-start).count();
//  std::cout << "LevelDBDataStore::get time = " << elapsed << " microseconds" << std::endl;
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150

  return success;
};

bool LevelDBDataStore::get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data) {
  bool success = false;

  data.clear();
  ds_bulk_t value;
  if (get(key, value)) {
    data.push_back(value);
    success = true;
  }
  
  return success;
};

151
void LevelDBDataStore::set_in_memory(bool enable)
152 153
{};

154 155
std::vector<ds_bulk_t> LevelDBDataStore::vlist_keys(
        const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix) const
156 157 158 159
{
    std::vector<ds_bulk_t> keys;

    leveldb::Iterator *it = _dbm->NewIterator(leveldb::ReadOptions());
160 161
    leveldb::Slice start_slice(start.data(), start.size());

162 163
    int c = 0;

164
    if (start.size() > 0) {
165 166 167 168
        it->Seek(start_slice);
        /* we treat 'start' the way RADOS treats it: excluding it from returned
         * keys. LevelDB treats start inclusively, so skip over it if we found
         * an exact match */
Shane Snyder's avatar
Shane Snyder committed
169
        if ( it->Valid() && (start.size() == it->key().size()) &&
170 171
                (memcmp(it->key().data(), start.data(), start.size()) == 0))
            it->Next();
172
    } else {
173
        it->SeekToFirst();
174 175 176
    }
    /* note: iterator initialized above, not in for loop */
    for (; it->Valid() && keys.size() < count; it->Next() ) {
177 178
        ds_bulk_t k(it->key().size());
        memcpy(k.data(), it->key().data(), it->key().size() );
179 180 181 182 183 184
        c = std::memcmp(prefix.data(), k.data(), prefix.size());
        if(c == 0) {
            keys.push_back(std::move(k));
        } else if(c < 0) {
            break;
        }
185 186 187 188
    }
    delete it;
    return keys;
}
189

190 191
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> LevelDBDataStore::vlist_keyvals(
        const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix) const
192 193 194 195 196 197 198 199 200 201 202 203 204
{
    std::vector<std::pair<ds_bulk_t,ds_bulk_t>> result;

    leveldb::Iterator *it = _dbm->NewIterator(leveldb::ReadOptions());
    leveldb::Slice start_slice(start.data(), start.size());

    int c = 0;

    if (start.size() > 0) {
        it->Seek(start_slice);
        /* we treat 'start' the way RADOS treats it: excluding it from returned
         * keys. LevelDB treats start inclusively, so skip over it if we found
         * an exact match */
205
        if ( it->Valid() && (start.size() == it->key().size()) &&
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227
                (memcmp(it->key().data(), start.data(), start.size()) == 0))
            it->Next();
    } else {
        it->SeekToFirst();
    }
    /* note: iterator initialized above, not in for loop */
    for (; it->Valid() && result.size() < count; it->Next() ) {
        ds_bulk_t k(it->key().size());
        ds_bulk_t v(it->value().size());
        memcpy(k.data(), it->key().data(), it->key().size());
        memcpy(v.data(), it->value().data(), it->value().size());

        c = std::memcmp(prefix.data(), k.data(), prefix.size());
        if(c == 0) {
            result.push_back(std::make_pair(std::move(k), std::move(v)));
        } else if(c < 0) {
            break;
        }
    }
    delete it;
    return result;
}
228

229 230 231 232 233 234 235 236 237 238 239 240 241 242
std::vector<ds_bulk_t> LevelDBDataStore::vlist_key_range(
        const ds_bulk_t &lower_bound, const ds_bulk_t &upper_bound, size_t max_keys) const {
    std::vector<ds_bulk_t> result;
    // TODO implement this function
    throw SDSKV_OP_NOT_IMPL;
    return result;
}

std::vector<std::pair<ds_bulk_t,ds_bulk_t>> LevelDBDataStore::vlist_keyval_range(
        const ds_bulk_t &lower_bound, const ds_bulk_t &upper_bound, size_t max_keys) const {
    std::vector<std::pair<ds_bulk_t,ds_bulk_t>> result;
    // TODO implement this function
    throw SDSKV_OP_NOT_IMPL;
    return result;
243
}
244

245
#ifdef USE_REMI
246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261
remi_fileset_t LevelDBDataStore::create_and_populate_fileset() const {
    remi_fileset_t fileset = REMI_FILESET_NULL;
    std::string local_root = _path;
    int ret;
    if(_path[_path.size()-1] != '/')
        local_root += "/";
    remi_fileset_create("sdskv", local_root.c_str(), &fileset);
    remi_fileset_register_directory(fileset, (_name+"/").c_str());
    remi_fileset_register_metadata(fileset, "database_type", "leveldb");
    remi_fileset_register_metadata(fileset, "comparison_function", _comp_fun_name.c_str()); 
    remi_fileset_register_metadata(fileset, "database_name", _name.c_str());
    if(_no_overwrite) {
        remi_fileset_register_metadata(fileset, "no_overwrite", "");
    }
    return fileset;
}
262
#endif