] add BenchmarkTools
using Distributed
using BenchmarkTools
using Printf
if procs() == workers()
addprocs(4)
end
function answer_checker(answer,solution)
if answer == solution
"🥳 Well done! "
else
"It's not correct. Keep trying! 💪"
end |> println
end
alg_1_deps_check(answer) = answer_checker(answer,"b")
alg_1_comm_overhead_check(answer) = answer_checker(answer, "c")
alg_1_comp_check(answer) = answer_checker(answer, "a")
alg_2_complex_check(answer) = answer_checker(answer, "b")
alg_2_deps_check(answer) = answer_checker(answer,"d")
alg_3_deps_check(answer) = answer_checker(answer, "c")
alg_3_complex_check(answer) = answer_checker(answer, "d")
In this notebook, we will:
Let us consider the (dense) matrix-matrix product C=A*B.
We want to
A,B, and C are initially stored in the master processCTo develop and study the parallel implementation, we will follow these steps:
We start by considering the (naive) sequential algorithm:
@everywhere function matmul_seq!(C,A,B)
m = size(C,1)
n = size(C,2)
l = size(A,2)
@assert size(A,1) == m
@assert size(B,2) == n
@assert size(B,1) == l
z = zero(eltype(C))
for j in 1:n
for i in 1:m
Cij = z
for k in 1:l
@inbounds Cij += A[i,k]*B[k,j]
end
C[i,j] = Cij
end
end
C
end
Run the following cell to compare the performance of our hand-written function with respect to the built in function mul!
using LinearAlgebra
N = 1000
A = rand(N,N)
B = rand(N,N)
C = rand(N,N)
@btime matmul_seq!(C,A,B)
@btime mul!(C,A,B);
Look at the three nested loops in the sequential implementation:
for j in 1:n
for i in 1:m
Cij = z
for k in 1:l
@inbounds Cij += A[i,k]*B[k,j]
end
C[i,j] = Cij
end
end
i and j are trivially parallelizable.k can be parallelized but it requires a reduction.All the entries of matrix C can be potentially computed in parallel, but is it the most efficient solution to solve all these entries in parallel in a distributed system? To find this we will consider different parallelization strategies:
Moving data through the network is expensive and reducing data movement is one of the key points in a distributed algorithm. To this end, we determine which is the minimum data needed by a worker to perform its computations.
In algorithm 1, each worker computes only an entry of the result matrix C.
a) column A[:,i] and row B[j,:]
b) row A[i,:] and column B[:,j]
c) the whole matrices A and B
d) row A[i,:] and the whole matrix B
answer = "x" # replace x with a, b, c, or d
alg_1_deps_check(answer)
Taking into account the data dependencies, the parallel algorithm 1 can be efficiently implemented following these steps from the worker perspective:
A possible implementation of this algorithm in Julia is as follows:
function matmul_dist_1!(C, A, B)
m = size(C,1)
n = size(C,2)
l = size(A,2)
@assert size(A,1) == m
@assert size(B,2) == n
@assert size(B,1) == l
z = zero(eltype(C))
@assert nworkers() == m*n
iw = 0
@sync for j in 1:n
for i in 1:m
Ai = A[i,:]
Bj = B[:,j]
iw += 1
w = workers()[iw]
ftr = @spawnat w begin
Cij = z
for k in 1:l
@inbounds Cij += Ai[k]*Bj[k]
end
Cij
end
@async C[i,j] = fetch(ftr)
end
end
C
end
You can execute the following cells to test this implementation.
using Test
N = 2
A = rand(N,N)
B = rand(N,N)
C = similar(A)
@test matmul_dist_1!(C,A,B) ≈ A*B
Let us study the performance of this algorithm. To this end, we will analyze if algorithm 1 is able to achieve the optimal parallel speedup. The parallel speedup on $P$ processes is defined as
$$ S_P = \frac{T_1}{T_P}, $$where $T_1$ denotes the runtime of the sequential algorithm on one node and $T_P$ denotes the runtime of the parallel algorithm on $P$ processes. If we run an optimal parallel algorithm with $P$ processes we expect it to run $p$ times faster than the sequential implementation. I.e., the optimal speedup of a parallel algorithm on $p$ processes is equal to $P$:
$$ S^{*}_p = P. $$The ratio of the actual speedup over the optimal one is called the parallel efficiency
$$ E_p = \frac{S_p}{S^{*}_p} = \frac{T_1/T_P}{P}. $$The following cell measures the speedup of parallel algorithm 1. Do we achieve the optimal speedup?
N = 2
A = rand(N,N)
B = rand(N,N)
C = similar(A)
T1 = @belapsed matmul_seq!(C,A,B)
C = similar(A)
TP = @belapsed matmul_dist_1!(C,A,B)
P = nworkers()
println("Speedup = ", T1/TP)
println("Optimal speedup = ", P)
println("Efficiency = ", 100*(T1/TP)/P, "%")
Since communication is usually the main bottleneck in a distributed algorithm, we want to reduce the amount of communication per unit of computation in a worker. Let us compute the (theoretical) communication overhead for algorithm 1. This will help us understand why the speedup of this algorithm was so bad.
Remember, algorithm 1 consisted of these main steps:
a) 3N
b) 2N + 2
c) 2N + 1
d) N² + 1
answer = "x" # replace x with a, b, c, or d
alg_1_comm_overhead_check(answer)
a) O(N)
b) O(N²)
c) O(N³)
answer = "x" # replace x with a, b, or c
alg_1_comp_check(answer)
From these results we can conclude:
In other words, the communication cost is of the same order of magnitude as the computation cost. Since, communication is orders of magnitude slower in real systems, the runtime in the worker will be dominated by communication. This explains why we obtained such a bad speedup.
Let us study the next algorithm to see if we can improve the efficiency by augmenting the granularity (i.e. the amount of work) in each parallel task. In parallel algorithm 2, each worker computes an entire row of C.
a) column A[:,i] and row B[j,:]
b) row A[i,:] and column B[:,j]
c) the whole matrices A and B
d) row A[i,:] and the whole matrix B
answer = "x" # replace x with a, b, c, or d
alg_2_deps_check(answer)
These are the main steps of the implementation of algorithm 2:
A possible implementation of this algorithm in Julia is as follows:
function matmul_dist_2!(C, A, B)
m = size(C,1)
n = size(C,2)
l = size(A,2)
@assert size(A,1) == m
@assert size(B,2) == n
@assert size(B,1) == l
z = zero(eltype(C))
@assert nworkers() == m
iw = 0
@sync for i in 1:m
Ai = A[i,:]
iw += 1
w = workers()[iw]
ftr = @spawnat w begin
Ci = fill(z,l)
for j in 1:n
for k in 1:l
@inbounds Ci[j] += Ai[k]*B[k,j]
end
end
Ci
end
@async C[i,:] = fetch(ftr)
end
C
end
Test it using next cell
using Test
N = 4
A = rand(N,N)
B = rand(N,N)
C = similar(A)
@test matmul_dist_2!(C,A,B) ≈ A*B
N = 4
A = rand(N,N)
B = rand(N,N)
C = similar(A)
T1 = @belapsed matmul_seq!(C,A,B)
C = similar(A)
TP = @belapsed matmul_dist_2!(C,A,B)
P = nworkers()
println("Speedup = ", T1/TP)
println("Optimal speedup = ", P)
println("Efficiency = ", 100*(T1/TP)/P, "%")
The speedup is still far from the optimal one. Let us study the communication overhead for this algorithm. Remember, algorithm 2 consists in these main steps:
a) O(N) communication and O(N^2) computation
b) O(N^2) communication and O(N^2) computation
c) O(N^3) communication and O(N^3) computation
d) O(N) communication and O(N) computation
answer = "x" # replace x with a, b, c, or d
alg_2_complex_check(answer)
The communication and computation cost are still of the same order of magnitude even though we have increased the grain size.
Let us increase even more the granularity of the parallel tasks by computing several rows of C in a worker. Each worker computes N/P consecutive rows of C, where P is the number of workers.
a) A[rows,:] and B[:,rows]
b) the whole matrix A and B[:,rows]
c) A[rows,:] and the whole matrix B
d) the whole matrices A and B
answer = "x" # replace x with a, b, c, or d
alg_3_deps_check(answer)
These are the main steps of the implementation of algorithm 3:
The implementation of this variant is let as an exercise (see below).
Let us analyze the (theoretical) communication overhead for algorithm 3.
a) O(N²) communication and O(N³) computation
b) O(N²) communication and O(N³/P) computation
c) O(N²) communication and O(N²) computation
d) O(N²/P) communication and O(N³/P) computation
answer = "x" # replace x with a, b, c, or d
alg_3_complex_check(answer)
In this case, the ratio between communication and computation is O(P/N). If the matrix size N is much larger than the number of workers P, then the communication overhead O(P/N) would be negligible. This opens the door to an scalable implementation.
The table below compares the three parallel algorithms.
| Algorithm | Parallelism (#workers) |
Communication per worker |
Computation per worker |
Ratio communication/ computation |
|---|---|---|---|---|
| 1 | N² | 2N + 1 | N | O(1) |
| 2 | N | 2N + N² | N² | O(1) |
| 3 | P | N² + 2N²/P | N³/P | O(P/N) |
Implement algorithm 3 in the function below. For simplicity, assume that the number of rows of C is a multiple of the number of workers.
function matmul_dist_3!(C,A,B)
m = size(C,1)
n = size(C,2)
l = size(A,2)
@assert size(A,1) == m
@assert size(B,2) == n
@assert size(B,1) == l
@assert mod(m,nworkers()) == 0
# Implement here
C
end
Use test-driven development to implement the algorithm. Use this test:
using Test
P = nworkers()
load = 100
N = load*P
A = rand(N,N)
B = rand(N,N)
C = similar(A)
@test matmul_dist_3!(C,A,B) ≈ A*B
Measure the performance of your implementation by running next cell. Do you get close to the optimal speedup?
P = nworkers()
load = 100
N = load*P
A = rand(N,N)
B = rand(N,N)
C = similar(A)
T1 = @belapsed matmul_seq!(C,A,B)
C = similar(A)
TP = @belapsed matmul_dist_3!(C,A,B)
println("Speedup = ", T1/TP)
println("Optimal speedup = ", P)
println("Efficiency = ", 100*(T1/TP)/P, "%")
The implementation of algorithm 1 is very impractical. One needs as many processors as entries in the result matrix C. For 1000 times 1000 matrix one would need a supercomputer with one million processes! We can easily fix this problem by using less processors and spawning the computation of an entry in any of the available processes. See the following code:
function matmul_dist_1_v2!(C, A, B)
m = size(C,1)
n = size(C,2)
l = size(A,2)
@assert size(A,1) == m
@assert size(B,2) == n
@assert size(B,1) == l
z = zero(eltype(C))
@sync for j in 1:n
for i in 1:m
Ai = A[i,:]
Bj = B[:,j]
ftr = @spawnat :any begin
Cij = z
for k in 1:l
@inbounds Cij += Ai[k]*Bj[k]
end
Cij
end
@async C[i,j] = fetch(ftr)
end
end
C
end
With this new implementation, we can multiply matrices of arbitrary size with a fixed number of workers. Test it:
using Test
N = 50
A = rand(N,N)
B = rand(N,N)
C = similar(A)
@test matmul_dist_1_v2!(C,A,B) ≈ A*B
Run the next cell to check the performance of this implementation. Note that we are far away from the optimal speed up. Why? To answer this question compute the theoretical communication over computation ratio for this implementation and reason about the obtained result. Hint: the number of times a worker is spawned in this implementation is N^3/P on average.
N = 100
A = rand(N,N)
B = rand(N,N)
C = similar(A)
P = nworkers()
T1 = @belapsed matmul_seq!(C,A,B)
C = similar(A)
TP = @belapsed matmul_dist_1_v2!(C,A,B)
println("Speedup = ", T1/TP)
println("Optimal speedup = ", P)
println("Efficiency = ", 100*(T1/TP)/P, "%")