Commit 18ed7257 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

added support for direct JSON serialization

parent 4ebed7a7
......@@ -125,6 +125,19 @@ class Backend {
const std::string& coll_name,
const std::string& record) = 0;
/**
* @brief Stores a record into the collection.
*
* @param coll_name Name of the collection.
* @param record Record to store.
*
* @return a RequestResult<uint64_t> instance
* containing the record id if successful.
*/
virtual RequestResult<uint64_t> storeJson(
const std::string& coll_name,
const Json::Value& record) = 0;
/**
* @brief Fetches a particular record by its id.
*
......@@ -139,7 +152,20 @@ class Backend {
uint64_t record_id) = 0;
/**
* @brief Returns an array of records matching a give
* @brief Fetches a particular record by its id.
*
* @param coll_name Name of the collection.
* @param record_id Record id.
*
* @return a RequestResult<Json::Value> instance.
* containing the content of the record if successful.
*/
virtual RequestResult<Json::Value> fetchJson(
const std::string& coll_name,
uint64_t record_id) = 0;
/**
* @brief Returns an array of records matching a given
* Jx9 filter. The filter should be expressed as a string
* containing a function. For example:
*
......@@ -155,6 +181,23 @@ class Backend {
const std::string& coll_name,
const std::string& filter_code) = 0;
/**
* @brief Returns an array of records matching a given
* Jx9 filter. The filter should be expressed as a string
* containing a function. For example:
*
* "function($user) { return $user.age > 30; }"
*
* @param coll_name Name of the collection.
* @param filter_code Code of the Jx9 function.
*
* @return a RequestResult<Json::Value>
* instance containing the result of the request.
*/
virtual RequestResult<Json::Value> filterJson(
const std::string& coll_name,
const std::string& filter_code) = 0;
/**
* @brief Updates an existing record with the new content.
*
......@@ -170,6 +213,21 @@ class Backend {
uint64_t record_id,
const std::string& new_content) = 0;
/**
* @brief Updates an existing record with the new content.
*
* @param coll_name Name of the collection.
* @param record_id Record to update.
* @param new_content New content of the record.
*
* @return a RequestResult<bool> instance indicating
* whether the update was successful.
*/
virtual RequestResult<bool> updateJson(
const std::string& coll_name,
uint64_t record_id,
const Json::Value& new_content) = 0;
/**
* @brief Returns all the records in the collection.
*
......@@ -181,6 +239,17 @@ class Backend {
virtual RequestResult<std::vector<std::string>> all(
const std::string& coll_name) = 0;
/**
* @brief Returns all the records in the collection.
*
* @param coll_name Name of the collection.
*
* @return a RequestResult<Json::Value> instance
* containing all the records as strings, if successful.
*/
virtual RequestResult<Json::Value> allJson(
const std::string& coll_name) = 0;
/**
* @brief Returns the last record id stored in the collection.
*
......
......@@ -7,9 +7,14 @@
#define __SONATA_JSON_VALUE_SERIALIZE_HPP
#include <json/json.h>
#include <thallium/serialization/stl/tuple.hpp>
#include <thallium/serialization/stl/vector.hpp>
#include <thallium/serialization/stl/string.hpp>
namespace Json { // needed for template deduction to work
template<typename A>
void save(A& ar, const Json::Value& val) {
void save(A& ar, Json::Value const& val) {
ar & (char)val.type();
switch(val.type()) {
case Json::nullValue:
......@@ -22,7 +27,7 @@ void save(A& ar, const Json::Value& val) {
break;
case Json::realValue:
ar & val.asDouble();
break
break;
case Json::stringValue:
ar & val.asString();
break;
......@@ -37,7 +42,7 @@ void save(A& ar, const Json::Value& val) {
break;
case Json::objectValue:
ar & val.size();
for(auto it = val.begin(); i != val.end(); it++) {
for(auto it = val.begin(); it != val.end(); it++) {
ar & it.name();
ar & *it;
}
......@@ -73,7 +78,7 @@ void load(A& ar, Json::Value& val) {
ar & v;
val = v;
}
break
break;
case Json::stringValue:
{
std::string v;
......@@ -115,4 +120,6 @@ void load(A& ar, Json::Value& val) {
}
}
}
#endif
spack:
specs:
- mochi-thallium
- cppunit
- cmake
- jsoncpp
- tclap
- spdlog
- mpi
- mochi-thallium@master+cereal
concretization: together
......@@ -26,10 +26,15 @@ class ClientImpl {
tl::remote_procedure m_drop_collection;
tl::remote_procedure m_execute_on_database;
tl::remote_procedure m_coll_store;
tl::remote_procedure m_coll_store_json;
tl::remote_procedure m_coll_fetch;
tl::remote_procedure m_coll_fetch_json;
tl::remote_procedure m_coll_filter;
tl::remote_procedure m_coll_filter_json;
tl::remote_procedure m_coll_update;
tl::remote_procedure m_coll_update_json;
tl::remote_procedure m_coll_all;
tl::remote_procedure m_coll_all_json;
tl::remote_procedure m_coll_last_id;
tl::remote_procedure m_coll_size;
tl::remote_procedure m_coll_erase;
......@@ -42,10 +47,15 @@ class ClientImpl {
, m_drop_collection( engine.define("sonata_drop_collection") )
, m_execute_on_database( engine.define("sonata_exec_on_database") )
, m_coll_store( engine.define("sonata_store") )
, m_coll_store_json( engine.define("sonata_store_json") )
, m_coll_fetch( engine.define("sonata_fetch") )
, m_coll_fetch_json( engine.define("sonata_fetch_json") )
, m_coll_filter( engine.define("sonata_filter") )
, m_coll_filter_json( engine.define("sonata_filter_json") )
, m_coll_update( engine.define("sonata_update") )
, m_coll_update_json( engine.define("sonata_update_json") )
, m_coll_all( engine.define("sonata_all") )
, m_coll_all_json( engine.define("sonata_all_json") )
, m_coll_last_id( engine.define("sonata_last_id") )
, m_coll_size( engine.define("sonata_size") )
, m_coll_erase( engine.define("sonata_erase") )
......
......@@ -6,6 +6,7 @@
#include "sonata/Collection.hpp"
#include "sonata/RequestResult.hpp"
#include "sonata/Exception.hpp"
#include "sonata/JsonSerialize.hpp"
#include "AsyncRequestImpl.hpp"
#include "ClientImpl.hpp"
......@@ -41,7 +42,24 @@ Database Collection::database() const {
}
void Collection::store(const Json::Value& record, uint64_t* id, AsyncRequest* req) const {
store(record.toStyledString(), id, req);
if(not self) throw Exception("Invalid sonata::Collection object");
auto& rpc = self->m_database->m_client->m_coll_store_json;
auto& ph = self->m_database->m_ph;
auto& db_name = self->m_database->m_name;
auto async_response = rpc.on(ph).async(db_name, self->m_name, record);
auto async_request_impl = std::make_shared<AsyncRequestImpl>(std::move(async_response));
async_request_impl->m_wait_callback = [id](AsyncRequestImpl& async_request_impl) {
RequestResult<uint64_t> result = async_request_impl.m_async_response.wait();
if(result.success()) {
if(id) *id = result.value();
} else {
throw Exception(result.error());
}
};
if(req)
*req = AsyncRequest(std::move(async_request_impl));
else
AsyncRequest(std::move(async_request_impl)).wait();
}
void Collection::store(const std::string& record, uint64_t* id, AsyncRequest* req) const {
......@@ -90,22 +108,15 @@ void Collection::fetch(uint64_t id, std::string* out, AsyncRequest* req) const {
void Collection::fetch(uint64_t id, Json::Value* out, AsyncRequest* req) const {
if(not out) return;
if(not self) throw Exception("Invalid sonata::Collection object");
auto& rpc = self->m_database->m_client->m_coll_fetch;
auto& rpc = self->m_database->m_client->m_coll_fetch_json;
auto& ph = self->m_database->m_ph;
auto& db_name = self->m_database->m_name;
auto async_response = rpc.on(ph).async(db_name, self->m_name, id);
auto async_request_impl = std::make_shared<AsyncRequestImpl>(std::move(async_response));
async_request_impl->m_wait_callback = [out, self=self](AsyncRequestImpl& async_request_impl) {
RequestResult<std::string> result = async_request_impl.m_async_response.wait();
RequestResult<Json::Value> result = async_request_impl.m_async_response.wait();
if(result.success()) {
std::string errors;
bool parsingSuccessful = self->m_json_reader->parse(result.value().c_str(),
result.value().c_str() + result.value().size(),
out,
&errors);
if(!parsingSuccessful) {
throw Exception(errors);
}
*out = std::move(result.value());
} else {
throw Exception(result.error());
}
......@@ -139,29 +150,15 @@ void Collection::filter(const std::string& filterCode, std::vector<std::string>*
void Collection::filter(const std::string& filterCode, Json::Value* out, AsyncRequest* req) const {
if(not self) throw Exception("Invalid sonata::Collection object");
auto& rpc = self->m_database->m_client->m_coll_filter;
auto& rpc = self->m_database->m_client->m_coll_filter_json;
auto& ph = self->m_database->m_ph;
auto& db_name = self->m_database->m_name;
auto async_response = rpc.on(ph).async(db_name, self->m_name, filterCode);
auto async_request_impl = std::make_shared<AsyncRequestImpl>(std::move(async_response));
async_request_impl->m_wait_callback = [out, self=self](AsyncRequestImpl& async_request_impl) {
RequestResult<std::vector<std::string>> result = async_request_impl.m_async_response.wait();
RequestResult<Json::Value> result = async_request_impl.m_async_response.wait();
if(result.success()) {
if(!out) return;
Json::Value tmp_array;
for(unsigned i=0; i < result.value().size(); i++) {
std::string errors;
Json::Value tmp;
bool parsingSuccessful = self->m_json_reader->parse(result.value()[i].c_str(),
result.value()[i].c_str() + result.value()[i].size(),
&tmp,
&errors);
if(!parsingSuccessful) {
throw Exception(errors);
}
tmp_array[i] = std::move(tmp);
}
*out = std::move(tmp_array);
if(out) *out = std::move(result.value());
} else {
throw Exception(result.error());
}
......@@ -192,7 +189,22 @@ void Collection::update(uint64_t id, const std::string& record, AsyncRequest* re
}
void Collection::update(uint64_t id, const Json::Value& record, AsyncRequest* req) const {
update(id, record.toStyledString(), req);
if(not self) throw Exception("Invalid sonata::Collection object");
auto& rpc = self->m_database->m_client->m_coll_update_json;
auto& ph = self->m_database->m_ph;
auto& db_name = self->m_database->m_name;
auto async_response = rpc.on(ph).async(db_name, self->m_name, id, record);
auto async_request_impl = std::make_shared<AsyncRequestImpl>(std::move(async_response));
async_request_impl->m_wait_callback = [](AsyncRequestImpl& async_request_impl) {
RequestResult<bool> result = async_request_impl.m_async_response.wait();
if(!result.success()) {
throw Exception(result.error());
}
};
if(req)
*req = AsyncRequest(std::move(async_request_impl));
else
AsyncRequest(std::move(async_request_impl)).wait();
}
void Collection::all(std::vector<std::string>* out, AsyncRequest* req) const {
......@@ -219,29 +231,15 @@ void Collection::all(std::vector<std::string>* out, AsyncRequest* req) const {
void Collection::all(Json::Value* out, AsyncRequest* req) const {
if(not self) throw Exception("Invalid sonata::Collection object");
auto& rpc = self->m_database->m_client->m_coll_all;
auto& rpc = self->m_database->m_client->m_coll_all_json;
auto& ph = self->m_database->m_ph;
auto& db_name = self->m_database->m_name;
auto async_response = rpc.on(ph).async(db_name, self->m_name);
auto async_request_impl = std::make_shared<AsyncRequestImpl>(std::move(async_response));
async_request_impl->m_wait_callback = [out, self=self](AsyncRequestImpl& async_request_impl) {
RequestResult<std::vector<std::string>> result = async_request_impl.m_async_response.wait();
RequestResult<Json::Value> result = async_request_impl.m_async_response.wait();
if(result.success()) {
if(!out) return;
Json::Value tmp_array;
for(unsigned i=0; i < result.value().size(); i++) {
std::string errors;
Json::Value tmp;
bool parsingSuccessful = self->m_json_reader->parse(result.value()[i].c_str(),
result.value()[i].c_str() + result.value()[i].size(),
&tmp,
&errors);
if(!parsingSuccessful) {
throw Exception(errors);
}
tmp_array[i] = std::move(tmp);
}
*out = std::move(tmp_array);
if(out) *out = std::move(result.value());
} else {
throw Exception(result.error());
}
......
......@@ -7,6 +7,7 @@
#define __SONATA_PROVIDER_IMPL_H
#include "sonata/Backend.hpp"
#include "sonata/JsonSerialize.hpp"
#include <thallium.hpp>
#include <thallium/serialization/stl/unordered_set.hpp>
......@@ -44,10 +45,15 @@ class ProviderImpl : public tl::provider<ProviderImpl> {
tl::remote_procedure m_open_collection;
tl::remote_procedure m_drop_collection;
tl::remote_procedure m_coll_store;
tl::remote_procedure m_coll_store_json;
tl::remote_procedure m_coll_fetch;
tl::remote_procedure m_coll_fetch_json;
tl::remote_procedure m_coll_filter;
tl::remote_procedure m_coll_filter_json;
tl::remote_procedure m_coll_update;
tl::remote_procedure m_coll_update_json;
tl::remote_procedure m_coll_all;
tl::remote_procedure m_coll_all_json;
tl::remote_procedure m_coll_last_id;
tl::remote_procedure m_coll_size;
tl::remote_procedure m_coll_erase;
......@@ -67,10 +73,15 @@ class ProviderImpl : public tl::provider<ProviderImpl> {
, m_open_collection( define("sonata_open_collection", &ProviderImpl::openCollection, pool))
, m_drop_collection( define("sonata_drop_collection", &ProviderImpl::dropCollection, pool))
, m_coll_store( define("sonata_store", &ProviderImpl::store, pool))
, m_coll_store_json( define("sonata_store_json", &ProviderImpl::storeJson, pool))
, m_coll_fetch( define("sonata_fetch", &ProviderImpl::fetch, pool))
, m_coll_fetch_json( define("sonata_fetch_json", &ProviderImpl::fetchJson, pool))
, m_coll_filter( define("sonata_filter", &ProviderImpl::filter, pool))
, m_coll_filter_json( define("sonata_filter_json", &ProviderImpl::filterJson, pool))
, m_coll_update( define("sonata_update", &ProviderImpl::update, pool))
, m_coll_update_json( define("sonata_update_json", &ProviderImpl::updateJson, pool))
, m_coll_all( define("sonata_all", &ProviderImpl::all, pool))
, m_coll_all_json( define("sonata_all_json", &ProviderImpl::allJson, pool))
, m_coll_last_id( define("sonata_last_id", &ProviderImpl::lastID, pool))
, m_coll_size( define("sonata_size", &ProviderImpl::size, pool))
, m_coll_erase( define("sonata_erase", &ProviderImpl::erase, pool))
......@@ -90,10 +101,15 @@ class ProviderImpl : public tl::provider<ProviderImpl> {
m_open_collection.deregister();
m_drop_collection.deregister();
m_coll_store.deregister();
m_coll_store_json.deregister();
m_coll_fetch.deregister();
m_coll_fetch_json.deregister();
m_coll_filter.deregister();
m_coll_filter_json.deregister();
m_coll_update.deregister();
m_coll_update_json.deregister();
m_coll_all.deregister();
m_coll_all_json.deregister();
m_coll_last_id.deregister();
m_coll_size.deregister();
m_coll_erase.deregister();
......@@ -417,6 +433,27 @@ class ProviderImpl : public tl::provider<ProviderImpl> {
spdlog::trace("[provider:{}] Record successfully stored (id = {})", id(), result.value());
}
void storeJson(const tl::request& req,
const std::string& db_name,
const std::string& coll_name,
const Json::Value& record) {
spdlog::trace("[provider:{}] Received store request", id(), db_name);
spdlog::trace("[provider:{}] => database = {}", id(), db_name);
spdlog::trace("[provider:{}] => collection = {}", id(), coll_name);
RequestResult<uint64_t> result;
auto it = m_backends.find(db_name);
if(it == m_backends.end()) {
result.success() = false;
result.error() = "Database "s + db_name + " not found";
req.respond(result);
spdlog::error("[provider:{}] Database {} not found", id(), db_name);
return;
}
result = it->second->storeJson(coll_name, record);
req.respond(result);
spdlog::trace("[provider:{}] Record successfully stored (id = {})", id(), result.value());
}
void fetch(const tl::request& req,
const std::string& db_name,
const std::string& coll_name,
......@@ -439,6 +476,28 @@ class ProviderImpl : public tl::provider<ProviderImpl> {
spdlog::trace("[provider:{}] Record {} successfully fetched", id(), record_id);
}
void fetchJson(const tl::request& req,
const std::string& db_name,
const std::string& coll_name,
uint64_t record_id) {
spdlog::trace("[provider:{}] Received fetch request", id());
spdlog::trace("[provider:{}] => database = {}", id(), db_name);
spdlog::trace("[provider:{}] => collection = {}", id(), coll_name);
spdlog::trace("[provider:{}] => record id = {}", id(), record_id);
RequestResult<Json::Value> result;
auto it = m_backends.find(db_name);
if(it == m_backends.end()) {
result.success() = false;
result.error() = "Database "s + db_name + " not found";
req.respond(result);
spdlog::error("[provider:{}] Database {} not found", id(), db_name);
return;
}
result = it->second->fetchJson(coll_name, record_id);
req.respond(result);
spdlog::trace("[provider:{}] Record {} successfully fetched", id(), record_id);
}
void filter(const tl::request& req,
const std::string& db_name,
const std::string& coll_name,
......@@ -460,6 +519,27 @@ class ProviderImpl : public tl::provider<ProviderImpl> {
spdlog::trace("[provider:{}] Filter successfully executed", id());
}
void filterJson(const tl::request& req,
const std::string& db_name,
const std::string& coll_name,
const std::string& filter_code) {
spdlog::trace("[provider:{}] Received filter request", id());
spdlog::trace("[provider:{}] => database = {}", id(), db_name);
spdlog::trace("[provider:{}] => collection = {}", id(), coll_name);
RequestResult<Json::Value> result;
auto it = m_backends.find(db_name);
if(it == m_backends.end()) {
result.success() = false;
result.error() = "Database "s + db_name + " not found";
req.respond(result);
spdlog::error("[provider:{}] Database {} not found", id(), db_name);
return;
}
result = it->second->filterJson(coll_name, filter_code);
req.respond(result);
spdlog::trace("[provider:{}] Filter successfully executed", id());
}
void update(const tl::request& req,
const std::string& db_name,
const std::string& coll_name,
......@@ -483,6 +563,29 @@ class ProviderImpl : public tl::provider<ProviderImpl> {
spdlog::trace("[provider:{}] Update successfully applied to record {}", id(), record_id);
}
void updateJson(const tl::request& req,
const std::string& db_name,
const std::string& coll_name,
uint64_t record_id,
const Json::Value& new_content) {
spdlog::trace("[provider:{}] Received update request", id());
spdlog::trace("[provider:{}] => database = {}", id(), db_name);
spdlog::trace("[provider:{}] => collection = {}", id(), coll_name);
spdlog::trace("[provider:{}] => record id = {}", id(), record_id);
RequestResult<bool> result;
auto it = m_backends.find(db_name);
if(it == m_backends.end()) {
result.success() = false;
result.error() = "Database "s + db_name + " not found";
req.respond(result);
spdlog::error("[provider:{}] Database {} not found", id(), db_name);
return;
}
result = it->second->updateJson(coll_name, record_id, new_content);
req.respond(result);
spdlog::trace("[provider:{}] Update successfully applied to record {}", id(), record_id);
}
void all(const tl::request& req,
const std::string& db_name,
const std::string& coll_name) {
......@@ -503,6 +606,26 @@ class ProviderImpl : public tl::provider<ProviderImpl> {
spdlog::trace("[provider:{}] Successfully returned the full collection {}", id(), coll_name);
}
void allJson(const tl::request& req,
const std::string& db_name,
const std::string& coll_name) {
spdlog::trace("[provider:{}] Received all request", id());
spdlog::trace("[provider:{}] => database = {}", id(), db_name);
spdlog::trace("[provider:{}] => collection = {}", id(), coll_name);
RequestResult<Json::Value> result;
auto it = m_backends.find(db_name);
if(it == m_backends.end()) {
result.success() = false;
result.error() = "Database "s + db_name + " not found";
req.respond(result);
spdlog::error("[provider:{}] Database {} not found", id(), db_name);
return;
}
result = it->second->allJson(coll_name);
req.respond(result);
spdlog::trace("[provider:{}] Successfully returned the full collection {}", id(), coll_name);
}
void lastID(const tl::request& req,
const std::string& db_name,
const std::string& coll_name) {
......
......@@ -182,6 +182,45 @@ class UnQLiteBackend : public Backend {
return result;
}
virtual RequestResult<uint64_t> storeJson(
const std::string& coll_name,
const Json::Value& record) override {
std::ostringstream ss;
ss << "$input = " << record.toStyledString() << ";"
<<
"if(!db_exists($collection)) {"
"$ret = false;"
"$err = \"Collection does not exist\";"
"} else {"
"$ret = db_store($collection,$input);"
"if(!$ret) {"
"$err = db_errlog();"
"} else {"
"$id = $input.__id;"
"}"
"}";
RequestResult<uint64_t> result;
try {
std::unique_lock<tl::mutex> lock;
if(!m_unqlite_is_threadsafe)
lock = std::unique_lock<tl::mutex>(m_mutex);
UnQLiteVM vm(m_db, ss.str().c_str(), this);
vm.set("collection", coll_name);
vm.execute();
result.success() = vm.get<bool>("ret");
if(!result.success()) {
result.error() = vm.get<std::string>("err");
} else {
result.value() = vm.get<uint64_t>("id");
}
unqlite_commit(m_db);
} catch(const Exception& e) {
result.success() = false;
result.error() = e.what();
}
return result;
}
virtual RequestResult<std::string> fetch(
const std::string& coll_name,
uint64_t record_id) override {
......@@ -222,6 +261,44 @@ class UnQLiteBackend : public Backend {
return result;
}
virtual RequestResult<Json::Value> fetchJson(
const std::string& coll_name,
uint64_t record_id) override {
constexpr static const char* script =
"if(!db_exists($collection)) {"
"$ret = false;"
"$err = \"Collection does not exist\";"
"} else {"
"$output = db_fetch_by_id($collection,$id);"
"if($output == NULL) {"
"$ret = false;"
"$err = \"Record does not exist\";"
"} else {"
"$ret = true;"
"}"
"}";