{ "cells": [ { "cell_type": "markdown", "id": "f48b9a60", "metadata": {}, "source": [ "# Solutions to Notebook Exercises\n", "\n", "## Julia Basics: Exercise 1" ] }, { "cell_type": "code", "execution_count": null, "id": "a06fd02a", "metadata": {}, "outputs": [], "source": [ "function ex1(a)\n", " j = 1\n", " m = a[j]\n", " for (i,ai) in enumerate(a)\n", " if m < ai\n", " m = ai\n", " j = i\n", " end\n", " end\n", " (m,j)\n", "end" ] }, { "cell_type": "markdown", "id": "175b6c35", "metadata": {}, "source": [ "## Julia Basics: Exercise 2" ] }, { "cell_type": "code", "execution_count": null, "id": "bb289acd", "metadata": {}, "outputs": [], "source": [ "ex2(f,g) = x -> f(x) + g(x) " ] }, { "cell_type": "markdown", "id": "86250e27", "metadata": {}, "source": [ "## Julia Basics: Exercise 3" ] }, { "cell_type": "code", "execution_count": null, "id": "41b537ab", "metadata": {}, "outputs": [], "source": [ "function compute_values(n,max_iters)\n", " x = LinRange(-1.7,0.7,n)\n", " y = LinRange(-1.2,1.2,n)\n", " values = zeros(Int,n,n)\n", " for j in 1:n\n", " for i in 1:n\n", " values[i,j] = mandel(x[i],y[j],max_iters)\n", " end\n", " end\n", " values\n", "end\n", "values = compute_values(1000,10)\n", "using GLMakie\n", "heatmap(x,y,values)" ] }, { "cell_type": "markdown", "id": "d6d12733", "metadata": {}, "source": [ "## Matrix Multiplication : Exercise 1" ] }, { "cell_type": "code", "execution_count": null, "id": "be73e87a", "metadata": {}, "outputs": [], "source": [ "function matmul_dist_3!(C,A,B)\n", " m = size(C,1)\n", " n = size(C,2)\n", " l = size(A,2)\n", " @assert size(A,1) == m\n", " @assert size(B,2) == n\n", " @assert size(B,1) == l\n", " @assert mod(m,nworkers()) == 0\n", " # Implement here\n", " nrows_w = div(m,nworkers())\n", " @sync for (i,w) in enumerate(workers())\n", " rows_w = (1:nrows_w) .+ (i-1)*nrows_w\n", " Aw = A[rows_w,:]\n", " ftr = @spawnat w begin\n", " Cw = similar(Aw,nrows_w,n)\n", " matmul_seq!(Cw,Aw,B)\n", " Cw\n", " end\n", " @async C[rows_w,:] = fetch(ftr)\n", " end\n", " C\n", "end" ] }, { "cell_type": "markdown", "id": "2d9f4813", "metadata": {}, "source": [ "## Jacobi Method : Exercise 1" ] }, { "cell_type": "code", "execution_count": null, "id": "cf3b1e72", "metadata": {}, "outputs": [], "source": [ "@mpi_do manager begin\n", " using MPI\n", " comm = MPI.Comm_dup(MPI.COMM_WORLD)\n", " nw = MPI.Comm_size(comm)\n", " iw = MPI.Comm_rank(comm)+1\n", " function jacobi_mpi(n,niters)\n", " if mod(n,nw) != 0\n", " println(\"n must be a multiple of nw\")\n", " MPI.Abort(comm,1)\n", " end\n", " n_own = div(n,nw)\n", " u = zeros(n_own+2)\n", " u[1] = -1\n", " u[end] = 1\n", " u_new = copy(u)\n", " for t in 1:niters\n", " reqs_snd = MPI.Request[]\n", " reqs_rcv = MPI.Request[]\n", " if iw != 1\n", " neig_rank = (iw-1)-1\n", " req = MPI.Isend(view(u,2:2),comm,dest=neig_rank,tag=0)\n", " push!(reqs_snd,req)\n", " req = MPI.Irecv!(view(u,1:1),comm,source=neig_rank,tag=0)\n", " push!(reqs_rcv,req)\n", " end\n", " if iw != nw\n", " neig_rank = (iw+1)-1\n", " s = n_own+1\n", " r = n_own+2\n", " req = MPI.Isend(view(u,s:s),comm,dest=neig_rank,tag=0)\n", " push!(reqs_snd,req)\n", " req = MPI.Irecv!(view(u,r:r),comm,source=neig_rank,tag=0)\n", " push!(reqs_rcv,req)\n", " end\n", " for i in 3:n_own\n", " u_new[i] = 0.5*(u[i-1]+u[i+1])\n", " end\n", " MPI.Waitall(reqs_rcv)\n", " for i in (2,n_own+1)\n", " u_new[i] = 0.5*(u[i-1]+u[i+1])\n", " end\n", " MPI.Waitall(reqs_snd)\n", " u, u_new = u_new, u\n", " end\n", " u\n", " @show u\n", " end\n", " niters = 100\n", " load = 4\n", " n = load*nw\n", " jacobi_mpi(n,niters)\n", "end" ] }, { "cell_type": "markdown", "id": "2f343157", "metadata": {}, "source": [ "## Exercise: Ring communication - MPI" ] }, { "cell_type": "code", "execution_count": null, "id": "a49be691", "metadata": {}, "outputs": [], "source": [ "using MPI\n", "using Test\n", "\n", "MPI.Init()\n", "comm = MPI.COMM_WORLD\n", "rank = MPI.Comm_rank(comm)\n", "id = rank + 1\n", "root = 0\n", "size = MPI.Comm_size(comm)\n", "\n", "dst = mod(rank + 1, size)\n", "src = mod(rank - 1, size)\n", "\n", "send_buf = id\n", "recv_buf = 1\n", "\n", "if rank == root \n", " # Proc 1: Send id async to destination, then wait for receive\n", " MPI.isend(send_buf, comm; dest=dst, tag=0)\n", " recv_buf = MPI.recv(comm; source=src, tag=0)\n", " @show recv_buf == factorial(size)\n", " @test recv_buf == factorial(size)\n", "else\n", " # Other procs: receive sync and send async to next process\n", " recv_buf = MPI.recv(comm; source=src, tag=0)\n", " send_buf = recv_buf * id\n", " MPI.isend(send_buf, comm; dest=dst, tag=0)\n", "end\n", "\n", "MPI.Finalize()" ] }, { "cell_type": "markdown", "id": "6cbbf074", "metadata": {}, "source": [ "## Exercise: Ring communication - Distributed.jl" ] }, { "cell_type": "code", "execution_count": null, "id": "dc156523", "metadata": {}, "outputs": [], "source": [ "using Distributed \n", "using Test\n", "\n", "np = 4\n", "add_n = np - nprocs() \n", "addprocs(add_n)\n", "worker_ids = workers()\n", "@assert nprocs() > nworkers()\n", "\n", "# Initialize id channel\n", "id_chnl = RemoteChannel(()->Channel{Int}(1))\n", "put!(id_chnl, 1)\n", "\n", "# Initialize data channel\n", "job_chnl = RemoteChannel(()->Channel{Int}(1))\n", "put!(job_chnl, 1)\n", "\n", "@sync for w in workers()\n", " @spawnat w begin\n", " pos = findfirst(worker_ids .== w) + 1\n", " dst = mod(pos, np) + 1\n", " src = mod(pos-2, np) + 1\n", " while true \n", " pred = fetch(id_chnl)\n", " if pred == src\n", " take!(id_chnl)\n", " value = take!(job_chnl)\n", " put!(job_chnl, value * pos) \n", " put!(id_chnl, pos) \n", " break\n", " end\n", " end\n", " end\n", "end\n", "\n", "res = take!(job_chnl)\n", "@show res\n", "@test res == factorial(np)\n", "\n", "rmprocs(workers())" ] }, { "cell_type": "markdown", "id": "19641daf", "metadata": {}, "source": [ "## TSP Exercise: Measure search overhead" ] }, { "cell_type": "code", "execution_count": null, "id": "f00557a0", "metadata": {}, "outputs": [], "source": [ "## TSP serial \n", "function tsp_serial(connections,city)\n", " num_cities = length(connections)\n", " path=zeros(Int,num_cities)\n", " hops = 1\n", " path[hops] = city\n", " min_path = zeros(Int, num_cities)\n", " current_distance = 0\n", " min_distance = typemax(Int)\n", " # Collect search time \n", " search_time = @elapsed min_path, min_distance = tsp_serial_impl(connections,hops,path,current_distance, min_path, min_distance)\n", " (;path=min_path,distance=min_distance, search_time)\n", "end" ] }, { "cell_type": "code", "execution_count": null, "id": "30784da2", "metadata": {}, "outputs": [], "source": [ "## TSP distributed\n", "@everywhere function tsp_dist_impl(wait_time, connections,hops,path,current_distance,min_dist_chnl, max_hops,jobs_chnl,ftr_result)\n", " num_cities = length(connections)\n", " if hops == num_cities\n", " min_distance = fetch(min_dist_chnl)\n", " if current_distance < min_distance\n", " take!(min_dist_chnl)\n", " # Collect wait time to substract from overall search time \n", " if ftr_result !== nothing\n", " wait_time += @elapsed @spawnat 1 begin\n", " result = fetch(ftr_result)\n", " result.path .= path\n", " result.min_distance_ref[] = current_distance\n", " end |> wait\n", " end\n", " put!(min_dist_chnl, current_distance)\n", " end\n", " elseif hops <= max_hops\n", " current_city = path[hops]\n", " next_hops = hops + 1\n", " for (next_city,distance_increment) in connections[current_city]\n", " if !visited(next_city,hops,path)\n", " path[next_hops] = next_city\n", " next_distance = current_distance + distance_increment\n", " # Collect wait time because fetch may block\n", " wait_time += @elapsed min_distance = fetch(min_dist_chnl)\n", " if next_distance < min_distance\n", " tsp_dist_impl(wait_time, connections,next_hops,path,next_distance,min_dist_chnl,max_hops,jobs_chnl,ftr_result)\n", " end\n", " end\n", " end \n", " else\n", " # Collect communication time and add to wait time\n", " wait_time += @elapsed if jobs_chnl !== nothing \n", " path_copy = copy(path) \n", " put!(jobs_chnl,(;hops,path=path_copy,current_distance))\n", " end\n", " end\n", " # Return wait time\n", " wait_time\n", "end\n", "\n", "function tsp_dist(connections,city)\n", " max_hops = 2\n", " num_cities = length(connections)\n", " path=zeros(Int,num_cities)\n", " result_path=zeros(Int, num_cities)\n", " wait_time = 0\n", " search_time = 0\n", " hops = 1\n", " path[hops] = city\n", " current_distance = 0\n", " min_distance = typemax(Int)\n", " jobs_chnl = RemoteChannel(()->Channel{Any}(10))\n", " min_dist_chnl = RemoteChannel(()->Channel{Int}(1))\n", " put!(min_dist_chnl, min_distance)\n", " ftr_result = @spawnat 1 (;path=result_path,min_distance_ref=Ref(min_distance))\n", " @async begin\n", " # Collect search time from master process\n", " search_time += @elapsed wait_time += tsp_dist_impl(wait_time,connections,hops,path,current_distance,min_dist_chnl,max_hops,jobs_chnl,nothing)\n", " for w in workers()\n", " put!(jobs_chnl,nothing)\n", " end\n", " end\n", " @sync for w in workers()\n", " @spawnat w begin\n", " path = zeros(Int, num_cities)\n", " max_hops = typemax(Int)\n", " while true\n", " job = take!(jobs_chnl)\n", " if job == nothing\n", " break\n", " end\n", " hops = job.hops\n", " path = job.path \n", " current_distance = job.current_distance\n", " min_distance = fetch(min_dist_chnl)\n", " if current_distance < min_distance\n", " # Collect search time from worker processes \n", " search_time += @elapsed wait_time += tsp_dist_impl(wait_time,connections,hops,path,current_distance,min_dist_chnl,max_hops,nothing,ftr_result)\n", " end\n", " end\n", " end\n", " end \n", " result = fetch(ftr_result)\n", " (;path = result.path, distance = result.min_distance_ref[], search_time, wait_time)\n", "end\n" ] }, { "cell_type": "code", "execution_count": null, "id": "694de934", "metadata": {}, "outputs": [], "source": [ "using Distributed\n", "using RandomMatrix\n", "using Plots\n", "\n", "function generate_rand_connections(city_range, distance_range)\n", " # generate random connections matrix \n", " n_cities = rand(city_range)\n", " matrix = randTriangular(distance_range, n_cities; Diag=false)\n", "\n", " connections = Array{Array{Tuple{Int64,Int64},1},1}(undef, n_cities)\n", " for i in 1:n_cities\n", " connections[i] = Array{Tuple{Int64,Int64},1}(undef, n_cities)\n", " end\n", " for i in 1:n_cities\n", " for j in i:n_cities\n", " distance = matrix[i,j]\n", " connections[i][j] = (j,distance)\n", " connections[j][i] = (i,distance)\n", " end\n", " end\n", " return connections\n", "end\n", "\n", "# Run once so compile times are not measured\n", "distance_range = 1:100\n", "connections = generate_rand_connections(4:4, distance_range)\n", "tsp_dist(connections,1)\n", "tsp_serial(connections,1)\n", "\n", "# Measure runtimes of serial and parallel algorithm\n", "n_it = 5\n", "city_ranges = [4:4, 6:6, 8:8, 10:10]\n", "search_overhead = zeros(Float64, length(city_ranges), n_it )\n", "for (i, n) in enumerate(city_ranges)\n", " for k in 1:n_it\n", " connections = generate_rand_connections(n, distance_range)\n", " @show n, k\n", " path_dist, distance_dist, search_time_dist, wait_time_dist = tsp_dist(connections,1)\n", " path_serial, distance_serial, search_time_serial = tsp_serial(connections,1)\n", " # Compute search overhead as difference between distributed program and serial program\n", " # (without time spent communicating or waiting)\n", " search_overhead[i, k] = search_time_dist - wait_time_dist - search_time_serial\n", " end\n", "end\n", "\n", "min_search_oh = minimum(search_overhead, dims=2)\n", "city_sizes = [4,6,8,10]\n", "plot(city_sizes, min_search_oh, yaxis=:log, seriestype=:scatter,legend=false)\n", "plot!(city_sizes, min_search_oh, yaxis=:log, legend=false)\n", "\n", "xlabel!(\"Number of cities\")\n", "ylabel!(\"Search overhead (s)\")\n", "title!(\"Minimum search overhead for different problem sizes\")" ] }, { "cell_type": "markdown", "id": "47d88e7a", "metadata": {}, "source": [ "# License\n", "\n", "TODO: replace link to website\n", "\n", "This notebook is part of the course [Programming Large Scale Parallel Systems](http://localhost:8000/) at Vrije Universiteit Amsterdam and may be used under a [CC BY 4.0](https://creativecommons.org/licenses/by/4.0/) license." ] }, { "cell_type": "code", "execution_count": null, "id": "968304a6", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Julia 1.9.1", "language": "julia", "name": "julia-1.9" }, "language_info": { "file_extension": ".jl", "mimetype": "application/julia", "name": "julia", "version": "1.9.1" } }, "nbformat": 4, "nbformat_minor": 5 }