Example 1.1 Numerical Integration with MPI Blocking Communications

Numerical integration is chosen as the instructional example for its trivially simplistic algorithm to parallelize; the task is narrowly confined yet computationally relevant. For this example, the integrand is the cosine function and the range of integration, from a to b, has length b-a. The work share is evenly divided by p, the number of processors, so that each processor is responsible for integration of a partial range, (b-a)/p. Upon completion of the local integration on each processor, processor 0 is designated to collect the local integral sums of all processors to form the total sum.

First, MPI_Init is invoked to initiate MPI and synchronize all participating processes. The MPI parallel paradigm used throughout this tutorial is SPMD (Single Program Multiple Data). Once MPI is initiated, the identical program is executed on multiple processors. The number of processors is determined by the user at runtime with

my-machine% mpirun -np 4 a.out

The mpirun command runs the MPI executable a.out on 4 processors. This is the most frequently used command to launch an MPI job. However, it is system dependent. Consult your local system for the proper way to run your MPI job. The number of processors, 4 in this example, is passed to your executable as the variable p through a call to MPI_Comm_size. To distinguish the identity of one processor from another, MPI_Comm_rank is called to querry for myid, the current rank number. Essentially, myid facilitates the handling of different data on different processors. The MPI_Send and MPI_Recv pair of blocking, also referred to as “standard”, send and receive subroutines are used to pass the local integral sum from individual processors to processor 0 to calculate the total sum. The MPI standard requires that a blocking send call blocks (and hence NOT return to the call) until the send buffer is safe to be reused. Similarly, the Standard requires that a blocking receive call blocks until the receive buffer actually contains the intended message. At the end, a call to MPI_Finalize permits an orderly shutdown of MPI before exiting the program.

To see the explanations on an MPI library routine used (as well as its arguments), click mouse on . For detail explanations of the MPI subroutines and functions, please read MPI: The Complete Reference.

Example 1.1 Fortran Code

      Program Example1_1
c#######################################################################
c#                                                                     #
c# This is an MPI example on parallel integration to demonstrate the   #
c# use of:                                                             #
c#                                                                     #
c# * MPI_Init, MPI_Comm_rank, MPI_Comm_size, MPI_Finalize              #
c# * MPI_Recv, MPI_Send                                                #
c#                                                                     #
c# Dr. Kadin Tseng                                                     #
c# Scientific Computing and Visualization                              #
c# Boston University                                                   #
c# 1998                                                                #
c#                                                                     #
c#######################################################################
      implicit none
      integer n, p, i, j, proc, ierr, master, myid, tag, comm
      real h, a, b, integral, pi, ai, my_int, integral_sum
      include "mpif.h"  ! brings in pre-defined MPI constants, ...
      integer status(MPI_STATUS_SIZE)  ! size defined in mpif.h
      data master/0/    ! processor 0 collects integral sums from other processors

      comm = MPI_COMM_WORLD      
      call MPI_Init(ierr)                         ! starts MPI
      call MPI_Comm_rank(comm, myid, ierr)      ! get current proc ID
      call MPI_Comm_size(comm, p, ierr)         ! get number of procs

      pi = acos(-1.0)   !  = 3.14159...
      a = 0.0           ! lower limit of integration
      b = pi/2.         ! upper limit of integration
      n = 500           ! number of increment within each process
      tag = 123         ! set the tag to identify this particular job
      h = (b-a)/n/p     ! length of increment

      ai = a + myid*n*h ! lower limit of integration for partition myid
      my_int = integral(ai, h, n) 
      write(*,"('Process ',i2,' has the partial sum of',f10.6)")
     &          myid,my_int

      call MPI_Send(  
     &     my_int, 1, MPI_REAL,   ! buffer, size, datatype
     &     master,     ! where to send message
     &     tag,         ! message tag
     &     comm, ierr)

      if(myid .eq. master) then      ! do following only on master ...
        integral_sum = 0.0           ! initialize integral_sum to zero
        do proc=0,p-1   ! loop on processors to collect local sum
          call MPI_Recv( 
     &       my_int, 1, MPI_REAL, 
     &       proc,     ! message source
     &       tag,         ! message tag
     &       comm, status, ierr)        ! status reports source, tag
          integral_sum = integral_sum + my_int   ! sum my_int from processors
        enddo
        print *,'The integral =',integral_sum
      endif

      call MPI_Finalize(ierr)                           ! MPI finish up ...

      end
      real function integral(ai, h, n)
      implicit none
      integer n, j
      real h, ai, aij

      integral = 0.0                ! initialize integral
      do j=0,n-1                    ! sum integrals
        aij = ai +(j+0.5)*h         ! abscissa mid-point
        integral = integral + cos(aij)*h
      enddo

      return
      end

Example 1.1 C code

