leveldb_datastore.cc 8.25 KB
Newer Older
1 2 3
// Copyright (c) 2017, Los Alamos National Security, LLC.
// All rights reserved.
#include "leveldb_datastore.h"
4
#include "fs_util.h"
5
#include "kv-config.h"
Matthieu Dorier's avatar
Matthieu Dorier committed
6
#include <cstring>
7 8
#include <chrono>
#include <iostream>
Matthieu Dorier's avatar
Matthieu Dorier committed
9
#include <sstream>
10 11 12 13

using namespace std::chrono;

LevelDBDataStore::LevelDBDataStore() :
14
  AbstractDataStore(Duplicates::IGNORE, false, false), _less(nullptr), _keycmp(this) {
15 16 17 18
  _dbm = NULL;
};

LevelDBDataStore::LevelDBDataStore(Duplicates duplicates, bool eraseOnGet, bool debug) :
19
  AbstractDataStore(duplicates, eraseOnGet, debug), _less(nullptr), _keycmp(this) {
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
  _dbm = NULL;
};
  
std::string LevelDBDataStore::toString(const ds_bulk_t &bulk_val) {
  std::string str_val(bulk_val.begin(), bulk_val.end());
  return str_val;
};

ds_bulk_t LevelDBDataStore::fromString(const std::string &str_val) {
  ds_bulk_t bulk_val(str_val.begin(), str_val.end());
  return bulk_val;
};

LevelDBDataStore::~LevelDBDataStore() {
  delete _dbm;
  //leveldb::Env::Shutdown(); // Riak version only
};

Matthieu Dorier's avatar
Matthieu Dorier committed
38 39 40 41
void LevelDBDataStore::sync() {

}

42
bool LevelDBDataStore::openDatabase(const std::string& db_name, const std::string& db_path) {
43 44 45
    _name = db_name;
    _path = db_path;

46 47 48
  leveldb::Options options;
  leveldb::Status status;
  
49
  if (!db_path.empty()) {
50
    mkdirs(db_path.c_str());
51
  }
52
  options.comparator = &_keycmp;
53
  options.create_if_missing = true;
54 55 56 57
  std::string fullname = db_path;
  if(!fullname.empty()) fullname += std::string("/");
  fullname += db_name;
  status = leveldb::DB::Open(options, fullname, &_dbm);
58 59 60 61
  
  if (!status.ok()) {
    // error
    std::cerr << "LevelDBDataStore::createDatabase: LevelDB error on Open = " << status.ToString() << std::endl;
62
    return false;
63
  }
64
  return true;
65 66
};

67 68
void LevelDBDataStore::set_comparison_function(const std::string& name, comparator_fn less) {
    _comp_fun_name = name;
69
   _less = less; 
70 71
}

72 73 74
bool LevelDBDataStore::put(const ds_bulk_t &key, const ds_bulk_t &data) {
  leveldb::Status status;
  bool success = false;
75 76 77 78 79 80

  if(_no_overwrite) {
      if(exists(key)) return false;
  }

  //high_resolution_clock::time_point start = high_resolution_clock::now();
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
  // IGNORE case deals with redundant puts (where key/value is the same). In LevelDB a
  // redundant put simply overwrites previous value which is fine when key/value is the same.
  if (_duplicates == Duplicates::IGNORE) {
    status = _dbm->Put(leveldb::WriteOptions(), toString(key), toString(data));
    if (status.ok()) {
      success = true;
    }
    else {
      std::cerr << "LevelDBDataStore::put: LevelDB error on Put = " << status.ToString() << std::endl;
    }
  }
  else if (_duplicates == Duplicates::ALLOW) {
    std::cerr << "LevelDBDataStore::put: Duplicates::ALLOW set, LevelDB does not support duplicates" << std::endl;
  }
  else {
    std::cerr << "LevelDBDataStore::put: Unexpected Duplicates option = " << int32_t(_duplicates) << std::endl;
  }
98 99
//  uint64_t elapsed = duration_cast<microseconds>(high_resolution_clock::now()-start).count();
//  std::cout << "LevelDBDataStore::put time = " << elapsed << " microseconds" << std::endl;
100 101 102 103

  return success;
};

Matthieu Dorier's avatar
Matthieu Dorier committed
104 105 106 107 108 109
bool LevelDBDataStore::erase(const ds_bulk_t &key) {
    leveldb::Status status;
    status = _dbm->Delete(leveldb::WriteOptions(), toString(key));
    return status.ok();
}

110 111 112 113 114 115 116
bool LevelDBDataStore::exists(const ds_bulk_t &key) {
    leveldb::Status status;
    std::string value;
    status = _dbm->Get(leveldb::ReadOptions(), toString(key), &value);
    return status.ok();
}

117 118 119 120
bool LevelDBDataStore::get(const ds_bulk_t &key, ds_bulk_t &data) {
  leveldb::Status status;
  bool success = false;

121
  //high_resolution_clock::time_point start = high_resolution_clock::now();
122 123 124 125 126 127 128 129 130 131
  data.clear();
  std::string value;
  status = _dbm->Get(leveldb::ReadOptions(), toString(key), &value);
  if (status.ok()) {
    data = fromString(value);
    success = true;
  }
  else if (!status.IsNotFound()) {
    std::cerr << "LevelDBDataStore::get: LevelDB error on Get = " << status.ToString() << std::endl;
  }
132 133
//  uint64_t elapsed = duration_cast<microseconds>(high_resolution_clock::now()-start).count();
//  std::cout << "LevelDBDataStore::get time = " << elapsed << " microseconds" << std::endl;
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150

  return success;
};

