...
 
Commits (79)
# master
* Added copy() function in serialization archives. This function
acts like a read() for input archive, and like a write() for
output archives. This allows writing a single serialize() function
instead of load()/store() functions in many situations.
* Added operator() to input and output archives to enable a syntax
similar to that of the cereal library.
......@@ -48,14 +48,25 @@ if (NOT CMAKE_BUILD_TYPE)
endif ()
set (CMAKE_PREFIX_PATH "" CACHE STRING "External dependencies path")
set (BUILD_SHARED_LIBS "OFF" CACHE BOOL "Build a shared library")
set (ENABLE_CEREAL "OFF" CACHE BOOL "Enable cereal serialization")
include_directories (${CMAKE_BINARY_DIR}/include)
# packages we depend on
if (ENABLE_CEREAL)
find_package (cereal CONFIG REQUIRED)
endif (ENABLE_CEREAL)
include (xpkg-import)
find_package (mercury CONFIG REQUIRED)
xpkg_import_module (margo REQUIRED margo)
add_subdirectory (src)
add_subdirectory (test)
add_subdirectory (examples)
if (ENABLE_TESTS)
add_subdirectory (test)
endif (ENABLE_TESTS)
if (ENABLE_EXAMPLES)
add_subdirectory (examples)
endif (ENABLE_EXAMPLES)
configure_file (include/thallium/config.hpp.in ${CMAKE_BINARY_DIR}/include/thallium/config.hpp)
......@@ -9,7 +9,7 @@ int main(int argc, char** argv) {
exit(0);
}
tl::engine myEngine("tcp", THALLIUM_CLIENT_MODE);
tl::remote_procedure hello = myEngine.define("hello").ignore_response();
tl::remote_procedure hello = myEngine.define("hello").disable_response();
tl::endpoint server = myEngine.lookup(argv[1]);
hello.on(server)();
......
......@@ -10,8 +10,8 @@ void hello(const tl::request& req) {
int main(int argc, char** argv) {
tl::engine myEngine("tcp", THALLIUM_SERVER_MODE);
std::cout << "Server running at address " << (std::string)myEngine.self() << std::endl;
myEngine.define("hello", hello).ignore_response();
std::cout << "Server running at address " << myEngine.self() << std::endl;
myEngine.define("hello", hello).disable_response();
return 0;
}
......
......@@ -11,7 +11,7 @@ void sum(const tl::request& req, int x, int y) {
int main(int argc, char** argv) {
tl::engine myEngine("tcp", THALLIUM_SERVER_MODE);
std::cout << "Server running at address " << (std::string)myEngine.self() << std::endl;
std::cout << "Server running at address " << myEngine.self() << std::endl;
myEngine.define("sum", sum);
return 0;
......
......@@ -6,7 +6,7 @@ namespace tl = thallium;
int main(int argc, char** argv) {
tl::engine myEngine("tcp", THALLIUM_SERVER_MODE);
std::cout << "Server running at address " << (std::string)myEngine.self() << std::endl;
std::cout << "Server running at address " << myEngine.self() << std::endl;
std::function<void(const tl::request&, int, int)> sum =
[](const tl::request& req, int x, int y) {
......
......@@ -6,7 +6,7 @@ namespace tl = thallium;
int main(int argc, char** argv) {
tl::engine myEngine("tcp", THALLIUM_SERVER_MODE);
std::cout << "Server running at address " << (std::string)myEngine.self() << std::endl;
std::cout << "Server running at address " << myEngine.self() << std::endl;
std::function<void(const tl::request&, int, int)> sum =
[&myEngine](const tl::request& req, int x, int y) {
......@@ -17,6 +17,8 @@ int main(int argc, char** argv) {
myEngine.define("sum", sum);
myEngine.push_finalize_callback([]() { std::cout << "Finalization was called" << std::endl; });
return 0;
}
......@@ -10,7 +10,7 @@ int main(int argc, char** argv) {
exit(0);
}
tl::engine myEngine("tcp", THALLIUM_CLIENT_MODE);
tl::remote_procedure hello = myEngine.define("hello").ignore_response();
tl::remote_procedure hello = myEngine.define("hello").disable_response();
tl::endpoint server = myEngine.lookup(argv[1]);
std::string name = "Matthieu";
hello.on(server)(name);
......
......@@ -12,8 +12,8 @@ void hello(const tl::request& req, const std::string& name) {
int main(int argc, char** argv) {
tl::engine myEngine("tcp", THALLIUM_SERVER_MODE);
std::cout << "Server running at address " << (std::string)myEngine.self() << std::endl;
myEngine.define("hello", hello).ignore_response();
std::cout << "Server running at address " << myEngine.self() << std::endl;
myEngine.define("hello", hello).disable_response();
return 0;
}
......
......@@ -7,7 +7,7 @@ namespace tl = thallium;
int main(int argc, char** argv) {
tl::engine myEngine("tcp", THALLIUM_SERVER_MODE);
std::cout << "Server running at address " << (std::string)myEngine.self() << std::endl;
std::cout << "Server running at address " << myEngine.self() << std::endl;
std::function<void(const tl::request&, const point&, const point&)> dot_product =
[&myEngine](const tl::request& req, const point& p, const point& q) {
......
......@@ -9,7 +9,7 @@ int main(int argc, char** argv) {
exit(0);
}
tl::engine myEngine("tcp", MARGO_CLIENT_MODE);
tl::remote_procedure remote_do_rdma = myEngine.define("do_rdma").ignore_response();
tl::remote_procedure remote_do_rdma = myEngine.define("do_rdma");
tl::endpoint server_endpoint = myEngine.lookup(argv[1]);
std::string buffer = "Matthieu";
......
......@@ -7,7 +7,7 @@ namespace tl = thallium;
int main(int argc, char** argv) {
tl::engine myEngine("tcp", THALLIUM_SERVER_MODE);
std::cout << "Server running at address " << (std::string)myEngine.self() << std::endl;
std::cout << "Server running at address " << myEngine.self() << std::endl;
std::function<void(const tl::request&, tl::bulk&)> f =
[&myEngine](const tl::request& req, tl::bulk& b) {
......@@ -21,7 +21,8 @@ int main(int argc, char** argv) {
std::cout << "Server received bulk: ";
for(auto c : v) std::cout << c;
std::cout << std::endl;
req.respond(1);
};
myEngine.define("do_rdma",f).ignore_response();
myEngine.define("do_rdma",f);
}
add_executable(08_async_sum_server server.cpp)
target_link_libraries(08_async_sum_server thallium)
add_executable(08_async_sum_client client.cpp)
target_link_libraries(08_async_sum_client thallium)
#include <iostream>
#include <thallium.hpp>
namespace tl = thallium;
int main(int argc, char** argv) {
if(argc != 2) {
std::cerr << "Usage: " << argv[0] << " <address>" << std::endl;
exit(0);
}
tl::engine myEngine("tcp", THALLIUM_CLIENT_MODE);
tl::remote_procedure sum = myEngine.define("sum");
tl::endpoint server = myEngine.lookup(argv[1]);
auto response = sum.on(server).async(42,63);
int ret = response.wait();
std::cout << "Server answered " << ret << std::endl;
return 0;
}
#include <iostream>
#include <thallium.hpp>
namespace tl = thallium;
void sum(const tl::request& req, int x, int y) {
std::cout << "Computing " << x << "+" << y << std::endl;
req.respond(x+y);
}
int main(int argc, char** argv) {
tl::engine myEngine("tcp", THALLIUM_SERVER_MODE);
std::cout << "Server running at address " << myEngine.self() << std::endl;
myEngine.define("sum", sum);
return 0;
}
add_executable(09_sum_provider server.cpp)
target_link_libraries(09_sum_provider thallium)
add_executable(09_sum_client client.cpp)
target_link_libraries(09_sum_client thallium)
#include <iostream>
#include <thallium/serialization/stl/string.hpp>
#include <thallium.hpp>
namespace tl = thallium;
int main(int argc, char** argv) {
if(argc != 3) {
std::cerr << "Usage: " << argv[0] << " <address> <provider_id>" << std::endl;
exit(0);
}
tl::engine myEngine("tcp", THALLIUM_CLIENT_MODE);
tl::remote_procedure sum = myEngine.define("sum");
tl::remote_procedure prod = myEngine.define("prod");
tl::remote_procedure hello = myEngine.define("hello").disable_response();
tl::remote_procedure print = myEngine.define("print").disable_response();
tl::endpoint server = myEngine.lookup(argv[1]);
uint16_t provider_id = atoi(argv[2]);
tl::provider_handle ph(server, provider_id);
int ret = sum.on(ph)(42,63);
std::cout << "(sum) Server answered " << ret << std::endl;
ret = prod.on(ph)(42,63);
std::cout << "(prod) Server answered " << ret << std::endl;
std::string name("Matthieu");
hello.on(ph)(name);
std::cout << "Done sending hello RPC, no response expected" << std::endl;
print.on(ph)(name);
std::cout << "Done sending print RPC, no response expected" << std::endl;
return 0;
}
#include <iostream>
#include <thallium.hpp>
#include <thallium/serialization/stl/string.hpp>
namespace tl = thallium;
class my_sum_provider : public tl::provider<my_sum_provider> {
private:
void prod(const tl::request& req, int x, int y) {
std::cout << "Computing " << x << "*" << y << std::endl;
req.respond(x+y);
}
int sum(int x, int y) const {
std::cout << "Computing " << x << "+" << y << std::endl;
return x+y;
}
void hello(const std::string& name) {
std::cout << "Hello, " << name << std::endl;
}
int print(const std::string& word) {
std::cout << "Printing " << word << std::endl;
return word.size();
}
public:
my_sum_provider(tl::engine& e, uint16_t provider_id=1)
: tl::provider<my_sum_provider>(e, provider_id) {
define("prod", &my_sum_provider::prod);
define("sum", &my_sum_provider::sum);
define("hello", &my_sum_provider::hello).disable_response();
define("print", &my_sum_provider::print, tl::ignore_return_value());
}
~my_sum_provider() {
wait_for_finalize();
}
};
int main(int argc, char** argv) {
uint16_t provider_id = 22;
tl::engine myEngine("tcp", THALLIUM_SERVER_MODE);
std::cout << "Server running at address " << myEngine.self()
<< " with provider id " << provider_id << std::endl;
my_sum_provider myProvider(myEngine, provider_id);
return 0;
}
add_executable(10_thread main.cpp)
target_link_libraries(10_thread thallium)
#include <iostream>
#include <thallium.hpp>
namespace tl = thallium;
void hello() {
tl::xstream es = tl::xstream::self();
std::cout << "Hello World from ES "
<< es.get_rank() << ", ULT "
<< tl::thread::self_id() << std::endl;
}
int main(int argc, char** argv) {
tl::abt scope;
std::vector<tl::managed<tl::xstream>> ess;
for(int i=0; i < 4; i++) {
tl::managed<tl::xstream> es = tl::xstream::create();
ess.push_back(std::move(es));
}
std::vector<tl::managed<tl::thread>> ths;
for(int i=0; i < 16; i++) {
tl::managed<tl::thread> th = ess[i % ess.size()]->make_thread(hello);
ths.push_back(std::move(th));
}
for(auto& mth : ths) {
mth->join();
}
for(int i=0; i < 4; i++) {
ess[i]->join();
}
return 0;
}
add_executable(11_task main.cpp)
target_link_libraries(11_task thallium)
#include <iostream>
#include <thallium.hpp>
namespace tl = thallium;
void hello() {
tl::xstream es = tl::xstream::self();
std::cout << "Hello World from ES "
<< es.get_rank() << ", TASK "
<< tl::task::self_id() << std::endl;
}
int main(int argc, char** argv) {
tl::abt scope;
std::vector<tl::managed<tl::xstream>> ess;
tl::xstream primary = tl::xstream::self();
for(int i=0; i < 4; i++) {
tl::managed<tl::xstream> es = tl::xstream::create();
ess.push_back(std::move(es));
}
std::vector<tl::managed<tl::task>> tsks;
for(int i=0; i < 16; i++) {
tl::managed<tl::task> tsk = ess[i % ess.size()]->make_task(hello);
tsks.push_back(std::move(tsk));
}
for(auto& mtsk : tsks) {
mtsk->join();
}
for(int i=0; i < 4; i++) {
ess[i]->join();
}
return 0;
}
add_executable(12_shared_pool main.cpp)
target_link_libraries(12_shared_pool thallium)
#include <iostream>
#include <thallium.hpp>
namespace tl = thallium;
void hello() {
tl::xstream es = tl::xstream::self();
std::cout << "Hello World from ES "
<< es.get_rank() << ", ULT "
<< tl::thread::self_id() << std::endl;
}
int main(int argc, char** argv) {
tl::abt scope;
std::vector<tl::managed<tl::xstream>> ess;
tl::managed<tl::pool> myPool = tl::pool::create(tl::pool::access::spmc);
for(int i=0; i < 4; i++) {
tl::managed<tl::xstream> es
= tl::xstream::create(tl::scheduler::predef::deflt, *myPool);
ess.push_back(std::move(es));
}
std::vector<tl::managed<tl::thread>> ths;
for(int i=0; i < 16; i++) {
tl::managed<tl::thread> th = ess[i % ess.size()]->make_thread(hello);
ths.push_back(std::move(th));
}
for(auto& mth : ths) {
mth->join();
}
for(int i=0; i < 4; i++) {
ess[i]->join();
}
return 0;
}
add_executable(13_mutex main.cpp)
target_link_libraries(13_mutex thallium)
#include <iostream>
#include <unistd.h>
#include <thallium.hpp>
namespace tl = thallium;
int myCounter = 0;
void hello(tl::mutex& mtx) {
tl::xstream es = tl::xstream::self();
mtx.lock();
std::cout << "Hello World from ES "
<< es.get_rank() << ", ULT "
<< tl::thread::self_id()
<< ", counter = " << myCounter << std::endl;
myCounter += 1;
mtx.unlock();
}
int main(int argc, char** argv) {
tl::abt scope;
std::vector<tl::managed<tl::xstream>> ess;
for(int i=0; i < 4; i++) {
tl::managed<tl::xstream> es = tl::xstream::create();
ess.push_back(std::move(es));
}
tl::mutex myMutex;
std::vector<tl::managed<tl::thread>> ths;
for(int i=0; i < 16; i++) {
tl::managed<tl::thread> th
= ess[i % ess.size()]->make_thread([&myMutex]() {
hello(myMutex);
});
ths.push_back(std::move(th));
}
for(auto& mth : ths) {
mth->join();
}
for(int i=0; i < 4; i++) {
ess[i]->join();
}
return 0;
}
add_executable(14_custom_sched main.cpp)
target_link_libraries(14_custom_sched thallium)
#include <iostream>
#include <unistd.h>
#include <deque>
#include <mutex> // to use std::lock_guard
#include <algorithm>
#include <thallium.hpp>
#define NUM_XSTREAMS 1
#define NUM_THREADS 16
namespace tl = thallium;
class my_unit;
class my_pool;
class my_sched;
class my_unit {
tl::thread m_thread;
tl::task m_task;
tl::unit_type m_type;
bool m_in_pool;
friend class my_pool;
public:
my_unit(const tl::thread& t)
: m_thread(t), m_type(tl::unit_type::thread), m_in_pool(false) {}
my_unit(const tl::task& t)
: m_task(t), m_type(tl::unit_type::task), m_in_pool(false) {}
tl::unit_type get_type() const {
return m_type;
}
const tl::thread& get_thread() const {
return m_thread;
}
const tl::task& get_task() const {
return m_task;
}
bool is_in_pool() const {
return m_in_pool;
}
~my_unit() {}
};
class my_pool {
mutable tl::mutex m_mutex;
std::deque<my_unit*> m_units;
public:
my_pool() {}
size_t get_size() const {
std::lock_guard<tl::mutex> lock(m_mutex);
return m_units.size();
}
void push(my_unit* u) {
std::lock_guard<tl::mutex> lock(m_mutex);
u->m_in_pool = true;
m_units.push_back(u);
}
my_unit* pop() {
std::lock_guard<tl::mutex> lock(m_mutex);
if(m_units.empty())
return nullptr;
my_unit* u = m_units.front();
m_units.pop_front();
u->m_in_pool = false;
return u;
}
void remove(my_unit* u) {
std::lock_guard<tl::mutex> lock(m_mutex);
auto it = std::find(m_units.begin(), m_units.end(), u);
if(it != m_units.end()) {
(*it)->m_in_pool = false;
m_units.erase(it);
}
}
~my_pool() {
std::cerr << "Pool destructor " << std::endl;
}
};
class my_scheduler : private tl::scheduler {
public:
template<typename ... Args>
my_scheduler(Args&&... args)
: tl::scheduler(std::forward<Args>(args)...) {}
void run() {
int n = num_pools();
my_unit* unit;
int target;
unsigned seed = time(NULL);
while (1) {
/* Execute one work unit from the scheduler's pool */
unit = get_pool(0).pop<my_unit>();
if(unit != nullptr) {
get_pool(0).run_unit(unit);
} else if (n > 1) {
/* Steal a work unit from other pools */
target = (n == 2) ? 1 : (rand_r(&seed) % (n-1) + 1);
unit = get_pool(target).pop<my_unit>();
if(unit != nullptr)
get_pool(target).run_unit(unit);
}
if(has_to_stop()) break;
tl::xstream::check_events(*this);
}
}
tl::pool get_migr_pool() const {
return get_pool(0);
}
~my_scheduler() {
std::cerr << "scheduler destructor "<< std::endl;
}
};
void hello() {
tl::xstream es = tl::xstream::self();
std::cout << "Hello World from ES "
<< es.get_rank() << ", ULT "
<< tl::thread::self_id()
<< std::endl;
}
int main(int argc, char** argv) {
tl::abt scope;
// create pools
std::vector<tl::managed<tl::pool>> pools;
for(int i=0; i < NUM_XSTREAMS; i++) {
pools.push_back(tl::pool::create<tl::pool::access::mpmc, my_pool, my_unit>());
}
// create schedulers
std::vector<tl::managed<tl::scheduler>> scheds;
for(int i=0; i < NUM_XSTREAMS; i++) {
std::vector<tl::pool> pools_for_sched_i;
for(int j=0; j < pools.size(); j++) {
pools_for_sched_i.push_back(*pools[j+i % pools.size()]);
}
scheds.push_back(tl::scheduler::create<my_scheduler>(pools_for_sched_i.begin(), pools_for_sched_i.end()));
}
std::vector<tl::managed<tl::xstream>> ess;
for(int i=0; i < NUM_XSTREAMS; i++) {
tl::managed<tl::xstream> es = tl::xstream::create(*scheds[i]);
ess.push_back(std::move(es));
}
std::vector<tl::managed<tl::thread>> ths;
for(int i=0; i < NUM_THREADS; i++) {
tl::managed<tl::thread> th
= ess[i % ess.size()]->make_thread([]() {
hello();
});
ths.push_back(std::move(th));
}
for(auto& mth : ths) {
mth->join();
}
for(int i=0; i < NUM_XSTREAMS; i++) {
ess[i]->join();
}
return 0;
}
add_executable(15_sum_server server.cpp)
target_link_libraries(15_sum_server thallium)
add_executable(15_sum_client client.cpp)
target_link_libraries(15_sum_client thallium)
#include <iostream>
#include <thallium.hpp>
namespace tl = thallium;
int main(int argc, char** argv) {
if(argc != 2) {
std::cerr << "Usage: " << argv[0] << " <address>" << std::endl;
exit(0);
}
tl::engine myEngine("tcp", THALLIUM_CLIENT_MODE);
tl::remote_procedure sum = myEngine.define("sum");
tl::endpoint server = myEngine.lookup(argv[1]);
tl::provider_handle ph(server, 1);
int ret = sum.on(ph)(42,63);
std::cout << "Server answered " << ret << std::endl;
return 0;
}
#include <iostream>
#include <thallium.hpp>
namespace tl = thallium;
void sum(const tl::request& req, int x, int y) {
std::cout << "Computing " << x << "+" << y << std::endl;
req.respond(x+y);
}
int main(int argc, char** argv) {
tl::abt scope;
tl::engine myEngine("tcp", THALLIUM_SERVER_MODE);
std::vector<tl::managed<tl::xstream>> ess;
tl::managed<tl::pool> myPool = tl::pool::create(tl::pool::access::spmc);
for(int i=0; i < 4; i++) {
tl::managed<tl::xstream> es
= tl::xstream::create(tl::scheduler::predef::deflt, *myPool);
ess.push_back(std::move(es));
}
std::cout << "Server running at address " << myEngine.self() << std::endl;
myEngine.define("sum", sum, 1, *myPool);
myEngine.wait_for_finalize();
for(int i=0; i < 4; i++) {
ess[i]->join();
}
return 0;
}
......@@ -6,3 +6,11 @@ add_subdirectory(04_stop)
add_subdirectory(05_stl)
add_subdirectory(06_custom)
add_subdirectory(07_rdma)
add_subdirectory(08_async)
add_subdirectory(09_provider)
add_subdirectory(10_threads)
add_subdirectory(11_tasks)
add_subdirectory(12_shared_pool)
add_subdirectory(13_mutex)
add_subdirectory(14_custom_sched)
add_subdirectory(15_rpc_pool)
......@@ -2,12 +2,33 @@
#define __THALLIUM_HPP
#include <margo.h>
#include <thallium/config.hpp>
#include <thallium/abt.hpp>
#include <thallium/anonymous.hpp>
#include <thallium/bulk_mode.hpp>
#include <thallium/bulk.hpp>
#include <thallium/timeout.hpp>
#include <thallium/engine.hpp>
#include <thallium/endpoint.hpp>
#include <thallium/remote_procedure.hpp>
#include <thallium/callable_remote_procedure.hpp>
#include <thallium/remote_bulk.hpp>
#include <thallium/provider.hpp>
#include <thallium/provider_handle.hpp>
#include <thallium/xstream.hpp>
#include <thallium/barrier.hpp>
#include <thallium/condition_variable.hpp>
#include <thallium/eventual.hpp>
#include <thallium/thread.hpp>
#include <thallium/unit_type.hpp>
#include <thallium/pool.hpp>
#include <thallium/scheduler.hpp>
#include <thallium/mutex.hpp>
#include <thallium/rwlock.hpp>
#include <thallium/exception.hpp>
#include <thallium/timer.hpp>
#include <thallium/future.hpp>
#include <thallium/xstream_barrier.hpp>
#include <thallium/self.hpp>
#endif
/*
* Copyright (c) 2017 UChicago Argonne, LLC
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __THALLIUM_ABT_HPP
#define __THALLIUM_ABT_HPP
#include <thallium/exception.hpp>
#include <thallium/abt_errors.hpp>
namespace thallium {
/**
* Exception class thrown by the abt class.
*/
class abt_exception : public exception {
public:
template<typename ... Args>
abt_exception(Args&&... args)
: exception(std::forward<Args>(args)...) {}
};
#define TL_ABT_EXCEPTION(__fun,__ret) \
abt_exception(#__fun," returned ", abt_error_get_name(__ret),\
" (", abt_error_get_description(__ret),") in ",__FILE__,":",__LINE__)
#define TL_ABT_ASSERT(__call) {\
int __ret = __call; \
if(__ret != ABT_SUCCESS) {\
throw TL_ABT_EXCEPTION(__call, __ret);\
}\
}
class abt {
public:
abt() {
initialize();
}
~abt() {
finalize();
}
/**
* @brief Initialize the Argobots execution environment.
*/
static void initialize() {
TL_ABT_ASSERT(ABT_init(0, nullptr));
}
/**
* @brief Check whether Argobots has been initialized.
*
* @return true if Argobots has been initialized.
*/
static bool initialized() {
return ABT_initialized() == ABT_TRUE;
}
/**
* @brief Finalizes Argobots.
*/
static void finalize() {
TL_ABT_ASSERT(ABT_finalize());
}
};
}
#undef TL_ABT_EXCEPTION
#undef TL_ABT_ASSERT
#endif
/*
* Copyright (c) 2017 UChicago Argonne, LLC
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __THALLIUM_ABT_ERRORS_HPP
#define __THALLIUM_ABT_ERRORS_HPP
namespace thallium {
/**
* @brief For internal use. Converts an error code
* returned by an Argobots function into a string name.
*
* @param err Error code
*
* @return Name of the error.
*/
const char* abt_error_get_name(int err);
/**
* @brief For internal use. Converts an error code
* returned by an Argobots function into a string description.
*
* @param err Error code
*
* @return Description of the error.
*/
const char* abt_error_get_description(int err);
}
#endif
/*
* Copyright (c) 2017 UChicago Argonne, LLC
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __THALLIUM_ANONYMOUS_HPP
#define __THALLIUM_ANONYMOUS_HPP
namespace thallium {
struct anonymous {};
}
#endif
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __THALLIUM_ASYNC_RESPONSE_HPP
#define __THALLIUM_ASYNC_RESPONSE_HPP
#include <thallium/margo_exception.hpp>
#include <thallium/buffer.hpp>
#include <thallium/packed_response.hpp>
namespace thallium {
class callable_remote_procedure;
/**
* @brief async_response objects are created by sending an
* RPC in a non-blocking way. They can be used to wait for
* the actual response.
*/
class async_response {
friend class callable_remote_procedure;
private:
margo_request m_request;
engine* m_engine;
hg_handle_t m_handle;
bool m_ignore_response;
/**
* @brief Constructor. Made private since async_response
* objects are created by callable_remote_procedure only.
*
* @param req Margo request to wait on.
* @param e Engine associated with the RPC.
* @param c callable_remote_procedure that created the async_response.
* @param ignore_resp whether response should be ignored.
*/
async_response(margo_request req, engine& e, hg_handle_t handle, bool ignore_resp)
: m_request(req), m_engine(&e), m_handle(handle), m_ignore_response(ignore_resp) {
margo_ref_incr(handle);
}
public:
/**
* @brief Copy constructor is deleted.
*/
async_response(const async_response& other) = delete;
/**
* @brief Move-constructor.
*
* @param other async_response to move from.
*/
async_response(async_response&& other)
: m_request(other.m_request)
, m_engine(other.m_engine)
, m_handle(other.m_handle)
, m_ignore_response(other.m_ignore_response) {
other.m_request = MARGO_REQUEST_NULL;
other.m_engine = nullptr;
other.m_handle = HG_HANDLE_NULL;
}
/**
* @brief Copy-assignment operator is deleted.
*/
async_response& operator=(const async_response& other) = delete;
/**
* @brief Move-assignment operator is deleted.
*/
async_response& operator=(async_response&& other) = delete;
/**
* @brief Destructor.
*/
~async_response() {
if(m_handle != HG_HANDLE_NULL)
margo_destroy(m_handle);
}
/**
* @brief Waits for the async_response to be ready and returns
* a packed_response when the response has been received.
*
* @return a packed_response containing the response.
*/
packed_response wait();
/**
* @brief Tests without blocking if the response has been received.
*
* @return true if the response has been received, false otherwise.
*/
bool received() const {
int ret;
int flag;
ret = margo_test(m_request, &flag);
MARGO_ASSERT((hg_return_t)ret, margo_test);
return flag;
}
};
}
#endif
/*
* Copyright (c) 2017 UChicago Argonne, LLC
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __THALLIUM_BARRIER_HPP
#define __THALLIUM_BARRIER_HPP
#include <abt.h>
#include <thallium/abt_errors.hpp>
#include <thallium/exception.hpp>
namespace thallium {
/**
* Exception class thrown by the barrier class.
*/
class barrier_exception : public exception {
public:
template<typename ... Args>
barrier_exception(Args&&... args)
: exception(std::forward<Args>(args)...) {}
};
#define TL_BARRIER_EXCEPTION(__fun,__ret) \
barrier_exception(#__fun," returned ", abt_error_get_name(__ret),\
" (", abt_error_get_description(__ret),") in ",__FILE__,":",__LINE__);
#define TL_BARRIER_ASSERT(__call) {\
int __ret = __call; \
if(__ret != ABT_SUCCESS) {\
throw TL_BARRIER_EXCEPTION(__call, __ret);\
}\
}
/**
* @brief Wrapper for Argobots' ABT_barrier.
*/
class barrier {
ABT_barrier m_barrier;
public:
/**
* @brief Native handle type (ABT_barrier)
*/
typedef ABT_barrier native_handle_type;
/**
* @brief Constructor.
*
* @param num_waiters Number of waiters.
*/
explicit barrier(uint32_t num_waiters) {
TL_BARRIER_ASSERT(ABT_barrier_create(num_waiters, &m_barrier));
}
/**
* @brief Copy constructor is deleted.
*/
barrier(const barrier& other) = delete;
/**
* @brief Copy assignment operator is deleted.
*/
barrier& operator=(const barrier& other) = delete;
/**
* @brief Move assignment operator.
*
* If the right and left operands are different,
* this method will free the left operand's resource (if
* necessary), and assign it the right operand's resource.
* The right operand will be invalidated.
*/
barrier& operator=(barrier&& other) {
if(this == &other) return *this;
if(m_barrier != ABT_BARRIER_NULL) {
TL_BARRIER_ASSERT(ABT_barrier_free(&m_barrier));
}
m_barrier = other.m_barrier;
other.m_barrier = ABT_BARRIER_NULL;
return *this;
}
/**
* @brief Move constructor. The right operand
* will be invalidated.
*
* @param other barrier object to move from.
*/
barrier(barrier&& other)
: m_barrier(other.m_barrier) {
other.m_barrier = ABT_BARRIER_NULL;
}
/**
* @brief Destructor.
*/
~barrier() {
if(m_barrier != ABT_BARRIER_NULL)
ABT_barrier_free(&m_barrier);
}
/**
* @brief Reinitializes the barrier for a given
* number of waiters.
*
* @param num_waiters Number of waiters.
*/
void reinit(uint32_t num_waiters) {
if(m_barrier == ABT_BARRIER_NULL) {
TL_BARRIER_ASSERT(ABT_barrier_create(num_waiters, &m_barrier));
} else {
TL_BARRIER_ASSERT(ABT_barrier_reinit(m_barrier, num_waiters));
}
}
/**
* @brief Waits on the barrier.
*/
void wait() {
TL_BARRIER_ASSERT(ABT_barrier_wait(m_barrier));
}
/**
* @brief Get the number of waiters that the barrier
* is expecting (passed to the constructor or to reinit).
*
* @return The number of waiters.
*/
uint32_t get_num_waiters() const {
uint32_t n;
TL_BARRIER_ASSERT(ABT_barrier_get_num_waiters(m_barrier, &n));
return n;
}
/**
* @brief Get the underlying ABT_barrier handle.
*
* @return the underlying ABT_barrier handle.
*/
native_handle_type native_handle() const noexcept {
return m_barrier;
}
};
}
#undef TL_BARRIER_EXCEPTION
#undef TL_BARRIER_ASSERT
#endif /* end of include guard */
......@@ -6,15 +6,120 @@
#ifndef __THALLIUM_BUFFER_HPP
#define __THALLIUM_BUFFER_HPP
#include <vector>
#include <stdlib.h>
#include <string.h>
namespace thallium {
/**
* @brief buffer object defined as std::vector<char>.
*/
using buffer = std::vector<char>;
class buffer {
char* m_data = nullptr;
size_t m_size = 0;
size_t m_capacity = 0;
public:
buffer() = default;
buffer(size_t initialSize) {
resize(initialSize);
}
~buffer() {
if(m_data != nullptr)
free(m_data);
}
buffer(const buffer& other) {
if(other.m_data != nullptr) {
m_data = static_cast<char*>(malloc(other.m_size));
m_size = other.m_size;
m_capacity = other.m_size;