Commit bee21832 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

added support for custom, Argobots based mutexes

parent 15a56c4e
# list of source files
set(sonata-client-src Client.cpp Database.cpp Collection.cpp AsyncRequest.cpp)
set(sonata-server-src Provider.cpp Backend.cpp UnQLiteBackend.cpp UnQLiteVM.cpp)
set(sonata-server-src Provider.cpp Backend.cpp UnQLiteBackend.cpp UnQLiteVM.cpp UnQLiteMutex.cpp)
set(sonata-admin-src Admin.cpp)
# load package helper for generating cmake CONFIG packages
......
......@@ -5,7 +5,15 @@
*/
#include "sonata/Backend.hpp"
// TODO once these PR land, just #include <jx9.h>
// https://github.com/symisc/unqlite/pull/104
// https://github.com/symisc/unqlite/pull/102
#define JX9_LIB_CONFIG_USER_MUTEX 3
#define JX9_LIB_CONFIG_THREAD_LEVEL_MULTI 5
extern "C" int jx9_lib_config(int nConfigOp, ...);
#include "UnQLiteVM.hpp"
#include "UnQLiteMutex.hpp"
#include <spdlog/spdlog.h>
#include <json/json.h>
......@@ -26,9 +34,6 @@ class UnQLiteBackend : public Backend {
UnQLiteBackend() {
m_unqlite_is_threadsafe = unqlite_lib_is_threadsafe();
if(m_unqlite_is_threadsafe) {
unqlite_lib_config(UNQLITE_LIB_CONFIG_THREAD_LEVEL_MULTI);
}
}
UnQLiteBackend(UnQLiteBackend&&) = delete;
......@@ -104,7 +109,6 @@ class UnQLiteBackend : public Backend {
return result;
}
virtual RequestResult<bool> dropCollection(
const std::string& coll_name) override {
constexpr static const char* script =
......@@ -525,6 +529,18 @@ std::unique_ptr<Backend> UnQLiteBackend::create(const Json::Value& config) {
throw Exception("Database file "s + db_path + " already exists");
}
}
// Setup Unqlite so it can be multithreaded
static bool unqlite_lib_is_initialized = false;
if(not unqlite_lib_is_initialized) {
jx9_lib_config(JX9_LIB_CONFIG_THREAD_LEVEL_MULTI);
jx9_lib_config(JX9_LIB_CONFIG_USER_MUTEX,
ExportUnqliteArgobotsMutexMethods());
unqlite_lib_config(UNQLITE_LIB_CONFIG_THREAD_LEVEL_MULTI);
unqlite_lib_config(UNQLITE_LIB_CONFIG_USER_MUTEX,
ExportUnqliteArgobotsMutexMethods());
unqlite_lib_is_initialized = true;
}
// Open the Unqlite database
unqlite* pDB;
spdlog::trace("[unqlite] Creating UnQLite database");
int ret;
......
#include <abt.h>
#include <unqlite.h>
#include <cstdlib>
#include <iostream>
#define SXMUTEX_TYPE_FAST 1
#define SXMUTEX_TYPE_RECURSIVE 2
#define SXMUTEX_TYPE_STATIC_1 3
#define SXMUTEX_TYPE_STATIC_2 4
#define SXMUTEX_TYPE_STATIC_3 5
#define SXMUTEX_TYPE_STATIC_4 6
#define SXMUTEX_TYPE_STATIC_5 7
#define SXMUTEX_TYPE_STATIC_6 8
struct ABTSyMutex
{
ABT_mutex sMutex;
unsigned int nType;
};
static ABTSyMutex aStaticMutexes[] = {
{ABT_MUTEX_NULL, SXMUTEX_TYPE_STATIC_1},
{ABT_MUTEX_NULL, SXMUTEX_TYPE_STATIC_2},
{ABT_MUTEX_NULL, SXMUTEX_TYPE_STATIC_3},
{ABT_MUTEX_NULL, SXMUTEX_TYPE_STATIC_4},
{ABT_MUTEX_NULL, SXMUTEX_TYPE_STATIC_5},
{ABT_MUTEX_NULL, SXMUTEX_TYPE_STATIC_6}
};
static int ABTMutexGlobalInit(void) {
std::cout << "ABTMutexGlobalInit" << std::endl;
for(unsigned i=0; i < 6; i++) {
ABT_mutex_create(&aStaticMutexes[i].sMutex);
}
return 0;
}
static void ABTMutexGlobalRelease(void) {
std::cout << "ABTMutexGlobalRelease" << std::endl;
for(unsigned i=0; i < 6; i++) {
ABT_mutex_free(&aStaticMutexes[i].sMutex);
}
}
static SyMutex* ABTMutexNew(int nType)
{
std::cout << "ABTMutexNew" << std::endl;
if(nType == SXMUTEX_TYPE_FAST || nType == SXMUTEX_TYPE_RECURSIVE) {
ABTSyMutex *pMutex = (ABTSyMutex *)malloc(sizeof(ABTSyMutex));
pMutex->nType = nType;
if(!pMutex){
return nullptr;
}
if(nType == SXMUTEX_TYPE_RECURSIVE) {
ABT_mutex_attr attr;
ABT_mutex_attr_create(&attr);
ABT_mutex_attr_set_recursive(attr, ABT_TRUE);
ABT_mutex_create_with_attr(attr, &(pMutex->sMutex));
ABT_mutex_attr_free(&attr);
} else {
ABT_mutex_create(&(pMutex->sMutex));
}
return reinterpret_cast<SyMutex*>(pMutex);
} else {
ABTSyMutex *pMutex = nullptr;
/* Use a pre-allocated static mutex */
if(nType > SXMUTEX_TYPE_STATIC_6) {
nType = SXMUTEX_TYPE_STATIC_6;
}
pMutex = &aStaticMutexes[nType - 3];
return reinterpret_cast<SyMutex*>(pMutex);
}
}
static void ABTMutexRelease(SyMutex *arg)
{
auto pMutex = reinterpret_cast<ABTSyMutex*>(arg);
if(pMutex->nType == SXMUTEX_TYPE_FAST || pMutex->nType == SXMUTEX_TYPE_RECURSIVE) {
ABT_mutex_free(&pMutex->sMutex);
free(pMutex);
}
}
static void ABTMutexEnter(SyMutex *arg)
{
auto pMutex = reinterpret_cast<ABTSyMutex*>(arg);
ABT_mutex_lock(pMutex->sMutex);
}
static int ABTMutexTryEnter(SyMutex *arg)
{
auto pMutex = reinterpret_cast<ABTSyMutex*>(arg);
int ret = ABT_mutex_trylock(pMutex->sMutex);
return ret;
}
static void ABTMutexLeave(SyMutex *arg)
{
auto pMutex = reinterpret_cast<ABTSyMutex*>(arg);
ABT_mutex_unlock(pMutex->sMutex);
}
static const SyMutexMethods sABTMutexMethods = {
ABTMutexGlobalInit,
ABTMutexGlobalRelease,
ABTMutexNew,
ABTMutexRelease,
ABTMutexEnter,
ABTMutexTryEnter,
ABTMutexLeave
};
const SyMutexMethods* ExportUnqliteArgobotsMutexMethods(void)
{
return &sABTMutexMethods;
}
#ifndef __UNQLITE_MUTEX_HPP
#define __UNQLITE_MUTEX_HPP
const SyMutexMethods* ExportUnqliteArgobotsMutexMethods(void);
#endif
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment