berkeleydb_datastore.cc 13.7 KB
Newer Older
1 2 3
// Copyright (c) 2017, Los Alamos National Security, LLC.
// All rights reserved.
#include "berkeleydb_datastore.h"
4
#include "fs_util.h"
5
#include "kv-config.h"
Matthieu Dorier's avatar
Matthieu Dorier committed
6
#include <sstream>
7
#include <chrono>
8
#include <cstring>
9 10 11 12 13 14 15 16
#include <iostream>

using namespace std::chrono;

BerkeleyDBDataStore::BerkeleyDBDataStore() :
  AbstractDataStore(Duplicates::IGNORE, false, false) {
  _dbm = NULL;
  _dbenv = NULL;
17
  _in_memory = false;
18 19 20 21 22 23
};

BerkeleyDBDataStore::BerkeleyDBDataStore(Duplicates duplicates, bool eraseOnGet, bool debug) :
  AbstractDataStore(duplicates, eraseOnGet, debug) {
  _dbm = NULL;
  _dbenv = NULL;
24
  _in_memory = false;
25 26 27
};
  
BerkeleyDBDataStore::~BerkeleyDBDataStore() {
28 29
//  delete _dbm;
  delete _wrapper;
30 31 32
  delete _dbenv;
};

33
bool BerkeleyDBDataStore::openDatabase(const std::string& db_name, const std::string& db_path) {
34 35
  int status = 0;

36 37
  _name = db_name;
  _path = db_path;
38 39 40 41 42
  std::string fullpath = db_path;
  if(fullpath[fullpath.size()-1] != '/') {
    fullpath += "/";
  }
  fullpath += db_name;
43

44 45
  if(!fullpath.empty()) {
    mkdirs(fullpath.c_str());
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
  }

  // initialize the environment
  uint32_t flags = 0;
  if (_in_memory) {
    // not sure if we want all of these for in_memory
    flags =
      DB_CREATE      | // Create the environment if it does not exist
      DB_PRIVATE     |
      DB_RECOVER     | // Run normal recovery.
      DB_INIT_LOCK   | // Initialize the locking subsystem
      DB_INIT_LOG    | // Initialize the logging subsystem
      DB_INIT_TXN    | // Initialize the transactional subsystem. This
      DB_THREAD      | // Cause the environment to be free-threaded
      DB_AUTO_COMMIT |
      DB_INIT_MPOOL;   // Initialize the memory pool (in-memory cache)
  }
  else {
    flags =
      DB_CREATE      | // Create the environment if it does not exist
      DB_PRIVATE     |
      DB_RECOVER     | // Run normal recovery.
      DB_INIT_LOCK   | // Initialize the locking subsystem
      DB_INIT_LOG    | // Initialize the logging subsystem
      DB_INIT_TXN    | // Initialize the transactional subsystem. This
      DB_THREAD      | // Cause the environment to be free-threaded
      DB_AUTO_COMMIT |
      DB_INIT_MPOOL;   // Initialize the memory pool (in-memory cache)
  }

  try {
    // create and open the environment
    uint32_t flag = DB_CXX_NO_EXCEPTIONS;
    int scratch_size = 1; // 1GB cache
    _dbenv = new DbEnv(flag);
    _dbenv->set_error_stream(&std::cerr);
    _dbenv->set_cachesize(scratch_size, 0, 0);
    if (_in_memory) {
      _dbenv->log_set_config(DB_LOG_IN_MEMORY, 1);
      _dbenv->set_lg_bsize(scratch_size * 1024 * 1024 * 1024); // in GB
      _dbenv->open(NULL, flags, 0);
    }
    else {
      _dbenv->set_lk_detect(DB_LOCK_MINWRITE);
90
      _dbenv->open(fullpath.c_str(), flags, 0644);
91
    }
92 93
    _dbenv->set_flags(DB_TXN_WRITE_NOSYNC,1);
    _dbenv->set_flags(DB_TXN_NOSYNC,1);
94 95 96 97 98 99 100 101
  }
  catch (DbException &e) {
    std::cerr << "BerkeleyDBDataStore::createDatabase: BerkeleyDB error on environment open = " 
	      << e.what() << std::endl;
    status = 1; // failure
  }
  
  if (status == 0) {
102 103
    _wrapper = new DbWrapper(_dbenv, DB_CXX_NO_EXCEPTIONS);
    _dbm = &(_wrapper->_db);
104 105 106 107

    if (_duplicates == Duplicates::ALLOW) {
      _dbm->set_flags(DB_DUP); // Allow duplicate keys
    }
108 109

    _dbm->set_bt_compare(&(BerkeleyDBDataStore::compkeys));
110 111 112 113 114 115
  
    uint32_t flags = DB_CREATE | DB_AUTO_COMMIT | DB_THREAD; // Allow database creation
    if (_in_memory) {
      status = _dbm->open(NULL, // txn pointer
			  NULL, // NULL for in-memory DB
			  NULL, // logical DB name
116
			  DB_BTREE, // DB type (e.g. BTREE, HASH)
117 118 119 120 121 122 123 124 125
			  flags,
			  0);
      if (status == 0) {
	DbMpoolFile *mpf = _dbm->get_mpf();
	mpf->set_flags(DB_MPOOL_NOFILE, 1);
      }
    }
    else {
      status = _dbm->open(NULL, // txn pointer
126
			  db_name.c_str(), // file name
127
			  NULL, // logical DB name
128
			  DB_BTREE, // DB type (e.g. BTREE, HASH)
129 130 131 132 133
			  flags,
			  0);
    }
    if (status != 0) { // is this the right test for error?
      std::cerr << "BerkeleyDBDataStore::createDatabase: BerkeleyDB error on DB open" << std::endl;
134
      std::cerr << "(database name = " << db_name << ", database path = " << db_path << ")" << std::endl;
135
      std::cerr << "status = " << status << std::endl;
136 137
    }
  }
138
  return (status == 0);
139 140
};

141 142
void BerkeleyDBDataStore::set_comparison_function(const std::string& name, comparator_fn less) {
    _comp_fun_name = name;
143
    _wrapper->_less = less;
144 145
}

146
bool BerkeleyDBDataStore::put(const void* key, size_t ksize, const void* val, size_t vsize) {
147 148
  int status = 0;
  bool success = false;
149 150

  if(_no_overwrite) {
151
    if(exists(key, ksize)) return false;
152 153
  }

154 155 156 157 158
  // IGNORE case deals with redundant puts (where key/value is the same). In BerkeleyDB a
  // redundant may overwrite previous value which is fine when key/value is the same.
  // ALLOW case deals with actual duplicates (where key is the same but value is different).
  // This option might be used when eraseOnGet is set (e.g. ParSplice hotpoint use case).
  if (_duplicates == Duplicates::IGNORE || _duplicates == Duplicates::ALLOW) {
159 160
    Dbt db_key((void*)key, ksize);
    Dbt db_data((void*)val, vsize);
161 162
    db_key.set_flags(DB_DBT_USERMEM);
    db_data.set_flags(DB_DBT_USERMEM);
163
    status = _dbm->put(NULL, &db_key, &db_data, 0);
164
    if (status == 0 || 
165
    (_duplicates == Duplicates::IGNORE && status == DB_KEYEXIST)) {
166 167 168 169 170 171 172 173 174 175 176 177 178
      success = true;
    }
    else {
      std::cerr << "BerkeleyDBDataStore::put: BerkeleyDB error on put = " << status << std::endl;
    }
  }
  else {
    std::cerr << "BerkeleyDBDataStore::put: Unexpected Duplicates option = " << int32_t(_duplicates) << std::endl;
  }

  return success;
};

179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
bool BerkeleyDBDataStore::put_multi(size_t num_items,
        const void* const* keys,
        const size_t* ksizes,
        const void* const* values,
        const size_t* vsizes)
{
    size_t sk = 0;
    size_t sv = 0;
    for(unsigned i = 0; i < num_items; i++) {
        sk += ksizes[i];
        sv += vsizes[i];
    }
    sk *= 2;
    sv *= 2;
    if(sk % 4 != 0) sk += (4 - (sk % 4));
    if(sv % 4 != 0) sv += (4 - (sv % 4));


    std::vector<char> kbuffer(sk);
    std::vector<char> vbuffer(sv);

    Dbt mkey, mdata;

    mkey.set_ulen(kbuffer.size());
    mkey.set_data(kbuffer.data());

    mdata.set_ulen(vbuffer.size());
    mdata.set_data(vbuffer.data());

    DbMultipleDataBuilder keybuilder(mkey);
    DbMultipleDataBuilder databuilder(mdata);

    for(size_t i = 0; i < num_items; i++) {
        keybuilder.append((void*)keys[i], ksizes[i]);
        databuilder.append((void*)values[i], vsizes[i]);
    }

    int status = _dbm->put(NULL, &mkey, &mdata, DB_MULTIPLE);
    return status == 0;
}

bool BerkeleyDBDataStore::exists(const void* key, size_t size) const {
    Dbt db_key((void*)key, size);
    db_key.set_flags(DB_DBT_USERMEM);
223 224 225 226
    int status = _dbm->exists(NULL, &db_key, 0);
    return status == 0;
}

Matthieu Dorier's avatar
Matthieu Dorier committed
227 228 229 230
bool BerkeleyDBDataStore::erase(const ds_bulk_t &key) {
    Dbt db_key((void*)key.data(), key.size());
    int status = _dbm->del(NULL, &db_key, 0);
    return status == 0;
Matthieu Dorier's avatar
Matthieu Dorier committed
231 232 233 234 235
}

void BerkeleyDBDataStore::sync() {
    _dbm->sync(0);
}
Matthieu Dorier's avatar
Matthieu Dorier committed
236

237 238 239 240 241 242 243 244
// In the case where Duplicates::ALLOW, this will return the first
// value found using key.
bool BerkeleyDBDataStore::get(const ds_bulk_t &key, ds_bulk_t &data) {
  int status = 0;
  bool success = false;

  data.clear();

245
  Dbt db_key((void*)&(key[0]), uint32_t(key.size()));
246
  db_key.set_ulen(uint32_t(key.size()));
247 248
  Dbt db_data;
  db_key.set_flags(DB_DBT_USERMEM);
249
  db_data.set_flags(DB_DBT_MALLOC);
250 251
  status = _dbm->get(NULL, &db_key, &db_data, 0);

252
  if (status != DB_NOTFOUND && status != DB_KEYEMPTY) {
253
    data.resize(db_data.get_size(), 0);
254 255 256
    if(db_data.get_size() > 0) {
        memcpy(&(data[0]), db_data.get_data(), db_data.get_size());
    }
257 258 259 260
    free(db_data.get_data());
    success = true;
  }
  else {
261
    //std::cerr << "BerkeleyDBDataStore::get: BerkeleyDB error on Get = " << status << std::endl;
262 263 264 265 266 267
  }
  
  if (success && _eraseOnGet) {
    status = _dbm->del(NULL, &db_key, 0);
    if (status != 0) {
      success = false;
268
      //std::cerr << "BerkeleyDBDataStore::get: BerkeleyDB error on delete (eraseOnGet) = " << status << std::endl;
269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289
    }
  }

  return success;
};

// TODO: To return more than 1 value (when Duplicates::ALLOW), this code should
// use the c_get interface.
bool BerkeleyDBDataStore::get(const ds_bulk_t &key, std::vector<ds_bulk_t> &data) {
  bool success = false;

  data.clear();
  ds_bulk_t value;
  if (get(key, value)) {
    data.push_back(value);
    success = true;
  }
  
  return success;
};

290
void BerkeleyDBDataStore::set_in_memory(bool enable) {
291 292 293
  _in_memory = enable;
};

294 295
std::vector<ds_bulk_t> BerkeleyDBDataStore::vlist_keys(
        const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix) const
296 297 298 299
{
    std::vector<ds_bulk_t> keys;
    Dbc * cursorp;
    Dbt key, data;
300
    int ret;
301
    _dbm->cursor(NULL, &cursorp, 0);
Rob Latham's avatar
Rob Latham committed
302

303 304
    /* 'start' is like RADOS: not inclusive  */
    if (start.size()) {
305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320
	    key.set_size(start.size());
	    key.set_data((void *)start.data());
	    ret = cursorp->get(&key, &data, DB_SET_RANGE);
	    if (ret != 0) {
	        cursorp->close();
	        return keys;
	    }
    } else {
        ret = cursorp->get(&key, &data, DB_FIRST);
        if (ret != 0) {
            cursorp->close();
            return keys;
        }
    }

	ds_bulk_t k((char*)key.get_data(), ((char*)key.get_data())+key.get_size());
321 322
	/* SET_RANGE will return the smallest key greater than or equal to the
	 * requested key, but we want strictly greater than */
323 324 325 326 327 328
    int c = 0;
	if (k != start) {
        c = std::memcmp(prefix.data(), k.data(), prefix.size());
        if(c == 0) {
            keys.push_back(std::move(k));
        }
329
    }
330 331 332 333 334 335 336 337 338
    while (keys.size() < count && c >= 0) {
	    ret = cursorp->get(&key, &data, DB_NEXT);
	    if (ret !=0 ) break;
        
        ds_bulk_t k((char*)key.get_data(), ((char*)key.get_data())+key.get_size());
        c = std::memcmp(prefix.data(), k.data(), prefix.size());
        if(c == 0) {
            keys.push_back(std::move(k));
        }
339 340 341 342
    }
    cursorp->close();
    return keys;
}
343

344 345
std::vector<std::pair<ds_bulk_t,ds_bulk_t>> BerkeleyDBDataStore::vlist_keyvals(
        const ds_bulk_t &start, size_t count, const ds_bulk_t &prefix) const
346
{
347
    std::vector<std::pair<ds_bulk_t,ds_bulk_t>> result;
348 349
    Dbc * cursorp;
    Dbt key, data;
350
    int ret;
351 352
    _dbm->cursor(NULL, &cursorp, 0);

353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371
    /* 'start' is like RADOS: not inclusive  */
    if (start.size()) {
	    key.set_size(start.size());
	    key.set_data((void *)start.data());
	    ret = cursorp->get(&key, &data, DB_SET_RANGE);
	    if (ret != 0) {
	        cursorp->close();
	        return result;
	    }
    } else {
        ret = cursorp->get(&key, &data, DB_FIRST);
        if (ret != 0) {
            cursorp->close();
            return result;
        }
    }

	ds_bulk_t k((char*)key.get_data(), ((char*)key.get_data())+key.get_size());
    ds_bulk_t v((char*)data.get_data(), ((char*)data.get_data())+data.get_size());
372

373 374 375 376 377 378 379 380 381 382 383 384 385 386 387
	/* SET_RANGE will return the smallest key greater than or equal to the
	 * requested key, but we want strictly greater than */
    int c = 0;
	if (k != start) {
        c = std::memcmp(prefix.data(), k.data(), prefix.size());
        if(c == 0) {
            result.push_back(std::make_pair(std::move(k),std::move(v)));
        }
    }
    while (result.size() < count && c >= 0) {
	    ret = cursorp->get(&key, &data, DB_NEXT);
	    if (ret !=0 ) break;
        
        ds_bulk_t k((char*)key.get_data(), ((char*)key.get_data())+key.get_size());
        ds_bulk_t v((char*)data.get_data(), ((char*)data.get_data())+data.get_size());
388

389 390 391 392
        c = std::memcmp(prefix.data(), k.data(), prefix.size());
        if(c == 0) {
            result.push_back(std::make_pair(std::move(k), std::move(v)));
        }
393 394
    }
    cursorp->close();
395
    return result;
396
}
397

398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413
std::vector<ds_bulk_t> BerkeleyDBDataStore::vlist_key_range(
        const ds_bulk_t &lower_bound, const ds_bulk_t &upper_bound, size_t max_keys) const {
    std::vector<ds_bulk_t> result;
    // TODO implement this function
    throw SDSKV_OP_NOT_IMPL;
    return result;
}

std::vector<std::pair<ds_bulk_t,ds_bulk_t>> BerkeleyDBDataStore::vlist_keyval_range(
        const ds_bulk_t &lower_bound, const ds_bulk_t &upper_bound, size_t max_keys) const {
    std::vector<std::pair<ds_bulk_t,ds_bulk_t>> result;
    // TODO implement this function
    throw SDSKV_OP_NOT_IMPL;
    return result;
}

414 415 416 417 418 419 420 421 422 423 424 425 426 427
int BerkeleyDBDataStore::compkeys(Db *db, const Dbt *dbt1, const Dbt *dbt2, size_t *locp) {
    DbWrapper* _wrapper = (DbWrapper*)(((char*)db) - offsetof(BerkeleyDBDataStore::DbWrapper, _db));
    if(_wrapper->_less) {
        return (_wrapper->_less)(dbt1->get_data(), dbt1->get_size(), 
                dbt2->get_data(), dbt2->get_size());
    } else {
        size_t s = dbt1->get_size() > dbt2->get_size() ? dbt2->get_size() : dbt1->get_size();
        int c = std::memcmp(dbt1->get_data(), dbt2->get_data(), s);
        if(c != 0) return c;
        if(dbt1->get_size() < dbt2->get_size()) return -1;
        if(dbt1->get_size() > dbt2->get_size()) return 1;
        return 0;
    }
}
428

429
#ifdef USE_REMI
430 431 432 433 434 435 436
remi_fileset_t BerkeleyDBDataStore::create_and_populate_fileset() const {
    remi_fileset_t fileset = REMI_FILESET_NULL;
    std::string local_root = _path;
    int ret;
    if(_path[_path.size()-1] != '/')
        local_root += "/";
    remi_fileset_create("sdskv", local_root.c_str(), &fileset);
Matthieu Dorier's avatar
Matthieu Dorier committed
437
    remi_fileset_register_directory(fileset, (_name+"/").c_str());
438
    //remi_fileset_register_file(fileset, "log.0000000001");
439 440 441 442 443 444 445 446
    remi_fileset_register_metadata(fileset, "database_type", "berkeleydb");
    remi_fileset_register_metadata(fileset, "comparison_function", _comp_fun_name.c_str());
    remi_fileset_register_metadata(fileset, "database_name", _name.c_str());
    if(_no_overwrite) {
        remi_fileset_register_metadata(fileset, "no_overwrite", "");
    }
    return fileset;
}
447
#endif