MPI_Send() and MPI_Recv()

Learn about using message blocking in organizing calculations

Introduction

In this lesson, MPI_Send() and MPI_Recv() commands are used to perform a repetitive calculation. The calculation, used as a simple example, calculates the gravity anomaly due to a buried sphere of some density contrast. Details about the calculation of the gravity anomaly due to a sphere are found on this geoscience community codes webpage. Briefly, The calculation of the gravity anomaly due to a buried sphere depends on the sphere's location. In this example, the sphere is located at x,y coordinate 0,0. Gravity values are calculated at points around 0,0. Since many points might be calculated, the actual calculation can be divided among several worker nodes, each calculating the gravity anomaly for its assigned set of points. In this case, the points are located on a profile line with varying values of the x-coordinate and with the y-coordinate equal to zero. MPI_Send() and MPI_Recv() are used because we would like the calculations (x, gravity anomaly) to print out in order, starting with the lowest value of x. Message blocking with MPI_Send() and MPI_Recv() accomplishes this task.

The gravity anomaly is specified using additional parameters: density contrast between the sphere and the surrounding rock, depth to the center of the sphere and radius of the sphere. The code output is x-coordinate and gravity anomaly (in mgal).

[Top]

How the calculation works with MPI_Send() and MPI_Recv ()

The main() code is written as a single program multiple domain (SPMD) code. The entire code executes on each available node. In this case there are two types of processors. One processor is the root or master processor. The master processor controls the program flow. The other processors are worker processors. The worker processors get information from the master processor, do the calculations, and return the results of the calculations to the master processor, which outputs the results.

After starting up and initializing parameters, the main() largely consists of an if - else statement, because the code does one set of tasks if the processor is the master node and another set of tasks if the processor is a worker node. If the processor is the master node, it does the following:

  • Determine how many data points are sent to each worker node. In this code, a clever method is used to load balance (to make sure each processor does about the same amount of work). An if statement is used to decide how many points each processor gets, depending on the total number of points and regardless of the whether the total number of points is evenly divisible by the number of worker nodes or not.
    
    min_points = num_gravity_pts/num_workers; //each worker gets at least
    extra_points = num_gravity_pts%num_workers; //these points are left over
    
    number_points = (i<=extra_points) ? min_points+1 : min_points;
    
    
    The above statements ensure that all of the data points are distributed to worker nodes. Based on its assigned rank, the worker gets the minimum number of points plus one extra point or the worker gets the minimum number of points.
  • Then the offset (starting point index) and number of points are sent to each worker node.
  • Once this send task is completed, the master node enters a second for-loop to receive the results from each worker node. Since MPI_Recv() is used within a for - loop, the master node waits until the results are received from the first worker before going on to the second worker, and so on. Once the results are received from a node, the master node prints the results to standard output. An additional print statement is added here for testing, to verify the results are received from each worker node. This print statement would be commented-out in an operational code.

Each worker node has the same task, defined in else. These tasks are:

  • Receive the offset and number of points from the master node. Because the commands are "message blocking", nothing happens until these values are received.
  • Each node creates a 2D array called gravity_anomaly that will hold the x coordinate value and the calculated gravity value.
  • The code then calls the function to calculate the gravity anomaly for each point (x-coordinate)) in its set of points.
  • Each worker node then sends of offset, number of points and array of calculated values to the master node.

This workflow is described in this flowchart:

gravity flowchart

C Gravity code

Here is one implementation in code:


#include <mpi.h>   /* This Header file PROVIDES THE BASIC MPI DEFINITION AND TYPES */
#include <stdio.h>
#include <math.h>
#include <stdlib.h>

#define G 6.67430e-11 // Gravitational constant
#define MASTER 0  /* root node is processor rank 0 */
#define BEGIN 1 /* message type */
#define DONE 4 /*message type*/

// Function to compute gravity anomaly at pt x,y due to a buried sphere centered at 0,0 in mgal
double sphere_gravity_anomaly(double x, double y, double depth, double radius, double density) {
    double r = sqrt(x * x + y * y + depth * depth);
    double mass = (4.0 / 3.0) * M_PI * pow(radius, 3) * density;
    return (G * mass / (r * r))*1e5;
}


/* calculate the gravity anomaly on a profile over 
   a buried sphere, dividing the task among processors
  */
int main(int argc, char *argv[]) {
    int rank, size;
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    /* sphere parameters*/
    double depth = 1000.0; //meters
    double radius = 500.0; //meters
    double density = -250.0; //kg/m3
    double x,y;
    
    int num_workers = size-1;  //because there is a Master node
    int min_points,num_gravity_pts, extra_points, number_points, offset;
    int destination, source, worker_number, message_tag;
    
    /*data parameters */
    num_gravity_pts = 10; //use a small number of points to illustrate workflow
    double point_spacing = 1000.0;
    
   
   if (rank == MASTER) {
        // Master distributes work by index
        min_points = num_gravity_pts/num_workers; //each worker gets at least
        extra_points = num_gravity_pts%num_workers; //these points are left over
        
        offset = 0;
        
        for (int i =1; i< size; i++) {
          number_points = (i<=extra_points) ? min_points+1 : min_points;
        	   
        	   destination = i;
        	   worker_number = i;
        	   message_tag = BEGIN;
        	   
        	   //MPI_Send(&worker_number, 1, MPI_INT, destination, message_tag, MPI_COMM_WORLD);
        	   MPI_Send(&offset,1, MPI_INT, destination, message_tag, MPI_COMM_WORLD);
        	   MPI_Send(&number_points,1, MPI_INT, destination, message_tag, MPI_COMM_WORLD);
        	   
        	   printf("Sent to %d offset %d number of points %d\n", destination, offset, number_points);
        	   
        	   offset += number_points;
        }
        
        for (int i =1; i< size; i++) {
        	   
        	   source = i;
        	   message_tag = DONE;
        	   
        	   MPI_Recv(&offset, 1, MPI_INT, source, message_tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
        	   MPI_Recv(&number_points, 1, MPI_INT, source, message_tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
        	   
        	   double temp[2][number_points];
        	   MPI_Recv(temp, 2.0*number_points, MPI_DOUBLE, source, message_tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
             printf("Sent from %d offset %d number of points %d\n", source, offset, number_points);
             for (int j=0;j< number_points;j++){
                printf("%lf %lf\n", temp[0][j], temp[1][j]);
             } 
            // printdat(number_points, &temp[0[0]]);

        }
        
        
        
   }else{
   //Worker process - calculate gravity anomaly based on index
        source = MASTER;
        message_tag = BEGIN;
        MPI_Recv(&offset,1, MPI_INT, source, message_tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
        MPI_Recv(&number_points,1, MPI_INT, source,message_tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
        double gravity_anomaly[2][number_points];
        for (int i = 0; i< number_points; i++){
               x = (-num_gravity_pts/2.0 + (offset + i))*point_spacing;
               y = 0.0;
               gravity_anomaly[0][i] = x;
               gravity_anomaly[1][i] = sphere_gravity_anomaly(x, y, depth, radius, density); 
               //printf("%lf %lf\n", gravity_anomaly[0][i], gravity_anomaly[1][i]); 
        }
       
        message_tag = DONE;
        
        MPI_Send(&offset, 1, MPI_INT, source, message_tag, MPI_COMM_WORLD);
        MPI_Send(&number_points, 1, MPI_INT, source, message_tag, MPI_COMM_WORLD);
        MPI_Send(gravity_anomaly, 2*number_points, MPI_DOUBLE, source, message_tag, MPI_COMM_WORLD);
   
   }
  
  MPI_Finalize();
  return 0;
  
}

[Top]

Compiling and running the code

To compile this C code, type on the command line:


mpicc -o gravity mpi_gravity_anomaly.c -lm

To run the compiled code using multiple threads on available cpus, type on the command line:


 mpirun --use-hwthread-cpus -np 4 gravity

The output of the code is printed to the terminal window. Assuming four processors are used, the output to the terminal window is:


Sent to 1 offset 0 number of points 4
Sent to 2 offset 4 number of points 3
Sent to 3 offset 7 number of points 3
Sent from 1 offset 0 number of points 4
-5000.000000 -0.033602
-4000.000000 -0.051392
-3000.000000 -0.087366
-2000.000000 -0.174733
Sent from 2 offset 4 number of points 3
-1000.000000 -0.436832
0.000000 -0.873664
1000.000000 -0.436832
Sent from 3 offset 7 number of points 3
2000.000000 -0.174733
3000.000000 -0.087366
4000.000000 -0.051392

The print statements about what is sent to and from which worker is just included to demonstrate the bookkeeping using MPI send and receive. Of course, these statements can be deleted in a production code, but they are very useful in debugging the MPI commands and so are included here.

Additional Explanation

Here is the syntax for MPI_Send and MPI_Recv:

int MPI_Send (
                      message ,    /* actual information sent */
                      length,         /* length of the message */
                      datatype,   /* MPI datatype of the message */
                      destination, /* rank of the processor getting the message */
                      tag,             /* tag helps sort messages - likely an int and the same as in MPI_Recv
                      MPI_Comm  /* almost alway MPI_Comm_World 
                      )   

int MPI_Recv (
                      message ,     /* actual information received*/
                      length,           /* length of the message */
                      datatype,     /* MPI datatype of the message */
                      source,         /* rank of the processor sending the message */
                      tag,               /* tag helps sort messages - likely an int and the same as in MPI_Recv* /
                      MPI_Comm,  /* almost alway MPI_Comm_World */
                      MPI_STATUS_IGNORE /* used as a short-cut to avoid using Status, which can be used to manage communications in some codes */

Note how the "message" contains the quantity that is passed from node to node, in this case either a number or an array of numbers. These values are passed as pointers. The number variables are passed by reference (ampersand). MPI_Send() and MPI_Recv() assume the array is passed by reference, so no ampersand is needed (and would cause an error if used).

Note these statements in the second for - loop in the master node task:


            MPI_Recv(&offset, 1, MPI_INT, source, message_tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
        	   MPI_Recv(&number_points, 1, MPI_INT, source, message_tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
        	   
        	   double temp[2][number_points];
        	   MPI_Recv(temp, 2.0*number_points, MPI_DOUBLE, source, message_tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);

In this second for - loop the master node waits to get the number of points and offset back from the worker node. At this point in code execution, the master node does not "remember" the offset and number of points sent to each worker node. It is safest to get this information back from the worker node. Once the number of points is known, the master node creates an array of proper dimension to hold the message (the calculated array of values) to be received from each worker. The array message (temp) is then received by the master.