Commit e64a72ff authored by Matthieu Dorier's avatar Matthieu Dorier

mostly done with argobots wrappers

parent 066acc19
add_executable(10_thread main.cpp)
target_link_libraries(10_thread thallium)
#include <iostream>
#include <thallium.hpp>
namespace tl = thallium;
void hello() {
tl::xstream es = tl::xstream::self();
std::cout << "Hello World from ES "
<< es.get_rank() << ", ULT "
<< tl::thread::self_id() << std::endl;
}
int main(int argc, char** argv) {
tl::engine myEngine("tcp", THALLIUM_CLIENT_MODE);
std::vector<tl::managed<tl::xstream>> ess;
for(int i=0; i < 4; i++) {
tl::managed<tl::xstream> es = tl::xstream::create();
ess.push_back(std::move(es));
}
std::vector<tl::managed<tl::thread>> ths;
for(int i=0; i < 16; i++) {
tl::managed<tl::thread> th = ess[i % ess.size()]->make_thread(hello);
ths.push_back(std::move(th));
}
for(auto& mth : ths) {
mth->join();
}
for(int i=0; i < 4; i++) {
ess[i]->join();
}
return 0;
}
add_executable(11_task main.cpp)
target_link_libraries(11_task thallium)
#include <iostream>
#include <thallium.hpp>
namespace tl = thallium;
void hello() {
tl::xstream es = tl::xstream::self();
std::cout << "Hello World from ES "
<< es.get_rank() << ", TASK "
<< tl::task::self_id() << std::endl;
}
int main(int argc, char** argv) {
tl::engine myEngine("tcp", THALLIUM_CLIENT_MODE);
std::vector<tl::managed<tl::xstream>> ess;
tl::xstream primary = tl::xstream::self();
for(int i=0; i < 4; i++) {
tl::managed<tl::xstream> es = tl::xstream::create();
ess.push_back(std::move(es));
}
std::vector<tl::managed<tl::task>> tsks;
for(int i=0; i < 16; i++) {
tl::managed<tl::task> tsk = ess[i % ess.size()]->make_task(hello);
tsks.push_back(std::move(tsk));
}
for(auto& mtsk : tsks) {
mtsk->join();
}
for(int i=0; i < 4; i++) {
ess[i]->join();
}
return 0;
}
add_executable(12_shared_pool main.cpp)
target_link_libraries(12_shared_pool thallium)
#include <iostream>
#include <thallium.hpp>
namespace tl = thallium;
void hello() {
tl::xstream es = tl::xstream::self();
std::cout << "Hello World from ES "
<< es.get_rank() << ", ULT "
<< tl::thread::self_id() << std::endl;
}
int main(int argc, char** argv) {
tl::engine myEngine("tcp", THALLIUM_CLIENT_MODE);
std::vector<tl::managed<tl::xstream>> ess;
tl::managed<tl::pool> myPool = tl::pool::create(tl::pool::access::spmc);
for(int i=0; i < 4; i++) {
tl::managed<tl::xstream> es
= tl::xstream::create(tl::scheduler::predef::deflt, *myPool);
ess.push_back(std::move(es));
}
std::vector<tl::managed<tl::thread>> ths;
for(int i=0; i < 16; i++) {
tl::managed<tl::thread> th = ess[i % ess.size()]->make_thread(hello);
ths.push_back(std::move(th));
}
for(auto& mth : ths) {
mth->join();
}
for(int i=0; i < 4; i++) {
ess[i]->join();
}
return 0;
}
add_executable(13_mutex main.cpp)
target_link_libraries(13_mutex thallium)
#include <iostream>
#include <unistd.h>
#include <thallium.hpp>
namespace tl = thallium;
int myCounter = 0;
void hello(tl::mutex& mtx) {
tl::xstream es = tl::xstream::self();
mtx.lock();
std::cout << "Hello World from ES "
<< es.get_rank() << ", ULT "
<< tl::thread::self_id()
<< ", counter = " << myCounter << std::endl;
myCounter += 1;
mtx.unlock();
}
int main(int argc, char** argv) {
tl::engine myEngine("tcp", THALLIUM_CLIENT_MODE);
std::vector<tl::managed<tl::xstream>> ess;
for(int i=0; i < 4; i++) {
tl::managed<tl::xstream> es = tl::xstream::create();
ess.push_back(std::move(es));
}
tl::mutex myMutex;
std::vector<tl::managed<tl::thread>> ths;
for(int i=0; i < 16; i++) {
tl::managed<tl::thread> th
= ess[i % ess.size()]->make_thread([&myMutex]() {
hello(myMutex);
});
ths.push_back(std::move(th));
}
for(auto& mth : ths) {
mth->join();
}
for(int i=0; i < 4; i++) {
ess[i]->join();
}
return 0;
}
add_executable(14_custom_sched main.cpp)
target_link_libraries(14_custom_sched thallium)
#include <iostream>
#include <unistd.h>
#include <deque>
#include <mutex> // to use std::lock_guard
#include <algorithm>
#include <thallium.hpp>
#define NUM_XSTREAMS 1
#define NUM_THREADS 16
namespace tl = thallium;
class my_unit;
class my_pool;
class my_sched;
class my_unit {
tl::thread m_thread;
tl::task m_task;
tl::pool::unit_type m_type;
bool m_in_pool;
friend class my_pool;
public:
my_unit(const tl::thread& t)
: m_thread(t), m_type(tl::pool::unit_type::thread), m_in_pool(false) {}
my_unit(const tl::task& t)
: m_task(t), m_type(tl::pool::unit_type::task), m_in_pool(false) {}
tl::pool::unit_type get_type() const {
return m_type;
}
const tl::thread& get_thread() const {
return m_thread;
}
const tl::task& get_task() const {
return m_task;
}
bool is_in_pool() const {
return m_in_pool;
}
~my_unit() {}
};
class my_pool {
mutable tl::mutex m_mutex;
std::deque<my_unit*> m_units;
public:
my_pool() {}
size_t get_size() const {
std::lock_guard<tl::mutex> lock(m_mutex);
return m_units.size();
}
void push(my_unit* u) {
std::lock_guard<tl::mutex> lock(m_mutex);
u->m_in_pool = true;
m_units.push_back(u);
}
my_unit* pop() {
std::lock_guard<tl::mutex> lock(m_mutex);
if(m_units.empty())
return nullptr;
my_unit* u = m_units.front();
m_units.pop_front();
u->m_in_pool = false;
return u;
}
void remove(my_unit* u) {
std::lock_guard<tl::mutex> lock(m_mutex);
auto it = std::find(m_units.begin(), m_units.end(), u);
if(it != m_units.end()) {
(*it)->m_in_pool = false;
m_units.erase(it);
}
}
~my_pool() {
std::cerr << "Pool destructor " << std::endl;
}
};
class my_scheduler : private tl::scheduler {
public:
template<typename ... Args>
my_scheduler(Args&&... args)
: tl::scheduler(std::forward<Args>(args)...) {}
void run() {
int n = num_pools();
my_unit* unit;
int target;
unsigned seed = time(NULL);
while (1) {
/* Execute one work unit from the scheduler's pool */
unit = get_pool(0).pop<my_unit>();
if(unit != nullptr) {
get_pool(0).run_unit(unit);
} else if (n > 1) {
/* Steal a work unit from other pools */
target = (n == 2) ? 1 : (rand_r(&seed) % (n-1) + 1);
unit = get_pool(target).pop<my_unit>();
if(unit != nullptr)
get_pool(target).run_unit(unit);
}
if(has_to_stop()) break;
tl::xstream::check_events(*this);
}
}
tl::pool get_migr_pool() const {
return get_pool(0);
}
~my_scheduler() {
std::cerr << "scheduler destructor "<< std::endl;
}
};
void hello() {
tl::xstream es = tl::xstream::self();
std::cout << "Hello World from ES "
<< es.get_rank() << ", ULT "
<< tl::thread::self_id()
<< std::endl;
}
int main(int argc, char** argv) {
tl::engine myEngine("tcp", THALLIUM_CLIENT_MODE);
// create pools
std::vector<tl::managed<tl::pool>> pools;
for(int i=0; i < NUM_XSTREAMS; i++) {
pools.push_back(tl::pool::create<tl::pool::access::mpmc, my_pool, my_unit>());
}
// create schedulers
std::vector<tl::managed<tl::scheduler>> scheds;
for(int i=0; i < NUM_XSTREAMS; i++) {
std::vector<tl::pool> pools_for_sched_i;
for(int j=0; j < pools.size(); j++) {
pools_for_sched_i.push_back(*pools[j+i % pools.size()]);
}
scheds.push_back(tl::scheduler::create<my_scheduler>(pools_for_sched_i.begin(), pools_for_sched_i.end()));
}
std::vector<tl::managed<tl::xstream>> ess;
for(int i=0; i < NUM_XSTREAMS; i++) {
tl::managed<tl::xstream> es = tl::xstream::create(*scheds[i]);
ess.push_back(std::move(es));
}
std::vector<tl::managed<tl::thread>> ths;
for(int i=0; i < NUM_THREADS; i++) {
tl::managed<tl::thread> th
= ess[i % ess.size()]->make_thread([]() {
hello();
});
ths.push_back(std::move(th));
}
for(auto& mth : ths) {
mth->join();
}
for(int i=0; i < NUM_XSTREAMS; i++) {
ess[i]->join();
}
return 0;
}
......@@ -8,3 +8,8 @@ add_subdirectory(06_custom)
add_subdirectory(07_rdma)
add_subdirectory(08_async)
add_subdirectory(09_provider)
add_subdirectory(10_threads)
add_subdirectory(11_tasks)
add_subdirectory(12_shared_pool)
add_subdirectory(13_mutex)
add_subdirectory(14_custom_sched)
......@@ -11,5 +11,16 @@
#include <thallium/remote_bulk.hpp>
#include <thallium/provider.hpp>
#include <thallium/provider_handle.hpp>
#include <thallium/xstream.hpp>
#include <thallium/barrier.hpp>
#include <thallium/condition_variable.hpp>
#include <thallium/eventual.hpp>
#include <thallium/thread.hpp>
#include <thallium/pool.hpp>
#include <thallium/scheduler.hpp>
#include <thallium/mutex.hpp>
#include <thallium/rwlock.hpp>
#include <thallium/exception.hpp>
#include <thallium/timer.hpp>
#endif
#ifndef __THALLIUM_ABT_ERRORS_HPP
#define __THALLIUM_ABT_ERRORS_HPP
namespace thallium {
/**
* @brief For internal use. Converts an error code
* returned by an Argobots function into a string name.
*
* @param err Error code
*
* @return Name of the error.
*/
const char* abt_error_get_name(int err);
/**
* @brief For internal use. Converts an error code
* returned by an Argobots function into a string description.
*
* @param err Error code
*
* @return Description of the error.
*/
const char* abt_error_get_description(int err);
}
#endif
/*
* Copyright (c) 2017 UChicago Argonne, LLC
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __THALLIUM_BARRIER_HPP
#define __THALLIUM_BARRIER_HPP
#include <abt.h>
#include <thallium/abt_errors.hpp>
#include <thallium/exception.hpp>
namespace thallium {
/**
* Exception class thrown by the barrier class.
*/
class barrier_exception : public exception {
public:
template<typename ... Args>
barrier_exception(Args&&... args)
: exception(std::forward<Args>(args)...) {}
};
#define TL_BARRIER_EXCEPTION(__fun,__ret) \
barrier_exception(#__fun," returned ", abt_error_get_name(__ret),\
" (", abt_error_get_description(__ret),") in ",__FILE__,":",__LINE__);
#define TL_BARRIER_ASSERT(__call) {\
int __ret = __call; \
if(__ret != ABT_SUCCESS) {\
throw TL_BARRIER_EXCEPTION(__call, __ret);\
}\
}
/**
* @brief Wrapper for Argobots' ABT_barrier.
*/
class barrier {
ABT_barrier m_barrier;
public:
/**
* @brief Native handle type (ABT_barrier)
*/
typedef ABT_barrier native_handle_type;
/**
* @brief Constructor.
*
* @param num_waiters Number of waiters.
*/
explicit barrier(uint32_t num_waiters) {
TL_BARRIER_ASSERT(ABT_barrier_create(num_waiters, &m_barrier));
}
/**
* @brief Copy constructor is deleted.
*/
barrier(const barrier& other) = delete;
/**
* @brief Copy assignment operator is deleted.
*/
barrier& operator=(const barrier& other) = delete;
/**
* @brief Move assignment operator.
*
* If the right and left operands are different,
* this method will free the left operand's resource (if
* necessary), and assign it the right operand's resource.
* The right operand will be invalidated.
*/
barrier& operator=(barrier&& other) {
if(this == &other) return *this;
if(m_barrier != ABT_BARRIER_NULL) {
TL_BARRIER_ASSERT(ABT_barrier_free(&m_barrier));
}
m_barrier = other.m_barrier;
other.m_barrier = ABT_BARRIER_NULL;
return *this;
}
/**
* @brief Move constructor. The right operand
* will be invalidated.
*
* @param other barrier object to move from.
*/
barrier(barrier&& other)
: m_barrier(other.m_barrier) {
other.m_barrier = ABT_BARRIER_NULL;
}
/**
* @brief Destructor.
*/
~barrier() {
if(m_barrier != ABT_BARRIER_NULL)
ABT_barrier_free(&m_barrier);
}
/**
* @brief Reinitializes the barrier for a given
* number of waiters.
*
* @param num_waiters Number of waiters.
*/
void reinit(uint32_t num_waiters) {
if(m_barrier == ABT_BARRIER_NULL) {
TL_BARRIER_ASSERT(ABT_barrier_create(num_waiters, &m_barrier));
} else {
TL_BARRIER_ASSERT(ABT_barrier_reinit(m_barrier, num_waiters));
}
}
/**
* @brief Waits on the barrier.
*/
void wait() {
TL_BARRIER_ASSERT(ABT_barrier_wait(m_barrier));
}
/**
* @brief Get the number of waiters that the barrier
* is expecting (passed to the constructor or to reinit).
*
* @return The number of waiters.
*/
uint32_t get_num_waiters() const {
uint32_t n;
TL_BARRIER_ASSERT(ABT_barrier_get_num_waiters(m_barrier, &n));
return n;
}
/**
* @brief Get the underlying ABT_barrier handle.
*
* @return the underlying ABT_barrier handle.
*/
native_handle_type native_handle() const noexcept {
return m_barrier;
}
};
}
#undef TL_BARRIER_EXCEPTION
#undef TL_BARRIER_ASSERT
#endif /* end of include guard */
/*
* Copyright (c) 2017 UChicago Argonne, LLC
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __THALLIUM_CONDITION_VARIABLE_HPP
#define __THALLIUM_CONDITION_VARIABLE_HPP
#include <abt.h>
#include <mutex>
#include <thallium/mutex.hpp>
#include <thallium/exception.hpp>
namespace thallium {
/**
* Exception class thrown by the condition_variable class.
*/
class condition_variable_exception : public exception {
public:
template<typename ... Args>
condition_variable_exception(Args&&... args)
: exception(std::forward<Args>(args)...) {}
};
#define TL_CV_EXCEPTION(__fun,__ret) \
condition_variable_exception(#__fun," returned ", abt_error_get_name(__ret),\
" (", abt_error_get_description(__ret),") in ",__FILE__,":",__LINE__);
#define TL_CV_ASSERT(__call) {\
int __ret = __call; \
if(__ret != ABT_SUCCESS) {\
throw TL_CV_EXCEPTION(__call, __ret);\
}\
}
/**
* @brief Wrapper for Argobots' ABT_cond.
*/
class condition_variable {
ABT_cond m_cond;
public:
/**
* @brief Native handle type.
*/
typedef ABT_cond native_handle_type;
/**
* @brief Returns the underlying ABT_cond handle.
*
* @return the underlying ABT_cond handle.
*/
native_handle_type native_handle() const noexcept {
return m_cond;
}
/**
* @brief Constructor.
*/
condition_variable() {
TL_CV_ASSERT(ABT_cond_create(&m_cond));
}
/**
* @brief Destructor.
*/
~condition_variable() {
if(m_cond != ABT_COND_NULL)
ABT_cond_free(&m_cond);
}
/**
* @brief Copy constructor is deleted.
*/
condition_variable(const condition_variable&) = delete;
/**
* @brief Copy assignment operator is deleted.
*/
condition_variable& operator=(const condition_variable&) = delete;
/**
* @brief Move assignment operator. If the left and right
* operands are different, this will move the right
* condition_variable's resources to the left condition_variable,
* leaving the right one invalid.
*/
condition_variable& operator=(condition_variable&& other) {
if(this == &other) return *this;
if(m_cond != ABT_COND_NULL) {
TL_CV_ASSERT(ABT_cond_free(&m_cond));
}
m_cond = other.m_cond;
other.m_cond = ABT_COND_NULL;
return *this;
}
/**
* @brief Move constructor. This function will invalidate
* the passed condition_variable.
*/
condition_variable(condition_variable&& other)
: m_cond(other.m_cond) {
other.m_cond = ABT_COND_NULL;
}
/**
* @brief Wait on a condition variable.
*
* @param lock Mutex to lock when the condition is satisfied.
*/
void wait(std::unique_lock<mutex>& lock) {
TL_CV_ASSERT(ABT_cond_wait(m_cond, lock.mutex()->native_handle()));
}