mpi4py¶
MPI, which stands for Message Passing Interface, is a common library
for parallel programming. There is a package mpi4py
that builds on
the top of MPI, and lets arbitrary python objects be passed between
different processes. These packages are not available from the
Sage distribution. Install openmpi
using your distribution’s
package manager. Then install mpi4py
using
sage: !pip install mpi4py
>>> from sage.all import *
>>> !pip install mpi4py
!pip install mpi4py
Now, the way that MPI works is you start a group of MPI processes, all of the processes run the same code. Each process has a rank, that is a number that identifies it. The following pseudocode indicates the general format of MPI programs.
....
if my rank is n:
do some computation ...
send some stuff to the process of rank j
receive some data from the process of rank k
else if my rank is n+1:
....
Each process looks for what it’s supposed to do (specified by its
rank), and processes can send data and receive data. Let’s give an
example. Create a script with the following code in a file mpi_1.py
from mpi4py import MPI
comm = MPI.COMM_WORLD
print("hello world")
print(f"my rank is: {comm.rank}")
To run it you can do (from the command line in your Sage directory)
mpirun -np 5 ./sage -python mpi_1.py
The command mpirun -np 5
starts 5 copies of a program under MPI. In
this case we have 5 copies of Sage in pure Python mode running the
script mpi_1.py
. The result should be 5 “hello worlds” plus 5 distinct ranks.
The two most important MPI operations are sending and receiving.
Consider the following example which you should put in a script mpi_2.py
from mpi4py import MPI
import numpy
comm = MPI.COMM_WORLD
rank = comm.rank
size = comm.size
v = numpy.array([rank] * 5, dtype=float)
comm.send(v, dest=(rank+1) % size)
data = comm.recv(source=(rank-1) % size)
print(f"my rank is: {rank}")
print("I received this:")
print(data)
The same command as above with mpi_1.py
replaced by mpi_2.py
will
produce 5 outputs. Each process will create an array and pass
it to the next process, where the last process passes to the
first. Note that MPI.size
is the total number of MPI
processes. MPI.COMM_WORLD
is the communication world.
There are some subtleties regarding MPI to be aware of. Small sends
are buffered. This means if a process sends a small object it will
be stored by openmpi and that process will continue its execution
and the object it sent will be received whenever the destination
executes a receive. However, if an object is large, a process will
hang until its destination executes a corresponding receive. In
fact, the above code will hang if [rank]*5
is replaced by
[rank]*500
. It would be better to do
from mpi4py import MPI
import numpy
comm = MPI.COMM_WORLD
rank = comm.rank
size = comm.size
v = numpy.array([rank] * 500, dtype=float)
if comm.rank == 0:
comm.send(v, dest=(rank+1) % size)
if comm.rank > 0:
data = comm.recv(source=(rank-1) % size)
comm.send(v, dest=(rank+1) % size)
if comm.rank == 0:
data = comm.recv(source=size - 1)
print(f"my rank is: {rank}")
print("I received this:")
print(data)
Now, process 0 sends the data to process 1, then waits to receive from
process MPI.size - 1
. Simultaneously, process 1 will send the
data to process 2, then receives the data from process 0. This will
not lock even if the array transmitted is huge.
A common idiom is to have one process, usually the one with rank 0, act as a leader. That process sends data out to the other processes, compute on the results, and decides how much further computation should proceed. Consider the following code
from mpi4py import MPI
import numpy
sendbuf = []
root = 0
comm = MPI.COMM_WORLD
if comm.rank == 0:
m = numpy.random.randn(comm.size, comm.size)
print(m)
sendbuf=m
v = comm.scatter(sendbuf, root)
print("I got this array:")
print(v)
The scatter
command takes a list and evenly divides it amongst all
the processes. Here the root process creates a matrix (which is
viewed as a list of rows) and then scatters it to everybody (root’s
sendbuf
is divided equally amongst the processes). Each process
prints the row it got. Note that the scatter
command is executed by
everyone, but when root executes it, it acts as a send
and a
receive
(root gets one row from itself), while for everyone else it
is just a receive
.
There is a complementary gather
command that collects results from
all the processes into a list. The next example uses scatter
and
gather
together. Now the root process scatters the rows of a
matrix. Each process squares the elements of the row it receives.
The root process then gathers the rows into a new matrix.
from mpi4py import MPI
import numpy
comm = MPI.COMM_WORLD
sendbuf = []
root = 0
if comm.rank == 0:
m = numpy.array(range(comm.size * comm.size), dtype=float)
m.shape = (comm.size, comm.size)
print(m)
sendbuf = m
v = comm.scatter(sendbuf, root)
print("I got this array:")
print(v)
v = v*v
recvbuf = comm.gather(v, root)
if comm.rank == 0:
print(numpy.array(recvbuf))
There is also a broadcast
command that sends a single object to
every process. Consider the following small extension. This is the
same as before, but now at the end, the root process sends everyone
the string “done”, which is printed out.
v = MPI.COMM_WORLD.scatter(sendbuf, root)
print("I got this array:")
print(v)
v = v*v
recvbuf = MPI.COMM_WORLD.gather(v, root)
if MPI.COMM_WORLD.rank == 0:
print(numpy.array(recvbuf))
if MPI.COMM_WORLD.rank == 0:
sendbuf = "done"
recvbuf = MPI.COMM_WORLD.bcast(sendbuf,root)
print(recvbuf)