bool LevelDBDataStore::get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data) {
  bool success = false;

  data.clear();
  ds_bulk_t value;
  if (get(key, value)) {
    data.push_back(value);
    success = true;
  }
  
  return success;
};

151
void LevelDBDataStore::set_in_memory(bool enable)
152 153
{};

154 155
std::vector<ds_bulk_t> LevelDBDataStore::vlist_keys(
        const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix) const
156 157 158 159
{
    std::vector<ds_bulk_t> keys;

    leveldb::Iterator *it = _dbm->NewIterator(leveldb::ReadOptions());
160 161
    leveldb::Slice start_slice(start.data(), start.size());

162 163
    int c = 0;

164
    if (start.size() > 0) {
165 166 167 168
        it->Seek(start_slice);
        /* we treat 'start' the way RADOS treats it: excluding it from returned
         * keys. LevelDB treats start inclusively, so skip over it if we found
         * an exact match */
Shane Snyder's avatar
Shane Snyder committed
169
        if ( it->Valid() && (start.size() == it->key().size()) &&
170 171
                (memcmp(it->key().data(), start.data(), start.size()) == 0))
            it->Next();
172
    } else {
173
        it->SeekToFirst();
174 175 176
    }
    /* note: iterator initialized above, not in for loop */
    for (; it->Valid() && keys.size() < count; it->Next() ) {
177 178
        ds_bulk_t k(it->key().size());
        memcpy(k.data(), it->key().data(), it->key().size() );
179 180 181 182 183 184
        c = std::memcmp(prefix.data(), k.data(), prefix.size());
        if(c == 0) {
            keys.push_back(std::move(k));
        } else if(c < 0) {
            break;
        }
185 186 187 188
    }
    delete it;
    return keys;
}
189

190 191
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> LevelDBDataStore::vlist_keyvals(
        const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix) const
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227
{
    std::vector<std::pair<ds_bulk_t,ds_bulk_t>> result;

    leveldb::Iterator *it = _dbm->NewIterator(leveldb::ReadOptions());
    leveldb::Slice start_slice(start.data(), start.size());

    int c = 0;

    if (start.size() > 0) {
        it->Seek(start_slice);
        /* we treat 'start' the way RADOS treats it: excluding it from returned
         * keys. LevelDB treats start inclusively, so skip over it if we found
         * an exact match */
        if ( start.size() == it->key().size() &&
                (memcmp(it->key().data(), start.data(), start.size()) == 0))
            it->Next();
    } else {
        it->SeekToFirst();
    }
    /* note: iterator initialized above, not in for loop */
    for (; it->Valid() && result.size() < count; it->Next() ) {
        ds_bulk_t k(it->key().size());
        ds_bulk_t v(it->value().size());
        memcpy(k.data(), it->key().data(), it->key().size());
        memcpy(v.data(), it->value().data(), it->value().size());

        c = std::memcmp(prefix.data(), k.data(), prefix.size());
        if(c == 0) {
            result.push_back(std::make_pair(std::move(k), std::move(v)));
        } else if(c < 0) {
            break;
        }
    }
    delete it;
    return result;
}
228

229 230 231 232 233 234 235 236 237 238 239 240 241 242
std::vector<ds_bulk_t> LevelDBDataStore::vlist_key_range(
        const ds_bulk_t &lower_bound, const ds_bulk_t &upper_bound, size_t max_keys) const {
    std::vector<ds_bulk_t> result;
    // TODO implement this function
    throw SDSKV_OP_NOT_IMPL;
    return result;
}

std::vector<std::pair<ds_bulk_t,ds_bulk_t>> LevelDBDataStore::vlist_keyval_range(
        const ds_bulk_t &lower_bound, const ds_bulk_t &upper_bound, size_t max_keys) const {
    std::vector<std::pair<ds_bulk_t,ds_bulk_t>> result;
    // TODO implement this function
    throw SDSKV_OP_NOT_IMPL;
    return result;
243
}
244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260

remi_fileset_t LevelDBDataStore::create_and_populate_fileset() const {
    remi_fileset_t fileset = REMI_FILESET_NULL;
    std::string local_root = _path;
    int ret;
    if(_path[_path.size()-1] != '/')
        local_root += "/";
    remi_fileset_create("sdskv", local_root.c_str(), &fileset);
    remi_fileset_register_directory(fileset, (_name+"/").c_str());
    remi_fileset_register_metadata(fileset, "database_type", "leveldb");
    remi_fileset_register_metadata(fileset, "comparison_function", _comp_fun_name.c_str()); 
    remi_fileset_register_metadata(fileset, "database_name", _name.c_str());
    if(_no_overwrite) {
        remi_fileset_register_metadata(fileset, "no_overwrite", "");
    }
    return fileset;
}