leveldb_datastore.cc 4.99 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
// Copyright (c) 2017, Los Alamos National Security, LLC.
// All rights reserved.
#include "leveldb_datastore.h"
#include "kv-config.h"
#include <chrono>
#include <iostream>
#include <boost/filesystem.hpp>

using namespace std::chrono;

LevelDBDataStore::LevelDBDataStore() :
  AbstractDataStore(Duplicates::IGNORE, false, false) {
  _dbm = NULL;
};

LevelDBDataStore::LevelDBDataStore(Duplicates duplicates, bool eraseOnGet, bool debug) :
  AbstractDataStore(duplicates, eraseOnGet, debug) {
  _dbm = NULL;
};
  
std::string LevelDBDataStore::toString(const ds_bulk_t &bulk_val) {
  std::string str_val(bulk_val.begin(), bulk_val.end());
  return str_val;
};

ds_bulk_t LevelDBDataStore::fromString(const std::string &str_val) {
  ds_bulk_t bulk_val(str_val.begin(), str_val.end());
  return bulk_val;
};

LevelDBDataStore::~LevelDBDataStore() {
  delete _dbm;
  //leveldb::Env::Shutdown(); // Riak version only
};

void LevelDBDataStore::createDatabase(std::string db_name) {
  leveldb::Options options;
  leveldb::Status status;
  
  // db_name assumed to include the full path (e.g. /var/data/db.dat)
  boost::filesystem::path p(db_name);
  std::string basepath = p.parent_path().string();
  if (!basepath.empty()) {
    boost::filesystem::create_directories(basepath);
  }

  options.create_if_missing = true;
  status = leveldb::DB::Open(options, db_name, &_dbm);
  
  if (!status.ok()) {
    // error
    std::cerr << "LevelDBDataStore::createDatabase: LevelDB error on Open = " << status.ToString() << std::endl;
  }
  assert(status.ok()); // fall over
  
  // debugging support?
};

bool LevelDBDataStore::put(const ds_bulk_t &key, const ds_bulk_t &data) {
  leveldb::Status status;
  bool success = false;
  
  high_resolution_clock::time_point start = high_resolution_clock::now();
  // IGNORE case deals with redundant puts (where key/value is the same). In LevelDB a
  // redundant put simply overwrites previous value which is fine when key/value is the same.
  if (_duplicates == Duplicates::IGNORE) {
    status = _dbm->Put(leveldb::WriteOptions(), toString(key), toString(data));
    if (status.ok()) {
      success = true;
    }
    else {
      std::cerr << "LevelDBDataStore::put: LevelDB error on Put = " << status.ToString() << std::endl;
    }
  }
  else if (_duplicates == Duplicates::ALLOW) {
    std::cerr << "LevelDBDataStore::put: Duplicates::ALLOW set, LevelDB does not support duplicates" << std::endl;
  }
  else {
    std::cerr << "LevelDBDataStore::put: Unexpected Duplicates option = " << int32_t(_duplicates) << std::endl;
  }
  uint64_t elapsed = duration_cast<microseconds>(high_resolution_clock::now()-start).count();
  std::cout << "LevelDBDataStore::put time = " << elapsed << " microseconds" << std::endl;

  return success;
};

Matthieu Dorier's avatar
Matthieu Dorier committed
87 88 89 90 91 92
bool LevelDBDataStore::erase(const ds_bulk_t &key) {
    leveldb::Status status;
    status = _dbm->Delete(leveldb::WriteOptions(), toString(key));
    return status.ok();
}

93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
bool LevelDBDataStore::get(const ds_bulk_t &key, ds_bulk_t &data) {
  leveldb::Status status;
  bool success = false;

  high_resolution_clock::time_point start = high_resolution_clock::now();
  data.clear();
  std::string value;
  status = _dbm->Get(leveldb::ReadOptions(), toString(key), &value);
  if (status.ok()) {
    data = fromString(value);
    success = true;
  }
  else if (!status.IsNotFound()) {
    std::cerr << "LevelDBDataStore::get: LevelDB error on Get = " << status.ToString() << std::endl;
  }
  uint64_t elapsed = duration_cast<microseconds>(high_resolution_clock::now()-start).count();
  std::cout << "LevelDBDataStore::get time = " << elapsed << " microseconds" << std::endl;

  return success;
};

bool LevelDBDataStore::get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data) {
  bool success = false;

  data.clear();
  ds_bulk_t value;
  if (get(key, value)) {
    data.push_back(value);
    success = true;
  }
  
  return success;
};

127
void LevelDBDataStore::set_in_memory(bool enable)
128 129
{};

130
std::vector<ds_bulk_t> LevelDBDataStore::list_keys(const ds_bulk_t &start, size_t count)
131 132 133 134
{
    std::vector<ds_bulk_t> keys;

    leveldb::Iterator *it = _dbm->NewIterator(leveldb::ReadOptions());
Rob Latham's avatar
Rob Latham committed
135
    size_t i=0;
136 137 138 139
    for (it->SeekToFirst(); it->Valid(); it->Next() ) {
        ds_bulk_t k(it->key().size());
        memcpy(k.data(), it->key().data(), it->key().size() );
        keys.push_back(k);
140
        if (i++ > count) break;
141 142 143 144
    }
    delete it;
    return keys;
}
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162

std::vector<std::pair<ds_bulk_t,ds_bulk_t>> LevelDBDataStore::list_keyvals(const ds_bulk_t &start_key, size_t count)
{
    std::vector<std::pair<ds_bulk_t,ds_bulk_t>> keyvals;

    leveldb::Iterator *it = _dbm->NewIterator(leveldb::ReadOptions());
    size_t i=0;
    for (it->SeekToFirst(); it->Valid(); it->Next() ) {
        ds_bulk_t k(it->key().size());
        ds_bulk_t v(it->value().size());
        memcpy(k.data(), it->key().data(), it->key().size() );
        memcpy(v.data(), it->value().data(), it->value().size() );
        keyvals.push_back(std::make_pair(std::move(k), std::move(v)));
        if (i++ > count) break;
    }
    delete it;
    return keyvals;
}