Programming large-scale parallel systems¶
Intro to MPI (collectives)¶
Contents¶
In this notebook, we will continue learning the basics of parallel computing using the Message Passing Interface (MPI) from Julia. In particular, we will learn:
- How to use basic collective communication directives
- How to use MPI communicators
Collective communication¶
MPI provides collective communication functions for communication involving multiple processes. Some usual collective functions are:
MPI_Barrier: Synchronize all processesMPI_Bcase: Send the same data from one to all processesMPI_Gather: Gather data from all processes to oneMPI_Scatter: Scatter data from one to all all processesMPI_Reduce: Reduce data from all processes to a single oneMPI_Scan: Scan (or prefix) reductionMPI_Allgather: Like aMPI_Gatherbut all processes receive the resultMPI_Allreduce: LikeMPI_Reducebut all processes receive the resultMPI_Alltoall: Exchange data from all to all processes
We will discuss some of them in greater detail in this notebook.
Why collective primitives?¶
Point-to-point communication primitives provide all the building blocks needed in parallel programs and could be used to implement the collective functions described above. Then, why does MPI provide collective communication directives? There are several reasons:
- Ease of use: It is handy for users to have these functions readily available instead of having to implement them.
- Performance: Library implementations typically use efficient algorithms (such as reduction trees).
- Hardware support: Hardware vendors can optimize the MPI library for their machine and even develop hardware specially design to perform MPI operations efficiently. For instance, IMB Blue Gene was equipped with auxiliary networks for MPI collectives.
Semantics of collective operations¶
- Completeness: All the collective communication directives above are complete operations. Thus, it is safe to use and reset the buffers once the function returns.
- Standard mode: Collective directives are in standard mode only, like
MPI_Send. Assuming that they block is erroneous, assuming that they do not block is also erroneous. - Synchronization: Completion of a call does not guarantee that other processes have completed the operation. A collective operation may or may not have the effect of synchronizing all processes, the only exception is
MPI_Barrierof course.
MPI_Barrier¶
This function is used to synchronizes a group of processes. All processes block until all have reached the barrier. It is often invoked at the end of for loops to make sure all processes have finished the current loop iteration to move to the next one. We will see an example later in another notebook when studying the traveling sales person problem (TSP).
In Julia:
MPI.Barrier(comm)
In C:
int MPI_Barrier(MPI_Comm comm)
Example¶
In this example the ranks sleep for a random amount of time and then they call barrier. It is guaranteed that the message "Done!" will be printed after all processes printed "I woke up" since we used a barrier. Try also to comment out the call to MPI.Barrier. You will see that the message can be printed in any order in this case.
code = quote
using MPI
MPI.Init()
comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
sleep(rand(1:3))
println("[rank $rank] I woke up")
MPI.Barrier(comm)
println("[rank $rank] Done!")
end
run(`$(mpiexec()) -np 4 julia --project=. -e $code`);
MPI_Reduce¶
Combines values provided by different processors according to a given reduction operation. The result is received in a single process (called the root process).
In Julia:
MPI.Reduce!(sendbuf, recvbuf, op, comm; root)
In C:
int MPI_Reduce(const void *sendbuf, void *recvbuf, int count,
MPI_Datatype datatype, MPI_Op op, int root,
MPI_Comm comm)
Example¶
Each process generates a random integer. Compute the sum of all these integers. Receive the result on rank 0.
code = quote
using MPI
MPI.Init()
comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
value = rand(1:4)
root = 0
sndbuf = [value]
if rank == root
rcvbuf = [0]
else
# This can be anything as only the root process
# Will receive data
rcvbuf = nothing
end
println("[rank $rank] I am contributing with $sndbuf")
MPI.Reduce!(sndbuf, rcvbuf, +, comm; root)
println("[rank $rank] The sum is $rcvbuf")
end
run(`$(mpiexec()) -np 3 julia --project=. -e $code`);
Reducing multiple values¶
If more than one item is provided in the send buffers, MPI_Reduce does the reduction element by element.
Example¶
Each process generates 2 random integers. Take a look to the result of reduce. Note that the values have ben reduced "element by element".
code = quote
using MPI
MPI.Init()
comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
sndbuf = rand(1:4,2)
root = 0
if rank == root
rcvbuf = [0,0]
else
rcvbuf = nothing
end
println("[rank $rank] I am contributing with $sndbuf")
MPI.Reduce!(sndbuf, rcvbuf, +, comm; root)
println("[rank $rank] The sum is $rcvbuf")
end
run(`$(mpiexec()) -np 3 julia --project=. -e $code`);
MPI_Allreduce¶
This is a variant of MPI_Reduce in which all processes get the result.
Example¶
This is the same example as above, but all processes get the result.
code = quote
using MPI
MPI.Init()
comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
sndbuf = rand(1:4,2)
# All ranks allocate a receive buffer
rcvbuf = [0,0]
println("[rank $rank] I am contributing with $sndbuf")
MPI.Allreduce!(sndbuf, rcvbuf, +, comm)
println("[rank $rank] The sum is $rcvbuf")
end
run(`$(mpiexec()) -np 3 julia --project=. -e $code`);
MPI_Gather¶
Each rank sends a message to the root rank (the root rank also sends a message to itself). The root rank receives all these values in a buffer (e.g. a vector). This function assumes that the amount of data sent from each rank is the same. The root process can be any process and it is rank 0 by default in Julia.
In Julia:
MPI.Gather!(sendbuf, recvbuf, comm; root=0)
In C:
int MPI_Gather(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
void *recvbuf, int recvcount, MPI_Datatype recvtype, int root,
MPI_Comm comm)
Example¶
Each process sends an integer to rank 0.
code = quote
using MPI
MPI.Init()
comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
msg = 10*(rank+2)
root = 0
sndbuf = [msg]
if rank == root
nranks = MPI.Comm_size(comm)
rcvbuf = zeros(Int,nranks)
else
# This can be anything as only the root process
# Will receive data
rcvbuf = nothing
end
println("[rank $rank] I am sending $sndbuf")
MPI.Gather!(sndbuf, rcvbuf, comm; root)
println("[rank $rank] I received $rcvbuf")
end
run(`$(mpiexec()) -np 3 julia --project=. -e $code`);
MPI_Allgather¶
This is a variant of MPI_Gather in which all processes get the result.
Example¶
code = quote
using MPI
MPI.Init()
comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
msg = 10*(rank+2)
sndbuf = [msg]
nranks = MPI.Comm_size(comm)
rcvbuf = zeros(Int,nranks)
println("[rank $rank] I am sending $sndbuf")
MPI.Allgather!(sndbuf, rcvbuf, comm)
# All ranks receive the result
println("[rank $rank] I received $rcvbuf")
end
run(`$(mpiexec()) -np 3 julia --project=. -e $code`);
MPI_Gatherv¶
This function is similar to MPI_Gather, but it is used when there is a different amount of data sent from each process. There is also MPI_Allgatherv for which all processes will receive the result.
Example¶
Each processes sends rank+1 integers to rank 0.
code = quote
using MPI
MPI.Init()
comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
sndbuf = fill(10*rank,rank+1)
root = 0
if rank == root
nranks = MPI.Comm_size(comm)
# Define the amount of data to be received
# from each rank
counts = [ rank+1 for rank in 0:(nranks-1)]
# Total amount of receive data
ndata = sum(counts)
data = zeros(Int,ndata)
rcvbuf = MPI.VBuffer(data,counts)
else
# This can be anything as only the root process
# Will receive data
rcvbuf = nothing
end
println("[rank $rank] I am sending $sndbuf")
MPI.Gatherv!(sndbuf, rcvbuf, comm; root)
if rank == root
println("[rank $rank] I received $data")
end
end
run(`$(mpiexec()) -np 4 julia --project=. -e $code`);
How to get the amount of data to be received?¶
Remember that MPI_Probe allows you to query the amount of data to be received with MPI_Recv. Unfortunately, this mechanism does not work for MPI collectives. A workaround is to first communicate the amount of data to be gathered, allocate the receive buffer accordingly, and then communicate the actual message.
Example¶
Each process sends a random amount of integers to rank 0.
code = quote
using MPI
MPI.Init()
comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
# Get number of ints to send
count = rand(1:4)
# Generate data to send
sndbuf = fill(rank,count)
# Communicate counts
root = 0
if rank == root
nranks = MPI.Comm_size(comm)
counts = zeros(Int,nranks)
else
counts = nothing
end
MPI.Gather!([count], counts, comm; root)
# Allocate receive buffer
if rank == root
ndata = sum(counts)
data = zeros(Int,ndata)
rcvbuf = MPI.VBuffer(data,counts)
else
rcvbuf = nothing
end
# Communicate data
println("[rank $rank] I am sending $sndbuf")
MPI.Gatherv!(sndbuf, rcvbuf, comm; root)
if rank == root
println("[rank $rank] I received $data")
end
end
run(`$(mpiexec()) -np 4 julia --project=. -e $code`);
MPI_Scatter¶
This is the inverse operation of MPI_Gather. The root rank sends a distinct message to each rank.
In Julia:
MPI.Scatter!(sendbuf, recvbuf, comm; root=0)
In C:
int MPI_Scatter(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
void *recvbuf, int recvcount, MPI_Datatype recvtype, int root,
MPI_Comm comm)
Example¶
Rank 0 sends a unique integer to each rank.
code = quote
using MPI
MPI.Init()
comm = MPI.COMM_WORLD
nranks = MPI.Comm_size(comm)
rank = MPI.Comm_rank(comm)
root = 0
rcv = Ref(0)
if rank == root
snd = [10*(i+1) for i in 1:nranks]
println("[rank $rank] I am sending: $snd")
else
# Only the root rank sends data
# so this can be anything
snd = nothing
end
MPI.Scatter!(snd,rcv,comm;root)
println("[rank $rank] I have received: $(rcv[])")
end
run(`$(mpiexec()) -np 4 julia --project=. -e $code`);
MPI_Bcast¶
Similar to MPI_Scatter, but we send the same message to all processes.
In Julia:
MPI.Bcast!(buf, comm; root)
In C:
int MPI_Bcast(void *buffer, int count, MPI_Datatype datatype,
int root, MPI_Comm comm)
Example¶
Rank 0 sends the same integer to all processes.
code = quote
using MPI
MPI.Init()
comm = MPI.COMM_WORLD
nranks = MPI.Comm_size(comm)
rank = MPI.Comm_rank(comm)
root = 0
buffer = Ref(0)
if rank == root
buffer[] = 20
println("[rank $rank] I am sending: $(buffer[])")
end
MPI.Bcast!(buffer,comm;root)
println("[rank $rank] I have received: $(buffer[])")
end
run(`$(mpiexec()) -np 4 julia --project=. -e $code`);
Communicators¶
A key component in MPI programming are so-called communicators. A communicator object has to main purposes:
- To provide an isolated communication context
- To define a group of processes
Communication context¶
When you write an MPI program it is very likely that you are going to use libraries that also use MPI to send messages. Ideally, these libraries should not interfere with application messages. Using tags to isolate the messages send by your application does not solve the problem. MPI communicators fix this problem as they provided an isolated communication context. For instance, MPI_SEND and MPI_RECV specify a communicator. MPI_RECV can only receive messages sent to same communicator. The same is also true for collective communication directives. If two libraries use different communicators, their message will never interfere. In particular it is recommended to never use the default communicator, MPI_COMM_WORLD, directly when working with other libraries. A new isolated communicator can be created with MPI_Comm_dup.
Process groups¶
On the other hand, imagine that we want to use an MPI communication directive like MPI_Gather, but we only want a subset of the processes to participate in the operation. So far, we have used always the default communication MPI_COMM_WORLD, which represents all processes. Thus, by using this communicator, we are including all processes in the operation. We can create other communicators that contain only a subset of processes. To this end, we can use function MPI_Comm_split.
MPI_Comm_dup¶
Duplicate a given communicator, creating a new one with an new communication context.
In Julia:
newcomm = MPI.Comm_dup(comm)
In C:
int MPI_Comm_dup(MPI_Comm comm, MPI_Comm *newcomm)
Example¶
Send value 2 from rank 2 to rank 3. Then send value 1 also from rank 2 to rank 3. Note that we are using 2 different communicators for each send receive pair in such a way that we first receive value 1 and then value 2, even if this not was the other in which the data was sent. Try also to define comm2 = comm and see how the result will change.
code = quote
using MPI
MPI.Init()
comm = MPI.COMM_WORLD
# We create a new communicator
comm2 = MPI.Comm_dup(comm)
rank = MPI.Comm_rank(comm)
if rank == 2
sndbuf = [2]
MPI.Send(sndbuf, comm2; dest=3, tag=0)
sndbuf = [1]
MPI.Send(sndbuf, comm; dest=3, tag=0)
end
if rank == 3
rcvbuf = zeros(Int,1)
MPI.Recv!(rcvbuf, comm; source=2, tag=0)
@show rcvbuf
MPI.Recv!(rcvbuf, comm2; source=2, tag=0)
@show rcvbuf
end
end
run(`$(mpiexec()) -np 4 julia --project=. -e $code`);
MPI_Comm_split¶
Split a given communicator.
In Julia:
newcomm = MPI.Comm_split(comm, color, key)
In C:
int MPI_Comm_split(MPI_Comm comm, int color, int key,
MPI_Comm *newcomm)
There are two key parameters:
color: all processes with the same color will be grouped in the same communicator.key: The processes will be ranked in the new communicator according to key, breaking ties with the rank in the old communicator.
Example¶
Create two groups of processes, one for processes with rank id even, and another for the rest. Note that the rank id will be function of the given communicator.
code = quote
using MPI
MPI.Init()
comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
color = mod(rank,2)
key = 0
newcomm = MPI.Comm_split(comm, color, key)
newrank = MPI.Comm_rank(newcomm)
println("Hello, I am rank $rank in world and rank $newrank in group $color.")
end
run(`$(mpiexec()) -np 4 julia --project=. -e $code`);
Example¶
Processes with odd rank id generate a random integer. Sum all these integers and send them to the first process with odd rank id. Note that we are using a new communicator to perform the reduction.
code = quote
using MPI
MPI.Init()
comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
color = mod(rank,2)
key = 0
newcomm = MPI.Comm_split(comm, color, key)
# Only odd rank ids will execute this
if color == 1
newrank = MPI.Comm_rank(newcomm)
newroot = 0
sndbuf = [rand(1:4)]
if newrank == newroot
rcvbuf = [0]
else
rcvbuf = nothing
end
println("[rank $rank, newrank $newrank] I am contributing with $sndbuf")
MPI.Reduce!(sndbuf, rcvbuf, +, newcomm;root=newroot)
println("[rank $rank, newrank $newrank] The sum is $rcvbuf")
end
end
run(`$(mpiexec()) -np 4 julia --project=. -e $code`);
License¶
This notebook is part of the course Programming Large Scale Parallel Systems at Vrije Universiteit Amsterdam and may be used under a CC BY 4.0 license.