Solutions to Notebook Exercises¶

Julia Basics: Exercise 1¶

In [ ]:
function ex1(a)
    j = 1
    m = a[j]
    for (i,ai) in enumerate(a)
        if m < ai
            m = ai
            j = i
        end
    end
    (m,j)
end

Julia Basics: Exercise 2¶

In [ ]:
ex2(f,g) = x -> f(x) + g(x)

Julia Basics: Exercise 3¶

In [ ]:
function compute_values(n,max_iters)
    x = LinRange(-1.7,0.7,n)
    y = LinRange(-1.2,1.2,n)
    values = zeros(Int,n,n)
    for j in 1:n
        for i in 1:n
            values[i,j] = mandel(x[i],y[j],max_iters)
        end
    end
    values
end
values = compute_values(1000,10)
using GLMakie
heatmap(x,y,values)

Matrix Multiplication : Exercise 1¶

In [ ]:
function matmul_dist_3!(C,A,B)
    m = size(C,1)
    n = size(C,2)
    l = size(A,2)
    @assert size(A,1) == m
    @assert size(B,2) == n
    @assert size(B,1) == l
    @assert mod(m,nworkers()) == 0
    # Implement here
    nrows_w = div(m,nworkers())
    @sync for (i,w) in enumerate(workers())
        rows_w = (1:nrows_w) .+ (i-1)*nrows_w
        Aw = A[rows_w,:]
        ftr = @spawnat w begin
            Cw = similar(Aw,nrows_w,n)
            matmul_seq!(Cw,Aw,B)
            Cw
        end
        @async C[rows_w,:] = fetch(ftr)
    end
    C
end

Jacobi Method : Exercise 1¶

In [ ]:
@mpi_do manager begin
    using MPI
    comm = MPI.Comm_dup(MPI.COMM_WORLD)
    nw = MPI.Comm_size(comm)
    iw = MPI.Comm_rank(comm)+1
    function jacobi_mpi(n,niters)
        if mod(n,nw) != 0
            println("n must be a multiple of nw")
            MPI.Abort(comm,1)
        end
        n_own = div(n,nw)
        u = zeros(n_own+2)
        u[1] = -1
        u[end] = 1
        u_new = copy(u)
        for t in 1:niters
            reqs_snd = MPI.Request[]
            reqs_rcv = MPI.Request[]
            if iw != 1
                neig_rank = (iw-1)-1
                req = MPI.Isend(view(u,2:2),comm,dest=neig_rank,tag=0)
                push!(reqs_snd,req)
                req = MPI.Irecv!(view(u,1:1),comm,source=neig_rank,tag=0)
                push!(reqs_rcv,req)
            end
            if iw != nw
                neig_rank = (iw+1)-1
                s = n_own+1
                r = n_own+2
                req = MPI.Isend(view(u,s:s),comm,dest=neig_rank,tag=0)
                push!(reqs_snd,req)
                req = MPI.Irecv!(view(u,r:r),comm,source=neig_rank,tag=0)
                push!(reqs_rcv,req)
            end
            for i in 3:n_own
                u_new[i] = 0.5*(u[i-1]+u[i+1])
            end
            MPI.Waitall(reqs_rcv)
            for i in (2,n_own+1)
                u_new[i] = 0.5*(u[i-1]+u[i+1])
            end
            MPI.Waitall(reqs_snd)
            u, u_new = u_new, u
        end
        u
        @show u
    end
    niters = 100
    load = 4
    n = load*nw
    jacobi_mpi(n,niters)
end

Exercise: Ring communication - MPI¶

In [ ]:
using MPI
using Test

MPI.Init()
comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
id = rank + 1
root = 0
size = MPI.Comm_size(comm)

dst = mod(rank + 1, size)
src = mod(rank - 1, size)

send_buf = id
recv_buf = 1

if rank == root 
    # Proc 1: Send id async to destination, then wait for receive
    MPI.isend(send_buf, comm; dest=dst, tag=0)
    recv_buf = MPI.recv(comm; source=src, tag=0)
    @show recv_buf == factorial(size)
    @test recv_buf == factorial(size)
else
    # Other procs: receive sync and send async to next process
    recv_buf = MPI.recv(comm; source=src, tag=0)
    send_buf = recv_buf * id
    MPI.isend(send_buf, comm; dest=dst, tag=0)
end

MPI.Finalize()

Exercise: Ring communication - Distributed.jl¶

In [ ]:
using Distributed 
using Test

np = 4
add_n = np - nprocs() 
addprocs(add_n)
worker_ids = workers()
@assert nprocs() > nworkers()

# Initialize id channel
id_chnl = RemoteChannel(()->Channel{Int}(1))
put!(id_chnl, 1)

# Initialize data channel
job_chnl = RemoteChannel(()->Channel{Int}(1))
put!(job_chnl, 1)

@sync for w in workers()
    @spawnat w begin
        pos = findfirst(worker_ids .== w) + 1
        dst = mod(pos, np) + 1
        src = mod(pos-2, np) + 1
        while true 
            pred = fetch(id_chnl)
            if pred == src
                take!(id_chnl)
                value = take!(job_chnl)
                put!(job_chnl, value * pos)  
                put!(id_chnl, pos)  
                break
            end
        end
    end
end

res = take!(job_chnl)
@show res
@test res == factorial(np)

rmprocs(workers())

TSP Exercise: Measure search overhead¶

In [ ]:
## TSP serial 
function tsp_serial(connections,city)
    num_cities = length(connections)
    path=zeros(Int,num_cities)
    hops = 1
    path[hops] = city
    min_path = zeros(Int, num_cities)
    current_distance = 0
    min_distance = typemax(Int)
    # Collect search time 
    search_time = @elapsed min_path, min_distance = tsp_serial_impl(connections,hops,path,current_distance, min_path, min_distance)
    (;path=min_path,distance=min_distance, search_time)
end
In [ ]:
## TSP distributed
@everywhere function tsp_dist_impl(wait_time, connections,hops,path,current_distance,min_dist_chnl, max_hops,jobs_chnl,ftr_result)
    num_cities = length(connections)
    if hops == num_cities
        min_distance = fetch(min_dist_chnl)
        if current_distance < min_distance
            take!(min_dist_chnl)
            # Collect wait time to substract from overall search time 
            if ftr_result !== nothing
                wait_time += @elapsed @spawnat 1 begin
                    result = fetch(ftr_result)
                    result.path .= path
                    result.min_distance_ref[] = current_distance
                end |> wait
            end
            put!(min_dist_chnl, current_distance)
        end
    elseif hops <= max_hops
        current_city = path[hops]
        next_hops = hops + 1
        for (next_city,distance_increment) in connections[current_city]
            if !visited(next_city,hops,path)
                path[next_hops] = next_city
                next_distance = current_distance + distance_increment
                # Collect wait time because fetch may block
                wait_time += @elapsed min_distance = fetch(min_dist_chnl)
                if next_distance < min_distance
                    tsp_dist_impl(wait_time, connections,next_hops,path,next_distance,min_dist_chnl,max_hops,jobs_chnl,ftr_result)
                end
            end
        end 
    else
        # Collect communication time and add to wait time
        wait_time += @elapsed if jobs_chnl !== nothing 
            path_copy = copy(path) 
            put!(jobs_chnl,(;hops,path=path_copy,current_distance))
        end
    end
    # Return wait time
    wait_time
end

function tsp_dist(connections,city)
    max_hops = 2
    num_cities = length(connections)
    path=zeros(Int,num_cities)
    result_path=zeros(Int, num_cities)
    wait_time = 0
    search_time = 0
    hops = 1
    path[hops] = city
    current_distance = 0
    min_distance = typemax(Int)
    jobs_chnl = RemoteChannel(()->Channel{Any}(10))
    min_dist_chnl = RemoteChannel(()->Channel{Int}(1))
    put!(min_dist_chnl, min_distance)
    ftr_result = @spawnat 1 (;path=result_path,min_distance_ref=Ref(min_distance))
    @async begin
        # Collect search time from master process
        search_time += @elapsed wait_time += tsp_dist_impl(wait_time,connections,hops,path,current_distance,min_dist_chnl,max_hops,jobs_chnl,nothing)
        for w in workers()
            put!(jobs_chnl,nothing)
        end
    end
    @sync for w in workers()
        @spawnat w begin
            path = zeros(Int, num_cities)
            max_hops = typemax(Int)
            while true
                job = take!(jobs_chnl)
                if job == nothing
                    break
                end
                hops = job.hops
                path = job.path 
                current_distance = job.current_distance
                min_distance = fetch(min_dist_chnl)
                if current_distance < min_distance
                    # Collect search time from worker processes 
                    search_time += @elapsed wait_time += tsp_dist_impl(wait_time,connections,hops,path,current_distance,min_dist_chnl,max_hops,nothing,ftr_result)
                end
            end
        end
    end 
    result = fetch(ftr_result)
    (;path = result.path, distance = result.min_distance_ref[], search_time, wait_time)
end
In [ ]:
using Distributed
using RandomMatrix
using Plots

function generate_rand_connections(city_range, distance_range)
    # generate random connections matrix 
    n_cities = rand(city_range)
    matrix = randTriangular(distance_range, n_cities; Diag=false)

    connections = Array{Array{Tuple{Int64,Int64},1},1}(undef, n_cities)
    for i in 1:n_cities
        connections[i] = Array{Tuple{Int64,Int64},1}(undef, n_cities)
    end
    for i in 1:n_cities
        for j in i:n_cities
            distance = matrix[i,j]
            connections[i][j] = (j,distance)
            connections[j][i] = (i,distance)
        end
    end
    return connections
end

# Run once so compile times are not measured
distance_range = 1:100
connections = generate_rand_connections(4:4, distance_range)
tsp_dist(connections,1)
tsp_serial(connections,1)

# Measure runtimes of serial and parallel algorithm
n_it = 5
city_ranges = [4:4, 6:6, 8:8, 10:10]
search_overhead = zeros(Float64, length(city_ranges), n_it )
for (i, n) in enumerate(city_ranges)
    for k in 1:n_it
        connections = generate_rand_connections(n, distance_range)
        @show n, k
        path_dist, distance_dist, search_time_dist, wait_time_dist = tsp_dist(connections,1)
        path_serial, distance_serial, search_time_serial = tsp_serial(connections,1)
        # Compute search overhead as difference between distributed program and serial program
        # (without time spent communicating or waiting)
        search_overhead[i, k] = search_time_dist - wait_time_dist - search_time_serial
    end
end

min_search_oh = minimum(search_overhead, dims=2)
city_sizes = [4,6,8,10]
plot(city_sizes, min_search_oh, yaxis=:log, seriestype=:scatter,legend=false)
plot!(city_sizes, min_search_oh, yaxis=:log, legend=false)

xlabel!("Number of cities")
ylabel!("Search overhead (s)")
title!("Minimum search overhead for different problem sizes")

License¶

TODO: replace link to website

This notebook is part of the course Programming Large Scale Parallel Systems at Vrije Universiteit Amsterdam and may be used under a CC BY 4.0 license.

In [ ]: