Commit b1df16ac authored by Matthieu Dorier's avatar Matthieu Dorier

Merge branch 'dev-margo-internal' into 'master'

Separated internal structures into a header

See merge request !30
parents f3ca56d8 48fb2647
/*
* (C) 2020 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __MARGO_INTERNAL_H
#define __MARGO_INTERNAL_H
#include <assert.h>
#include <unistd.h>
#include <errno.h>
#include <abt.h>
#include <stdlib.h>
#include <margo-config.h>
#include <time.h>
#include <math.h>
#include "margo.h"
#include "margo-bulk-util.h"
#include "margo-timer.h"
#include "utlist.h"
#include "uthash.h"
/* Structure to store timing information */
struct diag_data
{
/* breadcrumb stats */
breadcrumb_stats stats;
/* origin or target */
breadcrumb_type type;
uint64_t rpc_breadcrumb; /* identifier for rpc and it's ancestors */
struct global_breadcrumb_key key;
/* used to combine rpc_breadcrumb, addr_hash and provider_id to create a unique key for HASH_ADD inside margo_breadcrumb_measure */
__uint128_t x;
/*sparkline data for breadcrumb */
double sparkline_time[100];
double sparkline_count[100];
UT_hash_handle hh; /* hash table link */
};
struct margo_handle_cache_el
{
hg_handle_t handle;
UT_hash_handle hh; /* in-use hash link */
struct margo_handle_cache_el *next; /* free list link */
};
struct margo_finalize_cb
{
const void* owner;
void(*callback)(void*);
void* uargs;
struct margo_finalize_cb* next;
};
struct margo_timer_list; /* defined in margo-timer.c */
/* Stores the name and rpc id of a registered RPC. We track this purely for
* debugging and instrumentation purposes
*/
struct margo_registered_rpc
{
hg_id_t id; /* rpc id */
uint64_t rpc_breadcrumb_fragment; /* fragment id used in rpc tracing */
char func_name[64]; /* string name of rpc */
struct margo_registered_rpc *next; /* pointer to next in list */
};
struct margo_instance
{
/* mercury/argobots state */
hg_context_t *hg_context;
hg_class_t *hg_class;
ABT_pool handler_pool;
ABT_pool progress_pool;
/* internal to margo for this particular instance */
int margo_init;
ABT_thread hg_progress_tid;
ABT_thread sparkline_data_collection_tid;
int hg_progress_shutdown_flag;
ABT_xstream progress_xstream;
int owns_progress_pool;
ABT_xstream *rpc_xstreams;
int num_handler_pool_threads;
unsigned int hg_progress_timeout_ub;
uint16_t num_registered_rpcs; /* number of registered rpc's by all providers on this instance */
/* list of rpcs registered on this instance for debugging and profiling purposes */
struct margo_registered_rpc *registered_rpcs;
/* control logic for callers waiting on margo to be finalized */
int finalize_flag;
int refcount;
ABT_mutex finalize_mutex;
ABT_cond finalize_cond;
struct margo_finalize_cb* finalize_cb;
struct margo_finalize_cb* prefinalize_cb;
/* control logic to prevent margo_finalize from destroying
the instance when some operations are pending */
unsigned pending_operations;
ABT_mutex pending_operations_mtx;
int finalize_requested;
/* control logic for shutting down */
hg_id_t shutdown_rpc_id;
int enable_remote_shutdown;
/* timer data */
struct margo_timer_list* timer_list;
/* linked list of free hg handles and a hash of in-use handles */
struct margo_handle_cache_el *free_handle_list;
struct margo_handle_cache_el *used_handle_hash;
ABT_mutex handle_cache_mtx; /* mutex protecting access to above caches */
/* optional diagnostics data tracking */
/* NOTE: technically the following fields are subject to races if they
* are updated from more than one thread at a time. We will be careful
* to only update the counters from the progress_fn,
* which will serialize access.
*/
int diag_enabled;
unsigned int profile_enabled;
uint64_t self_addr_hash;
double previous_sparkline_data_collection_time;
uint16_t sparkline_index;
struct diag_data diag_trigger_elapsed;
struct diag_data diag_progress_elapsed_zero_timeout;
struct diag_data diag_progress_elapsed_nonzero_timeout;
struct diag_data diag_progress_timeout_value;
struct diag_data diag_bulk_create_elapsed;
struct diag_data *diag_rpc;
ABT_mutex diag_rpc_mutex;
};
struct margo_request_struct {
ABT_eventual eventual;
margo_timer_t* timer;
hg_handle_t handle;
double start_time; /* timestamp of when the operation started */
uint64_t rpc_breadcrumb; /* statistics tracking identifier, if applicable */
uint64_t server_addr_hash; /* hash of globally unique string addr of margo server instance */
uint16_t provider_id; /* id of the provider servicing the request, local to the margo server instance */
};
struct margo_rpc_data
{
margo_instance_id mid;
ABT_pool pool;
void* user_data;
void (*user_free_callback)(void *);
};
struct lookup_cb_evt
{
hg_return_t hret;
hg_addr_t addr;
};
typedef struct
{
hg_handle_t handle;
} margo_forward_timeout_cb_dat;
typedef struct
{
ABT_mutex mutex;
ABT_cond cond;
char is_asleep;
} margo_thread_sleep_cb_dat;
#endif
......@@ -15,6 +15,7 @@
#include <math.h>
#include "margo.h"
#include "margo-internal.h"
#include "margo-bulk-util.h"
#include "margo-timer.h"
#include "utlist.h"
......@@ -35,28 +36,6 @@ static int g_num_margo_instances = 0; // how many margo instances exist
static ABT_mutex g_num_margo_instances_mtx = ABT_MUTEX_NULL; // mutex for above global variable
static int g_margo_abt_init = 0;
/* Structure to store timing information */
struct diag_data
{
/* breadcrumb stats */
breadcrumb_stats stats;
/* origin or target */
breadcrumb_type type;
uint64_t rpc_breadcrumb; /* identifier for rpc and it's ancestors */
struct global_breadcrumb_key key;
/* used to combine rpc_breadcrumb, addr_hash and provider_id to create a unique key for HASH_ADD inside margo_breadcrumb_measure */
__uint128_t x;
/*sparkline data for breadcrumb */
double sparkline_time[100];
double sparkline_count[100];
UT_hash_handle hh; /* hash table link */
};
/* key for Argobots thread-local storage to track RPC breadcrumbs across thread
* execution
*/
......@@ -73,120 +52,6 @@ do {\
if(__data.stats.min == 0 || (__time) < __data.stats.min) __data.stats.min = (__time); \
} while(0)
struct margo_handle_cache_el
{
hg_handle_t handle;
UT_hash_handle hh; /* in-use hash link */
struct margo_handle_cache_el *next; /* free list link */
};
struct margo_finalize_cb
{
const void* owner;
void(*callback)(void*);
void* uargs;
struct margo_finalize_cb* next;
};
struct margo_timer_list; /* defined in margo-timer.c */
/* Stores the name and rpc id of a registered RPC. We track this purely for
* debugging and instrumentation purposes
*/
struct margo_registered_rpc
{
hg_id_t id; /* rpc id */
uint64_t rpc_breadcrumb_fragment; /* fragment id used in rpc tracing */
char func_name[64]; /* string name of rpc */
struct margo_registered_rpc *next; /* pointer to next in list */
};
struct margo_instance
{
/* mercury/argobots state */
hg_context_t *hg_context;
hg_class_t *hg_class;
ABT_pool handler_pool;
ABT_pool progress_pool;
/* internal to margo for this particular instance */
int margo_init;
ABT_thread hg_progress_tid;
ABT_thread sparkline_data_collection_tid;
int hg_progress_shutdown_flag;
ABT_xstream progress_xstream;
int owns_progress_pool;
ABT_xstream *rpc_xstreams;
int num_handler_pool_threads;
unsigned int hg_progress_timeout_ub;
uint16_t num_registered_rpcs; /* number of registered rpc's by all providers on this instance */
/* list of rpcs registered on this instance for debugging and profiling purposes */
struct margo_registered_rpc *registered_rpcs;
/* control logic for callers waiting on margo to be finalized */
int finalize_flag;
int refcount;
ABT_mutex finalize_mutex;
ABT_cond finalize_cond;
struct margo_finalize_cb* finalize_cb;
struct margo_finalize_cb* prefinalize_cb;
/* control logic to prevent margo_finalize from destroying
the instance when some operations are pending */
unsigned pending_operations;
ABT_mutex pending_operations_mtx;
int finalize_requested;
/* control logic for shutting down */
hg_id_t shutdown_rpc_id;
int enable_remote_shutdown;
/* timer data */
struct margo_timer_list* timer_list;
/* linked list of free hg handles and a hash of in-use handles */
struct margo_handle_cache_el *free_handle_list;
struct margo_handle_cache_el *used_handle_hash;
ABT_mutex handle_cache_mtx; /* mutex protecting access to above caches */
/* optional diagnostics data tracking */
/* NOTE: technically the following fields are subject to races if they
* are updated from more than one thread at a time. We will be careful
* to only update the counters from the progress_fn,
* which will serialize access.
*/
int diag_enabled;
unsigned int profile_enabled;
uint64_t self_addr_hash;
double previous_sparkline_data_collection_time;
uint16_t sparkline_index;
struct diag_data diag_trigger_elapsed;
struct diag_data diag_progress_elapsed_zero_timeout;
struct diag_data diag_progress_elapsed_nonzero_timeout;
struct diag_data diag_progress_timeout_value;
struct diag_data diag_bulk_create_elapsed;
struct diag_data *diag_rpc;
ABT_mutex diag_rpc_mutex;
};
struct margo_request_struct {
ABT_eventual eventual;
margo_timer_t* timer;
hg_handle_t handle;
double start_time; /* timestamp of when the operation started */
uint64_t rpc_breadcrumb; /* statistics tracking identifier, if applicable */
uint64_t server_addr_hash; /* hash of globally unique string addr of margo server instance */
uint16_t provider_id; /* id of the provider servicing the request, local to the margo server instance */
};
struct margo_rpc_data
{
margo_instance_id mid;
ABT_pool pool;
void* user_data;
void (*user_free_callback)(void *);
};
MERCURY_GEN_PROC(margo_shutdown_out_t, ((int32_t)(ret)))
static void hg_progress_fn(void* foo);
......@@ -1033,12 +898,6 @@ hg_return_t margo_registered_disabled_response(
return HG_SUCCESS;
}
struct lookup_cb_evt
{
hg_return_t hret;
hg_addr_t addr;
};
static hg_return_t margo_addr_lookup_cb(const struct hg_cb_info *info)
{
struct lookup_cb_evt evt;
......@@ -1218,11 +1077,6 @@ static hg_return_t margo_wait_internal(margo_request req)
return(hret);
}
typedef struct
{
hg_handle_t handle;
} margo_forward_timeout_cb_dat;
static void margo_forward_timeout_cb(void *arg)
{
margo_request req = (margo_request)arg;
......@@ -1662,13 +1516,6 @@ hg_return_t margo_bulk_itransfer(
return(hret);
}
typedef struct
{
ABT_mutex mutex;
ABT_cond cond;
char is_asleep;
} margo_thread_sleep_cb_dat;
static void margo_thread_sleep_cb(void *arg)
{
margo_thread_sleep_cb_dat *sleep_cb_dat =
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment