/* Copyright (c) 2011-2017 Graph500 Steering Committee All rights reserved. Developed by: Anton Korzh anton@korzh.us Graph500 Steering Committee http://www.graph500.org New code under University of Illinois/NCSA Open Source License see license.txt or https://opensource.org/licenses/NCSA */ // AML: active messages library v 1.0 // MPI-3 passive transport // transparent message aggregation greatly increases message-rate for loosy interconnects // shared memory optimization used // Implementation basic v1.0 #define _GNU_SOURCE #include #include #include #include #include #ifdef __APPLE__ #define SYSCTL_CORE_COUNT "machdep.cpu.core_count" #include #include #include #include // code borrowed from http://yyshen.github.io/2015/01/18/binding_threads_to_cores_osx.html typedef struct cpu_set { uint32_t count; } cpu_set_t; static inline void CPU_ZERO(cpu_set_t *cs) { cs->count = 0; } static inline void CPU_SET(int num, cpu_set_t *cs) { cs->count |= (1 << num); } static inline int CPU_ISSET(int num, cpu_set_t *cs) { return (cs->count & (1 << num)); } int sched_getaffinity(pid_t pid, size_t cpu_size, cpu_set_t *cpu_set) { int32_t core_count = 0; size_t len = sizeof(core_count); int ret = sysctlbyname("machdep.cpu.core_count", &core_count, &len, 0, 0); if (ret) { printf("error while get core count %d\n", ret); return -1; } cpu_set->count = 0; for (int i = 0; i < core_count; i++) { cpu_set->count |= (1 << i); } return 0; } int pthread_setaffinity_np(pthread_t thread, size_t cpu_size, cpu_set_t *cpu_set) { thread_port_t mach_thread; int core = 0; for (core = 0; core < 8 * cpu_size; core++) { if (CPU_ISSET(core, cpu_set)) break; } thread_affinity_policy_data_t policy = { core }; mach_thread = pthread_mach_thread_np(thread); thread_policy_set(mach_thread, THREAD_AFFINITY_POLICY, (thread_policy_t)&policy, 1); return 0; } #else #include #endif #ifdef __clang__ #define inline static inline #endif #include #include #define MAXGROUPS 65536 //number of nodes (core processes form a group on a same node) #define AGGR (1024*32) //aggregation buffer size per dest in bytes : internode #define AGGR_intra (1024*32) //aggregation buffer size per dest in bytes : intranode #define NRECV 4 // number of preposted recvs internode #define NRECV_intra 4 // number of preposted recvs intranode #define NSEND 4 // number of available sends internode #define NSEND_intra 4 // number of send intranode #define SOATTR __attribute__((visibility("default"))) #define SENDSOURCE(node) ( sendbuf+(AGGR*nbuf[node])) #define SENDSOURCE_intra(node) ( sendbuf_intra+(AGGR_intra*nbuf_intra[node]) ) #define ushort unsigned short static int myproc,num_procs; static int mygroup,num_groups; static int mylocal,group_size; #ifndef PROCS_PER_NODE_NOT_POWER_OF_TWO static int loggroup,groupmask; #define PROC_FROM_GROUPLOCAL(g,l) ((l)+((g)<> loggroup) #define LOCAL_FROM_PROC(p) ((p) & groupmask) #else #define PROC_FROM_GROUPLOCAL(g,l) ((g)*group_size+(l)) #define GROUP_FROM_PROC(p) ((p)/group_size) #define LOCAL_FROM_PROC(p) ((p)%group_size) #endif volatile static int ack=0; volatile static int inbarrier=0; static void (*aml_handlers[256]) (int,void *,int); //pointers to user-provided AM handlers //internode comm (proc number X from each group) //intranode comm (all cores of one nodegroup) MPI_Comm comm, comm_intra; // MPI stuff for sends static char *sendbuf; //coalescing buffers, most of memory is allocated is here static int *sendsize; //buffer occupacy in bytes static ushort *acks; //aggregated acks static ushort *nbuf; //actual buffer for each group/localcore static ushort activebuf[NSEND];// N_buffer used in transfer(0..NSEND{_intra}-1) static MPI_Request rqsend[NSEND]; // MPI stuff for recv static char recvbuf[AGGR*NRECV]; static MPI_Request rqrecv[NRECV]; unsigned long long nbytes_sent,nbytes_rcvd; static char *sendbuf_intra; static int *sendsize_intra; static ushort *acks_intra; static ushort *nbuf_intra; static ushort activebuf_intra[NSEND_intra]; static MPI_Request rqsend_intra[NSEND_intra]; static char recvbuf_intra[AGGR_intra*NRECV_intra]; static MPI_Request rqrecv_intra[NRECV_intra]; volatile static int ack_intra=0; inline void aml_send_intra(void *srcaddr, int type, int length, int local ,int from); void aml_finalize(void); void aml_barrier(void); SOATTR void aml_register_handler(void(*f)(int,void*,int),int n) { aml_barrier(); aml_handlers[n]=f; aml_barrier(); } struct __attribute__((__packed__)) hdr { //header of internode message ushort sz; char hndl; char routing; }; //process internode messages static void process(int fromgroup,int length ,char* message) { int i = 0; int from = PROC_FROM_GROUPLOCAL(fromgroup,mylocal); while ( i < length ) { void* m = message+i; struct hdr *h = m; int hsz=h->sz; int hndl=h->hndl; int destlocal = LOCAL_FROM_PROC(h->routing); if(destlocal == mylocal) aml_handlers[hndl](from,m+sizeof(struct hdr),hsz); else aml_send_intra(m+sizeof(struct hdr),hndl,hsz,destlocal,from); i += hsz + sizeof(struct hdr); } } struct __attribute__((__packed__)) hdri { //header of internode message ushort routing; ushort sz; char hndl; }; //process intranode messages static void process_intra(int fromlocal,int length ,char* message) { int i=0; while ( i < length ) { void*m = message+i; struct hdri *h = m; int hsz=h->sz; int hndl=h->hndl; aml_handlers[hndl](PROC_FROM_GROUPLOCAL((int)(h->routing),fromlocal),m+sizeof(struct hdri),hsz); i += sizeof(struct hdri) + hsz; } } // poll intranode message inline void aml_poll_intra(void) { int flag, from, length,index; MPI_Status status; MPI_Testany( NRECV_intra,rqrecv_intra, &index, &flag, &status ); if ( flag ) { MPI_Get_count( &status, MPI_CHAR, &length ); ack_intra -= status.MPI_TAG; if(length>0) { //no confirmation & processing for ack only messages from = status.MPI_SOURCE; if(inbarrier) MPI_Send(NULL, 0, MPI_CHAR,from, 1, comm_intra); //ack now else acks_intra[from]++; //normally we have delayed ack process_intra( from, length,recvbuf_intra +AGGR_intra*index); } MPI_Start( rqrecv_intra+index); } } // poll internode message static void aml_poll(void) { int flag, from, length,index; MPI_Status status; aml_poll_intra(); MPI_Testany( NRECV,rqrecv,&index, &flag, &status ); if ( flag ) { MPI_Get_count( &status, MPI_CHAR, &length ); ack -= status.MPI_TAG; nbytes_rcvd+=length; if(length>0) { //no confirmation & processing for ack only messages from = status.MPI_SOURCE; if(inbarrier) MPI_Send(NULL, 0, MPI_CHAR,from, 1, comm); //ack now else acks[from]++; //normally we have delayed ack process( from, length,recvbuf+AGGR*index ); } MPI_Start( rqrecv+index ); } } //flush internode buffer to destination node inline void flush_buffer( int node ) { MPI_Status stsend; int flag=0,index,tmp; if (sendsize[node] == 0 && acks[node]==0 ) return; while (!flag) { aml_poll(); MPI_Testany(NSEND,rqsend,&index,&flag,&stsend); } MPI_Isend(SENDSOURCE(node), sendsize[node], MPI_CHAR,node, acks[node], comm, rqsend+index ); nbytes_sent+=sendsize[node]; if (sendsize[node] > 0) ack++; sendsize[node] = 0; acks[node] = 0; tmp=activebuf[index]; activebuf[index]=nbuf[node]; nbuf[node]=tmp; //swap bufs } //flush intranode buffer, NB:node is local number of pe in group inline void flush_buffer_intra( int node ) { MPI_Status stsend; int flag=0,index,tmp; if (sendsize_intra[node] == 0 && acks_intra[node]==0 ) return; while (!flag) { aml_poll_intra(); MPI_Testany(NSEND_intra,rqsend_intra,&index,&flag,&stsend); } MPI_Isend( SENDSOURCE_intra(node), sendsize_intra[node], MPI_CHAR, node, acks_intra[node], comm_intra, rqsend_intra+index ); if (sendsize_intra[node] > 0) ack_intra++; sendsize_intra[node] = 0; acks_intra[node] = 0; tmp=activebuf_intra[index]; activebuf_intra[index]=nbuf_intra[node]; nbuf_intra[node]=tmp; //swap bufs } inline void aml_send_intra(void *src, int type, int length, int local, int from) { //send to _another_ process from same group int nmax = AGGR_intra - sendsize_intra[local] - sizeof(struct hdri); if ( nmax < length ) { flush_buffer_intra(local); } char* dst = (SENDSOURCE_intra(local)+sendsize_intra[local]); struct hdri *h=(void*)dst; h->routing = GROUP_FROM_PROC(from); h->sz=length; h->hndl = type; sendsize_intra[local] += length+sizeof(struct hdri); memcpy(dst+sizeof(struct hdri),src,length); } SOATTR void aml_send(void *src, int type,int length, int node ) { if ( node == myproc ) return aml_handlers[type](myproc,src,length); int group = GROUP_FROM_PROC(node); int local = LOCAL_FROM_PROC(node); //send to another node in my group if ( group == mygroup ) return aml_send_intra(src,type,length,local,myproc); //send to another group int nmax = AGGR - sendsize[group]-sizeof(struct hdr); if ( nmax < length ) { flush_buffer(group); } char* dst = (SENDSOURCE(group)+sendsize[group]); struct hdr *h=(void*)dst; h->routing = local; h->hndl = type; h->sz=length; sendsize[group] += length+sizeof(struct hdr); memcpy(dst+sizeof(struct hdr),src,length); } int stringCmp( const void *a, const void *b) { return strcmp(a,b); } // Should be called by user instead of MPI_Init() SOATTR int aml_init( int *argc, char ***argv ) { int r, i, j,tmpmax; r = MPI_Init(argc, argv); if ( r != MPI_SUCCESS ) return r; MPI_Comm_size( MPI_COMM_WORLD, &num_procs ); MPI_Comm_rank( MPI_COMM_WORLD, &myproc ); //split communicator char host_name[MPI_MAX_PROCESSOR_NAME]; char (*host_names)[MPI_MAX_PROCESSOR_NAME]; int namelen,bytes,n,color; MPI_Get_processor_name(host_name,&namelen); bytes = num_procs * sizeof(char[MPI_MAX_PROCESSOR_NAME]); host_names = (char (*)[MPI_MAX_PROCESSOR_NAME]) malloc(bytes); strcpy(host_names[myproc], host_name); for (n=0; n0 && strcmp(host_names[n-1], host_names[n])) color++; if(strcmp(host_name, host_names[n]) == 0) break; } free(host_names); MPI_Comm_split(MPI_COMM_WORLD, color, myproc, &comm_intra); //find intranode numbers and make internode communicator MPI_Comm_size( comm_intra, &group_size ); MPI_Comm_rank( comm_intra, &mylocal ); MPI_Comm_split(MPI_COMM_WORLD, mylocal, myproc, &comm); MPI_Comm_size( comm, &num_groups ); MPI_Comm_rank( comm, &mygroup ); //first nonblocking barriers are blocking,so we call them now MPI_Request hndl; MPI_Ibarrier(comm,&hndl); MPI_Wait(&hndl,MPI_STATUS_IGNORE); MPI_Ibarrier(comm_intra,&hndl); MPI_Wait(&hndl,MPI_STATUS_IGNORE); #ifndef PROCS_PER_NODE_NOT_POWER_OF_TWO groupmask=group_size-1; if((group_size&groupmask)) { printf("AML: Fatal: non power2 groupsize unsupported. Define macro PROCS_PER_NODE_NOT_POWER_OF_TWO to override\n");return -1;} for (loggroup = 0; loggroup < group_size; loggroup++) if ((1 << loggroup) == group_size) break; #endif if(myproc!=PROC_FROM_GROUPLOCAL(mygroup,mylocal)) {printf("AML: Fatal: Strange group rank assignment scheme.\n");return -1;} cpu_set_t cpuset; CPU_ZERO(&cpuset); CPU_SET(mylocal,&cpuset); //FIXME ? would it work good enough on all architectures? pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset); #ifdef DEBUGSTATS if(myproc==0) printf ("AML: multicore, num_groups %d group_size %d\n",num_groups,group_size); #ifdef PROCS_PER_NODE_NOT_POWER_OF_TWO if(myproc==0) printf ("AML: multicore, PROCS_PER_NODE_NOT_POWER_OF_TWO defined\n"); #else if(myproc==0) printf ("AML: multicore, loggroup=%d groupmask=%d\n",loggroup,groupmask); #endif if(myproc==0) printf ("NRECV=%d NRECVi=%d NSEND=%d NSENDi=%d AGGR=%dK AGGRi=%dK\n",NRECV,NRECV_intra,NSEND,NSEND_intra,AGGR>>10,AGGR_intra>>10); #endif if(num_groups>MAXGROUPS) { if(myproc==0) printf("AML:v1.0 reference:unsupported num_groups > MAXGROUPS=%d\n",MAXGROUPS); exit(-1); } fflush(NULL); //init preposted recvs: NRECV internode for(i=0;i