Commit 7114e8c4 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

added asynchronous functions

parent 178cfb6c
......@@ -12,8 +12,10 @@ cmake_minimum_required (VERSION 3.0)
project (sonata C CXX)
enable_testing ()
option(ENABLE_TESTS "Build tests. May require CppUnit_ROOT" OFF)
option(ENABLE_EXAMPLES "Build examples" OFF)
option(ENABLE_TESTS "Build tests. May require CppUnit_ROOT" OFF)
option(ENABLE_EXAMPLES "Build examples" OFF)
option(ENABLE_BENCHMARK "Build benchmark" OFF)
option(ENABLE_DAEMON "Build default daemon" OFF)
# add our cmake module directory to the path
set (CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH}
......
add_executable (sonata-daemon SonataDaemon.cpp)
target_include_directories (sonata-daemon PUBLIC $<INSTALL_INTERFACE:include>)
target_include_directories (sonata-daemon BEFORE PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../include>)
target_link_libraries (sonata-daemon sonata-server jsoncpp)
if (${ENABLE_DAEMON})
add_executable (sonata-daemon SonataDaemon.cpp)
target_include_directories (sonata-daemon PUBLIC $<INSTALL_INTERFACE:include>)
target_include_directories (sonata-daemon BEFORE PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../include>)
target_link_libraries (sonata-daemon sonata-server jsoncpp)
endif (${ENABLE_DAEMON})
add_executable (sonata-benchmark Benchmark.cpp)
#target_include_directories (sonata-benchmark PUBLIC $<INSTALL_INTERFACE:include>)
#target_include_directories (sonata-benchmark BEFORE PUBLIC
# $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../include>)
target_link_libraries(sonata-benchmark sonata-server sonata-client sonata-admin jsoncpp)
if (${ENABLE_BENCHMARK})
add_executable (sonata-benchmark Benchmark.cpp)
target_include_directories (sonata-benchmark PUBLIC $<INSTALL_INTERFACE:include>)
target_include_directories (sonata-benchmark BEFORE PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../include>)
target_link_libraries(sonata-benchmark sonata-server sonata-client sonata-admin jsoncpp)
endif (${ENABLE_BENCHMARK})
/*
* (C) 2020 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __SONATA_ASYNC_REQUEST_HPP
#define __SONATA_ASYNC_REQUEST_HPP
#include <memory>
#include <string>
namespace sonata {
class AsyncRequestImpl;
class Collection;
/**
* @brief AsyncRequest objects are used to keep track of
* on-going asynchronous operations.
*/
class AsyncRequest {
friend Collection;
public:
/**
* @brief Default constructor. Will create a non-valid AsyncRequest.
*/
AsyncRequest();
/**
* @brief Copy constructor.
*/
AsyncRequest(const AsyncRequest& other);
/**
* @brief Move constructor.
*/
AsyncRequest(AsyncRequest&& other);
/**
* @brief Copy-assignment operator.
*/
AsyncRequest& operator=(const AsyncRequest& other);
/**
* @brief Move-assignment operator.
*/
AsyncRequest& operator=(AsyncRequest&& other);
/**
* @brief Destructor.
*/
~AsyncRequest();
/**
* @brief Wait for the request to complete.
*/
void wait() const;
/**
* @brief Test if the request has completed, without blocking.
*/
bool completed() const;
/**
* @brief Checks if the Collection object is valid.
*/
operator bool() const;
private:
std::shared_ptr<AsyncRequestImpl> self;
AsyncRequest(const std::shared_ptr<AsyncRequestImpl>& impl);
};
}
#endif
......@@ -6,6 +6,7 @@
#ifndef __SONATA_COLLECTION_HPP
#define __SONATA_COLLECTION_HPP
#include <sonata/AsyncRequest.hpp>
#include <sonata/Database.hpp>
#include <json/json.h>
#include <thallium.hpp>
......@@ -78,7 +79,11 @@ class Collection {
*
* @return the record id of the stored document.
*/
uint64_t store(const std::string& record) const;
inline uint64_t store(const std::string& record) const {
uint64_t record_id;
store(record, &record_id);
return record_id;
}
/**
* @brief Stores a document into the collection.
......@@ -87,7 +92,11 @@ class Collection {
*
* @return the record id of the stored document.
*/
uint64_t store(const Json::Value& record) const;
inline uint64_t store(const Json::Value& record) const {
uint64_t record_id;
store(record, &record_id);
return record_id;
}
/**
* @brief Stores a document into the collection.
......@@ -96,30 +105,82 @@ class Collection {
*
* @return the record id of the stored document.
*/
uint64_t store(const char* record) const {
return store(std::string(record));
inline uint64_t store(const char* record) const {
uint64_t record_id;
store(record, &record_id);
return record_id;
}
/**
* @brief Fetches a document by its record id.
* @brief Asynchronously stores a document into the collection.
* If the pointer to the request is null, this function will be
* executed synchronously.
*
* @param record A valid JSON-formated document.
* @param id Pointer to a record id set when the request completes.
* @param req Pointer to a request to wait on.
*
* @return the record id of the stored document.
*/
void store(const std::string& record,
uint64_t* id, AsyncRequest* req = nullptr) const;
/**
* @brief Asynchronously stores a document into the collection.
* If the pointer to the request is null, this function will be
* executed synchronously.
*
* @param record A JSON document.
* @param id Pointer to a record id set when the request completes.
* @param req Pointer to a request to wait on.
*
* @return the record id of the stored document.
*/
void store(const Json::Value& record,
uint64_t* id, AsyncRequest* req = nullptr) const;
/**
* @brief Asynchronously stores a document into the collection.
* If the pointer to the request is null, this function will be
* executed synchronously.
*
* @param record A valid JSON-formated document.
* @param id Pointer to a record id set when the request completes.
* @param req Pointer to a request to wait on.
*
* @return the record id of the stored document.
*/
void store(const char* record,
uint64_t* id, AsyncRequest* req = nullptr) const {
store(std::string(record), id, req);
}
/**
* @brief Asynchronously fetches a document by its record id.
* If req is null, this function becomes synchronous.
*
* @param[in] id Record id.
* @param[out] result Resulting string.
* @param req Pointer to a request to wait on.
*/
void fetch(uint64_t id,
std::string* result) const;
std::string* result,
AsyncRequest* req = nullptr) const;
/**
* @brief Fetches a document by its record id.
* @brief Asynchronously fetches a document by its record id.
* If req is null, this function becomes synchronous.
*
* @param[in] id Record id.
* @param[out] result Resulting JSON object.
* @param req Pointer to a request to wait on.
*/
void fetch(uint64_t id,
Json::Value* result) const;
Json::Value* result,
AsyncRequest* req = nullptr) const;
/**
* @brief Filters the collection and returns the
* @brief Asynchronously filters the collection and returns the
* records that match the condition. This condition should
* be expressed as a Jx9 function returning TRUE or FALSE.
* For example the following Jx9 function selects only
......@@ -127,14 +188,18 @@ class Collection {
*
* "function($record) { return $record.x < 4; }"
*
* If req is null, this function becomes synchronous.
*
* @param filterCode A Jx9 filter code.
* @param result Resuling vector of records as strings.
* @param req Pointer to a request to wait on.
*/
void filter(const std::string& filterCode,
std::vector<std::string>* result) const;
std::vector<std::string>* result,
AsyncRequest* req = nullptr) const;
/**
* @brief Filters the collection and returns the
* @brief Asynchronously filters the collection and returns the
* records that match the condition. This condition should
* be expressed as a Jx9 function returning TRUE or FALSE.
* For example the following Jx9 function selects only
......@@ -142,61 +207,81 @@ class Collection {
*
* "function($record) { return $record.x < 4; }"
*
* If req is null, this function becomes synchronous.
*
* @param filterCode A Jx9 filter code.
* @param result Resuling JSON object containing the array of results.
* @param req Pointer to a request to wait on.
*/
void filter(const std::string& filterCode,
Json::Value* result) const;
Json::Value* result,
AsyncRequest* req = nullptr) const;
/**
* @brief Updates the content of a document with a new content.
* @brief Asynchronously updates the content of a document with a new content.
* If req is null, this function becomes synchronous.
*
* @param id Record id of the document to update.
* @param record New document.
* @param req Pointer to a request to wait on.
*/
void update(uint64_t id,
const Json::Value& record) const;
const Json::Value& record,
AsyncRequest* req = nullptr) const;
/**
* @brief Updates the content of a document with a new content.
* @brief Asynchronously updates the content of a document with a new content.
* If req is null, this function becomes synchronous.
*
* @param id Record id of the document to update.
* @param record New document.
* @param req Pointer to a request to wait on.
*/
void update(uint64_t id,
const std::string& record) const;
const std::string& record,
AsyncRequest* req = nullptr) const;
/**
* @brief Updates the content of a document with a new content.
* @brief Asynchronously updates the content of a document with a new content.
* If req is null, this function becomes synchronous.
*
* @param id Record id of the document to update.
* @param record New document.
* @param req Pointer to a request to wait on.
*/
void update(uint64_t id,
const char* record) const {
const char* record,
AsyncRequest* req = nullptr) const {
return update(id, std::string(record));
}
/**
* @brief Returns all the documents from the collection
* @brief Asynchronously returns all the documents from the collection
* as a vector of strings.
* If req is null, this function becomes synchronous.
*
* @param result All the documents from the collection.
* @param req Pointer to a request to wait on.
*/
void all(std::vector<std::string>* result) const;
void all(std::vector<std::string>* result,
AsyncRequest* req = nullptr) const;
/**
* @brief Returns all the documents from the collection
* @brief Asynchronously returns all the documents from the collection
* as a JSON object.
* If req is null, this function becomes synchronous.
*
* @param result All the documents from the collection.
* @param req Pointer to a request to wait on.
*/
void all(Json::Value* result) const;
void all(Json::Value* result,
AsyncRequest* req = nullptr) const;
/**
* @brief Returns the last record id used by the collection.
*
* @return The last record id.
* @param req Pointer to a request to wait on.
*/
uint64_t last_record_id() const;
......@@ -209,11 +294,13 @@ class Collection {
size_t size() const;
/**
* @brief Erases a document from the collection.
* @brief Asynchronously erases a document from the collection.
* If req is null, this function becomes synchronous.
*
* @param id Record id of the document to erase.
* @param req Pointer to a request to wait on.
*/
void erase(uint64_t id) const;
void erase(uint64_t id, AsyncRequest* req = nullptr) const;
private:
......
/*
* (C) 2020 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include "sonata/Exception.hpp"
#include "sonata/AsyncRequest.hpp"
#include "AsyncRequestImpl.hpp"
namespace sonata {
AsyncRequest::AsyncRequest() = default;
AsyncRequest::AsyncRequest(const std::shared_ptr<AsyncRequestImpl>& impl)
: self(impl) {}
AsyncRequest::AsyncRequest(const AsyncRequest& other) = default;
AsyncRequest::AsyncRequest(AsyncRequest&& other) {
self = std::move(other.self);
other.self = nullptr;
}
AsyncRequest::~AsyncRequest() {
if(self && self.unique()) {
wait();
}
}
AsyncRequest& AsyncRequest::operator=(const AsyncRequest& other) {
if(this == &other || self == other.self) return *this;
if(self && self.unique()) {
wait();
}
self = other.self;
return *this;
}
AsyncRequest& AsyncRequest::operator=(AsyncRequest&& other) {
if(this == &other || self == other.self) return *this;
if(self && self.unique()) {
wait();
}
self = std::move(other.self);
other.self = nullptr;
return *this;
}
void AsyncRequest::wait() const {
if(not self) throw Exception("Invalid sonata::AsyncRequest object");
if(self->m_waited) return;
self->m_wait_callback(*self);
self->m_waited = true;
}
bool AsyncRequest::completed() const {
if(not self) throw Exception("Invalid sonata::AsyncRequest object");
return self->m_async_response.received();
}
}
/*
* (C) 2020 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __SONATA_ASYNC_REQUEST_IMPL_H
#define __SONATA_ASYNC_REQUEST_IMPL_H
#include <functional>
#include <thallium.hpp>
namespace sonata {
namespace tl = thallium;
struct AsyncRequestImpl {
AsyncRequestImpl(tl::async_response&& async_response)
: m_async_response(std::move(async_response)) {}
tl::async_response m_async_response;
bool m_waited = false;
std::function<void(AsyncRequestImpl&)> m_wait_callback;
};
}
#endif
# list of source files
set(sonata-client-src Client.cpp Database.cpp Collection.cpp)
set(sonata-client-src Client.cpp Database.cpp Collection.cpp AsyncRequest.cpp)
set(sonata-server-src Provider.cpp Backend.cpp UnQLiteBackend.cpp UnQLiteVM.cpp)
set(sonata-admin-src Admin.cpp)
......
......@@ -7,6 +7,7 @@
#include "sonata/RequestResult.hpp"
#include "sonata/Exception.hpp"
#include "AsyncRequestImpl.hpp"
#include "ClientImpl.hpp"
#include "DatabaseImpl.hpp"
#include "CollectionImpl.hpp"
......@@ -39,139 +40,216 @@ Database Collection::database() const {
return Database(self->m_database);
}
uint64_t Collection::store(const std::string& record) const {
void Collection::store(const Json::Value& record, uint64_t* id, AsyncRequest* req) const {
store(record.toStyledString(), id, req);
}
void Collection::store(const std::string& record, uint64_t* id, AsyncRequest* req) const {
if(not self) throw Exception("Invalid sonata::Collection object");
auto& rpc = self->m_database->m_client->m_coll_store;
auto& ph = self->m_database->m_ph;
auto& db_name = self->m_database->m_name;
RequestResult<uint64_t> result = rpc.on(ph)(db_name, self->m_name, record);
if(result.success()) {
return result.value();
} else {
throw Exception(result.error());
}
return 0;
}
uint64_t Collection::store(const Json::Value& record) const {
return store(record.toStyledString());
auto async_response = rpc.on(ph).async(db_name, self->m_name, record);
auto async_request_impl = std::make_shared<AsyncRequestImpl>(std::move(async_response));
async_request_impl->m_wait_callback = [id](AsyncRequestImpl& async_request_impl) {
RequestResult<uint64_t> result = async_request_impl.m_async_response.wait();
if(result.success()) {
if(id) *id = result.value();
} else {
throw Exception(result.error());
}
};
if(req)
*req = AsyncRequest(std::move(async_request_impl));
else
AsyncRequest(std::move(async_request_impl)).wait();
}
void Collection::fetch(uint64_t id, std::string* out) const {
void Collection::fetch(uint64_t id, std::string* out, AsyncRequest* req) const {
if(not out) return;
if(not self) throw Exception("Invalid sonata::Collection object");
auto& rpc = self->m_database->m_client->m_coll_fetch;
auto& ph = self->m_database->m_ph;
auto& db_name = self->m_database->m_name;
RequestResult<std::string> result = rpc.on(ph)(db_name, self->m_name, id);
if(result.success()) {
*out = std::move(result.value());
} else {
throw Exception(result.error());
}
auto async_response = rpc.on(ph).async(db_name, self->m_name, id);
auto async_request_impl = std::make_shared<AsyncRequestImpl>(std::move(async_response));
async_request_impl->m_wait_callback = [out](AsyncRequestImpl& async_request_impl) {
RequestResult<std::string> result = async_request_impl.m_async_response.wait();
if(result.success()) {
*out = std::move(result.value());
} else {
throw Exception(result.error());
}
};
if(req)
*req = AsyncRequest(std::move(async_request_impl));
else
AsyncRequest(std::move(async_request_impl)).wait();
}
void Collection::fetch(uint64_t id, Json::Value* result) const {
if(not result) return;
std::string str;
fetch(id, &str);
std::string errors;
bool parsingSuccessful = self->m_json_reader->parse(str.c_str(),
str.c_str() + str.size(),
result,
&errors);
if(!parsingSuccessful) {
throw Exception(errors);
}
void Collection::fetch(uint64_t id, Json::Value* out, AsyncRequest* req) const {
if(not out) return;
if(not self) throw Exception("Invalid sonata::Collection object");
auto& rpc = self->m_database->m_client->m_coll_fetch;
auto& ph = self->m_database->m_ph;
auto& db_name = self->m_database->m_name;
auto async_response = rpc.on(ph).async(db_name, self->m_name, id);
auto async_request_impl = std::make_shared<AsyncRequestImpl>(std::move(async_response));
async_request_impl->m_wait_callback = [out, self=self](AsyncRequestImpl& async_request_impl) {
RequestResult<std::string> result = async_request_impl.m_async_response.wait();
if(result.success()) {
std::string errors;
bool parsingSuccessful = self->m_json_reader->parse(result.value().c_str(),
result.value().c_str() + result.value().size(),
out,
&errors);
if(!parsingSuccessful) {
throw Exception(errors);
}
} else {
throw Exception(result.error());
}
};
if(req)
*req = AsyncRequest(std::move(async_request_impl));
else
AsyncRequest(std::move(async_request_impl)).wait();
}
void Collection::filter(const std::string& filterCode, std::vector<std::string>* out) const {
void Collection::filter(const std::string& filterCode, std::vector<std::string>* out, AsyncRequest* req) const {
if(not self) throw Exception("Invalid sonata::Collection object");
auto& rpc = self->m_database->m_client->m_coll_filter;
auto& ph = self->m_database->m_ph;
auto& db_name = self->m_database->m_name;
RequestResult<std::vector<std::string>> result = rpc.on(ph)(db_name, self->m_name, filterCode);
if(result.success()) {
if(out) *out = std::move(result.value());
} else {
throw Exception(result.error());
}
}
void Collection::filter(const std::string& filterCode, Json::Value* result) const {
std::vector<std::string> array;
if(result)
filter(filterCode, &array);
auto async_response = rpc.on(ph).async(db_name, self->m_name, filterCode);
auto async_request_impl = std::make_shared<AsyncRequestImpl>(std::move(async_response));
async_request_impl->m_wait_callback = [out](AsyncRequestImpl& async_request_impl) {
RequestRes