#include <mpi.h>
#include <math.h>
#include <stdio.h>
/* Prototype */
float integral(float ai, float h, int n);
void main(int argc, char* argv[])
{
/*######################################################################
 #                                                                     #
 # This is an MPI example on parallel integration to demonstrate the   #
 # use of:                                                             #
 #                                                                     #
 # * MPI_Init, MPI_Comm_rank, MPI_Comm_size, MPI_Finalize              #
 # * MPI_Recv, MPI_Send                                                #
 #                                                                     #
 # Dr. Kadin Tseng                                                     #
 # Scientific Computing and Visualization                              #
 # Boston University                                                   #
 # 1998                                                                #
 #                                                                     #
 #####################################################################*/
      int n, p, myid, tag, proc, ierr;
      float h, integral_sum, a, b, ai, pi, my_int;
      int master = 0;  /* processor performing total sum */
      MPI_Comm comm;
      MPI_Status status;

      comm = MPI_COMM_WORLD;      
      ierr = MPI_Init(&argc,&argv);           /* starts MPI */
      MPI_Comm_rank(comm, &myid);           /* get current process id */
      MPI_Comm_size(comm, &p);               /* get number of processes */

      pi = acos(-1.0);  /* = 3.14159... */
      a = 0.;           /* lower limit of integration */
      b = pi*1./2.;     /* upper limit of integration */
      n = 500;          /* number of increment within each process */
      tag = 123;        /* set the tag to identify this particular job */
      h = (b-a)/n/p;    /* length of increment */

      ai = a + myid*n*h;  /* lower limit of integration for partition myid */
      my_int = integral(ai, h, n);   /* 0<=myid<=p-1 */

      printf("Process %d has the partial integral of %fn", myid,my_int);

      MPI_Send(        /* Send my_int from myid to master */
              &my_int, 1, MPI_FLOAT, 
              master,        /* message destination */
              tag,          /* message tag */
              comm);

      if(myid == master) {
        integral_sum = 0.0;
        for (proc=0;proc<proc++) {   /* for-loop serializes receives */
          MPI_Recv(         /* Receive my_int from proc to master  */
                &my_int, 1, MPI_FLOAT, 
                proc,        /* message source */
                tag,         /* message tag */
                comm, & status);     /* status reports source, tag */
          integral_sum += my_int;
        }
        printf("The Integral =%fn",integral_sum); /* sum of my_int */
      }
      MPI_Finalize();                        /* let MPI finish up ... */
}
float integral(float ai, float h, int n)
{
      int j;
      float aij, integ;

      integ = 0.0;                 /* initialize */
      for (j=0;j<j++) {            /* sum integrals */
        aij = ai + (j+0.5)*h;      /* mid-point */
        integ += cos(aij)*h;
      }
      return integ;
}

Discussion

A number of points can be drawn from the above code:

  1. The assignment of master may be any logical processor number within the range between 0 and p-1. The choice of “0,” however, ensures that it is also valid for single processor applications (i.e., p = 1).
  2. MPI_Send is used to send the message my_int from all processors to the master.
  3. This send is performed concurrently on all processors.
  4. For each point-to-point send, a matching receive on the master is expected.
  5. The matching receive requires the master to call the point-to-point receive MPI_Recv p times, each to receive the my_int sent by a processor.
  6. This effectively serializes an otherwise parallel procedure and is computationally less efficient.
  7. Usually, there is no problems for processors to send messages to the master and for the master to receive messages from other processors. In this example, however, there exists a problematic situation in which MPI_Send and MPI_Recv — keep in mind that both are blocking — try to send and receive on processor 0 at approximately the same time. Both send and receive processes got started but neither process could complete its task due to the other trying to get into the action and hence a deadlock situation arose — at least theorectically. This is because on many of the MPI implementations, a system memory buffer is provided even though this is a blocking operation which is not required by the MPI standard to provide buffer. As a result, deadlock may not occur. Hence, this situation is considered “unsafe”.
  8. Some MPI implementations allow the user to control the memory buffer. On the IBM pSeries, the memory buffer can be defined via the environment variable MP_EAGER_LIMIT. Setting it to 0 provides no buffer which enforces the strict definition of point-to-point blocking communication. The present example on the IBM pSeries deadlocks with setenv MP_EAGER_LIMIT 0. Incidentally, this is a good trick to use to see if your code is “safe”.
  9. tag is one of the parameters used to help define a message uniquely. In this application, myid is sufficient to identify a message. tag is not needed as an additional parameter to define the message uniquely. If multiple messages are sent and received by the same processor, the receiver might need the tag to distinguish among the messages to determine what to do with the message.
  10. In both the send and receive subroutine/function, there is a constant MPI_COMM_WORLD, defined in mpif.h (for fortran) or mpi.h (for C). This is called a communicator. MPI_COMM_WORLD means no processors allocated for this job is excluded for the task (i.e., send or receive) at hand. In some situations, may be only a restricted group of processors (e.g., odd or even numbers) are considered in a message passing operation. In that case, one would create a taylored communicator to take the place of MPI_COMM_WORLD. In all the numerical integration examples, we use only use the MPI_COMM_WORLD communicator.

The shortcomings stated above will be remedied in the next example, Example 1.2.

Example 1 | Example 1.1 | Example 1.2 | Example 1.3 | Example 1.4 | Example 1.5