Message Passing Interface (MPI) is a standardized and portable library specification for communication between parallel processes in distributed memory systems. Julia offers a convenient way to work with MPI for creating efficient parallel and distributed applications. In this tutorial, you will learn how to use MPI from Julia to perform parallel computing tasks.
When you run an MPI-enabled Julia script, MPI takes care of spawning multiple instances of the Julia executable, each acting as a separate process. These workers can communicate with each other using MPI communication functions. This enables parallel processing and distributed computation. Here's a summary of how it works:
-- TODO: insert picture here --
MPI Spawns Processes: The mpiexec command launches multiple instances of the Julia executable, creating separate worker processes. In this example, 4 Julia workers are spawned.
Worker Communication: These workers can communicate with each other using MPI communication functions, allowing them to exchange data and coordinate actions.
Parallel Tasks: The workers execute parallel tasks simultaneously, working on different parts of the computation to potentially speed up the process.
To use MPI in Julia, you'll need the MPI.jl package, and if you intend to run MPI programs in a Jupyter Notebook, you'll also need the MPIClusterManagers package. These packages provide the necessary bindings to the MPI library and cluster management capabilities. To install the packages, open a terminal and run the following commands:
using Pkg
Pkg.add("MPI")
Pkg.add("MPIClusterManagers")
MPI.jl is the Julia interface to MPI. Note that it is not a MPI library by itself. It is just a thin wrapper between MPI and Julia. To use this interface, you need an actual MPI library installed in your system such as OpenMPI or MPICH. Julia downloads and installs a MPI library for you, but it is also possible to use a MPI library already available in your system. This is useful, e.g., when running on HPC clusters. See the documentation of MPI.jl for further details.
Let's start by creating a simple MPI program that prints a message along with the rank of each worker.
Create a new Julia script, for example, mpi_hello_world.jl:
using MPI
# Initialize MPI
MPI.Init()
# Get the default communicator (MPI_COMM_WORLD) for all processes
comm = MPI.COMM_WORLD
# Get the number of processes in this communicator
nranks = MPI.Comm_size(comm)
# Get the rank of the current process within the communicator
rank = MPI.Comm_rank(comm)
# Print a message with the rank of the current process
println("Hello, I am process $rank of $nranks processes!")
# Finalize MPI
MPI.Finalize()
In MPI, a communicator is a context in which a group of processes can communicate with each other. MPI_COMM_WORLD is one of the MPI standard communicators, it represents all processes in the MPI program. Custom communicators can also be created to group processes based on specific requirements or logical divisions.
The rank of a processor is a unique identifier assigned to each process within a communicator. It allows processes to distinguish and address each other in communication operations.
To run MPI applications in parallel, you need a launcher like mpiexec. MPI codes written in Julia are not an exception to this rule. From the system terminal, you can run
$ mpiexec -np 4 mpi_hello_world.jl
In this command, -np 4 specifies the desired number of processes.
But it will probably not work since the version of mpiexec needs to match with the MPI version we are using from Julia. You can find the path to the mpiexec binary you need to use with these commands
julia> using MPI
julia> MPI.mpiexec_path
and then try again
$ /path/to/my/mpiexec -np 4 julia mpi_hello_world.jl
with your particular path.
However, this is not very convenient. Don't worry if you could not make it work! A more elegant way to run MPI code is from the Julia REPL directly, by using these commands:
julia> using MPI
julia> mpiexec(cmd->run(`$cmd -np 4 julia mpi_hello_world.jl`))
Now, you should see output from 4 ranks.
If you want to run your MPI code from a Jupyter Notebook, you can do so using the MPIClusterManagers package.
using MPIClusterManagers
# Distributed package is needed for addprocs()
using Distributed
manager = MPIWorkerManager(4)
addprocs(manager)
@mpi_do block to execute it on the cluster workers:@mpi_do manager begin
using MPI
comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
println("Hello from process $rank")
end
MPI is automatically initialized and finalized within the @mpi_do block.
rmprocs(manager)
MPI provides point-to-point communication using blocking send and receiving functions MPI.send, MPI.recv; or their non-blocking versions MPI.Isend, and MPI.Irecv!. These functions allow individual processes to send and receive data between each other.
Let's demonstrate how to send and receive with an example:
using MPI
MPI.Init()
comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
# Send and receive messages using blocking MPI.send and MPI.recv
if rank == 0
data = "Hello from process $rank !"
MPI.send(data, comm, dest=1)
elseif rank == 1
received_data = MPI.recv(comm, source=0)
println("Process $rank received: $received_data")
end
MPI.Finalize()
In this example, process 0 sends a message using MPI.send, and process 1 receives it using MPI.recv.
To demonstrate asynchronous communication, let's modify the example using MPI.Isend and MPI.Irecv!:
using MPI
MPI.Init()
comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
# Asynchronous communication using MPI.Isend and MPI.Irecv!
if rank == 0
data = "Hello from process $rank !"
request = MPI.Isend(data, comm, dest=1)
# Other computation can happen here
MPI.Wait(request)
elseif rank == 1
received_data = Array{UInt8}(undef, 50) # Preallocate buffer
request = MPI.Irecv!(received_data, comm, source=0)
# Other computation can happen here
MPI.Wait(request)
println("Process $rank received: $(String(received_data))")
end
MPI.Finalize()
In this example, process 0 uses MPI.Isend to send the message asynchronously. This function returns immediately, allowing the sender process to continue its execution. However, the actual sending of data is done asynchronously in the background. Similar to MPI.Isend, MPI.Irecv! returns immediately, allowing the receiver process to continue executing.
MPI.Wait() to ensure the communication is finished before accessing the send or receive buffer.
MPI provides collective communication functions for communication involving multiple processes. Let's explore some of these functions:
Let's illustrate the use of MPI.Gather and MPI.Scatter with an example:
# TODO: check if this runs correctly
using MPI
using Random
MPI.Init()
comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
size = MPI.Comm_size(comm)
# Root processor generates random data
data = rand(rank == 0 ? size * 2 : 0)
# Scatter data to all processes
local_data = Vector{Float64}(undef, 2)
MPI.Scatter!(data, local_data, comm, root=0)
# Compute local average
local_average = sum(local_data) / length(local_data)
# Gather local averages at the root processor
gathered_averages = Vector{Float64}(undef, size)
MPI.Gather!(local_average, gathered_averages, comm, root=0)
if rank == 0
# Compute global average of sub-averages
global_average = sum(gathered_averages) / size
println("Global average: $global_average")
end
MPI.Finalize()
using MPI
using Random
# TODO: check if this runs correctly
MPI.Init()
comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
size = MPI.Comm_size(comm)
# Root processor generates random data
data = rand(rank == 0 ? size * 2 : 0)
# Scatter data to all processes
local_data = Vector{Float64}(undef, 2)
MPI.Scatter!(data, local_data, comm, root=0)
# Compute local average
local_average = sum(local_data) / length(local_data)
# Gather local averages at the root processor
gathered_averages = Vector{Float64}(undef, size)
MPI.Gather!(local_average, gathered_averages, comm, root=0)
if rank == 0
# Compute global average of sub-averages
global_average = sum(gathered_averages) / size
println("Global average: $global_average")
end
MPI.Finalize()
In this example, the root processor generates random data and then scatters it to all processes using MPI.Scatter. Each process calculates the average of its local data, and then the local averages are gathered using MPI.Gather. The root processor computes the global average of all sub-averages and prints it.