Ioannis Nompelis
  personal pages  

  Message Passing Interface (MPI) hacks, tricks and examples

Thie contents of this page are for educational purposes. Feel free to use any and all of this code in your own software. Also feel free to link to this page and to send comments and questions to me.


 
Grouping MPI processes on multi-node, multi-core clusters (ForTran)
This demonstration is for the MPI2 standard

In certain cases, software needs to operate with exclusive use of a multi-core node in a parallel environment. This can possibly be done in several ways, some of which involve modifying the environment of how the MPI processes are launched. However, if one has access to the source code, they can make this mode of operation a run-time option, without having to rely on modifying the MPI run-time environment itself. This, and other reasons, is why I created this piece of software. I first did this in C, but I think it will be most useful if it is made available in Fortran. Note that I am using a very vaive way of collecting host-names. There is a better way to do it but this one is more intuitive (the other way is leaner and faster, however).

Use as you please, but feel free to include my e-mail address in your comments in case there are questions, augmentations or fixes. This works for me; it better work for you too.

The basic code is split into four files: the driver main.f, the module with MPI variables mpi_struct.F, the actual source that does the work mpi.F and a Makefile. The main code has several routines. One of them was included so that it can make this code work in an instructive way on a desktop. This routine fakes the hostname to pretend that we are launching several processes over several nodes, when in fact we are executing the software on a single host.

Here is the output on my desktop:

mach29% 
mach29% make 
mpif90 -c -Mextend  -Minform=inform -fpic -Mlarge_arrays -Mbounds -Ktrap=fp mpi_struct.F 
mpif90 -c -Mextend  -Minform=inform -fpic -Mlarge_arrays -Mbounds -Ktrap=fp mpi.F 
mpif90 -c -Mextend  -Minform=inform -fpic -Mlarge_arrays -Mbounds -Ktrap=fp main.f
mpif90 mpi.o main.o 
mach29% 
mach29% 
mach29% mpirun -np 10 a.out
      Changed             0  to host0  after faking
      Changed             1  to host0  after faking
      Changed             2  to host0  after faking
      Changed             3  to host1  after faking
      Changed             4  to host1  after faking
      Changed             5  to host1  after faking
      Changed             6  to host2  after faking
      Changed             7  to host2  after faking
      Changed             8  to host2  after faking
      Changed             9  to host2  after faking
 NEWCOM            0            2 host0
 NEWCOM            1            0 host0
 NEWCOM            2            1 host0
 NEWCOM            3            0 host1
 NEWCOM            4            1 host1
 NEWCOM            5            2 host1
 NEWCOM            6            2 host2
 NEWCOM            7            3 host2
 NEWCOM            8            0 host2
 NEWCOM            9            1 host2
mach29% 
mach29% 

Here are the contents of the modules:


cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc
c **** Module to store the MPI variables
c      Ioannis Nompelis  Created:       20120124
c      Ioannis Nompelis  Last modified: 20140206
cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc
      Module mpivars
      Implicit None
      Include 'mpif.h'

c---- generic communication variables
      Integer ier,nproc,id,icomw,mpiinfo
      Integer mpistat(MPI_STATUS_SIZE)
      Character*(MPI_MAX_PROCESSOR_NAME) hname
      Integer ihlen

c---- intra-node communication variables
      Integer nproc_node,id_node,icom_node

c---- cross-node communication variables
      Integer nproc_net,id_net,icom_net

      End module

Here are the contents of the main code:


cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc
c **** Subroutine to start up MPI procedures
c      Ioannis Nompelis  Created:       20120124
c      Ioannis Nompelis  Last modified: 20130613
cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc
       Subroutine inustd_MPI_Start()

       Use mpivars
       Implicit None


       call MPI_INIT(ier)

       icomw = MPI_COMM_WORLD

       call MPI_COMM_RANK(icomw,id,ier)
       call MPI_COMM_SIZE(icomw,nproc,ier)
       call MPI_INFO_CREATE(mpiinfo, ier)
       call MPI_Get_processor_name( hname, ihlen, ier)
C      print*,'     Processor ',id,' ',trim(hname),' started'


c---- we will pretend that we are on a system of several nodes with
c---- several processors (MPI processes) per node
c---- (this is useful for testing on one's workstation - one hostname)
       call inustd_MPI_FakeHosts()

       return
       end

cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc
c **** Subroutine to fictitiously create nodes by corrupting hostnames
c      Ioannis Nompelis  Created:       20140206
c      Ioannis Nompelis  Last modified: 20140206
cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc
       Subroutine inustd_MPI_FakeHosts()

       Use mpivars
       Implicit None
       Integer n

c --- we are just going to corrupt the name...
       if(id .ge. 2*nproc/3) hname = 'host2'
       if(id .lt. 2*nproc/3) hname = 'host1'
       if(id .lt.   nproc/3) hname = 'host0'


       call system('usleep 100000')
       do n = 0,nproc-1
       if(id.eq.n) print*,'     Changed ',id,' to ',trim(hname),'  after faking'
       call MPI_BARRIER(icomw,ier)
       enddo

       return
       end

cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc
c **** Subroutine to group processors by hostname (per node)
c **** Creates intra-node communicator and node-head communicator
c      Ioannis Nompelis  Created:       20140206
c      Ioannis Nompelis  Last modified: 20140206
cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc
       Subroutine inustd_MPI_GroupByNode()

       Use mpivars
       Implicit None
       Type hname_t
          Character*(MPI_MAX_PROCESSOR_NAME) :: str
       End Type
       Type(hname_t),allocatable,dimension(:) :: hnames
       Integer,allocatable,dimension(:) :: imsk,ilist
       Integer iclr,n,itag,m,l,i1,i2,ikey


c --- locate MPI processes on the same hostname and "colour" them
       if(id.eq.0) then
          allocate(hnames(0:nproc-1),ilist(0:nproc-1),
     &             imsk(MPI_MAX_PROCESSOR_NAME))

          hnames(0) %str = hname
          do n = 1,nproc-1
             itag = n
             call MPI_RECV(hnames(n)%str,MPI_MAX_PROCESSOR_NAME,MPI_CHARACTER,
     &                     n,itag,icomw,mpistat,ier)
          enddo
       else
          itag = id
          call MPI_SEND(hname,MPI_MAX_PROCESSOR_NAME,MPI_CHARACTER,
     &                  0,itag,icomw,ier)
       endif
       call MPI_BARRIER(icomw,ier)

       iclr = 0
       if(id.eq.0) then
          ilist(:) = -1
          ilist(0) = 0
          do n = 1,nproc-1
             do m = 0,n
                if(ilist(n).eq.-1) then
                   itag = 0                        ! assume it is the same
                   do l = 1,MPI_MAX_PROCESSOR_NAME
                      i1 = iachar( hnames(n)%str(l:l) )
                      i2 = iachar( hnames(m)%str(l:l) )
                      if(i1.ne.i2) itag = 1        ! it is not the same
                   enddo
                   if(itag.eq.0) ilist(n) = m
                endif
             enddo
          enddo

       endif

c --- locate MPI processes on the same hostname and "colour" them
       if(id.eq.0) then
          do n = 1,nproc-1
             itag = n
             call MPI_SEND(ilist(n),1,MPI_INTEGER,n,itag,icomw,ier)
          enddo
       else
          itag = id
          call MPI_RECV(iclr,1,MPI_INTEGER,0,itag,icomw,mpistat,ier)
       endif

       if(id.eq.0) then
          deallocate(hnames,ilist)
       endif

       call MPI_BARRIER(icomw,ier)


c --- create intra-node communicator and get information
       iclr = iclr + 1              ! call requires non-negative number
       call MPI_COMM_SPLIT(icomw, iclr, ikey, icom_node, ier)
       call MPI_COMM_RANK(icom_node,id_node,ier)
       call MPI_COMM_SIZE(icom_node,nproc_node,ier)

c --- print on the screen what we have done
       do n = 0,nproc-1
          CALL SYSTEM('usleep 100000')
          IF(id.eq.n) PRINT*,'NEWCOM',id,id_node,trim(hname)
       enddo

       return
       end

In the end you are left with the original MPI global communicator and ranks (processor IDs) intact, but you have several additional communicators. These communicators are local to a node, as in, they are capable of communicating information on MPI processes that are found on a specific node by hostname. Keep in mind that this does not imply shared memory access! (Although this can be achieved through the kernel and the filesystem.) An additional communicator must be built, and this one connects only the processes that are "master process" on a given node. Their rank variables are unique to that communicator.

As a result of this maneuver, you can now communicate in any way you like within and outside of a node in your own software.

IN 2014/02/10
 

 
Grouping MPI processes on multi-node, multi-core clusters (C)
This demonstration is for the MPI2 standard

The following piece of code does the same as the Fortran example above, but it is in C and uses the leaner method of grouping hosts.


#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <mpi.h>


typedef char hname_t [MPI_MAX_PROCESSOR_NAME];

struct my_MPI_vars {
   int ier;
   int npew,id;
   MPI_Group grw;
   MPI_Comm comw;
   MPI_Info info;
   char hname[MPI_MAX_PROCESSOR_NAME];
   int hlen;
   hname_t *hosts;

   int imh, npeh,idh, npen,idn;
   MPI_Group grh,grn;
   MPI_Comm comh,comn;
};


/*
 * Routine to corrupt MPI hostnames for testing purposes
 */

void inMPI_CorruptHosts( struct my_MPI_vars *mpivars )
{
   int ilen = (int) MPI_MAX_PROCESSOR_NAME;

   memset(mpivars->hname, '\0', (size_t) ilen);
   if(mpivars->id < 18) sprintf(mpivars->hname,"host3");
   if(mpivars->id <  8) sprintf(mpivars->hname,"host2");
   if(mpivars->id <  4) sprintf(mpivars->hname,"host1");
   if(mpivars->id <  2) sprintf(mpivars->hname,"host0");
}


/*
 * Routine to start the MPI
 */

int inMPI_Startup( struct my_MPI_vars *mpivars )
{
   char *FUNC = "inMPI_Startup";
   int impi = 1;
   char **cmpi;
   int n,iclr,ikey;
   char hdata[MPI_MAX_PROCESSOR_NAME];


   // start MPI and parent communicator, etc
   mpivars->comw = MPI_COMM_WORLD;

   mpivars->ier = MPI_Init(&impi, &cmpi);
   mpivars->ier = MPI_Comm_size(mpivars->comw, &(mpivars->npew));
   mpivars->ier = MPI_Comm_rank(mpivars->comw, &(mpivars->id));
   mpivars->ier = MPI_Info_create(&(mpivars->info));
   mpivars->ier = MPI_Get_processor_name(mpivars->hname, &(mpivars->hlen));

   mpivars->ier = MPI_Comm_group(mpivars->comw,&(mpivars->grw));

   // pad the remaining part of the name because we do not know what MPI
   // did to get it (tests show it corrupts the whole buffer!)
   for(n=mpivars->hlen;n<MPI_MAX_PROCESSOR_NAME;++n) mpivars->hname[n] = '\0';

   if(mpivars->id == 0) {
      fprintf(stdout," i [%s]  Started %d MPI processes \n",FUNC,mpivars->npew);
   }


   // determine host-master processes
//inMPI_CorruptHosts( mpivars ); // HACK FAKE hostnames
   for(n=0;n<mpivars->npew;++n) {
      int ilen,ipcs,ipcr;

      ilen = (int) MPI_MAX_PROCESSOR_NAME;
      strncpy(hdata,mpivars->hname, (size_t) ilen);
      (void) MPI_Bcast(hdata,ilen,MPI_CHAR,n,mpivars->comw);

      if(mpivars->id == n) {
            ipcs = mpivars->id;
      } else {
         if(strcmp(hdata,mpivars->hname) == 0) {
            ipcs = mpivars->id;
         } else {
            ipcs = mpivars->npew;
         }
      }

      (void) MPI_Allreduce(&ipcs,&ipcr,1,MPI_INT,MPI_MIN,mpivars->comw);
      if(strcmp(hdata,mpivars->hname) == 0) {
         mpivars->imh = ipcr;
      }
   }


  /*
   * create intra-node communicators
   */
   ikey = 0;
   iclr = mpivars->imh + 1;
   mpivars->ier = MPI_Comm_split(mpivars->comw, iclr, ikey, &(mpivars->comh));
   mpivars->ier = MPI_Comm_size(mpivars->comh, &(mpivars->npeh));
   mpivars->ier = MPI_Comm_rank(mpivars->comh, &(mpivars->idh));
   mpivars->ier = MPI_Comm_group(mpivars->comh,&(mpivars->grh));

   for(n=0;n<mpivars->npew;++n) {
   // usleep(10000);
      if(mpivars->idh == 0 && mpivars->id == n) {
         printf("id=%d master of group imh=%d with nprocs=%d \n",
               mpivars->id,mpivars->imh,mpivars->npeh);
      }
   }

  /*
   * create cross-node communicator for node-master processes
   * (we will create a different communicator for all other processes too
   * that should never be used for anything useful)
   */
   ikey = 0;
   if(mpivars->idh == 0) { iclr = 1; } else { iclr = 2; }
   mpivars->ier = MPI_Comm_split(mpivars->comw, iclr, ikey, &(mpivars->comn));
   mpivars->ier = MPI_Comm_size(mpivars->comn, &(mpivars->npen));
   mpivars->ier = MPI_Comm_rank(mpivars->comn, &(mpivars->idn));
   mpivars->ier = MPI_Comm_group(mpivars->comn,&(mpivars->grn));

   for(n=0;n<mpivars->npew;++n) {
   // usleep(10000);
      if(mpivars->idh == 0 && mpivars->id == n) {
         printf("Global id=%d master of group %d (with nprocs=%d) num-of-masters %d \n",
               mpivars->id,mpivars->imh,mpivars->npeh,mpivars->npen);
      }
   }

   return(0);
}




int inMPI_DumbTest( struct my_MPI_vars *mpivars )
{
   char *FUNC = "inMPI_DumbTest";
   int i,j;


   for(int n=0; n<100000; ++n) {
      i = n + mpivars->id;

      (void) MPI_Allreduce(&i,&j,1,MPI_INT,MPI_MIN,mpivars->comw);
   }

   return(0);
}




int main() {


   struct my_MPI_vars mpivars;


   (void) inMPI_Startup(&mpivars);




   (void) MPI_Finalize();

   return(0);
}

IN 2014/07/30
 

 
Stress-testing an MPI environment

It is important to know how an MPI installation is performing before wasting CPU cycles on a poorly performing installation to do actual simulation. Although there are many ways that one can test the MPI installation, and should be testing it, this really boils down to being able to send arbitrarily large messages across MPI processes robustly and efficiently.

By observing what the timings are for sends and receives one can infer a lot about how the system is performing. For example, with modern OpenMPI versions that rely on the openfab layer, the system is able to make use of shared memory for processes that are on the same node, wheras the interconnect is used only for node-to-node message passing. Note that buffering may be an issue when a node is running close to its physical memory limit, but I do not have any evidence of this in this demonstration.

Here is a simple MPI code in C that passes a contiguous chunk of memory from procesor to processor. A loop over receiving processors takes place. Within that loop, a loop over sending processors takes place. At the end of each send-receive accross a pair of processors all processors sync with a barrier.


#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <mpi.h>

typedef char hname_t [MPI_MAX_PROCESSOR_NAME];

struct my_MPI_vars {
   int ier;
   int nproc,id;
   MPI_Group igrw;
   MPI_Comm icomw;
   MPI_Info info;
   char hname[MPI_MAX_PROCESSOR_NAME];
   int nlen;
   hname_t *hosts;
};



/*
 * Routine to start the MPI
 */

int inStartupMPI( struct my_MPI_vars *mpivars )
{
   int impi = 1;
   char **cmpi;
   int n;


   // start MPI and parent communicator, etc
   mpivars->icomw = MPI_COMM_WORLD;

   mpivars->ier = MPI_Init(&impi, &cmpi);
   mpivars->ier = MPI_Comm_size(mpivars->icomw, &(mpivars->nproc));
   mpivars->ier = MPI_Comm_rank(mpivars->icomw, &(mpivars->id));
   mpivars->ier = MPI_Info_create(&(mpivars->info));
   mpivars->ier = MPI_Get_processor_name(mpivars->hname, &(mpivars->nlen));

   mpivars->ier = MPI_Comm_group(mpivars->icomw,&(mpivars->igrw));

   // pad the remaining part of the name because we do not know what MPI
   // did to get it (test show it corrupts the whole buffer!)
   for(n=mpivars->nlen;n<MPI_MAX_PROCESSOR_NAME;++n) {
      mpivars->hname[n] = '\0';
   }


   if(mpivars->id == 0) {
      fprintf(stdout," i [inStartupMPI]  Started %d MPI processes \n",mpivars->nproc);
   }

   return(0);
}


void stress_MPI_DoCrap(struct my_MPI_vars *mpivars) {
   double *rec,*sen,*tim;
   int isize = 50000000;
   int i,j,itag,n;
   double t1,t2;


   rec = malloc(isize*sizeof(double));
   sen = malloc(isize*sizeof(double));
   tim = malloc(mpivars->nproc*sizeof(double));

   for(n=0;n<isize;++n) sen[n] = 999.9;
   for(n=0;n<mpivars->nproc;++n) tim[n]= 0.0;

   for(i=0;i<mpivars->nproc;++i) {
      if(mpivars->id == 0) printf("Working on processor %d \n",i);

      for(j=0;j<mpivars->nproc;++j) {

         itag = i*mpivars->nproc + j;

         if(mpivars->id == i) {
            if(mpivars->id != j) {
               MPI_Status mpistat;

               t1 = MPI_Wtime();
               printf("RECEVING FROM  %d  %d \n",j,itag);
/*
          sleep(1);
     if(j > 2 && i > 2)
*/
               MPI_Recv(rec,isize,MPI_DOUBLE,j,itag,mpivars->icomw,&mpistat);
               t2 = MPI_Wtime();
               tim[j] = t2 - t1;
               printf("  Communication: %d %lf \n",mpivars->id,t2-t1);
            }
         } else {
            if(mpivars->id == j) {
               printf(" SENDING TO %d %d \n",i,itag);
/*
     if(j > 2 && i > 2)
*/
               MPI_Send(sen,isize,MPI_DOUBLE,i,itag,mpivars->icomw);
            }
         }

         MPI_Barrier(mpivars->icomw);
         usleep(10000);
      }

      MPI_Barrier(mpivars->icomw);
      usleep(10000);
   }

   if(mpivars->id == 0) {
      FILE *fp = fopen("FIRST","w");
      for(j=0;j<mpivars->nproc;++j) {
      fprintf(fp," %d %lf \n",j,tim[j]);
      }
      fclose(fp);
   }
   if(mpivars->id == mpivars->nproc-1) {
      FILE *fp = fopen("LAST","w");
      for(j=0;j<mpivars->nproc;++j) {
      fprintf(fp," %d %lf \n",j,tim[j]);
      }
      fclose(fp);
   }
{
      FILE *fp;
      char name[256];   // arbitrary number
      sprintf(name,"DATA_%.4d",mpivars->id);
      fp = fopen(name,"w");
      fprintf(fp,"# variables = p time \n");
      fprintf(fp,"# zone T=\"proc %d\"\n",mpivars->id);
      for(j=0;j<mpivars->nproc;++j) {
      fprintf(fp," %d %lf \n",j,tim[j]);
      }
      fclose(fp);
}

   free(sen); free(rec); free(tim);

}


int main() {
   struct my_MPI_vars mpivars;

   inStartupMPI(&mpivars);
   stress_MPI_DoCrap(&mpivars);
   (void) MPI_Finalize();
   return(0);
}

Here is the same code in Fortran


      Module mpivars
      Implicit None
      Include 'mpif.h'

      Integer ier,nproc,id,icomw,mpiinfo
      Integer mpistat(MPI_STATUS_SIZE)

      End module


      Subroutine MPI_Start()

      Use mpivars
      Implicit None


      call MPI_INIT(ier)

      icomw = MPI_COMM_WORLD

      call MPI_COMM_RANK(icomw,id,ier)
      call MPI_COMM_SIZE(icomw,nproc,ier)
      call MPI_INFO_CREATE(mpiinfo, ier)

      print*,'     Processor ',id,'started'

      return
      end


      Subroutine MPI_DoCrap()

      Use mpivars
      Implicit None
      Real*8,allocatable,dimension(:) :: rec,sen,tim
      Integer,parameter :: isize =  50*1000*1000
C     Integer,parameter :: isize =  50
      Integer i,j,itag
      Real*8 t1,t2
      Character*20 :: fname
      Character*3 :: fnext


      allocate(rec(isize),sen(isize), tim(0:nproc-1), stat=i)
      if(i.ne.0) STOP
      sen(:) = 999.0d0
      tim(0:nproc-1)= 0.0d0

      do i = 0,nproc-1
         if(id.eq.0) PRINT*,'Working on processor',i

         do j = 0,nproc-1

            itag = i*nproc + j

            if(id.eq.i) then
               if(id.ne.j) then
                  t1 = MPI_WTIME()
                  PRINT*,'    RECEVING FROM',j,itag
                  call MPI_RECV(rec,isize,MPI_DOUBLE_PRECISION,j,itag,
     &                          icomw,mpistat,ier)
                  t2 = MPI_WTIME()
                  tim(j)= t2 - t1
                  WRITE(*,*) 'Communication:',id,t2-t1,'sec'
               endif
            else
               if(id.eq.j) then
                  PRINT*,'    SENDING TO',i,itag
                  call MPI_SEND(sen,isize,MPI_DOUBLE_PRECISION,i,itag,
     &                          icomw,ier)
               endif
            endif

            call MPI_BARRIER(icomw,ier)
         enddo

         call MPI_BARRIER(icomw,ier)
      enddo

      if(id.eq.0) then
      open(unit=20,file='FIRST',status='unknown')
      do j = 0,nproc-1
      write(20,*) j,tim(j)
      enddo
      close(20)
      endif

      if(id.eq.nproc-1) then
      open(unit=20,file='LAST',status='unknown')
      do j = 0,nproc-1
      write(20,*) j,tim(j)
      enddo
      close(20)
      endif

      write(unit=fnext,fmt=100) id
 100  Format(i3)
      fname = 'DATA'//fnext
      open(unit=20,file=fname,status='unknown')
      write(20,*) '# variables = x y'
      write(20,*) '# zone T="',id,'"'
      do j = 0,nproc-1
      write(20,*) j,tim(j)
      enddo
      close(20)

      deallocate(tim,sen,rec)

      return
      end


      program main

      call MPI_Start()
      call MPI_DoCrap()

      call MPI_FINALIZE(ier)
      end

This code will keep track of the timing for each receive (by the receiving process) and will write three files: (a) the "FIRST" process's timings, (b) the "LAST" process's timings, and (c) all processes's timings.

IN 2014/02/21
 


Copyright © 2004-2017 Ioannis Nompelis

Last updated: (20.12.2017) by I.N.