In [ ]:
function ex1(a)
j = 1
m = a[j]
for (i,ai) in enumerate(a)
if m < ai
m = ai
j = i
end
end
(m,j)
end
Julia Basics: Exercise 2¶
In [ ]:
ex2(f,g) = x -> f(x) + g(x)
Julia Basics: Exercise 3¶
In [ ]:
function compute_values(n,max_iters)
x = LinRange(-1.7,0.7,n)
y = LinRange(-1.2,1.2,n)
values = zeros(Int,n,n)
for j in 1:n
for i in 1:n
values[i,j] = mandel(x[i],y[j],max_iters)
end
end
values
end
values = compute_values(1000,10)
using GLMakie
heatmap(x,y,values)
Matrix Multiplication : Exercise 1¶
In [ ]:
function matmul_dist_3!(C,A,B)
m = size(C,1)
n = size(C,2)
l = size(A,2)
@assert size(A,1) == m
@assert size(B,2) == n
@assert size(B,1) == l
@assert mod(m,nworkers()) == 0
# Implement here
nrows_w = div(m,nworkers())
@sync for (i,w) in enumerate(workers())
rows_w = (1:nrows_w) .+ (i-1)*nrows_w
Aw = A[rows_w,:]
ftr = @spawnat w begin
Cw = similar(Aw,nrows_w,n)
matmul_seq!(Cw,Aw,B)
Cw
end
@async C[rows_w,:] = fetch(ftr)
end
C
end
Jacobi Method : Exercise 1¶
In [ ]:
@mpi_do manager begin
using MPI
comm = MPI.Comm_dup(MPI.COMM_WORLD)
nw = MPI.Comm_size(comm)
iw = MPI.Comm_rank(comm)+1
function jacobi_mpi(n,niters)
if mod(n,nw) != 0
println("n must be a multiple of nw")
MPI.Abort(comm,1)
end
n_own = div(n,nw)
u = zeros(n_own+2)
u[1] = -1
u[end] = 1
u_new = copy(u)
for t in 1:niters
reqs_snd = MPI.Request[]
reqs_rcv = MPI.Request[]
if iw != 1
neig_rank = (iw-1)-1
req = MPI.Isend(view(u,2:2),comm,dest=neig_rank,tag=0)
push!(reqs_snd,req)
req = MPI.Irecv!(view(u,1:1),comm,source=neig_rank,tag=0)
push!(reqs_rcv,req)
end
if iw != nw
neig_rank = (iw+1)-1
s = n_own+1
r = n_own+2
req = MPI.Isend(view(u,s:s),comm,dest=neig_rank,tag=0)
push!(reqs_snd,req)
req = MPI.Irecv!(view(u,r:r),comm,source=neig_rank,tag=0)
push!(reqs_rcv,req)
end
for i in 3:n_own
u_new[i] = 0.5*(u[i-1]+u[i+1])
end
MPI.Waitall(reqs_rcv)
for i in (2,n_own+1)
u_new[i] = 0.5*(u[i-1]+u[i+1])
end
MPI.Waitall(reqs_snd)
u, u_new = u_new, u
end
u
@show u
end
niters = 100
load = 4
n = load*nw
jacobi_mpi(n,niters)
end
Exercise: Ring communication - MPI¶
In [ ]:
using MPI
using Test
MPI.Init()
comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
id = rank + 1
root = 0
size = MPI.Comm_size(comm)
dst = mod(rank + 1, size)
src = mod(rank - 1, size)
send_buf = id
recv_buf = 1
if rank == root
# Proc 1: Send id async to destination, then wait for receive
MPI.isend(send_buf, comm; dest=dst, tag=0)
recv_buf = MPI.recv(comm; source=src, tag=0)
@show recv_buf == factorial(size)
@test recv_buf == factorial(size)
else
# Other procs: receive sync and send async to next process
recv_buf = MPI.recv(comm; source=src, tag=0)
send_buf = recv_buf * id
MPI.isend(send_buf, comm; dest=dst, tag=0)
end
MPI.Finalize()
Exercise: Ring communication - Distributed.jl¶
In [ ]:
using Distributed
using Test
np = 4
add_n = np - nprocs()
addprocs(add_n)
worker_ids = workers()
@assert nprocs() > nworkers()
# Initialize id channel
id_chnl = RemoteChannel(()->Channel{Int}(1))
put!(id_chnl, 1)
# Initialize data channel
job_chnl = RemoteChannel(()->Channel{Int}(1))
put!(job_chnl, 1)
@sync for w in workers()
@spawnat w begin
pos = findfirst(worker_ids .== w) + 1
dst = mod(pos, np) + 1
src = mod(pos-2, np) + 1
while true
pred = fetch(id_chnl)
if pred == src
take!(id_chnl)
value = take!(job_chnl)
put!(job_chnl, value * pos)
put!(id_chnl, pos)
break
end
end
end
end
res = take!(job_chnl)
@show res
@test res == factorial(np)
rmprocs(workers())
New ring example¶
In [ ]:
@everywhere workers() begin
using MPI
MPI.Init()
comm = MPI.Comm_dup(MPI.COMM_WORLD)
rank = MPI.Comm_rank(comm)
nranks = MPI.Comm_size(comm)
buffer = Ref(0)
if rank == 0
msg = rand(1:10)
buffer[] = msg
println("msg = $(buffer[])")
MPI.Send(buffer,comm;dest=rank+1,tag=0)
MPI.Recv!(buffer,comm;source=nranks-1,tag=0)
println("msg = $(buffer[])")
else
dest = (rank != nranks-1) ? rank+1 : 0
MPI.Recv!(buffer,comm;source=rank-1,tag=0)
buffer[] += 1
println("msg = $(buffer[])")
MPI.Send(buffer,comm;dest,tag=0)
end
end
In [ ]:
ftrs = Vector{Future}(undef,nprocs())
for p in 1:nprocs()
ftrs[p] = @spawnat p RemoteChannel(()->Channel{Int}(1))
end
for p in 1:nprocs()
@spawnat p begin
chnl_snd = fetch(ftrs[p])
source = (p != 1) ? p-1 : nprocs()
chnl_rcv = fetch(ftrs[source])
if p == 1
msg = rand(1:10)
@show msg
put!(chnl_snd,msg)
msg = take!(chnl_rcv)
@show msg
else
msg = take!(chnl_rcv)
msg += 1
@show msg
put!(chnl_snd,msg)
end
end
end
In [ ]:
@everywhere function work(msg)
msg += 1
println("msg = $msg")
if myid() == nprocs()
@spawnat 1 println("msg = $msg")
else
next = myid() + 1
@spawnat next work(msg)
end
end
In [ ]:
function ge_par_block!(B,n,m,comm)
nranks = MPI.Comm_size(comm)
rank = MPI.Comm_rank(comm)
# Init buffers
T = eltype(B)
if rank == 0
buffer_root = Vector{T}(undef,n*m)
buffer_root[:] = transpose(B)[:]
else
buffer_root = Vector{T}(undef,0)
end
nw = div(n,nranks)
buffer = Vector{T}(undef,nw*m)
# Send local matrices to workers
MPI.Scatter!(buffer_root,buffer,comm;root=0)
Bw = Matrix{T}(undef,nw,m)
transpose(Bw)[:] = buffer
MPI.Barrier(comm)
# time calcultation
t = @elapsed ge_block_worker!(Bw,n,comm)
# Gather results
buffer[:] = transpose(Bw)[:]
MPI.Gather!(buffer,buffer_root,comm;root=0)
Tf = typeof(t)
if rank == 0
transpose(B)[:] = buffer_root[:]
ts = Vector{Tf}(undef,nranks)
else
ts = Vector{Tf}(undef,0)
end
MPI.Gather!([t],ts,comm;root=0)
B, ts
end
function ge_block_worker!(Bw,n,comm)
rank = MPI.Comm_rank(comm)
nranks = MPI.Comm_size(comm)
nw,m = size(Bw)
B_k = similar(Bw,m)
lb = nw*rank + 1
ub = nw*(rank+1)
@inbounds for k in 1:n
# If I have row k
if k in lb:ub
myk = k - lb + 1
# Update just row k
#@show Bw[myk,k]
for t in (k+1):m
Bw[myk,t] = Bw[myk,t]/Bw[myk,k]
end
Bw[myk,k] = 1
# Send k to other procs
B_k .= view(Bw,myk,:)
MPI.Bcast!(B_k,comm;root=rank)
else
myk = 0
owner = div(k-1,nw)
MPI.Bcast!(B_k,comm;root=owner)
end
# Do nothing if I only have rows < k
if k <= ub
for i in (myk+1):nw
for j in (k+1):m
Bw[i,j] = Bw[i,j] - Bw[i,k]*B_k[j]
end
Bw[i,k] = 0
end
end
end
Bw
end
Cyclic partitioning:¶
In [ ]:
function ge_par_cyclic!(B,n,m,comm)
nranks = MPI.Comm_size(comm)
rank = MPI.Comm_rank(comm)
nw = div(n,nranks)
# Init buffers
T = eltype(B)
if rank == 0
buffer_root = Vector{T}(undef,n*m)
# Fill buffer cyclicly
for i in 1:n
ub = i * m
lb = ub - m + 1
j = mod(i-1,nw)*nranks + div(i-1,nw) +1
buffer_root[lb:ub] = transpose(B)[:,j]
end
else
buffer_root = Vector{T}(undef,0)
end
buffer = Vector{T}(undef,nw*m)
# Send local matrices to workers
MPI.Scatter!(buffer_root,buffer,comm;root=0)
Bw = Matrix{T}(undef,nw,m)
transpose(Bw)[:] = buffer
MPI.Barrier(comm)
# time calcultation
t = @elapsed ge_cyclic_worker!(Bw,n,comm)
# Gather results
buffer[:] = transpose(Bw)[:]
MPI.Gather!(buffer,buffer_root,comm;root=0)
Tf = typeof(t)
if rank == 0
for i in 1:n
ub = i * m
lb = ub - m + 1
j = mod(i-1,nw)*nranks + div(i-1,nw) +1
transpose(B)[:,j] = buffer_root[lb:ub]
end
ts = Vector{Tf}(undef,nranks)
else
ts = Vector{Tf}(undef,0)
end
MPI.Gather!([t],ts,comm;root=0)
B, ts
end
function ge_cyclic_worker!(Bw,n,comm)
rank = MPI.Comm_rank(comm)
nranks = MPI.Comm_size(comm)
nw,m = size(Bw)
B_k = similar(Bw,m)
my_rows = [i*nranks+(rank+1) for i in 0:(nw-1)]
@inbounds for k in 1:n
# If I have row k
if k in my_rows
myk = findfirst(my_rows .== k)[2]
# Update just row k
for t in (k+1):m
Bw[myk,t] = Bw[myk,t]/Bw[myk,k]
end
Bw[myk,k] = 1
# Send k to other procs
B_k .= view(Bw,myk,:)
MPI.Bcast!(B_k,comm;root=rank)
else
owner = mod(k-1,nranks)
MPI.Bcast!(B_k,comm;root=owner)
end
# Do nothing if I only have rows < k
if k < maximum(my_rows)
cutoff = findfirst(my_rows .> k)[2]
for i in cutoff:length(my_rows)
for j in (k+1):m
Bw[i,j] = Bw[i,j] - Bw[i,k]*B_k[j]
end
Bw[i,k] = 0
end
end
end
Bw
end
Main Function to run code and test performance:¶
In [ ]:
using MPI
using Test
MPI.Init()
function tridiagonal_matrix(n)
C = zeros(n,n)
stencil = [(-1,2,-1),(-1,0,1)]
for i in 1:n
for (coeff,o) in zip((-1,2,-1),(-1,0,1))
j = i+o
if j in 1:n
C[i,j] = coeff
end
end
end
C
end
function ge_seq!(B)
n,m = size(B)
@inbounds for k in 1:n
for t in (k+1):m
B[k,t] = B[k,t]/B[k,k]
end
B[k,k] = 1
for i in (k+1):n
for j in (k+1):m
B[i,j] = B[i,j] - B[i,k]*B[k,j]
end
B[i,k] = 0
end
end
B
end
function main_check()
# Create communicator
comm = MPI.Comm_dup(MPI.COMM_WORLD)
rank = MPI.Comm_rank(comm)
nranks = MPI.Comm_size(comm)
n = 10*nranks
# create matrix B such that mod(n, nprocs) == 0
if rank == 0
A = tridiagonal_matrix(n)
b = ones(n)
B = [A b]
else
B = tridiagonal_matrix(0)
end
# call parallel method
B_block, ts_block = ge_par_block!(copy(B),n,n+1,comm)
B_cyclic, ts_cyclic = ge_par_cyclic!(copy(B),n,n+1,comm)
# test if results is equal to sequential
if rank == 0
B_seq = ge_seq!(copy(B))
@test B_seq == B_block
@test B_seq == B_cyclic
# print timings
println("No workers: $nranks| Blockwise: min $(minimum(ts_block))| Cyclic: min $(minimum(ts_cyclic))")
end
end
main_check()
ASP : Exercise 1¶
Implementation of parallel ASP using only MPI.Bcast!.
In [ ]:
@everywhere function floyd_worker_bcast!(Cw,comm)
rank = MPI.Comm_rank(comm)
nranks = MPI.Comm_size(comm)
Nw,N = size(Cw)
C_k = similar(Cw,N)
lb = Nw*rank + 1
ub = Nw*(rank+1)
for k in 1:N
if k in lb:ub
myk = k - lb + 1
C_k .= view(Cw,myk,:)
MPI.Bcast!(C_k,comm;root=rank)
else
owner = div(k-1,Nw)
MPI.Bcast!(C_k,comm;root=owner)
end
for j in 1:N
for i in 1:Nw
@inbounds Cw[i,j] = min(Cw[i,j],Cw[i,k]+C_k[j])
end
end
end
Cw
end
Exercise: Measure search overhead¶
Modify the code of the serial and parallel algorithms so that the functions return the number of nodes in the search tree that they visit. You can then compare how many more nodes are visited by the parallel algorithm compared with the serial algorithm (known as search overhead). You can then use the third cell to gather some statistics about the search overhead using your altered version of the functions.
TSP: Exercise x (Measure search overhead)¶
This is the solution of how the code can be altered to measure the search overhead:
In [ ]:
using Distributed
if procs() == workers()
addprocs(4)
end
@everywhere function visited(city,hops,path)
for i = 1:hops
if path[i] == city
return true
end
end
return false
end
In [ ]:
## TSP serial
function tsp_serial_impl(connections,hops,path,current_distance, min_path, min_distance, node_count)
num_cities = length(connections)
if hops == num_cities
if current_distance < min_distance
min_path .= path
return min_path, current_distance, node_count
end
else
current_city = path[hops]
next_hops = hops + 1
for (next_city,distance_increment) in connections[current_city]
if !visited(next_city,hops,path)
node_count += 1
path[next_hops] = next_city
next_distance = current_distance + distance_increment
if next_distance < min_distance
min_path, min_distance, node_count = tsp_serial_impl(connections,next_hops,path,next_distance,min_path,min_distance, node_count)
end
end
end
end
return min_path, min_distance, node_count
end
function tsp_serial(connections,city)
num_cities = length(connections)
path=zeros(Int,num_cities)
hops = 1
path[hops] = city
min_path = zeros(Int, num_cities)
current_distance = 0
min_distance = typemax(Int)
node_count = 1
# Count the number of nodes visited in recursive function and return
min_path, min_distance, node_count = tsp_serial_impl(connections,hops,path,current_distance, min_path, min_distance, node_count)
(;path=min_path,distance=min_distance, node_count)
end
In [ ]:
## TSP distributed
@everywhere function tsp_dist_impl(connections,hops,path,current_distance,min_dist_chnl, max_hops,jobs_chnl,ftr_result,node_count)
num_cities = length(connections)
if hops == num_cities
min_distance = fetch(min_dist_chnl)
if current_distance < min_distance
take!(min_dist_chnl)
if ftr_result !== nothing
@spawnat 1 begin
result = fetch(ftr_result)
result.path .= path
result.min_distance_ref[] = current_distance
end |> wait
end
put!(min_dist_chnl, current_distance)
end
elseif hops <= max_hops
current_city = path[hops]
next_hops = hops + 1
for (next_city,distance_increment) in connections[current_city]
if !visited(next_city,hops,path)
path[next_hops] = next_city
node_count += 1
next_distance = current_distance + distance_increment
min_distance = fetch(min_dist_chnl)
if next_distance < min_distance
node_count = tsp_dist_impl(connections,next_hops,path,next_distance,min_dist_chnl,max_hops,jobs_chnl,ftr_result,node_count)
end
end
end
else
if jobs_chnl !== nothing
path_copy = copy(path)
put!(jobs_chnl,(;hops,path=path_copy,current_distance))
end
end
return node_count
end
function tsp_dist(connections,city)
max_hops = 2
num_cities = length(connections)
path=zeros(Int,num_cities)
result_path=zeros(Int, num_cities)
hops = 1
path[hops] = city
current_distance = 0
min_distance = typemax(Int)
node_count = 1
jobs_chnl = RemoteChannel(()->Channel{Any}(10))
min_dist_chnl = RemoteChannel(()->Channel{Int}(1))
put!(min_dist_chnl, min_distance)
ftr_result = @spawnat 1 (;path=result_path,min_distance_ref=Ref(min_distance))
# Add another future to store number of visited nodes
ftr_node_count = @spawnat 1 node_count_ref = Ref(node_count)
@async begin
ncount = 0
ncount += tsp_dist_impl(connections,hops,path,current_distance,min_dist_chnl,max_hops,jobs_chnl,nothing, ncount)
# Update node counter
node_count_ref = fetch(ftr_node_count)
node_count_ref[] += ncount
for w in workers()
put!(jobs_chnl,nothing)
end
end
@sync for w in workers()
@spawnat w begin
path = zeros(Int, num_cities)
max_hops = typemax(Int)
while true
job = take!(jobs_chnl)
if job == nothing
break
end
hops = job.hops
path = job.path
current_distance = job.current_distance
ncount = 0
min_distance = fetch(min_dist_chnl)
if current_distance < min_distance
ncount += tsp_dist_impl(connections,hops,path,current_distance,min_dist_chnl,max_hops,nothing,ftr_result, ncount)
# Update node counter
@spawnat 1 begin
node_count_ref = fetch(ftr_node_count)
node_count_ref[] += ncount
end
end
end
end
end
result = fetch(ftr_result)
# Fetch number of visited nodes for return
node_count_ref = fetch(ftr_node_count)
(;path = result.path, distance = result.min_distance_ref[], node_count=node_count_ref[])
end
In [ ]:
using Distributed
using Plots
using Statistics
# Generate random matrices
function rand_symmetric_distance_table(n)
threshold = 0.2
mincost = 3
maxcost = 10
infinity = 10000*maxcost
C = fill(infinity,n,n)
for j in 1:n
for i in 1:j
if rand() > threshold
C[i,j] = rand(mincost:maxcost)
C[j,i] = C[i,j]
end
end
C[j,j] = 0
end
C,infinity
end
# Sort to get ascending distance weights
function sort_neighbors(C)
n = size(C,1)
map(1:n) do i
Ci = C[i,:]
cities = sortperm(Ci)
distances = Ci[cities]
collect(zip(cities,distances))[2:end]
end
end
n_cities = [4, 6, 8, 10]
n_rep = 10
city = 1
node_count_serial = zeros(Union{Missing,Int64},length(n_cities), n_rep)
node_count_dist = zeros(Union{Missing,Int64},length(n_cities), n_rep)
for (i, n) in enumerate(n_cities)
@show n
for j in 1:n_rep
# Generate random connections matrix
C, inf = rand_symmetric_distance_table(n)
C = sort_neighbors(C)
# Run serial algorithm
path, distance, ncount_serial = tsp_serial(C, city)
# Check if graph is connected
if distance >= inf
println("The input graph size $n, it $j is not connected")
node_count_serial[i,j] = missing
node_count_dist[i,j] = missing
else
path, distance, ncount_dist = tsp_dist(C, city)
node_count_serial[i,j] = ncount_serial
node_count_dist[i,j] = ncount_dist
end
end
end
# Calculate average and confidence interval
search_overhead_perc = (node_count_dist .- node_count_serial)./node_count_serial
avg_search_overhead = [mean(skipmissing(search_overhead_perc[i,:])) for i in axes(search_overhead_perc,1)]
conf_int = [1.96*std(skipmissing(search_overhead_perc[i,:]))/
sqrt(count(!ismissing,search_overhead_perc[i,:]))
for i in axes(search_overhead_perc,1)]
# Plot
plot(n_cities, avg_search_overhead, ribbon=conf_int, markershape=:circle, legend=false)
title!("Average search overhead (%) and 95%-CI \nin parallel algorithm")
ylabel!("Average % extra nodes visited \n in parallel algorithm")
xlabel!("Number of cities")
In [ ]: