CPSC 441, Fall 2018
About MPI


MPI is simply a specification of a set of functions, and there are several implementations of the specification. We will be using one called OpenMPI. It is suitable for distributed processing on a network of workstations. When you run an MPI program under OpenMPI, you can ask it to run copies of the program on several different computers. Once a copy of the program has been started by OpenMPI on each computer, those running processes can use MPI to communicate with processes running other copies of the same program.

The web site https://www.open-mpi.org/ has a lot of information about MPI and OpenMPI, including version 1.10 documentation, which is the version we are using. However, this handout should have everything that you need to know for our OpenMPI labs.


Using OpenMPI: One-time Setup

OpenMPI runs most easily in an environment where you have the same shared home directory on all of the computers that will be used, and you set up passwordless login to those computers using ssh. For us, that means that you need to use OpenMPI in your netXX account. Before you used OpenMPI, you should have that account set up for passwordless login to other lab computers, and all the computers that you want to use should already be in your known_hosts file. You might have already set this up for the distributed Mandelbrot program in Lab 7. If not, you should do it before working with OpenMPI. (The instructions for passwordless login were at the very beginning of Lab 1. A known_hosts file containing all the cslab and csfac computers can be found in /classes/cs441/known_hosts; you can simply copy it into the .ssh directory in your netXX account.)


Using OpenMPI: Compiling and Running Programs

We will use MPI programs written in C. (The alternative would be FORTRAN, or C++.) The command for compiling an MPI program written in C is mpicc. It is used in the same way as the usual Linux C compiler, gcc. For example, if you want to compile an MPI program named hello_mpi.cc written in C, use:

         mpicc -o hello_mpi hello_mpi.c

The -o option specifies the name of the compiled program. As usual, if you leave out the -o option, the compiled program will be named a.out. (For a C++ program, you would use mpiCC instead of mpicc.) It might be necessary to add some libraries to the command. In particular, if math functions from math.h are used, you need to add "-lm" to the end of the command. For example,

         mpicc  -o primes1  primes1.c  -lm

Once you have a compiled MPI program, you can execute it using the mpirun command. We will always use mpirun with a "hostfile" that lists the available computers and the maximum number of processes that can be started on each computer. The command line for mpirun specifies the host files, the number of copies of the program to run, and the name of the program. For example, to run 10 copies of the compiled MPI program hello_mpi, with a hostfile named mpihosts, use:

            mpirun -hostfile mpihosts -n 10 hello_mpi

It's possible to add command-line arguments for the program. When you do this, each copy of the program gets a copy of the command-line arguments. The program can access them in the usual way, using the parameters argc and argv to the main() routine, but you should only do this after calling MPI_Init(argc,argv), since MPI does do some funny things with the command-line. For example, if you want to run a multiprocessing program named primes1 that can take an integer as a command-line argument, you can run it with a command like:

          mpirun -hostfile mpihosts -n 16 primes1 200000

If you only want to run MPI on a subset of the computers listed in the hostfile, you can list the computers that you want to use with the "-H" option. The argument is a comma-separated list of computer names, with no spaces. For example,

          mpirun -hostfile mpihosts -H csfac1,csfac2,csfac3 -n 12 primes1

MPI Functions

We will use only a small subset of all the available MPI functions in this course. However, this subset has enough power to write a full range of message-passing, parallel-processing programs.

The four most basic commands are MPI_Init, MPI_Finalize, MPI_Comm_size and MPI_Comm_rank. These commands are not covered here, but they are illustrated in the sample program hello_mpi.c. They are used in basically the same way in almost every program. Note that the MPI_Comm_size is used to find out how many processes are running in the MPI job. Each of those processes is given a number, called its rank. MPI_Comm_rank returns the rank of the process that calls it. Thus, this function returns a different value in each process. All of the processes are running the same program, but they can do different things because they each have a different rank.

Most MPI functions return an error code as the return value. However, the default behavior is for the MPI system itself to abort the program when an error occurs, so that you never see the error code. We will not use error codes in any of our programs. (It's possible to change the default behavior so that errors are reported instead of causing program termination, but we won't worry about that.)

The rest of this page contains information about other MPI functions that you should know about. There are two classes of function: those related to point-to-point communication between two processes (MPI_Send, MPI_Recv, and MPI_Iprobe) and those related to collective communications that involve all processes (MPI_Barrier, MPI_Bcast, MPI_Reduce, MPI_Gather, MPI_Allgather, and MPI_Scatter). Finally, there are two miscellaneous functions, MPI_Wtime and MPI_Abort.

MPI_Send
Prototype:
     int MPI_Send(
           void*        message,          // Pointer to data to send
           int          count,            // Number of data values to send
           MPI_Datatype datatype,         // Type of data (e.g. MPI_INT)
           int          destination_rank, // Rank of process to receive message
           int          tag,              // Identifies message type
           MPI_Comm     comm              // Use MPI_COMM_WORLD
         )
Explanation: Sends a message from the process that executes this command to another process. The receiver is specified by giving the destination_rank. (Recall that each process has a number between zero and process_count minus one.) The message consists of one or more items, all of which must be of the same type. The type of data is specified in the datatype parameter, which must be a constant such as MPI_INT, MPI_DOUBLE, or MPI_CHAR. (You will probably use either MPI_INT or MPI_DOUBLE.) The data type specified should match the actual type of the message parameter.

The actual data is referenced by the message parameter, which must be a pointer to the start of the data. If the data is a single value stored in a variable named data, for example, pass the address of the variable, &data. If the data consists of several values in an array named data_array, just pass the array name (with no "&" operator) -- remember that an array in C/C++ is already a pointer.

The tag is a non-negative integer that you make up and use in your program to identify each different type of message that you might want to send. The tag allows the recipient to differentiate between different types of messages, so they can be processed differently. You can use any value you like. If you just have one type of message, the value of tag is irrelevant.

On the system that we are using, MPI_Send just drops the message into an outgoing message queue and returns immediately. The message is actually delivered by a separate process. On other systems, however, MPI_Send might block and fail to return until the message is actually sent. (This is unfortunate because it means that a program that works on one system might fail on another. MPI has many more specialized send functions that give more control.)

The last parameter to MPI_Send is of type MPI_Comm. This is a "communicator," which identifies some particular defined group of processes. MPI_COMM_WORLD is an MPI constant that represents the group that contains every process in the virtual machine. We will not work with other groups, so you should always specify MPI_COMM_WORLD as the value of a parameter of type MPI_Comm. Almost every MPI function has such a parameter; I will not mention them again.

The return value of the function is a error code, which we will always ignore, as discussed above.

Example(s):
     int count;
     double results[5];
        .
        .  // Compute values for count and for the results array
        .
     MPI_Send( &count, 1, MPI_INT, 0, 1, MPI_COMM_WORLD );
           // send 1 int to process 0 with identification tag 1
     MPI_Send( results, 5, MPI_DOUBLE, 0, 2, MPI_COMM_WORLD );
           // Now send 5 doubles to process 0 with ID tag 2 --
           // the different tag indicates a different type of message.

MPI_Recv
Prototype:
     int MPI_Recv(
           void*        message,     // Points to location in memory where
                                     //     received message is to be stored.
           int          count,       // MAX number of data values to accept 
           MPI_Datatype datatype     // Type of data (e.g. MPI_INT)  
           int          source_rank, // Rank of process to receive from
                                     //    (Use MPI_ANY_SOURCE to accept
                                     //     from any sender)
           int          tag,         // Type of message to receive
                                     //    (Use MPI_ANY_TAG to accept any type)
           MPI_Comm     comm,        // Use MPI_COMM_WORLD
           MPI_Status*  status       // To receive info about the message
         )
Explanation: Receive a message sent by another process using MPI_Send (or certain other send functions that aren't covered here). This function will block until a message arrives. When it returns, the message that was received has been placed in the memory specified by the message parameter. This parameter will ordinarily be specified as an array name or as &data where data is a simple variable.

The count specifies the maximum number of data values that can be accepted. It should be 1 for a simple variable; for an array, it can be as large as the array size. The actual number of values received can be fewer than count. Usually, you know how many values you will get. If not, there a function, MPI_Get_count, that can be used to find out the actual number received.

If source_rank is MPI_ANY_SOURCE, this function will accept a message from any other process. If source_rank is a process number, then it will only accept a message from that process. (If there are messages waiting from other processes, they will be ignored -- they are not errors; the will just wait in the incoming message queue and they can be read later with other calls to MPI_Recv.) If tag is MPI_ANY_TAG, than any message will be accepted from the specified source. Otherwise, only a message with a matching tag will be accepted. (A message's tag is specified by the sending process in the MPI_Send command; see above.)

The status parameter is a struct that is filled with information about the message when it is received. Usually, you will declare a variable named status of type MPI_Status and pass its address, &status, to the function. After the function returns, status.MPI_SOURCE contains the number of the process that sent the message, and status.MPI_TAG contains the message's tag. You might need this information if you used MPI_ANY_SOURCE or MPI_ANY_TAG.

Example(s):
     int count_received;
     double results_received[5];
     int stuff[100];
     MPI_status status;
     MPI_Recv( &count_received, 1, MPI_INT, 17, 1, MPI_COMM_WORLD, &status );
           // Receive 1 int from process 17 in a message with tag 1
     MPI_Recv( results_received, 5, MPI_DOUBLE, 17, 2, MPI_COMM_WORLD, &status );
           // Receive 5 doubles from process 17 in a message with tag 2
     MPI_Recv( stuff, 100, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status );
           // Receive up to 100 ints from any source in a message with any tag;
           // status.MPI_SOURCE is the message source and status.MPI_TAG is its tag

MPI_Iprobe
Prototype:
     int MPI_Iprobe(
           int          source_rank, // Rank of process to receive from
                                     //    (Use MPI_ANY_SOURCE to accept
                                     //     from any sender)
           int          tag,         // Type of message to receive
                                     //    (Use MPI_ANY_TAG to accept any type)
           MPI_Comm     comm,        // Use MPI_COMM_WORLD
           int*         flag,        // Tells if a message is available
           MPI_Status*  status       // To receive info about the message
         )
Explanation: This function allows you to check whether a message is available, without blocking to wait for that message. The function returns immediately (which is what the "I" in the name stands for). If a message that matches source_rank and tag is available, the value of flag is set to true. In this case, you can call MPI_Recv to receive the message, and you can be sure that it won't block. If no such message is available at the current time, the value of flag is set to false. If a message is available, the status parameter is filled with information about the message in exactly the same way that it would be in the MPI_Recv function. The actual flag parameter in the function call will almost certainly be of the form &test were test is a variable of type int.
Example(s):
    MPI_Status status;
    int flag;
    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, &status );
    if (flag) {
        // Receive message with MPI_Recv and do something with it.
    }
    else {
        // Do other work.
    }

MPI_Barrier
Prototype:
     int MPI_Barrier(
           MPI_Comm   comm    // Use MPI_COMM_WORLD
         )
Explanation: This is the simplest collective communication function. In a collective communication, every process must call the function. Otherwise, an error occurs. For example, the program might hang, waiting forever for one of the processes to call the function.

When a function calls MPI_Barrier, it will simply wait until every other process has also called the same function. At that time, all the processes start running again. The point of this function is simply to synchronize the processes by bringing them all to a certain point in the program before allowing them to continue. In practice, you will probably not use this function, since the other collective communication functions do the same sort of synchronization implicitly.


MPI_Bcast
Prototype:
     int MPI_Bcast(
           void*        message,   // Pointer to location of data
           int          count,     // Number of data values to broadcast
           MPI_Datatype datatype,  // Type of data (e.g. MPI_INT)
           int          root,      // Process that originally has the data
           MPI_Comm     comm       // Use MPI_COMM_WORLD
         )
           
Explanation: Broadcasts a message from one "root" process to every other process. Every process -- both the sender and all the receivers -- must call this function, or an error occurs. The rank of the process that originally has the data is specified in the root parameter. In this process, the message parameter specifies the data to be sent (as in an MPI_Send command). In every other process, the message parameter specifies where the incoming message is to be stored. Note that although all processes call the same function, the meaning is different for the sender than it is for the receivers.

As usual, the first parameter will ordinarily be either the name of an array or the address (&data) of a simple variable. The datatype parameter should match the actual type of the message parameter. The count should be the actual number of data values to be broadcast -- 1 for a simple variable or anything up to the array size for an array.

Example(s):
     int initialdata[20];
     if (my_rank == 0) {
         // Fill the initialdata array with some data.
         // Only process 0 does this.
     }
     MPI_Bcast( initialdata, 20, MPI_INT, 0, MPI_COMM_WORLD );
        // Now, every process has the same data in its initialdata array.

MPI_Reduce
Prototype:
     int MPI_Reduce(
           void*        value,      // Input value from this process
           void*        answer,     // Result -- on root process only
           int          count,      // Number of values -- usually 1
           MPI_Datatype datatype,   // Type of data (e.g. MPI_INT)         
           MPI_Op       operation,  // What to do (e.g. MPI_SUM)           
           int          root,       // Process that receives the answer    
           MPI_Comm     comm        // Use MPI_COMM_WORLD                  
         )
Explanation: This function is used when each process has a data value, and it is necessary to combine the values form different process in some way, such as adding them all up or finding the minimum. Every process must call this function. Each function passes its own input value as the first parameter. The answer is computed and is placed in the answer parameter on only one process. The process that is to receive the final answer is called the root and is specified in the root parameter. For other processes, the value of the answer parameter after the function is called is undefined (but they still have to specify the parameter). As usual, the first and second parameters can be either array names or addresses of simple variables.

The datatype parameter specifies the type of data to be operated on. The operation parameter specifies how the individual values are to be combined. Possible operations include: MPI_SUM, MPI_PROD, MPI_MIN, MPI_MAX, and others.

The count parameter specifies the number of values on each process. If count is greater than 1, then the first two parameters should be arrays, and the operation is done on each array location separately.

Note that every process must call this function, or an error occurs.

Example(s):
     double my_total;      // value computed by each process
     double overall_total; // total of values from all the processes
        .
        .  // Compute a value for my_total
        .
     MPI_Reduce( &my_total, &overall_total, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD );
     if ( my_rank == 0 ) {
           // Only process 0 knows the overall answer!
        printf("The final result is %f\n", overall_total);
     }

MPI_Gather
Prototype:
     int MPI_Gather(
           void*        sendbuffer, // Contains data values to send
           int          sendcount,  // Number of values sent from each process --
                                    //    explanation below assumes sendcount is 1
           MPI_Datatype sendtype,   // Type of data (e.g. MPI_INT)         
           void*        recvbuffer, // Collected data stored here -- on root only
           int          recvcount,  // Use same value as sendcount
           MPI_Datatype recvtype,   // Use same value as sendtype        
           int          root,       // Process that receives the answer    
           MPI_Comm     comm        // Use MPI_COMM_WORLD                  
         )
Explanation: This function is used when each process has a data value, and it is necessary to collect all the data values from the various processes and combine them into an array on one process. The process where the data is gathered is called the root and is specified by the root parameter. Before this function is executed, each process -- including the root -- has one data value, which is stored in sendbuffer. The sendbuffer will usually be specified as the address of a simple variable. The recvbuffer must be specified by each process, but it is only used in the root process. It should be an array whose length is at least equal to the total number of processes. After the function returns, the array will contain the data from all the processes, with the value from process 0 in position 0, the value from process 1 in position 1, and so on.

(If sendcount is greater than 1, then sendbuffer should be an array of size sendcount. All the values from the arrays on all the processes are then collected and stored in the recvbuffer, which must be large enough to hold all the data.)

Note that every process must call this function, or an error occurs.

Example(s):
     int my_data;    // Data value computed by this process
     int all_data[process_count];  // Data from all processes
        .
        . Generate a value for my_data
        .
     MPI_Gather( &my_data, 1, MPI_INT, all_data, 1, MPI_INT, 0, MPI_COMM_WORLD );
        // Now process 0 has all the data in its all_data array.

MPI_Allgather
Prototype:
     int MPI_Allgather(
           void*        sendbuffer, // Contains data values to send
           int          sendcount,  // Number of values sent from each process --
                                    //    explanation below assumes sendcount is 1
           MPI_Datatype sendtype,   // Type of data (e.g. MPI_INT)         
           void*        recvbuffer, // Collected data stored here on root only
           int          recvcount,  // Use same value as sendcount
           MPI_Datatype recvtype,   // Use same value as sendtype        
           MPI_Comm     comm        // Use MPI_COMM_WORLD                  
         )
Explanation: Does the same thing as MPI_Gather, except that the combined data is distributed to all processes. That is, after MPI_Allgather executes, the recvbuffer on every process contains the complete set of data from all the processes.

MPI_Scatter
Prototype:
     int MPI_Scatter(
           void*        sendbuffer, // An array of data values to send -- on root only
           int          sendcount,  // Number of values sent to each process --
                                    //    explanation below assumes sendcount is 1
           MPI_Datatype sendtype,   // Type of data (e.g. MPI_INT)         
           void*        recvbuffer, // Location to receive data value on each process
           int          recvcount,  // Use same value as sendcount
           MPI_Datatype recvtype,   // Use same value as sendtype        
           int          root,       // Process that receives the answer    
           MPI_Comm     comm        // Use MPI_COMM_WORLD                  
         )
Explanation: This function is an inverse to MPI_Gather. It takes an array of data values on the root process and distributes on value from the array to each process. The size of the array should be equal to the number of processes. Note that the sendcount parameter is the number of data values to send to each process, which I am assuming is 1. (If it is more than one, then a block of data values is sent to each process.)

MPI_Wtime
Prototype:
     double MPI_Wtime()        // No parameters
Explanation: This function allows you to time activities in a single process. It's not a communication function -- it just works on the single process where it is called. It gives the number of seconds that have elapsed since some reference time in the past. Note that it gives elapsed time, not compute time. (The elapsed time is called the "wall time," hence the "W" in the name of the function.)
Example(s):
     double start_time = MPI_Wtime();
       .
       .  // do some work
       .
     double elapsed_time = MPI_Wtime() - start_time;

MPI_Abort
Prototype:
     int MPI_Abort(
           MPI_Comm  comm,       // Use MPI_COMM_WORLD
           int       error_code  // Use 1
         )
Explanation: If you encounter an error or some other condition that makes you want to shut down your entire program, you can call MPI_Abort to do so. It is not sufficient to call the usual exit() function, since that will only abort the one process on which it is executed, and it will do so without issuing the required call to MPI_Finalize. MPI_Abort, on the other hand, will properly shut down all the processes. The error_code is returned to the environment just as the parameter of exit() would be. In practice, you will probably not need to use this function.
Example(s):
     MPI_Abort( MPI_Comm_World, 1 );


David Eck