Commit ece5745b authored by Sridutt Bhalachandra's avatar Sridutt Bhalachandra

[Feature] MPI_NRM library supports aggregation

Added aggregation support to MPI_NRM library. It uses the modified
downstream API to transmit compute and total time of a phase.

The NRM damper value is used for aggregation, and phases smaller than
this value are aggregated. The _aggregation parameter keeps track of
the number of phases that are aggregated for the total time to be
greater than the damper value.

See Issue #2
parent 2970011f
......@@ -45,14 +45,6 @@ in `containers.py` at the moment.
The need to manually update will be fixed in the future.
`export OperationMode=0`
to not transmit application context information.
or
`export OperationMode=1`
to transmit application context information.
## Additional Info
Report bugs to Sridutt Bhalachandra (<sriduttb@anl.gov>)
/* Filename: mpi_nrm.cpp
*
* Description: This Message Passing Interface (MPI) libary allows
* Description: This Message Passing Interface(MPI) libary allows
* application of runtime policies for energy efficiency through the MPI
* standard profiling interface (PMPI).
* standard profiling interface(PMPI).
*
* The current implementation passes phase contextual information (compute and
* barrier times) to the Argo Node Resource Manager (NRM). The NRM using this
* The current implementation passes phase contextual information(compute and
* barrier times) to the Argo Node Resource Manager(NRM). The NRM using this
* contextual information invokes power policies to improve energy efficiency
* of the node.
*
*
* Written by Sridutt Bhalachandra, sriduttb@anl.gov
*/
......@@ -27,7 +27,7 @@ void handle_signal(int sig_num)
/************************
* Setup up the MPI NRM Interface
***********************/
extern "C" void MPI_energy_init()
extern "C" void MPI_nrm_init()
{
signal(SIGTERM, handle_signal);
signal(SIGINT, handle_signal);
......@@ -36,41 +36,84 @@ extern "C" void MPI_energy_init()
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
if (getenv ("NRM_TRANSMIT"))
if(getenv("NRM_TRANSMIT"))
{
transmit = atoi (getenv ("NRM_TRANSMIT"));
_transmit = atoi(getenv("NRM_TRANSMIT"));
}
else
if(getenv("NRM_DAMPER"))
{
// Dont transmit application context information
transmit = 0;
_damper = atof(getenv("NRM_DAMPER"));
}
// Initialize context to communicate with Argo Node Resource Manager(NRM)
// TODO: Change hard coded uuid
// TODO: Change hard coded application uuid
nrm_init(&ctxt, "mpi_nrm");
return;
}
extern "C" void MPI_energy_fini()
extern "C" void MPI_nrm_fini()
{
// Cleanup NRM context
nrm_fini(&ctxt);
}
/************************
* Prints the transmission statistics for an application
***********************/
extern "C" void MPI_nrm_print_stats(void)
{
printf("Stats: CPU %u Damper %lf DamperAggreations %u\n", cpu, _damper,
_damperAggregationCount);
return;
}
/************************
* Send appropriate phase context to NRM
***********************/
extern "C" void transmit_to_nrm(int cpu, uint64_t *startCompute, uint64_t
endCompute, uint64_t startBarrier, uint64_t endBarrier)
{
uint64_t computeTime, barrierTime, totalPhaseTime;
// Time spent in computation, barrier and total
computeTime = endCompute - *startCompute;
barrierTime = endBarrier - startBarrier;
totalPhaseTime = computeTime + barrierTime;
// Aggregate phases smaller than the damper value set
if(totalPhaseTime < _damper)
{
// Keep track if the value being transmitted in the future is an
// aggregation of smaller phases
_aggregation++;
_damperAggregationCount++;
return;
}
// Send context to NRM
nrm_send_phase_context(&ctxt, cpu, _aggregation, computeTime, totalPhaseTime);
// Reset
_aggregation = 0;
*startCompute = return_current_time();
return;
}
int MPI_Init(int *argc, char ***argv)
{
startCompute = return_current_time();
int ret_value = PMPI_Init(argc, argv);
MPI_energy_init();
MPI_nrm_init();
return ret_value;
}
int MPI_Finalize(void)
{
MPI_energy_fini();
MPI_nrm_fini();
return PMPI_Finalize();
}
......@@ -116,12 +159,10 @@ int MPI_Barrier(MPI_Comm comm)
uint64_t endBarrier = return_current_time();
if(transmit)
if(_transmit)
{
nrm_send_phase_context(&ctxt, cpu, startCompute, endCompute, startBarrier,
endBarrier);
transmit_to_nrm(cpu, &startCompute, endCompute, startBarrier, endBarrier);
}
startCompute = return_current_time();
return ret_value;
}
......@@ -183,12 +224,10 @@ int MPI_Allreduce(const void *sendbuf, void *recvbuf, int count,
uint64_t endBarrier = return_current_time();
if(transmit)
if(_transmit)
{
nrm_send_phase_context(&ctxt, cpu, startCompute, endCompute, startBarrier,
endBarrier);
transmit_to_nrm(cpu, &startCompute, endCompute, startBarrier, endBarrier);
}
startCompute = return_current_time();
return ret_value;
}
......
......@@ -18,11 +18,18 @@
#include "downstream_api.h"
// Set to non-zero value to transmit to NRM using NRM_TRANSMIT environment variable
static int transmit;
//static int skip = 0; // Skips transmit if 1 - User-annotation
static unsigned int _transmit = 0;
// Phase shorter than this will be aggregated - Set using NRM_DAMPER environ (in
// seconds)
static double _damper = 0.01;
int cpu, rank;
// used to measure the length of a computation phase
// Book-keeping and Statistics
static unsigned int _aggregation = 0;
static unsigned int _damperAggregationCount = 0;
unsigned int cpu;
int rank;
// used to measure the computation time during a phase
uint64_t startCompute, endCompute;
struct nrm_context ctxt;
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment