Commit b13b2417 authored by Matthieu Dorier's avatar Matthieu Dorier

added buffers

parent 334b849b
......@@ -68,7 +68,7 @@ public:
}
template<typename ... T>
auto operator()(T&& ... t) const {
auto operator()(const T& ... t) const {
// TODO throw an exception if handle is null
// buffer input;
// BufferOutputArchive arch(input);
......@@ -77,12 +77,18 @@ public:
//auto input = std::tie(t...);
margo_forward(m_handle, nullptr);//&input);
/*
Buffer output;
*/
// Buffer output;
//margo_get_output(m_handle, &output);
return true;//Pack(std::move(output));
}
auto operator()(const buffer& buf) const {
margo_forward(m_handle, const_cast<void*>(static_cast<const void*>(&buf)));
return true;
}
};
}
......
......@@ -30,7 +30,9 @@ private:
void* data = margo_registered_data(mid, info->id);
auto f = function_cast<G>(data);
request req(handle);
(*f)(req);
buffer input;
margo_get_input(handle, &input);
(*f)(req, input);
return HG_SUCCESS;
}
......@@ -77,6 +79,7 @@ public:
} // namespace thallium
#include <thallium/remote_procedure.hpp>
#include <thallium/serialization.hpp>
namespace thallium {
......@@ -84,8 +87,8 @@ template<typename F>
remote_procedure margo_engine::define(const std::string& name) {
// TODO throw an exception if the following call fails
hg_id_t id = margo_register_name(m_mid, name.c_str(),
nullptr, //proc_vector<char>,
nullptr, //proc_vector<char>,
serialize_buffer,
serialize_buffer,
nullptr);
margo_registered_disable_response(m_mid, id, HG_TRUE);
return remote_procedure(id);
......@@ -95,8 +98,8 @@ template<typename F>
remote_procedure margo_engine::define(const std::string& name, F&& fun) {
// TODO throw an exception if the following call fails
hg_id_t id = margo_register_name(m_mid, name.c_str(),
nullptr,//serialize<Args...>,
nullptr, //proc_vector<char>,
serialize_buffer,
serialize_buffer,
rpc_callback<decltype(fun)>);
margo_register_data(m_mid, id, void_cast(&fun), nullptr);
margo_registered_disable_response(m_mid, id, HG_TRUE);
......
......@@ -9,41 +9,7 @@ namespace thallium {
namespace proc {
//using namespace std;
template <>
hg_return_t process_type<buffer>(hg_proc_t proc, void* data, std::size_t& size) {
buffer *vec = static_cast<buffer*>(data);
std::size_t num_T = 0;
size = 0;
hg_return_t hret;
switch(hg_proc_get_op(proc)) {
case HG_ENCODE:
num_T = vec->size();
hret = hg_proc_memcpy(proc, &num_T, sizeof(num_T));
if (hret != HG_SUCCESS) return hret;
size += sizeof(num_T);
if (num_T > 0)
hret = hg_proc_memcpy(proc, vec->data(), num_T);
if (hret != HG_SUCCESS) return hret;
size += num_T;
break;
case HG_DECODE:
hret = hg_proc_memcpy(proc, &num_T, sizeof(num_T));
if (hret != HG_SUCCESS) return hret;
size += sizeof(num_T);
if (num_T > 0) {
vec->resize(num_T);
hret = hg_proc_memcpy(proc, vec->data(), num_T);
if (hret != HG_SUCCESS) return hret;
size += num_T;
}
break;
case HG_FREE:
return HG_SUCCESS;
}
return HG_SUCCESS;
}
hg_return_t process_buffer(hg_proc_t proc, void* data, std::size_t& size);
} // namespace proc
......
#ifndef __THALLIUM_SERIALIZATION_HPP
#define __THALLIUM_SERIALIZATION_HPP
#include <cstdint>
#include <mercury_proc.h>
namespace thallium {
......@@ -32,10 +33,7 @@ hg_return_t serialize(hg_proc_t proc, void* data) {
return proc::process_type<std::tuple<T...>>(proc, data, size);
}
hg_return_t serialize_buffer(hg_proc_t proc, void* data) {
std::size_t size;
return proc::process_type<buffer>(proc,data, size);
}
hg_return_t serialize_buffer(hg_proc_t proc, void* data);
} // namespace thallium
......
......@@ -4,10 +4,12 @@
#
# list of source files
set(thallium-src endpoint.cpp
set(thallium-src serialization.cpp
endpoint.cpp
margo_engine.cpp
remote_procedure.cpp
callable_remote_procedure.cpp)
callable_remote_procedure.cpp
proc_buffer.cpp)
# load package helper for generating cmake CONFIG packages
include (CMakePackageConfigHelpers)
......
#include <thallium/proc_buffer.hpp>
namespace thallium {
namespace proc {
hg_return_t process_buffer(hg_proc_t proc, void* data, std::size_t& size) {
buffer *vec = static_cast<buffer*>(data);
std::size_t num_T = 0;
size = 0;
hg_return_t hret;
switch(hg_proc_get_op(proc)) {
case HG_ENCODE:
num_T = vec->size();
hret = hg_proc_memcpy(proc, &num_T, sizeof(num_T));
if (hret != HG_SUCCESS) return hret;
size += sizeof(num_T);
if (num_T > 0)
hret = hg_proc_memcpy(proc, vec->data(), num_T);
if (hret != HG_SUCCESS) return hret;
size += num_T;
break;
case HG_DECODE:
hret = hg_proc_memcpy(proc, &num_T, sizeof(num_T));
if (hret != HG_SUCCESS) return hret;
size += sizeof(num_T);
if (num_T > 0) {
vec->resize(num_T);
hret = hg_proc_memcpy(proc, vec->data(), num_T);
if (hret != HG_SUCCESS) return hret;
size += num_T;
}
break;
case HG_FREE:
return HG_SUCCESS;
}
return HG_SUCCESS;
}
} // namespace proc
} // namespace thallium
#include <thallium/serialization.hpp>
namespace thallium {
hg_return_t serialize_buffer(hg_proc_t proc, void* data) {
std::size_t size;
return proc::process_buffer(proc,data, size);
}
} // namespace thallium
......@@ -8,15 +8,20 @@
namespace tl = thallium;
void hello(const tl::request& req) {
std::cout << "(1) Hello World" << std::endl;
void hello(const tl::request& req, const tl::buffer& input) {
std::cout << "(1) Hello World ";
for(auto c : input) std::cout << c;
std::cout << std::endl;
}
int server() {
tl::margo_engine me("bmi+tcp://127.0.0.1:1234", MARGO_SERVER_MODE);
me.define("hello1", hello);
me.define("hello2", [](const tl::request& req) { std::cout << "(2) Hello World" << std::endl; });
me.define("hello2", [](const tl::request& req, const tl::buffer& input)
{ std::cout << "(2) Hello World ";
for(auto c : input) std::cout << c;
std::cout << std::endl; });
std::string addr = me.self();
std::cout << "Server running at address " << addr << std::endl;
......@@ -29,16 +34,18 @@ int client() {
tl::margo_engine me("bmi+tcp", MARGO_CLIENT_MODE);
auto remote_hello1 = me.define<decltype(hello)>("hello1");
auto remote_hello2 = me.define<void(const tl::request& req)>("hello2");
auto remote_hello2 = me.define<void(const tl::request&, const tl::buffer&)>("hello2");
std::string server_addr = "bmi+tcp://127.0.0.1:1234";
sleep(1);
auto server_endpoint = me.lookup(server_addr);
std::cout << "Lookup done for endpoint " << (std::string)server_endpoint << std::endl;
tl::buffer b(16,'a');
(remote_hello1, server_endpoint)();
(remote_hello1, server_endpoint)(b);
remote_hello2.on(server_endpoint)();
remote_hello2.on(server_endpoint)(b);
return 0;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment