leveldb_datastore.cc 7.23 KB
Newer Older
1 2 3
// Copyright (c) 2017, Los Alamos National Security, LLC.
// All rights reserved.
#include "leveldb_datastore.h"
4
#include "fs_util.h"
5
#include "kv-config.h"
Matthieu Dorier's avatar
Matthieu Dorier committed
6
#include <cstring>
7 8
#include <chrono>
#include <iostream>
Matthieu Dorier's avatar
Matthieu Dorier committed
9
#include <sstream>
10 11 12 13

using namespace std::chrono;

LevelDBDataStore::LevelDBDataStore() :
14
  AbstractDataStore(Duplicates::IGNORE, false, false), _less(nullptr), _keycmp(this) {
15 16 17 18
  _dbm = NULL;
};

LevelDBDataStore::LevelDBDataStore(Duplicates duplicates, bool eraseOnGet, bool debug) :
19
  AbstractDataStore(duplicates, eraseOnGet, debug), _less(nullptr), _keycmp(this) {
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
  _dbm = NULL;
};
  
std::string LevelDBDataStore::toString(const ds_bulk_t &bulk_val) {
  std::string str_val(bulk_val.begin(), bulk_val.end());
  return str_val;
};

ds_bulk_t LevelDBDataStore::fromString(const std::string &str_val) {
  ds_bulk_t bulk_val(str_val.begin(), str_val.end());
  return bulk_val;
};

LevelDBDataStore::~LevelDBDataStore() {
  delete _dbm;
  //leveldb::Env::Shutdown(); // Riak version only
};

38
void LevelDBDataStore::createDatabase(const std::string& db_name, const std::string& db_path) {
39 40 41
  leveldb::Options options;
  leveldb::Status status;
  
42
  if (!db_path.empty()) {
43 44 45 46
//      std::stringstream str_db_path("mkdir -p ");
//      str_db_path << db_path;
//      system(str_db_path.str().c_str());
    mkdirs(db_path.c_str());
47
  }
48
  options.comparator = &_keycmp;
49
  options.create_if_missing = true;
50 51 52 53
  std::string fullname = db_path;
  if(!fullname.empty()) fullname += std::string("/");
  fullname += db_name;
  status = leveldb::DB::Open(options, fullname, &_dbm);
54 55 56 57 58 59 60 61 62 63
  
  if (!status.ok()) {
    // error
    std::cerr << "LevelDBDataStore::createDatabase: LevelDB error on Open = " << status.ToString() << std::endl;
  }
  assert(status.ok()); // fall over
  
  // debugging support?
};

64
void LevelDBDataStore::set_comparison_function(comparator_fn less) {
65
   _less = less; 
66 67
}

68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
bool LevelDBDataStore::put(const ds_bulk_t &key, const ds_bulk_t &data) {
  leveldb::Status status;
  bool success = false;
  
  high_resolution_clock::time_point start = high_resolution_clock::now();
  // IGNORE case deals with redundant puts (where key/value is the same). In LevelDB a
  // redundant put simply overwrites previous value which is fine when key/value is the same.
  if (_duplicates == Duplicates::IGNORE) {
    status = _dbm->Put(leveldb::WriteOptions(), toString(key), toString(data));
    if (status.ok()) {
      success = true;
    }
    else {
      std::cerr << "LevelDBDataStore::put: LevelDB error on Put = " << status.ToString() << std::endl;
    }
  }
  else if (_duplicates == Duplicates::ALLOW) {
    std::cerr << "LevelDBDataStore::put: Duplicates::ALLOW set, LevelDB does not support duplicates" << std::endl;
  }
  else {
    std::cerr << "LevelDBDataStore::put: Unexpected Duplicates option = " << int32_t(_duplicates) << std::endl;
  }
  uint64_t elapsed = duration_cast<microseconds>(high_resolution_clock::now()-start).count();
  std::cout << "LevelDBDataStore::put time = " << elapsed << " microseconds" << std::endl;

  return success;
};

Matthieu Dorier's avatar
Matthieu Dorier committed
96 97 98 99 100 101
bool LevelDBDataStore::erase(const ds_bulk_t &key) {
    leveldb::Status status;
    status = _dbm->Delete(leveldb::WriteOptions(), toString(key));
    return status.ok();
}

