In this lesson, MPI_Send() and MPI_Recv() commands are used to perform a repetitive calculation. The calculation, used as a simple example, calculates the gravity anomaly due to a buried sphere of some density contrast. Details about the calculation of the gravity anomaly due to a sphere are found on this geoscience community codes webpage. Briefly, The calculation of the gravity anomaly due to a buried sphere depends on the sphere's location. In this example, the sphere is located at x,y coordinate 0,0. Gravity values are calculated at points around 0,0. Since many points might be calculated, the actual calculation can be divided among several worker nodes, each calculating the gravity anomaly for its assigned set of points. In this case, the points are located on a profile line with varying values of the x-coordinate and with the y-coordinate equal to zero. MPI_Send() and MPI_Recv() are used because we would like the calculations (x, gravity anomaly) to print out in order, starting with the lowest value of x. Message blocking with MPI_Send() and MPI_Recv() accomplishes this task.
The gravity anomaly is specified using additional parameters: density contrast between the sphere and the surrounding rock, depth to the center of the sphere and radius of the sphere. The code output is x-coordinate and gravity anomaly (in mgal).
The main() code is written as a single program multiple domain (SPMD) code. The entire code executes on each available node. In this case there are two types of processors. One processor is the root or master processor. The master processor controls the program flow. The other processors are worker processors. The worker processors get information from the master processor, do the calculations, and return the results of the calculations to the master processor, which outputs the results.
After starting up and initializing parameters, the main() largely consists of an if - else statement, because the code does one set of tasks if the processor is the master node and another set of tasks if the processor is a worker node. If the processor is the master node, it does the following:
min_points = num_gravity_pts/num_workers; //each worker gets at least
extra_points = num_gravity_pts%num_workers; //these points are left over
number_points = (i<=extra_points) ? min_points+1 : min_points;
The above statements ensure that all of the data points are distributed to worker nodes. Based on its assigned rank, the worker gets the minimum number of points plus one extra point or the worker gets the minimum number of points.
Each worker node has the same task, defined in else. These tasks are:
This workflow is described in this flowchart:
Here is one implementation in code:
#include <mpi.h> /* This Header file PROVIDES THE BASIC MPI DEFINITION AND TYPES */
#include <stdio.h>
#include <math.h>
#include <stdlib.h>
#define G 6.67430e-11 // Gravitational constant
#define MASTER 0 /* root node is processor rank 0 */
#define BEGIN 1 /* message type */
#define DONE 4 /*message type*/
// Function to compute gravity anomaly at pt x,y due to a buried sphere centered at 0,0 in mgal
double sphere_gravity_anomaly(double x, double y, double depth, double radius, double density) {
double r = sqrt(x * x + y * y + depth * depth);
double mass = (4.0 / 3.0) * M_PI * pow(radius, 3) * density;
return (G * mass / (r * r))*1e5;
}
/* calculate the gravity anomaly on a profile over
a buried sphere, dividing the task among processors
*/
int main(int argc, char *argv[]) {
int rank, size;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
/* sphere parameters*/
double depth = 1000.0; //meters
double radius = 500.0; //meters
double density = -250.0; //kg/m3
double x,y;
int num_workers = size-1; //because there is a Master node
int min_points,num_gravity_pts, extra_points, number_points, offset;
int destination, source, worker_number, message_tag;
/*data parameters */
num_gravity_pts = 10; //use a small number of points to illustrate workflow
double point_spacing = 1000.0;
if (rank == MASTER) {
// Master distributes work by index
min_points = num_gravity_pts/num_workers; //each worker gets at least
extra_points = num_gravity_pts%num_workers; //these points are left over
offset = 0;
for (int i =1; i< size; i++) {
number_points = (i<=extra_points) ? min_points+1 : min_points;
destination = i;
worker_number = i;
message_tag = BEGIN;
//MPI_Send(&worker_number, 1, MPI_INT, destination, message_tag, MPI_COMM_WORLD);
MPI_Send(&offset,1, MPI_INT, destination, message_tag, MPI_COMM_WORLD);
MPI_Send(&number_points,1, MPI_INT, destination, message_tag, MPI_COMM_WORLD);
printf("Sent to %d offset %d number of points %d\n", destination, offset, number_points);
offset += number_points;
}
for (int i =1; i< size; i++) {
source = i;
message_tag = DONE;
MPI_Recv(&offset, 1, MPI_INT, source, message_tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
MPI_Recv(&number_points, 1, MPI_INT, source, message_tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
double temp[2][number_points];
MPI_Recv(temp, 2.0*number_points, MPI_DOUBLE, source, message_tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
printf("Sent from %d offset %d number of points %d\n", source, offset, number_points);
for (int j=0;j< number_points;j++){
printf("%lf %lf\n", temp[0][j], temp[1][j]);
}
// printdat(number_points, &temp[0[0]]);
}
}else{
//Worker process - calculate gravity anomaly based on index
source = MASTER;
message_tag = BEGIN;
MPI_Recv(&offset,1, MPI_INT, source, message_tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
MPI_Recv(&number_points,1, MPI_INT, source,message_tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
double gravity_anomaly[2][number_points];
for (int i = 0; i< number_points; i++){
x = (-num_gravity_pts/2.0 + (offset + i))*point_spacing;
y = 0.0;
gravity_anomaly[0][i] = x;
gravity_anomaly[1][i] = sphere_gravity_anomaly(x, y, depth, radius, density);
//printf("%lf %lf\n", gravity_anomaly[0][i], gravity_anomaly[1][i]);
}
message_tag = DONE;
MPI_Send(&offset, 1, MPI_INT, source, message_tag, MPI_COMM_WORLD);
MPI_Send(&number_points, 1, MPI_INT, source, message_tag, MPI_COMM_WORLD);
MPI_Send(gravity_anomaly, 2*number_points, MPI_DOUBLE, source, message_tag, MPI_COMM_WORLD);
}
MPI_Finalize();
return 0;
}
To compile this C code, type on the command line:
mpicc -o gravity mpi_gravity_anomaly.c -lm
To run the compiled code using multiple threads on available cpus, type on the command line:
mpirun --use-hwthread-cpus -np 4 gravity
The output of the code is printed to the terminal window. Assuming four processors are used, the output to the terminal window is:
Sent to 1 offset 0 number of points 4
Sent to 2 offset 4 number of points 3
Sent to 3 offset 7 number of points 3
Sent from 1 offset 0 number of points 4
-5000.000000 -0.033602
-4000.000000 -0.051392
-3000.000000 -0.087366
-2000.000000 -0.174733
Sent from 2 offset 4 number of points 3
-1000.000000 -0.436832
0.000000 -0.873664
1000.000000 -0.436832
Sent from 3 offset 7 number of points 3
2000.000000 -0.174733
3000.000000 -0.087366
4000.000000 -0.051392
The print statements about what is sent to and from which worker is just included to demonstrate the bookkeeping using MPI send and receive. Of course, these statements can be deleted in a production code, but they are very useful in debugging the MPI commands and so are included here.
Here is the syntax for MPI_Send and MPI_Recv:
int MPI_Send (
message , /* actual information sent */
length, /* length of the message */
datatype, /* MPI datatype of the message */
destination, /* rank of the processor getting the message */
tag, /* tag helps sort messages - likely an int and the same as in MPI_Recv
MPI_Comm /* almost alway MPI_Comm_World
)
int MPI_Recv (
message , /* actual information received*/
length, /* length of the message */
datatype, /* MPI datatype of the message */
source, /* rank of the processor sending the message */
tag, /* tag helps sort messages - likely an int and the same as in MPI_Recv* /
MPI_Comm, /* almost alway MPI_Comm_World */
MPI_STATUS_IGNORE /* used as a short-cut to avoid using Status, which can be used to manage communications in some codes */
Note how the "message" contains the quantity that is passed from node to node, in this case either a number or an array of numbers. These values are passed as pointers. The number variables are passed by reference (ampersand). MPI_Send() and MPI_Recv() assume the array is passed by reference, so no ampersand is needed (and would cause an error if used).
Note these statements in the second for - loop in the master node task:
MPI_Recv(&offset, 1, MPI_INT, source, message_tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
MPI_Recv(&number_points, 1, MPI_INT, source, message_tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
double temp[2][number_points];
MPI_Recv(temp, 2.0*number_points, MPI_DOUBLE, source, message_tag, MPI_COMM_WORLD, MPI_STATUS_IGNORE);