In this notebook, we will learn the basics of asynchronous programming in Julia. In particular, we will learn about:
Understanding these concepts is important to learn later distributed computing.
A task is a piece of computation work that can be run asynchronously (i.e., that can be run in the background). To create a task, we first need to create a function that represents the work to be done in the task. In next cell, we generate a task that generates and sums two matrices.
function work()
println("Starting work")
sleep(7)
a = rand(3,3)
b = rand(3,3)
r = a + b
println("Finishing work")
r
end
t = Task(work)
The task has been created, but the corresponding work has not started. Note that we do not see any output from function work yet. To run the task we need to schedule it.
schedule(t)
The task has been executed, but we do not see the result. To get the result we need to fetch it.
fetch(t)
It is important to note that tasks run asynchronously. To illustrate this let's create and schedule a new task.
t = Task(work)
schedule(t)
Note that while the task is running we can execute Julia code. To check this, execute the next two cells while the task is running.
sin(4π)*exp(-0.1)
1 + 1
How is this possible? Tasks run in the brackground and this particular task is sleeping for most of the time. Thus, it is possible to use the current Julia process for other operations while the tasks is sleeping.
It is also important to note that tasks do not run in parallel. We were able to run code while previous tasks was running because the task was idling most of the time. If the task does actual work, the current process will be busy running this task and it is likely that we cannot run other code at the same time. Let's illustrate this with an example. The following code computes an approximation of $\pi$ using Leibniz formula. The quality of the approximation increases with the value of n.
function compute_π(n)
s = 1.0
for i in 1:n
s += (isodd(i) ? -1 : 1) / (i*2+1)
end
4*s
end
Call this function with a large number. Note that it will take some time.
compute_π(4_000_000_000)
Create a task that performs this computation.
fun = () -> compute_π(4_000_000_000)
t = Task(fun)
Schedule the tasks and then try to execute the 2nd cell bellow. Note that the current process will be busy running the task.
schedule(t)
1+1
This function needs to have zero arguments, but it can capture variables if needed. If we try to create a task with a function that has arguments, it will result in an error when we schedule it.
add(a,b) = a + b
t = Task(add)
schedule(t)
If we need, we can capture variables in the function to be run by the task as shown in the next cells.
a = rand(3,3)
b = rand(3,3);
fun = () -> a + b
t = Task(fun)
schedule(t)
@async¶So far, we have created tasks using low-level functions, but there are more convenient ways of creating and scheduling tasks. For instance using the @async macro. This macro is used to run a piece of code asynchronously. Under the hood it puts the code in an anonymous function, creates a task, and schedules it. For instance, the next cell is equivalent to previous one.
@async a + b
@sync¶This macro is used to wait for all the tasks created with @async in a given block of code.
@sync begin
@async sleep(3)
@async sleep(4)
end
chnl = Channel{Int}()
@async begin
for i in 1:5
put!(chnl,i)
end
close(chnl)
end
By executing next cell several times, we will get the values from the channel. We are indeed communicating values from two different tasks. If we execute the cell more than 5 times, it will raise an error since the channel is closed.
take!(chnl)
Instead of taking values from a channel until an error occurs, we can also iterate over the channel in a for loop until the channel is closed.
chnl = Channel{Int}()
@async begin
for i in 1:5
put!(chnl,i)
end
close(chnl)
end
for i in chnl
@show i
end
put! and take! are blocking¶Note that put! and take! are blocking operations. Calling put! blocks the tasks until another task calls take! and viceversa. Thus, we need at least 2 tasks for this to work. If we call put! and take! from the same task, it will result in a dead lock. We have added a print statement to previous example. Run it again and note how put! blocks until we call take!.
chnl = Channel{Int}()
@async begin
for i in 1:5
put!(chnl,i)
println("I have put $i")
end
close(chnl)
end
take!(chnl)
We can be a bit more flexible and use a buffered channel. In this case, put! will block only if the channel is full and take! will block if the channel is empty. We repeat previous example, but with a buffered channel of size 2. Note that we can call put! until the channel is full. At this point, we need to wait to until we call take! which removes an item from the channel, making room for a new item.
buffer_size = 2
chnl = Channel{Int}(buffer_size)
@async begin
for i in 1:5
put!(chnl,i)
println("I have put $i")
end
close(chnl)
end
take!(chnl)
@time compute_π(100_000_000)
@time for i in 1:10
compute_π(100_000_000)
end
@time for i in 1:10
@async compute_π(100_000_000)
end
@time @sync for i in 1:10
@async compute_π(100_000_000)
end
buffer_size = 4
chnl = Channel{Int}(buffer_size)
@time begin
put!(chnl,3)
i = take!(chnl)
sleep(i)
end
chnl = Channel{Int}()
@time begin
put!(chnl,3)
i = take!(chnl)
sleep(i)
end