102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
bool LevelDBDataStore::get(const ds_bulk_t &key, ds_bulk_t &data) {
  leveldb::Status status;
  bool success = false;

  high_resolution_clock::time_point start = high_resolution_clock::now();
  data.clear();
  std::string value;
  status = _dbm->Get(leveldb::ReadOptions(), toString(key), &value);
  if (status.ok()) {
    data = fromString(value);
    success = true;
  }
  else if (!status.IsNotFound()) {
    std::cerr << "LevelDBDataStore::get: LevelDB error on Get = " << status.ToString() << std::endl;
  }
  uint64_t elapsed = duration_cast<microseconds>(high_resolution_clock::now()-start).count();
  std::cout << "LevelDBDataStore::get time = " << elapsed << " microseconds" << std::endl;

  return success;
};

bool LevelDBDataStore::get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data) {
  bool success = false;

  data.clear();
  ds_bulk_t value;
  if (get(key, value)) {
    data.push_back(value);
    success = true;
  }
  
  return success;
};

136
void LevelDBDataStore::set_in_memory(bool enable)
137 138
{};

139
std::vector<ds_bulk_t> LevelDBDataStore::vlist_keys(const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix)
140 141 142 143
{
    std::vector<ds_bulk_t> keys;

    leveldb::Iterator *it = _dbm->NewIterator(leveldb::ReadOptions());
144 145
    leveldb::Slice start_slice(start.data(), start.size());

146 147
    int c = 0;

148
    if (start.size() > 0) {
149 150 151 152 153 154 155
        it->Seek(start_slice);
        /* we treat 'start' the way RADOS treats it: excluding it from returned
         * keys. LevelDB treats start inclusively, so skip over it if we found
         * an exact match */
        if ( start.size() == it->key().size() &&
                (memcmp(it->key().data(), start.data(), start.size()) == 0))
            it->Next();
156
    } else {
157
        it->SeekToFirst();
158 159 160
    }
    /* note: iterator initialized above, not in for loop */
    for (; it->Valid() && keys.size() < count; it->Next() ) {
161 162
        ds_bulk_t k(it->key().size());
        memcpy(k.data(), it->key().data(), it->key().size() );
163 164 165 166 167 168
        c = std::memcmp(prefix.data(), k.data(), prefix.size());
        if(c == 0) {
            keys.push_back(std::move(k));
        } else if(c < 0) {
            break;
        }
169 170 171 172
    }
    delete it;
    return keys;
}
173

174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> LevelDBDataStore::vlist_keyvals(const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix)
{
    std::vector<std::pair<ds_bulk_t,ds_bulk_t>> result;

    leveldb::Iterator *it = _dbm->NewIterator(leveldb::ReadOptions());
    leveldb::Slice start_slice(start.data(), start.size());

    int c = 0;

    if (start.size() > 0) {
        it->Seek(start_slice);
        /* we treat 'start' the way RADOS treats it: excluding it from returned
         * keys. LevelDB treats start inclusively, so skip over it if we found
         * an exact match */
        if ( start.size() == it->key().size() &&
                (memcmp(it->key().data(), start.data(), start.size()) == 0))
            it->Next();
    } else {
        it->SeekToFirst();
    }
    /* note: iterator initialized above, not in for loop */
    for (; it->Valid() && result.size() < count; it->Next() ) {
        ds_bulk_t k(it->key().size());
        ds_bulk_t v(it->value().size());
        memcpy(k.data(), it->key().data(), it->key().size());
        memcpy(v.data(), it->value().data(), it->value().size());

        c = std::memcmp(prefix.data(), k.data(), prefix.size());
        if(c == 0) {
            result.push_back(std::make_pair(std::move(k), std::move(v)));
        } else if(c < 0) {
            break;
        }
    }
    delete it;
    return result;
}
/*
212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227
{
    std::vector<std::pair<ds_bulk_t,ds_bulk_t>> keyvals;

    leveldb::Iterator *it = _dbm->NewIterator(leveldb::ReadOptions());
    size_t i=0;
    for (it->SeekToFirst(); it->Valid(); it->Next() ) {
        ds_bulk_t k(it->key().size());
        ds_bulk_t v(it->value().size());
        memcpy(k.data(), it->key().data(), it->key().size() );
        memcpy(v.data(), it->value().data(), it->value().size() );
        keyvals.push_back(std::make_pair(std::move(k), std::move(v)));
        if (i++ > count) break;
    }
    delete it;
    return keyvals;
}
228
*/