Commit a17bcd7c authored by Matthieu Dorier's avatar Matthieu Dorier

working hello world

parent b13b2417
......@@ -21,9 +21,10 @@ class margo_engine {
private:
margo_instance_id m_mid;
bool m_is_server;
template<typename F>
static hg_return_t rpc_callback(hg_handle_t handle) {
static void rpc_handler_ult(hg_handle_t handle) {
using G = std::remove_reference_t<F>;
const struct hg_info* info = margo_get_info(handle);
margo_instance_id mid = margo_hg_handle_get_instance(handle);
......@@ -33,17 +34,40 @@ private:
buffer input;
margo_get_input(handle, &input);
(*f)(req, input);
return HG_SUCCESS;
margo_free_input(handle, &input);
}
template<typename F>
static hg_return_t rpc_callback(hg_handle_t handle) {
int ret;
ABT_pool pool;
margo_instance_id mid;
const struct hg_info *hgi = margo_get_info(handle);
mid = margo_hg_handle_get_instance(handle);
if(mid == MARGO_INSTANCE_NULL) {
return HG_OTHER_ERROR;
}
ret = margo_lookup_mplex(mid, hgi->id, hgi->target_id, &pool);
if(ret != HG_SUCCESS) {
return HG_INVALID_PARAM;
}
ret = ABT_thread_create(pool, (void (*)(void *)) rpc_handler_ult<F>, handle, ABT_THREAD_ATTR_NULL, NULL);
if(ret != 0) {
return HG_NOMEM_ERROR;
}
return HG_SUCCESS;
}
public:
margo_engine(const std::string& addr, int mode,
bool use_progress_thread = false,
std::int32_t rpc_thread_count = 0) {
m_is_server = (mode == MARGO_SERVER_MODE);
m_mid = margo_init(addr.c_str(), mode,
use_progress_thread,
use_progress_thread ? 1 : 0,
rpc_thread_count);
// TODO throw exception if m_mid is null
}
......@@ -54,8 +78,10 @@ public:
margo_engine& operator=(const margo_engine& other) = delete;
~margo_engine() {
// TODO throw an exception if following call fails
margo_wait_for_finalize(m_mid);
if(m_is_server) {
// TODO throw an exception if following call fails
margo_wait_for_finalize(m_mid);
}
}
void finalize() {
......
......@@ -29,7 +29,7 @@ public:
callable_remote_procedure on(const endpoint& ep) const;
callable_remote_procedure operator,(const endpoint& ep) const;
callable_remote_procedure operator>>(const endpoint& ep) const;
};
}
......
......@@ -7,7 +7,7 @@ callable_remote_procedure remote_procedure::on(const endpoint& ep) const {
return callable_remote_procedure(m_id, ep);
}
callable_remote_procedure remote_procedure::operator,(const endpoint& ep) const {
callable_remote_procedure remote_procedure::operator>>(const endpoint& ep) const {
return on(ep);
}
......
......@@ -18,14 +18,14 @@ int server() {
tl::margo_engine me("bmi+tcp://127.0.0.1:1234", MARGO_SERVER_MODE);
me.define("hello1", hello);
me.define("hello2", [](const tl::request& req, const tl::buffer& input)
me.define("hello2", [&me](const tl::request& req, const tl::buffer& input)
{ std::cout << "(2) Hello World ";
for(auto c : input) std::cout << c;
std::cout << std::endl; });
std::cout << std::endl;
me.finalize(); });
std::string addr = me.self();
std::cout << "Server running at address " << addr << std::endl;
// TODO send address to client
return 0;
}
......@@ -43,10 +43,10 @@ int client() {
tl::buffer b(16,'a');
(remote_hello1, server_endpoint)(b);
(remote_hello1 >> server_endpoint)(b);
remote_hello2.on(server_endpoint)(b);
return 0;
}
......@@ -59,6 +59,7 @@ int main(int argc, char** argv) {
if(rank == 0) server();
else client();
std::cout << "rank " << rank << " finished its work" << std::endl;
MPI_Finalize();
return 0;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment