Commit 97291013 authored by Matthieu Dorier's avatar Matthieu Dorier

added exception when margo calls fail

parent fd5f43b6
......@@ -11,6 +11,7 @@
#include <vector>
#include <margo.h>
#include <thallium/endpoint.hpp>
#include <thallium/margo_exception.hpp>
namespace thallium {
......@@ -141,7 +142,8 @@ public:
*/
bulk(const bulk& other)
: m_engine(other.m_engine), m_bulk(other.m_bulk), m_is_local(other.m_is_local) {
margo_bulk_ref_incr(m_bulk);
hg_return_t ret = margo_bulk_ref_incr(m_bulk);
MARGO_ASSERT(ret, margo_bulk_ref_incr);
}
/**
......@@ -158,13 +160,15 @@ public:
bulk& operator=(const bulk& other) {
if(this == &other) return *this;
if(m_bulk != HG_BULK_NULL) {
margo_bulk_free(m_bulk);
hg_return_t ret = margo_bulk_free(m_bulk);
MARGO_ASSERT(ret, margo_bulk_free);
}
m_bulk = other.m_bulk;
m_engine = other.m_engine;
m_is_local = other.m_is_local;
if(m_bulk != HG_BULK_NULL) {
margo_bulk_ref_incr(m_bulk);
hg_return_t ret = margo_bulk_ref_incr(m_bulk);
MARGO_ASSERT(ret, margo_bulk_ref_incr);
}
return *this;
}
......@@ -175,7 +179,8 @@ public:
bulk& operator=(bulk&& other) {
if(this == &other) return *this;
if(m_bulk != HG_BULK_NULL) {
margo_bulk_free(m_bulk);
hg_return_t ret = margo_bulk_free(m_bulk);
MARGO_ASSERT(ret, margo_bulk_free);
}
m_engine = other.m_engine;
m_bulk = other.m_bulk;
......@@ -187,9 +192,10 @@ public:
/**
* @brief Destructor.
*/
~bulk() {
~bulk() throw(margo_exception) {
if(m_bulk != HG_BULK_NULL) {
margo_bulk_free(m_bulk);
hg_return_t ret = margo_bulk_free(m_bulk);
MARGO_ASSERT(ret, margo_bulk_free);
}
}
......@@ -276,8 +282,8 @@ public:
} else {
hg_size_t s = margo_bulk_get_serialize_size(m_bulk, HG_TRUE);
std::vector<char> buf(s);
margo_bulk_serialize(&buf[0], s, HG_TRUE, m_bulk);
// XXX check return values
hg_return_t ret = margo_bulk_serialize(&buf[0], s, HG_TRUE, m_bulk);
MARGO_ASSERT(ret, margo_bulk_serialize);
ar & buf;
}
}
......@@ -308,8 +314,8 @@ void bulk::load(A& ar) {
ar & buf;
if(buf.size() > 0) {
m_engine = &(ar.get_engine());
margo_bulk_deserialize(m_engine->m_mid, &m_bulk, &buf[0], buf.size());
// XXX check return value
hg_return_t ret = margo_bulk_deserialize(m_engine->m_mid, &m_bulk, &buf[0], buf.size());
MARGO_ASSERT(ret, margo_bulk_deserialize);
m_is_local = false;
}
}
......
......@@ -15,6 +15,7 @@
#include <thallium/serialization/serialize.hpp>
#include <thallium/serialization/stl/vector.hpp>
#include <thallium/serialization/buffer_output_archive.hpp>
#include <thallium/margo_exception.hpp>
namespace thallium {
......@@ -57,11 +58,15 @@ private:
* @return a packed_response object from which the returned value can be deserialized.
*/
packed_response forward(const buffer& buf) const {
margo_forward(m_handle, const_cast<void*>(static_cast<const void*>(&buf)));
hg_return_t ret;
ret = margo_forward(m_handle, const_cast<void*>(static_cast<const void*>(&buf)));
MARGO_ASSERT(ret, margo_forward);
buffer output;
if(m_ignore_response) return packed_response(std::move(output), *m_engine);
margo_get_output(m_handle, &output);
margo_free_output(m_handle, &output); // won't do anything on a buffer type
ret = margo_get_output(m_handle, &output);
MARGO_ASSERT(ret, margo_get_output);
ret = margo_free_output(m_handle, &output); // won't do anything on a buffer type
MARGO_ASSERT(ret, margo_free_output);
return packed_response(std::move(output), *m_engine);
}
......@@ -71,12 +76,15 @@ public:
* @brief Copy-constructor.
*/
callable_remote_procedure(const callable_remote_procedure& other) {
if(m_handle != HG_HANDLE_NULL) {
margo_destroy(m_handle);
hg_return_t ret;
if(m_handle != HG_HANDLE_NULL) {
ret = margo_destroy(m_handle);
MARGO_ASSERT(ret, margo_destroy);
}
m_handle = other.m_handle;
if(m_handle != HG_HANDLE_NULL) {
margo_ref_incr(m_handle);
ret = margo_ref_incr(m_handle);
MARGO_ASSERT(ret, margo_ref_incr);
}
}
......@@ -85,7 +93,8 @@ public:
*/
callable_remote_procedure(callable_remote_procedure&& other) {
if(m_handle != HG_HANDLE_NULL) {
margo_destroy(m_handle);
hg_return_t ret = margo_destroy(m_handle);
MARGO_ASSERT(ret, margo_destroy);
}
m_handle = other.m_handle;
other.m_handle = HG_HANDLE_NULL;
......@@ -95,12 +104,15 @@ public:
* @brief Copy-assignment operator.
*/
callable_remote_procedure& operator=(const callable_remote_procedure& other) {
hg_return_t ret;
if(&other == this) return *this;
if(m_handle != HG_HANDLE_NULL) {
margo_destroy(m_handle);
ret = margo_destroy(m_handle);
MARGO_ASSERT(ret, margo_destroy);
}
m_handle = other.m_handle;
margo_ref_incr(m_handle);
ret = margo_ref_incr(m_handle);
MARGO_ASSERT(ret, margo_ref_incr);
return *this;
}
......@@ -111,7 +123,8 @@ public:
callable_remote_procedure& operator=(callable_remote_procedure&& other) {
if(&other == this) return *this;
if(m_handle != HG_HANDLE_NULL) {
margo_destroy(m_handle);
hg_return_t ret = margo_destroy(m_handle);
MARGO_ASSERT(ret, margo_destroy);
}
m_handle = other.m_handle;
other.m_handle = HG_HANDLE_NULL;
......@@ -121,9 +134,10 @@ public:
/**
* @brief Destructor.
*/
~callable_remote_procedure() {
~callable_remote_procedure() throw(margo_exception) {
if(m_handle != HG_HANDLE_NULL) {
margo_destroy(m_handle);
hg_return_t ret = margo_destroy(m_handle);
MARGO_ASSERT(ret, margo_destroy);
}
}
......
......@@ -9,6 +9,7 @@
#include <cstdint>
#include <string>
#include <margo.h>
#include <thallium/margo_exception.hpp>
namespace thallium {
......@@ -77,7 +78,7 @@ public:
/**
* @brief Destructor.
*/
~endpoint();
~endpoint() throw(margo_exception);
/**
* @brief Creates a string representation of the endpoint's address.
......
......@@ -87,9 +87,12 @@ private:
auto f = function_cast<G>(cb_data->m_function);
request req(*(cb_data->m_engine), handle, disable_response);
buffer input;
margo_get_input(handle, &input);
hg_return_t ret;
ret = margo_get_input(handle, &input);
MARGO_ASSERT(ret, margo_get_input);
(*f)(req, input);
margo_free_input(handle, &input);
ret = margo_free_input(handle, &input);
MARGO_ASSERT(ret, margo_free_input);
}
/**
......@@ -143,7 +146,7 @@ public:
m_mid = margo_init(addr.c_str(), mode,
use_progress_thread ? 1 : 0,
rpc_thread_count);
// TODO throw exception if m_mid is null
// XXX throw an exception if m_mid not initialized
m_owns_mid = true;
}
......@@ -177,9 +180,8 @@ public:
/**
* @brief Destructor.
*/
~engine() {
~engine() throw(margo_exception) {
if(m_is_server && m_owns_mid) {
// TODO an exception if following call fails
margo_wait_for_finalize(m_mid);
}
}
......@@ -189,7 +191,6 @@ public:
* @brief Finalize the engine. Can be called by any thread.
*/
void finalize() {
// TODO an exception if the following call fails
margo_finalize(m_mid);
}
......@@ -278,7 +279,6 @@ namespace thallium {
template<typename ... Args>
remote_procedure engine::define(const std::string& name,
const std::function<void(const request&, Args...)>& fun) {
// TODO throw an exception if the following call fails
hg_id_t id = margo_register_name(m_mid, name.c_str(),
process_buffer,
......@@ -301,8 +301,9 @@ remote_procedure engine::define(const std::string& name,
cb_data->m_engine = this;
cb_data->m_function = void_cast(&m_rpcs[id]);
margo_register_data(m_mid, id, (void*)cb_data, free_rpc_callback_data);
hg_return_t ret = margo_register_data(m_mid, id, (void*)cb_data, free_rpc_callback_data);
MARGO_ASSERT(ret, margo_register_data);
return remote_procedure(*this, id);
}
......
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __THALLIUM_MARGO_EXCEPTION_HPP
#define __THALLIUM_MARGO_EXCEPTION_HPP
#include <exception>
#include <stdexcept>
#include <string>
#include <sstream>
#include <margo.h>
namespace thallium {
/**
* @brief Exception class used when margo functions fail.
*/
class margo_exception : public std::exception {
private:
std::ostringstream m_content;
public:
/**
* @brief Constructor.
*
* @param function Name of the function that failed.
* @param file Name of the file where the exception was thrown.
* @param line Line in the file.
* @param message Additional message.
*/
margo_exception(const std::string& function, const std::string file, unsigned line,
const std::string& message = std::string()) {
m_content << "[" << file << ":" << line << "][" << function << "] "
<< message;
}
/**
* @brief Return the error message.
*
* @return error message associated with the exception.
*/
virtual const char* what() const throw() {
return m_content.str().c_str();
}
};
std::string translate_margo_error_code(hg_return_t ret);
#define MARGO_THROW(__fun__,__msg__) do {\
throw margo_exception(#__fun__,__FILE__,__LINE__,__msg__); \
} while(0);
#define MARGO_ASSERT(__ret__, __fun__) do {\
if(__ret__ != HG_SUCCESS) { \
std::stringstream msg; \
msg << "Function returned "; \
msg << translate_margo_error_code(__ret__); \
throw margo_exception(#__fun__, __FILE__, __LINE__, msg.str());\
}\
} while(0);
}
#endif
......@@ -7,6 +7,7 @@
#define __THALLIUM_REQUEST_HPP
#include <margo.h>
#include <thallium/margo_exception.hpp>
#include <thallium/serialization/serialize.hpp>
#include <thallium/serialization/buffer_output_archive.hpp>
......@@ -50,7 +51,8 @@ public:
*/
request(const request& other)
: m_engine(other.m_engine), m_handle(other.m_handle), m_disable_response(other.m_disable_response) {
margo_ref_incr(m_handle);
hg_return_t ret = margo_ref_incr(m_handle);
MARGO_ASSERT(ret, margo_ref_incr);
}
/**
......@@ -66,11 +68,14 @@ public:
*/
request& operator=(const request& other) {
if(m_handle == other.m_handle) return *this;
margo_destroy(m_handle);
hg_return_t ret;
ret = margo_destroy(m_handle);
MARGO_ASSERT(ret, margo_destroy);
m_engine = other.m_engine;
m_handle = other.m_handle;
m_disable_response = other.m_disable_response;
margo_ref_incr(m_handle);
ret = margo_ref_incr(m_handle);
MARGO_ASSERT(ret, margo_ref_incr);
return *this;
}
......@@ -90,8 +95,9 @@ public:
/**
* @brief Destructor.
*/
~request() {
margo_destroy(m_handle);
~request() throw(margo_exception) {
hg_return_t ret = margo_destroy(m_handle);
MARGO_ASSERT(ret, margo_destroy);
}
/**
......@@ -109,7 +115,8 @@ public:
buffer b;
buffer_output_archive arch(b, *m_engine);
serialize_many(arch, std::forward<T>(t)...);
margo_respond(m_handle, &b);
hg_return_t ret = margo_respond(m_handle, &b);
MARGO_ASSERT(ret, margo_respond);
}
}
......
......@@ -11,6 +11,7 @@ set(thallium-src bulk.cpp
request.cpp
remote_bulk.cpp
callable_remote_procedure.cpp
margo_exception.cpp
proc_buffer.cpp)
# load package helper for generating cmake CONFIG packages
......
......@@ -4,6 +4,7 @@
* See COPYRIGHT in top-level directory.
*/
#include <thallium/margo_exception.hpp>
#include <thallium/callable_remote_procedure.hpp>
#include <thallium/endpoint.hpp>
#include <thallium/engine.hpp>
......@@ -13,8 +14,8 @@ namespace thallium {
callable_remote_procedure::callable_remote_procedure(engine& e, hg_id_t id, const endpoint& ep, bool ignore_resp)
: m_engine(&e) {
m_ignore_response = ignore_resp;
// TODO throw exception if this call fails
margo_create(ep.m_engine->m_mid, ep.m_addr, id, &m_handle);
hg_return_t ret = margo_create(ep.m_engine->m_mid, ep.m_addr, id, &m_handle);
MARGO_ASSERT(ret, margo_create);
}
}
......@@ -5,26 +5,31 @@
*/
#include <thallium/endpoint.hpp>
#include <thallium/engine.hpp>
#include <thallium/margo_exception.hpp>
namespace thallium {
endpoint::endpoint(const endpoint& other)
: m_engine(other.m_engine) {
if(other.m_addr != HG_ADDR_NULL) {
margo_addr_dup(m_engine->m_mid, other.m_addr, &m_addr);
hg_return_t ret = margo_addr_dup(m_engine->m_mid, other.m_addr, &m_addr);
MARGO_ASSERT(ret, margo_addr_dup);
} else {
m_addr = HG_ADDR_NULL;
}
}
endpoint& endpoint::operator=(const endpoint& other) {
hg_return_t ret;
if(&other == this) return *this;
if(m_addr != HG_ADDR_NULL) {
margo_addr_free(m_engine->m_mid, m_addr);
ret = margo_addr_free(m_engine->m_mid, m_addr);
MARGO_ASSERT(ret, margo_addr_free);
}
m_engine = other.m_engine;
if(other.m_addr != HG_ADDR_NULL) {
margo_addr_dup(m_engine->m_mid, other.m_addr, &m_addr);
ret = margo_addr_dup(m_engine->m_mid, other.m_addr, &m_addr);
MARGO_ASSERT(ret, margo_addr_dup);
} else {
m_addr = HG_ADDR_NULL;
}
......@@ -34,7 +39,8 @@ endpoint& endpoint::operator=(const endpoint& other) {
endpoint& endpoint::operator=(endpoint&& other) {
if(&other == this) return *this;
if(m_addr != HG_ADDR_NULL) {
margo_addr_free(m_engine->m_mid, m_addr);
hg_return_t ret = margo_addr_free(m_engine->m_mid, m_addr);
MARGO_ASSERT(ret, margo_addr_free);
}
m_engine = other.m_engine;
m_addr = other.m_addr;
......@@ -42,9 +48,10 @@ endpoint& endpoint::operator=(endpoint&& other) {
return *this;
}
endpoint::~endpoint() {
endpoint::~endpoint() throw(margo_exception) {
if(m_addr != HG_ADDR_NULL) {
margo_addr_free(m_engine->m_mid, m_addr);
hg_return_t ret = margo_addr_free(m_engine->m_mid, m_addr);
MARGO_ASSERT(ret, margo_addr_free);
}
}
......@@ -53,12 +60,14 @@ endpoint::operator std::string() const {
if(m_addr == HG_ADDR_NULL) return std::string();
hg_size_t size;
margo_addr_to_string(m_engine->m_mid, NULL, &size, m_addr);
hg_return_t ret = margo_addr_to_string(m_engine->m_mid, NULL, &size, m_addr);
MARGO_ASSERT(ret, margo_addr_to_string);
std::string result(size+1,' ');
size += 1;
margo_addr_to_string(m_engine->m_mid, &result[0], &size, m_addr);
ret = margo_addr_to_string(m_engine->m_mid, &result[0], &size, m_addr);
MARGO_ASSERT(ret, margo_addr_to_string);
return result;
}
......
......@@ -15,20 +15,19 @@ namespace thallium {
endpoint engine::lookup(const std::string& address) const {
hg_addr_t addr;
margo_addr_lookup(m_mid, address.c_str(), &addr);
// TODO throw exception if address is not known
hg_return_t ret = margo_addr_lookup(m_mid, address.c_str(), &addr);
MARGO_ASSERT(ret, margo_addr_lookup);
return endpoint(const_cast<engine&>(*this), addr);
}
endpoint engine::self() const {
hg_addr_t self_addr;
margo_addr_self(m_mid, &self_addr);
// TODO throw an exception if this call fails
hg_return_t ret = margo_addr_self(m_mid, &self_addr);
MARGO_ASSERT(ret, margo_addr_self);
return endpoint(const_cast<engine&>(*this), self_addr);
}
remote_procedure engine::define(const std::string& name) {
// TODO throw an exception if the following call fails
hg_id_t id = margo_register_name(m_mid, name.c_str(),
process_buffer,
process_buffer,
......@@ -46,7 +45,7 @@ bulk engine::expose(const std::vector<std::pair<void*,size_t>>& segments, bulk_m
buf_sizes[i] = segments[i].second;
}
hg_return_t ret = margo_bulk_create(m_mid, count, &buf_ptrs[0], &buf_sizes[0], (hg_uint32_t)flag, &handle);
// TODO throw an exception if ret != HG_SUCCESS
MARGO_ASSERT(ret, margo_bulk_create);
return bulk(*this, handle, true);
}
......
/*
* (C) 2017 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <margo.h>
#include <thallium/margo_exception.hpp>
namespace thallium {
std::string translate_margo_error_code(hg_return_t ret) {
switch(ret){
case HG_SUCCESS: /*!< operation succeeded */
return "HG_SUCCESS";
case HG_NA_ERROR: /*!< error in NA layer */
return "HG_NA_ERROR";
case HG_TIMEOUT: /*!< reached timeout */
return "HG_TIMEOUT";
case HG_INVALID_PARAM: /*!< invalid parameter */
return "HG_INVALID_PARAM";
case HG_SIZE_ERROR: /*!< size error */
return "HG_SIZE_ERROR";
case HG_NOMEM_ERROR: /*!< no memory error */
return "HG_NOMEM_ERROR";
case HG_PROTOCOL_ERROR: /*!< protocol does not match */
return "HG_PROTOCOL_ERROR";
case HG_NO_MATCH: /*!< no function match */
return "HG_NO_MATCH";
case HG_CHECKSUM_ERROR: /*!< checksum error */
return "HG_CHECKSUM_ERROR";
case HG_CANCELED: /*!< operation was canceled */
return "HG_CANCELED";
case HG_OTHER_ERROR: /*!< error from mercury_util or external to mercury */
return "HG_OTHER_ERROR";
}
return "Unknown error";
}
}
......@@ -24,6 +24,7 @@ std::size_t remote_bulk::operator>>(const bulk::bulk_segment& dest) const {
hg_return_t ret = margo_bulk_transfer(mid, op,
origin_addr, origin_handle, origin_offset,
local_handle, local_offset, size);
MARGO_ASSERT(ret, margo_bulk_transfer);
return size;
}
......@@ -44,6 +45,7 @@ std::size_t remote_bulk::operator<<(const bulk::bulk_segment& src) const {
hg_return_t ret = margo_bulk_transfer(mid, op,
origin_addr, origin_handle, origin_offset,
local_handle, local_offset, size);
MARGO_ASSERT(ret, margo_bulk_transfer);
return size;
}
......
......@@ -7,7 +7,8 @@ namespace thallium {
endpoint request::get_endpoint() const {
const struct hg_info* info = margo_get_info(m_handle);
hg_addr_t addr;
margo_addr_dup(m_engine->m_mid, info->addr, &addr);
hg_return_t ret = margo_addr_dup(m_engine->m_mid, info->addr, &addr);
MARGO_ASSERT(ret, margo_addr_dup);
return endpoint(*m_engine, addr);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment