Commit db9687a7 authored by Rob Latham's avatar Rob Latham
Browse files

implement list_keys "starting key" feature

Honors "starting key" in BerkelyDB and LevelDB backends.  Note that
we are following RADOS semantics for starting key and excluding it from
returned list of keys.

closes #9
parent 7a8ac8f8
...@@ -99,7 +99,7 @@ void BerkeleyDBDataStore::createDatabase(std::string db_name) { ...@@ -99,7 +99,7 @@ void BerkeleyDBDataStore::createDatabase(std::string db_name) {
status = _dbm->open(NULL, // txn pointer status = _dbm->open(NULL, // txn pointer
NULL, // NULL for in-memory DB NULL, // NULL for in-memory DB
NULL, // logical DB name NULL, // logical DB name
DB_HASH, // DB type (e.g. BTREE, HASH) DB_BTREE, // DB type (e.g. BTREE, HASH)
flags, flags,
0); 0);
if (status == 0) { if (status == 0) {
...@@ -111,7 +111,7 @@ void BerkeleyDBDataStore::createDatabase(std::string db_name) { ...@@ -111,7 +111,7 @@ void BerkeleyDBDataStore::createDatabase(std::string db_name) {
status = _dbm->open(NULL, // txn pointer status = _dbm->open(NULL, // txn pointer
dbname.c_str(), // file name dbname.c_str(), // file name
NULL, // logical DB name NULL, // logical DB name
DB_HASH, // DB type (e.g. BTREE, HASH) DB_BTREE, // DB type (e.g. BTREE, HASH)
flags, flags,
0); 0);
} }
...@@ -219,15 +219,32 @@ std::vector<ds_bulk_t> BerkeleyDBDataStore::list_keys(const ds_bulk_t &start, si ...@@ -219,15 +219,32 @@ std::vector<ds_bulk_t> BerkeleyDBDataStore::list_keys(const ds_bulk_t &start, si
std::vector<ds_bulk_t> keys; std::vector<ds_bulk_t> keys;
Dbc * cursorp; Dbc * cursorp;
Dbt key, data; Dbt key, data;
int ret;
_dbm->cursor(NULL, &cursorp, 0); _dbm->cursor(NULL, &cursorp, 0);
for (size_t i=0; i< count; i++) {
int ret = cursorp->get(&key, &data, DB_NEXT);
if (ret !=0 ) break;
ds_bulk_t k(key.get_size() ); /* 'start' is like RADOS: not inclusive */
memcpy(k.data(), key.get_data(), key.get_size() ); if (start.size()) {
/* I hope this is a deep copy! */ key.set_size(start.size());
keys.push_back(std::move(k)); key.set_data((void *)start.data());
ret = cursorp->get(&key, &data, DB_SET_RANGE);
if (ret != 0) {
cursorp->close();
return keys;
}
ds_bulk_t k(key.get_size() );
memcpy(k.data(), key.get_data(), key.get_size() );
/* SET_RANGE will return the smallest key greater than or equal to the
* requested key, but we want strictly greater than */
if (k != start)
keys.push_back(std::move(k));
}
while (keys.size() < count) {
ret = cursorp->get(&key, &data, DB_NEXT);
if (ret !=0 ) break;
ds_bulk_t k(key.get_size() );
memcpy(k.data(), key.get_data(), key.get_size() );
keys.push_back(std::move(k));
} }
cursorp->close(); cursorp->close();
return keys; return keys;
......
...@@ -132,12 +132,24 @@ std::vector<ds_bulk_t> LevelDBDataStore::list_keys(const ds_bulk_t &start, size_ ...@@ -132,12 +132,24 @@ std::vector<ds_bulk_t> LevelDBDataStore::list_keys(const ds_bulk_t &start, size_
std::vector<ds_bulk_t> keys; std::vector<ds_bulk_t> keys;
leveldb::Iterator *it = _dbm->NewIterator(leveldb::ReadOptions()); leveldb::Iterator *it = _dbm->NewIterator(leveldb::ReadOptions());
size_t i=0; leveldb::Slice start_slice(start.data(), start.size());
for (it->SeekToFirst(); it->Valid(); it->Next() ) {
if (start.size() > 0) {
it->Seek(start_slice);
/* we treat 'start' the way RADOS treats it: excluding it from returned
* keys. LevelDB treats start inclusively, so skip over it if we found
* an exact match */
if ( start.size() == it->key().size() &&
(memcmp(it->key().data(), start.data(), start.size()) == 0))
it->Next();
} else {
it->SeekToFirst();
}
/* note: iterator initialized above, not in for loop */
for (; it->Valid() && keys.size() < count; it->Next() ) {
ds_bulk_t k(it->key().size()); ds_bulk_t k(it->key().size());
memcpy(k.data(), it->key().data(), it->key().size() ); memcpy(k.data(), it->key().data(), it->key().size() );
keys.push_back(k); keys.push_back(std::move(k));
if (i++ > count) break;
} }
delete it; delete it;
return keys; return keys;
......
...@@ -357,8 +357,9 @@ static hg_return_t list_handler(hg_handle_t handle) ...@@ -357,8 +357,9 @@ static hg_return_t list_handler(hg_handle_t handle)
list_in_t list_in; list_in_t list_in;
list_out_t list_out; list_out_t list_out;
std::vector<char> start{};
margo_get_input(handle, &list_in); margo_get_input(handle, &list_in);
ds_bulk_t start(list_in.list_in.start_key,
list_in.list_in.start_key + list_in.list_in.start_ksize);
auto keys = datastore->list_keys(start, list_in.list_in.max_keys); auto keys = datastore->list_keys(start, list_in.list_in.max_keys);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment