] add BenchmarkTools
using Distributed
using BenchmarkTools
using Printf
if procs() == workers()
addprocs(4)
end
function answer_checker(answer,solution)
if answer == solution
"🥳 Well done! "
else
"It's not correct. Keep trying! 💪"
end |> println
end
alg_1_deps_check(answer) = answer_checker(answer,"b")
alg_2_deps_check(answer) = answer_checker(answer,"d")
Distributed matrix-matrix multiplication¶
Contents¶
In this notebook, we will:
- Parallelize a simple algorithm
- Study the performance of different parallelization strategies
- Implement them using Julia
Problem Statement¶
Let us consider the (dense) matrix-matrix product C=A*B.
Goals¶
We want to
- compute the product in parallel using more than one process (distributed implementation)
- study the performance of different parallelization alternatives
- implement the algorithms using Julia
Assumptions¶
- All matrices
A,B, andCare initially stored in the master process - The result will be overwritten in
C
Steps¶
To develop and study the parallel implementation, we will follow these steps:
- Identify the parts of the sequential algorithm that can be parallelized
- Consider different parallelization strategies
- Discuss the (theoretical) performance of these implementations
Serial implementation¶
We start by considering the (naive) sequential algorithm:
@everywhere function matmul_seq!(C,A,B)
m = size(C,1)
n = size(C,2)
l = size(A,2)
@assert size(A,1) == m
@assert size(B,2) == n
@assert size(B,1) == l
z = zero(eltype(C))
for j in 1:n
for i in 1:m
Cij = z
for k in 1:l
@inbounds Cij += A[i,k]*B[k,j]
end
C[i,j] = Cij
end
end
C
end
Run the following cell to compare the performance of our hand-written function with respect to the built in function mul!
using LinearAlgebra
N = 1000
A = rand(N,N)
B = rand(N,N)
C = rand(N,N)
@btime matmul_seq!(C,A,B)
@btime mul!(C,A,B);
Where do we can exploit parallelism?¶
Look at the three nested loops in the sequential implementation:
for j in 1:n
for i in 1:m
Cij = z
for k in 1:l
@inbounds Cij += A[i,k]*B[k,j]
end
C[i,j] = Cij
end
end
- Loops over
iandjare trivially parallelizable. - The loop over
kcan be parallelized but it requires a reduction.
Parallel algorithms¶
All the entries of matrix C can be potentially computed in parallel, but is it the most efficient solution to solve all these entries in parallel in a distributed system? To find this we will consider different parallelization strategies:
- Algorithm 1: each worker computes a single entry of C
- Algorithm 2: each worker computes a single row of C
- Algorithm 3: each worker computes a block rows of C
Parallel algorithm 1¶
Data dependencies¶
Moving data through the network is expensive and reducing data movement is one of the key points in a distributed algorithm. To this end, we determine which is the minimum data needed by a worker to perform its computations.
In algorithm 1, each worker computes only an entry of the result matrix C.
a) column A[:,i] and row B[j,:]
b) row A[i,:] and column B[:,j]
c) the whole matrices A and B
d) row A[i,:] and the whole matrix B
answer = "x" # replace x with a, b, c, or d
alg_1_deps_check(answer)
Implementation¶
Taking into account the data dependencies, the parallel algorithm 1 can be efficiently implemented following these steps from the worker perspective:
- The worker receives the corresponding row A[i,:] and column B[:,j] from the master process
- The worker computes the dot product of A[i,:] and B[:,j]
- The worker sends back the result of C[i,j] to the master process
A possible implementation of this algorithm in Julia is as follows:
function matmul_dist_1!(C, A, B)
m = size(C,1)
n = size(C,2)
l = size(A,2)
@assert size(A,1) == m
@assert size(B,2) == n
@assert size(B,1) == l
z = zero(eltype(C))
@assert nworkers() == m*n
iw = 0
@sync for j in 1:n
for i in 1:m
Ai = A[i,:]
Bj = B[:,j]
iw += 1
w = workers()[iw]
ftr = @spawnat w begin
Cij = z
for k in 1:l
@inbounds Cij += Ai[k]*Bj[k]
end
Cij
end
@async C[i,j] = fetch(ftr)
end
end
C
end
You can execute the following cells to test this implementation.
using Test
N = 2
A = rand(N,N)
B = rand(N,N)
C = similar(A)
@test matmul_dist_1!(C,A,B) ≈ A*B
Performance¶
Let us study the performance of this algorithm. To this end, we will analyze if algorithm 1 is able to achieve the optimal parallel speedup. The parallel speedup on $P$ processes is defined as
$$ S_P = \frac{T_1}{T_P}, $$
where $T_1$ denotes the runtime of the sequential algorithm on one node and $T_P$ denotes the runtime of the parallel algorithm on $P$ processes. If we run an optimal parallel algorithm with $P$ processes we expect it to run $p$ times faster than the sequential implementation. I.e., the optimal speedup of a parallel algorithm on $p$ processes is equal to $P$:
$$ S^{*}_p = P. $$
The ratio of the actual speedup over the optimal one is called the parallel efficiency
$$ E_p = \frac{S_p}{S^{*}_p} = \frac{T_1/T_P}{P}. $$
Experimental speedup¶
The following cell measures the speedup of parallel algorithm 1. Do we achieve the optimal speedup?
N = 2
A = rand(N,N)
B = rand(N,N)
C = similar(A)
T1 = @belapsed matmul_seq!(C,A,B)
C = similar(A)
TP = @belapsed matmul_dist_1!(C,A,B)
P = nworkers()
println("Speedup = ", T1/TP)
println("Optimal speedup = ", P)
println("Efficiency = ", 100*(T1/TP)/P, "%")
Communication overhead¶
Since communication is usually the main bottleneck in a distributed algorithm, we want to reduce the amount of communication per unit of computation in a worker. Let us compute the (theoretical) communication overhead for algorithm 1. This will help us understand why the speedup of this algorithm was so bad.
Remember, algorithm 1 consisted of these main steps:
- The worker receives the corresponding row A[i,:] and column B[:,j] from the master process
- The worker computes the dot product of A[i,:] and B[:,j]
- The worker sends back the result of C[i,j] to the master process
# TODO multiple choice
# TODO multiple choice
From these results we can conclude:
- The communication complexity is O(N)
- The computation complexity is O(N)
- The ratio communication over computation (the communication overhead) is O(1)
In other words, the communication cost is of the same order of magnitude as the computation cost. Since, communication is orders of magnitude slower in real systems, the runtime in the worker will be dominated by communication. This explains why we obtained such a bad speedup.
Parallel algorithm 2¶
Let us study the next algorithm to see if we can improve the efficiency by augmenting the granularity (i.e. the amount of work) in each parallel task. In parallel algorithm 2, each worker computes an entire row of C.
Data dependencies¶
a) column A[:,i] and row B[j,:]
b) row A[i,:] and column B[:,j]
c) the whole matrices A and B
d) row A[i,:] and the whole matrix B
answer = "x"
alg_2_deps_check(answer)
Implementation¶
These are the main steps of the implementation of algorithm 2:
- The worker receives the corresponding row A[i,:] and matrix B from the master process
- The worker computes the product of row A[i,:] times B
- The worker sends back the result of row C[i,:] to the master process
A possible implementation of this algorithm in Julia is as follows:
function matmul_dist_2!(C, A, B)
m = size(C,1)
n = size(C,2)
l = size(A,2)
@assert size(A,1) == m
@assert size(B,2) == n
@assert size(B,1) == l
z = zero(eltype(C))
@assert nworkers() == m
iw = 0
@sync for i in 1:m
Ai = A[i,:]
iw += 1
w = workers()[iw]
ftr = @spawnat w begin
Ci = fill(z,l)
for j in 1:n
for k in 1:l
@inbounds Ci[j] += Ai[k]*B[k,j]
end
end
Ci
end
@async C[i,:] = fetch(ftr)
end
C
end
Test it using next cell
using Test
N = 4
A = rand(N,N)
B = rand(N,N)
C = similar(A)
@test matmul_dist_2!(C,A,B) ≈ A*B
Experimental speedup¶
N = 4
A = rand(N,N)
B = rand(N,N)
C = similar(A)
T1 = @belapsed matmul_seq!(C,A,B)
C = similar(A)
TP = @belapsed matmul_dist_2!(C,A,B)
P = nworkers()
println("Speedup = ", T1/TP)
println("Optimal speedup = ", P)
println("Efficiency = ", 100*(T1/TP)/P, "%")
Complexity¶
The speedup is still far from the optimal one. Let us study the communication overhead for this algorithm. Remember, algorithm 2 consists in these main steps:
- The worker receives the corresponding row A[i,:] and matrix B from the master process
- The worker computes the product of row A[i,:] times B
- The worker sends back the result of row C[i,:] to the master process
a) O(N) communication and O(N^2) computation
b) O(N^2) communication and O(N^2) computation
c) O(N^3) communication and O(N^3) computation
d) O(N) communication and O(N) computation
answer = "x"
alg_2_complex_check(answer)
The communication and computation cost are still of the same order of magnitude even though we have increased the grain size.
Parallel algorithm 3¶
Let us increase even more the granularity of the parallel tasks by computing several rows of C in a worker. Each worker computes N/P consecutive rows of C, where P is the number of workers.
Data dependencies¶
# TODO multiple choice and checker
Implementation¶
These are the main steps of the implementation of algorithm 3:
- The worker receives the corresponding rows A[rows,:] and matrix B from the master process
- The worker computes the product of A[rows,:] times B
- The worker sends back the result of C[rows,:] to the master process
The implementation of this variant is let as an exercise (see below).
Communication overhead¶
Let us analyze the (theoretical) communication overhead for algorithm 3.
# TODO multiple choice and checker function
In this case, the ratio between communication and computation is O(P/N). If the matrix size N is much larger than the number of workers P, then the communication overhead O(P/N) would be negligible. This opens the door to an scalable implementation.
Summary¶
The table below compares the three parallel algorithms.
| Algorithm | Parallelism (#workers) |
Communication per worker |
Computation per worker |
Ratio communication/ computation |
|---|---|---|---|---|
| 1 | N² | 2N + 1 | N | O(1) |
| 2 | N | 2N + N² | N² | O(1) |
| 3 | P | N² + 2N²/P | N³/P | O(P/N) |
- Matrix-matrix multiplication is trivially parallelizable (all entries in the result matrix can be computed in parallel, at least in theory)
- However, we cannot exploit all the potential parallelism in a distributed system due to communication overhead
- We need a sufficiently large grain size to obtain a near optimal speedup
Exercises¶
Implementation of algorithm 3¶
Implement algorithm 3 in the function below. For simplicity, assume that the number of rows of C is a multiple of the number of workers.
# TODO move solution to a separate notebook and remove part of the implementation here
function matmul_dist_3!(C,A,B)
m = size(C,1)
n = size(C,2)
l = size(A,2)
@assert size(A,1) == m
@assert size(B,2) == n
@assert size(B,1) == l
@assert mod(m,nworkers()) == 0
# Implement here
nrows_w = div(m,nworkers())
@sync for (i,w) in enumerate(workers())
rows_w = (1:nrows_w) .+ (i-1)*nrows_w
Aw = A[rows_w,:]
ftr = @spawnat w begin
Cw = similar(Aw,nrows_w,n)
matmul_seq!(Cw,Aw,B)
Cw
end
@async C[rows_w,:] = fetch(ftr)
end
C
end
Use test-driven development to implement the algorithm. Use this test:
using Test
P = nworkers()
load = 100
N = load*P
A = rand(N,N)
B = rand(N,N)
C = similar(A)
@test matmul_dist_3!(C,A,B) ≈ A*B
Measure the performance of your implementation by running next cell. Do you get close to the optimal speedup?
P = nworkers()
load = 100
N = load*P
A = rand(N,N)
B = rand(N,N)
C = similar(A)
T1 = @belapsed matmul_seq!(C,A,B)
C = similar(A)
TP = @belapsed matmul_dist_3!(C,A,B)
println("Speedup = ", T1/TP)
println("Optimal speedup = ", P)
println("Efficiency = ", 100*(T1/TP)/P, "%")
A more practical version of algorithm 1¶
The implementation of algorithm 1 is very impractical. One needs as many processors as entries in the result matrix C. For 1000 times 1000 matrix one would need a supercomputer with one million processes! We can easily fix this problem by using less processors and spawning the computation of an entry in any of the available processes. See the following code:
function matmul_dist_1_v2!(C, A, B)
m = size(C,1)
n = size(C,2)
l = size(A,2)
@assert size(A,1) == m
@assert size(B,2) == n
@assert size(B,1) == l
z = zero(eltype(C))
@sync for j in 1:n
for i in 1:m
Ai = A[i,:]
Bj = B[:,j]
ftr = @spawnat :any begin
Cij = z
for k in 1:l
@inbounds Cij += Ai[k]*Bj[k]
end
Cij
end
@async C[i,j] = fetch(ftr)
end
end
C
end
With this new implementation, we can multiply matrices of arbitrary size with a fixed number of workers. Test it:
using Test
N = 50
A = rand(N,N)
B = rand(N,N)
C = similar(A)
@test matmul_dist_1_v2!(C,A,B) ≈ A*B
Run the next cell to check the performance of this implementation. Note that we are far away from the optimal speed up. Why? To answer this question compute the theoretical communication over computation ratio for this implementation and reason about the obtained result. Hint: the number of times a worker is spawned in this implementation is N^3/P on average.
N = 100
A = rand(N,N)
B = rand(N,N)
C = similar(A)
P = nworkers()
T1 = @belapsed matmul_seq!(C,A,B)
C = similar(A)
TP = @belapsed matmul_dist_1_v2!(C,A,B)
println("Speedup = ", T1/TP)
println("Optimal speedup = ", P)
println("Efficiency = ", 100*(T1/TP)/P, "%")