Commit 0fabe659 authored by Matthieu Dorier's avatar Matthieu Dorier
Browse files

Merge branch 'dev-argobots-wrappers' into 'master'

Dev argobots wrappers

See merge request !4
parents 0deea5b8 46ecd386
add_executable(09_sum_provider server.cpp)
target_link_libraries(09_sum_provider thallium)
add_executable(09_sum_client client.cpp)
target_link_libraries(09_sum_client thallium)
#include <iostream>
#include <thallium/serialization/stl/string.hpp>
#include <thallium.hpp>
namespace tl = thallium;
int main(int argc, char** argv) {
if(argc != 3) {
std::cerr << "Usage: " << argv[0] << " <address> <provider_id>" << std::endl;
exit(0);
}
tl::engine myEngine("tcp", THALLIUM_CLIENT_MODE);
tl::remote_procedure sum = myEngine.define("sum");
tl::remote_procedure prod = myEngine.define("prod");
tl::remote_procedure hello = myEngine.define("hello").ignore_response();
tl::remote_procedure print = myEngine.define("print").ignore_response();
tl::endpoint server = myEngine.lookup(argv[1]);
uint16_t provider_id = atoi(argv[2]);
tl::provider_handle ph(server, provider_id);
int ret = sum.on(ph)(42,63);
std::cout << "(sum) Server answered " << ret << std::endl;
ret = prod.on(ph)(42,63);
std::cout << "(prod) Server answered " << ret << std::endl;
std::string name("Matthieu");
hello.on(ph)(name);
print.on(ph)(name);
return 0;
}
#include <iostream>
#include <thallium.hpp>
#include <thallium/serialization/stl/string.hpp>
namespace tl = thallium;
class my_sum_provider : public tl::provider<my_sum_provider> {
private:
void prod(const tl::request& req, int x, int y) {
std::cout << "Computing " << x << "*" << y << std::endl;
req.respond(x+y);
}
int sum(int x, int y) {
std::cout << "Computing " << x << "+" << y << std::endl;
return x+y;
}
void hello(const std::string& name) {
std::cout << "Hello, " << name << std::endl;
}
int print(const std::string& word) {
std::cout << "Printing " << word << std::endl;
return word.size();
}
public:
my_sum_provider(tl::engine& e, uint16_t provider_id=1)
: tl::provider<my_sum_provider>(e, provider_id) {
define("prod", &my_sum_provider::prod);
define("sum", &my_sum_provider::sum);
define("hello", &my_sum_provider::hello);
define("print", &my_sum_provider::print, tl::ignore_return_value());
}
~my_sum_provider() {
wait_for_finalize();
}
};
int main(int argc, char** argv) {
uint16_t provider_id = 22;
tl::engine myEngine("tcp", THALLIUM_SERVER_MODE);
std::cout << "Server running at address " << myEngine.self()
<< " with provider id " << provider_id << std::endl;
my_sum_provider myProvider(myEngine, provider_id);
return 0;
}
add_executable(10_thread main.cpp)
target_link_libraries(10_thread thallium)
#include <iostream>
#include <thallium.hpp>
namespace tl = thallium;
void hello() {
tl::xstream es = tl::xstream::self();
std::cout << "Hello World from ES "
<< es.get_rank() << ", ULT "
<< tl::thread::self_id() << std::endl;
}
int main(int argc, char** argv) {
tl::abt scope;
std::vector<tl::managed<tl::xstream>> ess;
for(int i=0; i < 4; i++) {
tl::managed<tl::xstream> es = tl::xstream::create();
ess.push_back(std::move(es));
}
std::vector<tl::managed<tl::thread>> ths;
for(int i=0; i < 16; i++) {
tl::managed<tl::thread> th = ess[i % ess.size()]->make_thread(hello);
ths.push_back(std::move(th));
}
for(auto& mth : ths) {
mth->join();
}
for(int i=0; i < 4; i++) {
ess[i]->join();
}
return 0;
}
add_executable(11_task main.cpp)
target_link_libraries(11_task thallium)
#include <iostream>
#include <thallium.hpp>
namespace tl = thallium;
void hello() {
tl::xstream es = tl::xstream::self();
std::cout << "Hello World from ES "
<< es.get_rank() << ", TASK "
<< tl::task::self_id() << std::endl;
}
int main(int argc, char** argv) {
tl::abt scope;
std::vector<tl::managed<tl::xstream>> ess;
tl::xstream primary = tl::xstream::self();
for(int i=0; i < 4; i++) {
tl::managed<tl::xstream> es = tl::xstream::create();
ess.push_back(std::move(es));
}
std::vector<tl::managed<tl::task>> tsks;
for(int i=0; i < 16; i++) {
tl::managed<tl::task> tsk = ess[i % ess.size()]->make_task(hello);
tsks.push_back(std::move(tsk));
}
for(auto& mtsk : tsks) {
mtsk->join();
}
for(int i=0; i < 4; i++) {
ess[i]->join();
}
return 0;
}
add_executable(12_shared_pool main.cpp)
target_link_libraries(12_shared_pool thallium)
#include <iostream>
#include <thallium.hpp>
namespace tl = thallium;
void hello() {
tl::xstream es = tl::xstream::self();
std::cout << "Hello World from ES "
<< es.get_rank() << ", ULT "
<< tl::thread::self_id() << std::endl;
}
int main(int argc, char** argv) {
tl::abt scope;
std::vector<tl::managed<tl::xstream>> ess;
tl::managed<tl::pool> myPool = tl::pool::create(tl::pool::access::spmc);
for(int i=0; i < 4; i++) {
tl::managed<tl::xstream> es
= tl::xstream::create(tl::scheduler::predef::deflt, *myPool);
ess.push_back(std::move(es));
}
std::vector<tl::managed<tl::thread>> ths;
for(int i=0; i < 16; i++) {
tl::managed<tl::thread> th = ess[i % ess.size()]->make_thread(hello);
ths.push_back(std::move(th));
}
for(auto& mth : ths) {
mth->join();
}
for(int i=0; i < 4; i++) {
ess[i]->join();
}
return 0;
}
add_executable(13_mutex main.cpp)
target_link_libraries(13_mutex thallium)
#include <iostream>
#include <unistd.h>
#include <thallium.hpp>
namespace tl = thallium;
int myCounter = 0;
void hello(tl::mutex& mtx) {
tl::xstream es = tl::xstream::self();
mtx.lock();
std::cout << "Hello World from ES "
<< es.get_rank() << ", ULT "
<< tl::thread::self_id()
<< ", counter = " << myCounter << std::endl;
myCounter += 1;
mtx.unlock();
}
int main(int argc, char** argv) {
tl::abt scope;
std::vector<tl::managed<tl::xstream>> ess;
for(int i=0; i < 4; i++) {
tl::managed<tl::xstream> es = tl::xstream::create();
ess.push_back(std::move(es));
}
tl::mutex myMutex;
std::vector<tl::managed<tl::thread>> ths;
for(int i=0; i < 16; i++) {
tl::managed<tl::thread> th
= ess[i % ess.size()]->make_thread([&myMutex]() {
hello(myMutex);
});
ths.push_back(std::move(th));
}
for(auto& mth : ths) {
mth->join();
}
for(int i=0; i < 4; i++) {
ess[i]->join();
}
return 0;
}
add_executable(14_custom_sched main.cpp)
target_link_libraries(14_custom_sched thallium)
#include <iostream>
#include <unistd.h>
#include <deque>
#include <mutex> // to use std::lock_guard
#include <algorithm>
#include <thallium.hpp>
#define NUM_XSTREAMS 1
#define NUM_THREADS 16
namespace tl = thallium;
class my_unit;
class my_pool;
class my_sched;
class my_unit {
tl::thread m_thread;
tl::task m_task;
tl::unit_type m_type;
bool m_in_pool;
friend class my_pool;
public:
my_unit(const tl::thread& t)
: m_thread(t), m_type(tl::unit_type::thread), m_in_pool(false) {}
my_unit(const tl::task& t)
: m_task(t), m_type(tl::unit_type::task), m_in_pool(false) {}
tl::unit_type get_type() const {
return m_type;
}
const tl::thread& get_thread() const {
return m_thread;
}
const tl::task& get_task() const {
return m_task;
}
bool is_in_pool() const {
return m_in_pool;
}
~my_unit() {}
};
class my_pool {
mutable tl::mutex m_mutex;
std::deque<my_unit*> m_units;
public:
my_pool() {}
size_t get_size() const {
std::lock_guard<tl::mutex> lock(m_mutex);
return m_units.size();
}
void push(my_unit* u) {
std::lock_guard<tl::mutex> lock(m_mutex);
u->m_in_pool = true;
m_units.push_back(u);
}
my_unit* pop() {
std::lock_guard<tl::mutex> lock(m_mutex);
if(m_units.empty())
return nullptr;
my_unit* u = m_units.front();
m_units.pop_front();
u->m_in_pool = false;
return u;
}
void remove(my_unit* u) {
std::lock_guard<tl::mutex> lock(m_mutex);
auto it = std::find(m_units.begin(), m_units.end(), u);
if(it != m_units.end()) {
(*it)->m_in_pool = false;
m_units.erase(it);
}
}
~my_pool() {
std::cerr << "Pool destructor " << std::endl;
}
};
class my_scheduler : private tl::scheduler {
public:
template<typename ... Args>
my_scheduler(Args&&... args)
: tl::scheduler(std::forward<Args>(args)...) {}
void run() {
int n = num_pools();
my_unit* unit;
int target;
unsigned seed = time(NULL);
while (1) {
/* Execute one work unit from the scheduler's pool */
unit = get_pool(0).pop<my_unit>();
if(unit != nullptr) {
get_pool(0).run_unit(unit);
} else if (n > 1) {
/* Steal a work unit from other pools */
target = (n == 2) ? 1 : (rand_r(&seed) % (n-1) + 1);
unit = get_pool(target).pop<my_unit>();
if(unit != nullptr)
get_pool(target).run_unit(unit);
}
if(has_to_stop()) break;
tl::xstream::check_events(*this);
}
}
tl::pool get_migr_pool() const {
return get_pool(0);
}
~my_scheduler() {
std::cerr << "scheduler destructor "<< std::endl;
}
};
void hello() {
tl::xstream es = tl::xstream::self();
std::cout << "Hello World from ES "
<< es.get_rank() << ", ULT "
<< tl::thread::self_id()
<< std::endl;
}
int main(int argc, char** argv) {
tl::abt scope;
// create pools
std::vector<tl::managed<tl::pool>> pools;
for(int i=0; i < NUM_XSTREAMS; i++) {
pools.push_back(tl::pool::create<tl::pool::access::mpmc, my_pool, my_unit>());
}
// create schedulers
std::vector<tl::managed<tl::scheduler>> scheds;
for(int i=0; i < NUM_XSTREAMS; i++) {
std::vector<tl::pool> pools_for_sched_i;
for(int j=0; j < pools.size(); j++) {
pools_for_sched_i.push_back(*pools[j+i % pools.size()]);
}
scheds.push_back(tl::scheduler::create<my_scheduler>(pools_for_sched_i.begin(), pools_for_sched_i.end()));
}
std::vector<tl::managed<tl::xstream>> ess;
for(int i=0; i < NUM_XSTREAMS; i++) {
tl::managed<tl::xstream> es = tl::xstream::create(*scheds[i]);
ess.push_back(std::move(es));
}
std::vector<tl::managed<tl::thread>> ths;
for(int i=0; i < NUM_THREADS; i++) {
tl::managed<tl::thread> th
= ess[i % ess.size()]->make_thread([]() {
hello();
});
ths.push_back(std::move(th));
}
for(auto& mth : ths) {
mth->join();
}
for(int i=0; i < NUM_XSTREAMS; i++) {
ess[i]->join();
}
return 0;
}
......@@ -7,3 +7,9 @@ add_subdirectory(05_stl)
add_subdirectory(06_custom)
add_subdirectory(07_rdma)
add_subdirectory(08_async)
add_subdirectory(09_provider)
add_subdirectory(10_threads)
add_subdirectory(11_tasks)
add_subdirectory(12_shared_pool)
add_subdirectory(13_mutex)
add_subdirectory(14_custom_sched)
......@@ -2,6 +2,7 @@
#define __THALLIUM_HPP
#include <margo.h>
#include <thallium/abt.hpp>
#include <thallium/bulk_mode.hpp>
#include <thallium/bulk.hpp>
#include <thallium/engine.hpp>
......@@ -9,5 +10,22 @@
#include <thallium/remote_procedure.hpp>
#include <thallium/callable_remote_procedure.hpp>
#include <thallium/remote_bulk.hpp>
#include <thallium/provider.hpp>
#include <thallium/provider_handle.hpp>
#include <thallium/xstream.hpp>
#include <thallium/barrier.hpp>
#include <thallium/condition_variable.hpp>
#include <thallium/eventual.hpp>
#include <thallium/thread.hpp>
#include <thallium/unit_type.hpp>
#include <thallium/pool.hpp>
#include <thallium/scheduler.hpp>
#include <thallium/mutex.hpp>
#include <thallium/rwlock.hpp>
#include <thallium/exception.hpp>
#include <thallium/timer.hpp>
#include <thallium/future.hpp>
#include <thallium/xstream_barrier.hpp>
#include <thallium/self.hpp>
#endif
/*
* Copyright (c) 2017 UChicago Argonne, LLC
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __THALLIUM_ABT_HPP
#define __THALLIUM_ABT_HPP
#include <thallium/exception.hpp>
#include <thallium/abt_errors.hpp>
namespace thallium {
/**
* Exception class thrown by the abt class.
*/
class abt_exception : public exception {
public:
template<typename ... Args>
abt_exception(Args&&... args)
: exception(std::forward<Args>(args)...) {}
};
#define TL_ABT_EXCEPTION(__fun,__ret) \
abt_exception(#__fun," returned ", abt_error_get_name(__ret),\
" (", abt_error_get_description(__ret),") in ",__FILE__,":",__LINE__)
#define TL_ABT_ASSERT(__call) {\
int __ret = __call; \
if(__ret != ABT_SUCCESS) {\
throw TL_ABT_EXCEPTION(__call, __ret);\
}\
}
class abt {
public:
abt() {
initialize();
}
~abt() {
finalize();
}
/**
* @brief Initialize the Argobots execution environment.
*/
static void initialize() {
TL_ABT_ASSERT(ABT_init(0, nullptr));
}
/**
* @brief Check whether Argobots has been initialized.
*
* @return true if Argobots has been initialized.
*/
static bool initialized() {
return ABT_initialized() == ABT_TRUE;
}
/**
* @brief Finalizes Argobots.
*/
static void finalize() {
TL_ABT_ASSERT(ABT_finalize());
}
};
}
#undef TL_ABT_EXCEPTION
#undef TL_ABT_ASSERT
#endif