Commit 7d3633bb authored by Richard Zamora's avatar Richard Zamora
Browse files

adding topology api to the exerciser code to test the effect of aggregator...

adding topology api to the exerciser code to test the effect of aggregator selection on the collective performance
parent 704b1a8d
......@@ -13,6 +13,7 @@
* ...Each dimension of curNEls is then multiplied by each dimension of bufMult, and the process is repeated.
*
* The exerciser takes the following parameters (all default to 0 or false):
* --topohint: Whether to use topology for aggregator selection and pass MPI hint
* --metacoll: Whether to set meta data collective usage
* --derivedtype: Whether to create a second data set containing a derived type.
* --addattr: Whether to add attributes to group 1.
......@@ -72,8 +73,9 @@
#include <sys/sysinfo.h>
#include <sys/resource.h>
#endif
#include "topology.h"
#define METACOLOK // Are collective functions available?
//#define METACOLOK // Are collective functions available?
//#define USING_CCIO // Using CCIO branch of HDF5?
// MPI_CHECK will display a custom error message as well as an error string
......@@ -146,6 +148,8 @@ int main( int argc, char* argv[] )
{
int nprocs, rank, i, j;
int64_t agg_size;
int64_t agg_count;
const uint64_t safebuf = (uint64_t) 2*GB; // Must leave (at least) 2 GiB free
// parse command line
// defaults are 0
......@@ -158,6 +162,7 @@ int main( int argc, char* argv[] )
int numDims = 0;
int useChunked = 0;
int usemem = 0;
int topohint = 0;
double memadd = 0.0;
int maxcheck_set = 0;
// defaults are 1
......@@ -205,6 +210,8 @@ int main( int argc, char* argv[] )
useMetaDataCollectives = 1;
else if (strcmp(argv[i],"--nowrite") == 0)
doWriteat = 0;
else if (strcmp(argv[i],"--topohint") == 0)
topohint = 1;
else if (strcmp(argv[i],"--derivedtype") == 0)
addDerivedTypeDataset = 1;
else if (strcmp(argv[i],"--perf") == 0)
......@@ -360,13 +367,48 @@ int main( int argc, char* argv[] )
MPI_Info mpiHints = MPI_INFO_NULL;
// set the parallel driver - don't repetatively do this as overhead is quite large for some mpi implementations and most
// set the parallel driver - don't repetatively do this as overhead is quite large for some mpi implementations and most
// apps will just do this once
accessPropList = H5Pcreate(H5P_FILE_ACCESS);
H5Pset_fapl_mpio(accessPropList, comm, mpiHints);
fd = H5Fcreate(testFileName, H5F_ACC_TRUNC,createPropList, accessPropList);
rc = H5Fclose(fd);
// If we want to select the aggregator list with topology in mind,
// lets create an aggregator list and pass a "hint" here.
if (topohint) {
// Call the topology API to get collecive-buffer count and size
get_cb_props( &agg_size, &agg_count, testFileName);
// Now create a fake data set that will have all ranks writing to all aggs
int64_t chunk_size = agg_size * agg_count; // Assume all to all
int64_t num_chunks = 1; // Assume single chunk
int64_t dperiod = (int64_t) (chunk_size * nprocs);
int64_t* data_size = (int64_t*) malloc(sizeof(int64_t*) * (num_chunks) );
int64_t* offset_size = (int64_t*) malloc(sizeof(int64_t*) * (num_chunks) );
for (i=0; i<num_chunks; i++) {
data_size[i] = (int64_t) (chunk_size);
offset_size[i] = (int64_t) (i * dperiod + rank * chunk_size);
}
char agg_array[80];
get_cb_config_list ( data_size, offset_size, num_chunks, &agg_array[0], agg_size, agg_count );
printf("Aggregator List:\n%s\n", agg_array);
MPI_Barrier(comm);
free(data_size);
free(offset_size);
// Set the MPIO cb_config_list Hint
MPI_Info_create(&mpiHints);
MPI_Info_set(mpiHints, "cb_config_list", agg_array);
// Re-open the property list to pass the hint
H5Pclose(accessPropList);
accessPropList = H5Pcreate(H5P_FILE_ACCESS);
H5Pset_fapl_mpio(accessPropList, comm, mpiHints);
}
#ifdef METACOLOK
if (useMetaDataCollectives) {
hbool_t metaDataCollWrite = true;
......@@ -598,6 +640,7 @@ int main( int argc, char* argv[] )
char attr1Buf[ATTRIBUTE_SIZE] = {"This is my attribute string."};
char attributeName[NAME_LENGTH];
startTime = MPI_Wtime();
if (addAttributes) {
MPI_Barrier(comm);
startTime = MPI_Wtime();
......@@ -610,8 +653,8 @@ int main( int argc, char* argv[] )
H5Aclose(attr1id);
H5Sclose(attr1DS);
}
attrTime += (MPI_Wtime()-startTime);
}
attrTime += (MPI_Wtime()-startTime);
// create prop list for parallel io transfer
xferPropList = H5Pcreate(H5P_DATASET_XFER);
......@@ -769,7 +812,7 @@ int main( int argc, char* argv[] )
groupTime += (MPI_Wtime()-startTime);
if (fd < 0) {
printf("H5Fopen error - fd is %lld\n",(hid_t)fd);
printf("H5Fopen error - fd is %d\n",(hid_t)fd);
exit(1);
}
......@@ -780,14 +823,14 @@ int main( int argc, char* argv[] )
dataSetTime += (MPI_Wtime()-startTime);
if (dataSet < 0) {
printf("H5Dopen error - dataSet is %lld\n",(hid_t)dataSet);
printf("H5Dopen error - dataSet is %d\n",(hid_t)dataSet);
exit(1);
}
fileDataSpace = H5Dget_space(dataSet);
if (fileDataSpace < 0) {
printf("H5Dget_space error - fileDataSpace is %lld\n",(hid_t)fileDataSpace);
printf("H5Dget_space error - fileDataSpace is %d\n",(hid_t)fileDataSpace);
exit(1);
}
dtype_id = H5Dget_type(dataSet);
......@@ -963,7 +1006,7 @@ int main( int argc, char* argv[] )
getmoments(&maxOtherCloseTime[0], NUM_ITERATIONS, &MOMOtherCloseTime[0]);
getmoments(&minRawWriteBDWTH[0], NUM_ITERATIONS, &MOMRawWriteBDWTH[0]);
getmoments(&minRawReadBDWTH[0], NUM_ITERATIONS, &MOMRawReadBDWTH[0]);
#ifdef USING_CCIO
#ifdef USING_CCIO
if(callCustomPerf){
getmoments(&sumTotalCustomAggTime[0], NUM_ITERATIONS, &MOMsumTotalCustomAggTime[0]);
getmoments(&sumTotalMPIFileWriteAtTime[0], NUM_ITERATIONS, &MOMsumTotalMPIFileWriteAtTime[0]);
......@@ -1005,7 +1048,7 @@ int main( int argc, char* argv[] )
printf("Avg_custom %10ld minMinMPIFileWriteAtBW: %10.6f maxMaxMPIFileWriteAtBW: %10.6f minMinMPIPutBW: %10.6f maxMaxMPIPutBW: %10.6f\n", BufSizeTotalDouble, MOMminMinMPIFileWriteAtBW[AVG_ind], MOMmaxMaxMPIFileWriteAtBW[AVG_ind], MOMminMinMPIPutBW[AVG_ind], MOMmaxMaxMPIPutBW[AVG_ind]);
printf("Std_custom %10ld minMinMPIFileWriteAtBW: %10.6f maxMaxMPIFileWriteAtBW: %10.6f minMinMPIPutBW: %10.6f maxMaxMPIPutBW: %10.6f\n", BufSizeTotalDouble, MOMminMinMPIFileWriteAtBW[STD_ind], MOMmaxMaxMPIFileWriteAtBW[STD_ind], MOMminMinMPIPutBW[STD_ind], MOMmaxMaxMPIPutBW[STD_ind]);
}
#endif
#endif
printf("Min %10ld %10.6f %12.6f %10.6f %12.6f %10.6f %10.6f %10.6f %10.6f %10.6f %10.6f %10.6f\n",BufSizeTotalDouble,MOMWriteDataTime[MIN_ind],MOMRawWriteBDWTH[MIN_ind],MOMReadDataTime[MIN_ind],MOMRawReadBDWTH[MIN_ind],MOMDataSetTime[MIN_ind],MOMGroupTime[MIN_ind],MOMAttrTime[MIN_ind],MOMFopenTime[MIN_ind],MOMFcloseTime[MIN_ind],MOMFflushTime[MIN_ind],MOMOtherCloseTime[MIN_ind]);
printf("Med %10ld %10.6f %12.6f %10.6f %12.6f %10.6f %10.6f %10.6f %10.6f %10.6f %10.6f %10.6f\n",BufSizeTotalDouble,MOMWriteDataTime[MED_ind],MOMRawWriteBDWTH[MED_ind],MOMReadDataTime[MED_ind],MOMRawReadBDWTH[MED_ind],MOMDataSetTime[MED_ind],MOMGroupTime[MED_ind],MOMAttrTime[MED_ind],MOMFopenTime[MED_ind],MOMFcloseTime[MED_ind],MOMFflushTime[MED_ind],MOMOtherCloseTime[MED_ind]);
printf("Max %10ld %10.6f %12.6f %10.6f %12.6f %10.6f %10.6f %10.6f %10.6f %10.6f %10.6f %10.6f\n",BufSizeTotalDouble,MOMWriteDataTime[MAX_ind],MOMRawWriteBDWTH[MAX_ind],MOMReadDataTime[MAX_ind],MOMRawReadBDWTH[MAX_ind],MOMDataSetTime[MAX_ind],MOMGroupTime[MAX_ind],MOMAttrTime[MAX_ind],MOMFopenTime[MAX_ind],MOMFcloseTime[MAX_ind],MOMFflushTime[MAX_ind],MOMOtherCloseTime[MAX_ind]);
......
#ifdef THETA
#include <pmi.h>
#elif BGQ
#include <spi/include/kernel/location.h>
#include <spi/include/kernel/process.h>
#include <spi/include/kernel/memory.h>
#include <firmware/include/personality.h>
#include <hwi/include/bqc/nd_500_dcr.h>
#include <mpix.h>
#endif
typedef struct cost cost;
struct cost {
double cost;
int rank;
};
int64_t network_bandwidth () {
#ifdef THETA
return 1800000;
#elif BGQ
#endif
// Default Value:
return 1800000;
}
int64_t network_latency () {
#ifdef THETA
return 30;
#elif BGQ
#endif
// Default Value:
return 30;
}
void rank_to_coordinates ( int rank, int* coord ) {
#ifdef THETA
pmi_mesh_coord_t xyz;
int nid;
/* Hypothesis : PMI_rank == MPI_rank */
PMI_Get_nid(rank, &nid);
PMI_Get_meshcoord((pmi_nid_t) nid, &xyz);
coord[0] = xyz.mesh_x;
coord[1] = xyz.mesh_y;
coord[2] = xyz.mesh_z;
coord[3] = nid;
coord[4] = sched_getcpu();
#elif BGQ
MPIX_Rank2torus( rank, coord );
#endif
}
int distance_between_ranks ( int src_rank, int dest_rank ) {
int distance = 0;
#ifdef THETA
int dim = 4, d;
int src_coord[dim], dest_coord[dim];
rank_to_coordinates ( src_rank, src_coord );
rank_to_coordinates ( dest_rank, dest_coord );
for ( d = 0; d < dim; d++ ) {
if ( src_coord[d] != dest_coord[d] )
distance++;
}
#elif BGQ
int dim = 6, d, hops;
int src_coords[dim], dest_coords[dim];
int dim, d, hops, ;
MPIX_Hardware_t hw;
rank_to_coordinates ( src_rank, src_coord );
rank_to_coordinates ( dest_rank, dest_coord );
MPIX_Hardware( &hw );
for ( d = 0; d < dim; d++ ) {
hops = abs ( destCoords[d] - srcCoords[d] );
if ( hw.isTorus[d] == 1 )
hops = min ( hops, (int)hw.Size[d] - hops );
distance += hops;
}
#endif
return distance;
}
int distance_to_io_node ( int src_rank ) {
#ifdef THETA
return 1;
#elif BGQ
#endif
// Default Value:
return 1;
}
void get_cb_props( int64_t *buffer_size, int64_t *nb_aggr, char* fname ) {
int rank, nprocs;
int info_flag;
MPI_File fh;
MPI_Info mpi_file_info;
char info_value[MPI_MAX_INFO_VAL];
int mpi_code, striping_unit, striping_factor, co_ratio;
MPI_Comm_rank ( MPI_COMM_WORLD, &rank );
MPI_Comm_size ( MPI_COMM_WORLD, &nprocs );
mpi_code = MPI_File_open(MPI_COMM_WORLD, fname, MPI_MODE_RDONLY, MPI_INFO_NULL, &fh);
#ifdef THETA
MPI_File_get_info(fh, &mpi_file_info);
mpi_code = MPI_Info_get(mpi_file_info, "romio_filesystem_type", MPI_MAX_INFO_VAL, info_value, &info_flag);
if (rank == 0) printf("romio_filesystem_type is :%s:\n",info_value);
// striping_unit is the size of the stripe (in bytes)
mpi_code = MPI_Info_get(mpi_file_info, "striping_unit", MPI_MAX_INFO_VAL, info_value, &info_flag);
striping_unit = atoi(info_value);
if (rank == 0) printf("striping_unit is :%s:\n",info_value);
// striping_factor is the number of stripes used for the file
mpi_code = MPI_Info_get(mpi_file_info, "striping_factor", MPI_MAX_INFO_VAL, info_value, &info_flag);
striping_factor = atoi(info_value);
if (rank == 0) printf("striping_factor is :%s:\n",info_value);
// Number of aggregators = "striping_factor" X "romio_lustre_co_ratio"
mpi_code = MPI_Info_get(mpi_file_info, "romio_lustre_co_ratio", MPI_MAX_INFO_VAL, info_value, &info_flag);
co_ratio = atoi(info_value);
if (rank == 0) printf("romio_lustre_co_ratio is :%s:\n",info_value);
*buffer_size = (int64_t) striping_unit;
*nb_aggr = (int64_t) (striping_factor * co_ratio);
#elif ROMIO
MPI_File_get_info(fh, &mpi_file_info);
mpi_code = MPI_Info_get(mpi_file_info, "cb_nodes", MPI_MAX_INFO_VAL, info_value, &info_flag);
*nb_aggr = (int64_t) atoi(info_value);
if (rank == 0) printf("cb_nodes is :%lld:\n",*nb_aggr);
mpi_code = MPI_Info_get(mpi_file_info, "cb_buffer_size", MPI_MAX_INFO_VAL, info_value, &info_flag);
*buffer_size = (int64_t) atoi(info_value);
if (rank == 0) printf("cb_buffer_size is :%lld:\n",*buffer_size);
#else
*nb_aggr = 1;
*buffer_size = 65536;
#endif
MPI_File_close( &fh );
if (rank == 0) printf("Using %lld aggregators with %lld buffer size\n",*nb_aggr,*buffer_size);
return;
}
int topology_aware_list_serial ( int64_t* tally, int64_t nb_aggr, int* agg_list )
{
int i, r, agg_ind, aggr_nprocs, nprocs, latency, bandwidth, distance_to_io, distance, rank;
int agg_to_calc, aggr_comm_rank, aggr_comm_size, ranks_per_agg;
MPI_Comm_rank ( MPI_COMM_WORLD, &rank );
MPI_Comm_size ( MPI_COMM_WORLD, &nprocs );
int64_t *data_distribution;
int trim_thresh = 1;
int *world_ranks;
cost aggr_cost, min_cost;
latency = network_latency (); /* */
bandwidth = network_bandwidth ();
data_distribution = (int64_t *) malloc (nprocs * sizeof(int64_t));
world_ranks = (int *) malloc (nprocs * sizeof(int));
// Loop through the aggregators (this is the `serial` part)
for (agg_ind=0; agg_ind<nb_aggr; agg_ind++ ) {
aggr_cost.cost = 0.0;
aggr_cost.rank = rank;
min_cost.cost = 0.0;
min_cost.rank = 0;
// All-reduce the structures needed to calculate cost for sending data
MPI_Allgather ( &tally[ agg_ind ], 1, MPI_LONG_LONG, data_distribution, 1, MPI_LONG_LONG, MPI_COMM_WORLD );
MPI_Allgather ( &rank, 1, MPI_INT, world_ranks, 1, MPI_INT, MPI_COMM_WORLD );
// Now we can trim the data a bit
aggr_nprocs = nprocs;
if (0 && trim_thresh > 0) {
for (r = nprocs-1; r >= 0; r-- ) {
if (data_distribution[r] < trim_thresh) {
aggr_nprocs--;
for (i = r; i < aggr_nprocs; i++) {
data_distribution[i] = data_distribution[i+1];
world_ranks[i] = world_ranks[i+1];
}
}
}
}
// Compute the cost of aggregating data from the other ranks
for (r = 0; r < aggr_nprocs; r++ ) {
if ( (rank != world_ranks[r]) && (data_distribution[r] > 0)) {
distance = distance_between_ranks ( rank, world_ranks[r] );
//printf("agg_ind = %d r = %d distance = %d \n", agg_ind, r , distance);
aggr_cost.cost += ( distance * latency + data_distribution[r] / bandwidth );
}
}
distance_to_io = 1;
aggr_cost.cost += distance_to_io * latency;
// Check if this rank was already selected as an agg
if (agg_ind > 0) {
for (r = 0; r < agg_ind; r++ ) {
// Penalize the rank if it was already chosen
if ( rank == agg_list[ r ] ) aggr_cost.cost += 1000000.0;
}
}
// Determine the aggr with minimum cost
//printf("agg_ind = %d aggr_cost.rank = %d aggr_cost.cost = %f\n",agg_ind,aggr_cost.rank,aggr_cost.cost);
MPI_Allreduce ( &aggr_cost, &min_cost, 1, MPI_DOUBLE_INT, MPI_MINLOC, MPI_COMM_WORLD );
agg_list[ agg_ind ] = min_cost.rank;
//printf("agg_ind = %d min_cost.rank = %d min_cost.cost = %f\n",agg_ind,min_cost.rank,min_cost.cost);
}
free(data_distribution);
free(world_ranks);
return 0;
}
int rank_aggr_topology_aware ( int64_t data_size, MPI_Comm aggr_comm )
{
int rank, aggr_rank, aggr_nprocs, r, distance, distance_to_io, latency, bandwidth;
int *world_ranks;
cost aggr_cost, min_cost;
int64_t *data_distribution, aggregated_data;
MPI_Comm local_comm;
MPI_Comm_dup ( aggr_comm, &local_comm );
MPI_Comm_rank ( MPI_COMM_WORLD, &rank );
MPI_Comm_rank ( local_comm, &aggr_rank );
MPI_Comm_size ( local_comm, &aggr_nprocs );
aggr_cost.cost = 0.0;
aggr_cost.rank = rank;
aggregated_data = 0;
latency = network_latency (); /* */
bandwidth = network_bandwidth ();
data_distribution = (int64_t *) malloc (aggr_nprocs * sizeof(int64_t));
world_ranks = (int *) malloc (aggr_nprocs * sizeof(int));
MPI_Allgather ( &data_size, 1, MPI_LONG_LONG, data_distribution, 1, MPI_LONG_LONG, local_comm );
MPI_Allgather ( &rank, 1, MPI_INT, world_ranks, 1, MPI_INT, local_comm );
/*
* Compute the cost of aggregating data from the other ranks
*/
for (r = 0; r < aggr_nprocs; r++ ) {
aggregated_data += data_distribution[r];
if ( aggr_rank != r || rank != world_ranks[r] ) {
distance = distance_between_ranks ( rank, world_ranks[r] );
aggr_cost.cost += ( distance * latency + data_distribution[r] / bandwidth );
}
}
/*
* Compute the cost of sending the aggregated data to the I/O node
*/
distance_to_io = 1;
aggr_cost.cost += ( distance_to_io * latency + data_distribution[r] / bandwidth );
MPI_Allreduce ( &aggr_cost, &min_cost, 1, MPI_DOUBLE_INT, MPI_MINLOC, local_comm );
free(data_distribution);
free(world_ranks);
return min_cost.rank;
}
int add_chunk ( int64_t datalen, int64_t offset, int64_t buffer_size, int64_t nb_aggr, int64_t* tally )
{
int64_t agg_offset = 0;
int64_t amount_add = 0;
int64_t amount_left = 0;
agg_offset = offset % (nb_aggr * buffer_size) ; // Position of the start of the chunk wrt the round
agg_offset = agg_offset / buffer_size ; // Which aggregator owns the start of this chunk
amount_add = buffer_size - (agg_offset % buffer_size) ; // How many bytes belong in the starting agg
if (amount_add > datalen) { // chunk ends within the same agg
amount_add = datalen;
} else {
amount_left = datalen - amount_add;
if (amount_left > 0) {
offset += amount_add;
datalen-= amount_add;
add_chunk ( datalen, offset, buffer_size, nb_aggr, tally );
}
}
tally[ agg_offset ] += amount_add;
return 0;
}
int get_cb_config_list ( int64_t* data_lens, int64_t* offsets, int data_len, char* hint_str, int64_t buffer_size, int64_t nb_aggr )
{
int my_aggr_id, my_aggr_rank, rank, aggr_comm_rank, total_buff_count, total_rounds_count;
int i, r, my_buff_count, my_first_buff_id, my_aggr_buff_id, resultlen, nprocs;
int64_t total_data_size, data_to_send, max_data_to_send, tmp_offset;
int64_t data_size, offset;
int64_t *data_to_send_per_aggr;
int64_t total_data_size_l;
char name[MPI_MAX_PROCESSOR_NAME];
char name_buf[MPI_MAX_PROCESSOR_NAME];
MPI_Comm aggr_comm;
MPI_Comm_rank ( MPI_COMM_WORLD, &rank );
MPI_Comm_size ( MPI_COMM_WORLD, &nprocs );
MPI_Get_processor_name( name, &resultlen );
/*
* Assumption:
* ---------------------------------------------------------------
* | agg 1 round 1 | agg 2 round 1 | agg 1 round 2 | agg 2 round 2 |
* ---------------------------------------------------------------
*/
// Tally data quantities associated with each aggregator
total_data_size_l = 0;
data_to_send_per_aggr = (int64_t *) calloc (nb_aggr, sizeof (int64_t));
for ( r = 0; r < data_len; r++ ) {
if (r == 0) {
offset = offsets[r];
data_size = data_lens[r];
}
total_data_size_l += data_lens[r];
add_chunk ( data_lens[r], offsets[r], buffer_size, nb_aggr, data_to_send_per_aggr );
}
MPI_Allreduce ( &total_data_size_l, &total_data_size, 1, MPI_LONG_LONG, MPI_SUM, MPI_COMM_WORLD);
// Generate topology-aware list of aggregators
int* agg_list = (int *) calloc (nprocs, sizeof (int));
topology_aware_list_serial( data_to_send_per_aggr, nb_aggr, agg_list );
for ( r = 0; r < nb_aggr; r++ ) {
if (rank==0) printf("agg_list[%d] = %d\n",r,agg_list[r]);
strcpy(name_buf, name);
MPI_Bcast ( name_buf, MPI_MAX_PROCESSOR_NAME, MPI_CHAR, agg_list[r], MPI_COMM_WORLD );
if (r==0)
sprintf(hint_str,"%s",name_buf);
else
sprintf(hint_str,"%s,%s",hint_str,name_buf);
}
free(agg_list);
return 0;
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment