Programming large-scale parallel systems¶
Distributed computing with MPI¶
Contents¶
In this notebook, we will learn the basics of parallel computing using the Message Passing Interface (MPI) from Julia. In particular, we will learn:
- How to run parallel MPI code in Julia
- How to use basic collective communication directives
- How to use basic point-to-point communication directives
For further information on how to use MPI from Julia see https://github.com/JuliaParallel/MPI.jl
What is MPI ?¶
- MPI stands for the "Message Passing Interface"
- It is a standardized library specification for communication between parallel processes in distributed memory systems.
- It is the gold-standard for distributed computing in HPC systems since the 90s
- It is huge: the MPI standard has above 1k pages (see https://www.mpi-forum.org/docs/mpi-4.0/mpi40-report.pdf)
- There are several implementations of this standard (OpenMPI, MPICH, IntelMPI)
- The interface is in C and FORTRAN (C++ was deprecated)
- There are Julia bindings via the package MPI.jl https://github.com/JuliaParallel/MPI.jl
Hello-world example¶
] add MPI
using MPI
MPI.Init()
comm = MPI.COMM_WORLD
nranks = MPI.Comm_size(comm)
rank = MPI.Comm_rank(comm)
println("Hello, I am process $rank of $nranks processes!")
Running MPI code¶
Creating MPI processes (aka ranks)¶
- MPI processes are created with the driver program
mpiexec mpiexectakes an application and runs it on different ranks- The application calls MPI directives to communicate between these ranks
- The application can be Julia running your script in particular.
Execution model¶
- MPI programs are typically run with a Single Program Multiple Data (SPMD) model
- But the standard supports Multiple Program Multiple Data (MPMD)
Hello world¶
Julia code typically needs to be in a file to run in with MPI. Let's us write our hello world in a file:
code = raw"""
using MPI
MPI.Init()
comm = MPI.COMM_WORLD
nranks = MPI.Comm_size(comm)
rank = MPI.Comm_rank(comm)
println("Hello, I am process $rank of $nranks processes!")
"""
filename = tempname()*".jl"
write(filename,code);
Now, we can run it
using MPI
mpiexec(cmd->run(`$cmd -np 4 julia --project=. $filename`));
Note that function mpiexec provided by MPI.jl is a convenient way of accessing the mpiexec program that matches the MPI installation used my Julia.
MPIClusterManagers¶
- This package allows you to create Julia workers that can call MPI functions
- This is useful to combine Distributed.jl and MPI.jl
- E.g., we can run MPI code interactively (from a notebook)
- Link: https://github.com/JuliaParallel/MPIClusterManagers.jl
] add MPIClusterManagers
using MPIClusterManagers
using Distributed
if procs() == workers()
nranks = 3
manager = MPIWorkerManager(nranks)
addprocs(manager)
end
@everywhere workers() begin
using MPI
MPI.Init()
comm = MPI.COMM_WORLD
nranks = MPI.Comm_size(comm)
rank = MPI.Comm_rank(comm)
println("Hello, I am process $rank of $nranks processes!")
end
MPI Communicators¶
In MPI, a communicator represents a group of processes that can communicate with each other. MPI_COMM_WORLD (MPI.COMM_WORLD from Julia) is a built-in communicator that represents all processes available in the MPI program. Custom communicators can also be created to group processes based on specific requirements or logical divisions. The rank of a processor is a unique identifier assigned to each process within a communicator. It allows processes to distinguish and address each other in communication operations.
Duplicating a communicator¶
It is a good practice to not using the built-in communicators directly, and use a copy instead with MPI.Comm_dup. Different libraries using the same communicator can lead to unexpected interferences.
Collective communication¶
MPI provides collective communication functions for communication involving multiple processes. Some usual collective directives are:
MPI.Scatter: Distributes data from one process to all processes.MPI.Gather: Gathers data from all processes to a single process.MPI.Bcast: Broadcasts data from one process to all processes.MPI.Barrier: Synchronizes all processes.
See more collective directives available from Julia here: https://juliaparallel.org/MPI.jl/stable/reference/collective/
Scatter¶
TODO explain what this code does
@everywhere workers() begin
comm = MPI.Comm_dup(MPI.COMM_WORLD)
nranks = MPI.Comm_size(comm)
rank = MPI.Comm_rank(comm)
root = 0
rcv = Ref(0)
if rank == root
snd = [10*(i+1) for i in 1:nranks]
println("I am sending: $snd")
else
snd = nothing
end
MPI.Scatter!(snd,rcv,comm;root)
println("I have received: $(rcv[])")
end
Gather¶
TODO explain what this code does
@everywhere workers() begin
comm = MPI.Comm_dup(MPI.COMM_WORLD)
nranks = MPI.Comm_size(comm)
rank = MPI.Comm_rank(comm)
root = 0
snd = 10*(rank+2)
println("I am sending $snd")
rcv = MPI.Gather(snd,comm;root)
if rank == root
println("I have received: $rcv")
end
end
Point-to-Point communication¶
MPI also provides point-to-point communication directives for arbitrary communication between processes. Point-to-point communications are two-sided: there is a sender and a receiver. Here, we will discuss these basic directives:
MPI.Isend, andMPI.Irecv!
which are non-blocking directives.
MPI also offers blocking directives with different blocking behaviors (communication modes). Blocking communication will be discussed later in the course.
Example¶
The first rank generates a message and sends it to the last rank. The last rank receives the message and multiplies it by a coefficient. The last rank sends the result back to the first rank.
@everywhere workers() begin
comm = MPI.Comm_dup(MPI.COMM_WORLD)
rank = MPI.Comm_rank(comm)
nranks = MPI.Comm_size(comm)
snder = 0
rcver = nranks-1
buffer = Ref(0)
if rank == snder
msg = 10*(rank+2)
println("I am sending: $msg")
buffer[] = msg
req = MPI.Isend(buffer,comm;dest=rcver,tag=0)
MPI.Wait(req)
req = MPI.Irecv!(buffer,comm,source=rcver,tag=0)
MPI.Wait(req)
msg = buffer[]
println("I have received: $msg")
end
if rank == rcver
req = MPI.Irecv!(buffer,comm,source=snder,tag=0)
MPI.Wait(req)
msg = buffer[]
println("I have received: $msg")
coef = (rank+2)
msg = msg*coef
println("I am sending: $msg")
buffer[] = msg
req = MPI.Isend(buffer,comm;dest=snder,tag=0)
MPI.Wait(req)
end
end
MPI.Wait() before modifying the send buffer or using the receive buffer.
License¶
TODO: replace link to website
This notebook is part of the course Programming Large Scale Parallel Systems at Vrije Universiteit Amsterdam and may be used under a CC BY 4.0 license.