<img src="https://upload.wikimedia.org/wikipedia/commons/thumb/3/39/VU_logo.png/800px-VU_logo.png?20161029201021" width="350">

### Programming large-scale parallel systems

# Intro to MPI (collectives)

## Contents


In this notebook, we will continue learning the basics of parallel computing using the Message Passing Interface (MPI) from Julia. In particular, we will learn:

- How to use basic collective communication directives
- How to use MPI communicators


## Collective communication

MPI provides collective communication functions for communication involving multiple processes. Some usual collective functions are:


- `MPI_Barrier`: Synchronize all processes
- `MPI_Bcase`: Send the same data from one to all processes
- `MPI_Gather`: Gather data from all processes to one
- `MPI_Scatter`: Scatter data from one to all all processes
- `MPI_Reduce`: Reduce data from all processes to a single one
- `MPI_Scan`: Scan (or prefix) reduction
- `MPI_Allgather`: Like a `MPI_Gather` but all processes receive the result
- `MPI_Allreduce`: Like `MPI_Reduce` but all processes receive the result
- `MPI_Alltoall`:  Exchange data from all to all processes


We will discuss some of them in greater detail in this notebook.

## Why collective primitives?

Point-to-point communication primitives provide all the building blocks needed in parallel programs and could be used to implement the collective functions described above. Then, why does MPI provide collective communication directives? There are several reasons:

- Ease of use: It is handy for users to have these functions readily available instead of having to implement them.
- Performance: Library implementations typically use efficient algorithms (such as reduction trees).
- Hardware support: Hardware vendors can optimize the MPI library for their machine and even develop hardware specially design to perform MPI operations efficiently. For instance, [IMB Blue Gene](https://en.wikipedia.org/wiki/IBM_Blue_Gene) was equipped with  auxiliary networks for MPI collectives.


## Semantics of collective operations


- Completeness: All the collective communication directives above are *complete* operations. Thus, it is safe to use and reset the buffers once the function returns.
- Standard mode: Collective directives are in standard mode only, like `MPI_Send`. Assuming that they block is erroneous, assuming that they do not block is also erroneous.
- Synchronization:  Completion of a call does not guarantee that other processes have completed the operation. A collective operation may or may not have the effect of synchronizing all processes, the only exception is `MPI_Barrier` of course.


<div class="alert alert-block alert-info">
<b>Note:</b> Recent versions of the MPI standard also include non-blocking (incomplete) versions of collective operations (not covered in this notebook). A particularly funny one is the non-blocking barrier `MPI_Ibarrier`.
</div>

## MPI_Barrier

This function is used to synchronizes a group of processes. All processes block until all have reached the barrier. It is often invoked at the end of for loops to make sure all processes have finished the current loop iteration to move to the next one. We will see an example later in another notebook when studying the traveling sales person problem (TSP).

In Julia:
```julia
MPI.Barrier(comm)
```

In C:
```c
int MPI_Barrier(MPI_Comm comm)
```

### Example

In this example the ranks sleep for a random amount of time and then they call barrier. It is guaranteed that the message "Done!" will be printed after all processes printed "I woke up" since we used a barrier. Try also to comment out the call to `MPI.Barrier`. You will see that the message can be printed in any order in this case.

In [26]:
code = quote
    using MPI
    MPI.Init()
    comm = MPI.COMM_WORLD
    rank = MPI.Comm_rank(comm)
    sleep(rand(1:3))
    println("[rank $rank] I woke up")
    MPI.Barrier(comm)
    println("[rank $rank] Done!")
end
run(`$(mpiexec()) -np 4 julia --project=. -e $code`);

[rank 0] I woke up
[rank 1] I woke up
[rank 2] I woke up
[rank 3] I woke up
[rank 0] Done!
[rank 1] Done!
[rank 2] Done!
[rank 3] Done!


## MPI_Gather

Each rank sends a message to the root rank (the root rank also sends a message to itself). The root rank receives all these values in a buffer (e.g. a vector). This function assumes that the amount of data sent from each rank is the same. The root process can be any process and it is rank 0 by default in Julia.

In Julia:
```julia
MPI.Gather!(sendbuf, recvbuf, comm; root=0)
```

In C:
```c
int MPI_Gather(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
    void *recvbuf, int recvcount, MPI_Datatype recvtype, int root,
    MPI_Comm comm)
```

### Example

Each process sends an integer to rank 0.


<div>
<img src="attachment:g13884.png"  align="left" width="350"/>
</div>

In [34]:
code = quote
    using MPI
    MPI.Init()
    comm = MPI.COMM_WORLD
    rank = MPI.Comm_rank(comm)
    msg = 10*(rank+2)
    root = 0
    sndbuf = [msg]
    if rank == root
        nranks = MPI.Comm_size(comm)
        rcvbuf = zeros(Int,nranks)
    else
        # This can be anything as only the root process
        # Will receive data
        rcvbuf = nothing
    end
    println("[rank $rank] I am sending $sndbuf")
    MPI.Gather!(sndbuf, rcvbuf, comm; root)
    println("[rank $rank] I received $rcvbuf")
end
run(`$(mpiexec()) -np 3 julia --project=. -e $code`);

[rank 1] I am sending [30]
[rank 2] I am sending [40]
[rank 0] I am sending [20]
[rank 0] I received [20, 30, 40]
[rank 1] I received nothing
[rank 2] I received nothing


## MPI_Gatherv

This function is similar to `MPI_Gather`, but it is used when there is a different amount of data sent from each process.

### Example

Each processes sends `rank+1` integers to rank 0.

In [32]:
code = quote
    using MPI
    MPI.Init()
    comm = MPI.COMM_WORLD
    rank = MPI.Comm_rank(comm)
    sndbuf = fill(10*rank,rank+1)
    root = 0
    if rank == root
        nranks = MPI.Comm_size(comm)
        # Define the amount of data to be received
        # from each rank
        counts = [ rank+1 for rank in 0:(nranks-1)]
        # Total amount of receive data
        ndata = sum(counts)
        data = zeros(Int,ndata)
        rcvbuf = MPI.VBuffer(data,counts)
    else
        # This can be anything as only the root process
        # Will receive data
        rcvbuf = nothing
    end
    println("[rank $rank] I am sending $sndbuf")
    MPI.Gatherv!(sndbuf, rcvbuf, comm; root)
    if rank == root
        println("[rank $rank] I received $data")
    end
end
run(`$(mpiexec()) -np 4 julia --project=. -e $code`);

[rank 1] I am sending [10, 10]
[rank 2] I am sending [20, 20, 20]
[rank 3] I am sending [30, 30, 30, 30]
[rank 0] I am sending [0]
[rank 0] I received [0, 10, 10, 20, 20, 20, 30, 30, 30, 30]


## How to get the amount of data to be received?

Remember that `MPI_Probe` allows you to query the amount of data to be received with `MPI_Recv`. Unfortunately, this mechanism does not work for MPI collectives. A workaround is to first communicate the amount of data to be gathered, allocate the receive buffer accordingly, and then communicate the actual message.


### Example

Each process sends a random amount of integers to rank 0.

In [39]:
code = quote
    using MPI
    MPI.Init()
    comm = MPI.COMM_WORLD
    rank = MPI.Comm_rank(comm)
    # Get number of ints to send
    count = rand(1:4)
    # Generate data to send
    sndbuf = fill(rank,count)
    # Communicate counts
    root = 0
    if rank == root
        nranks = MPI.Comm_size(comm)
        counts = zeros(Int,nranks)
    else
        counts = nothing
    end
    MPI.Gather!([count], counts, comm; root)
    # Allocate receive buffer
    if rank == root
        ndata = sum(counts)
        data = zeros(Int,ndata)
        rcvbuf = MPI.VBuffer(data,counts)
    else
        rcvbuf = nothing
    end
    # Communicate data
    println("[rank $rank] I am sending $sndbuf")
    MPI.Gatherv!(sndbuf, rcvbuf, comm; root)
    if rank == root
        println("[rank $rank] I received $data")
    end
end
run(`$(mpiexec()) -np 4 julia --project=. -e $code`);

[rank 1] I am sending [1, 1]
[rank 2] I am sending [2]
[rank 3] I am sending [3, 3, 3, 3]
[rank 0] I am sending [0]
[rank 0] I received [0, 1, 1, 2, 3, 3, 3, 3]


## MPI_Scatter

This is the inverse operation of `MPI_Gather`. The root rank sends a distinct message to each rank.

In Julia:
```julia
MPI.Scatter!(sendbuf, recvbuf, comm; root=0)
```

In C:
```c
int MPI_Scatter(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
    void *recvbuf, int recvcount, MPI_Datatype recvtype, int root,
    MPI_Comm comm)
```

<div class="alert alert-block alert-info">
<b>Note:</b> `MPI_Scatter` sends the same amount of data to each rank. To send a variable amount of data there is `MPI_Scatterv` (`MPI.Scatterv!` in julia), which works similar to `MPI_Gatherv`. 
</div>



### Example

Rank 0 sends a unique integer to each rank. 

<div>
<img src="attachment:g13389.png"  align="left" width="350"/>
</div>

In [44]:
code = quote
    using MPI
    MPI.Init()
    comm = MPI.COMM_WORLD
    nranks = MPI.Comm_size(comm)
    rank = MPI.Comm_rank(comm)
    root = 0
    rcv = Ref(0) 
    if rank == root
        snd = [10*(i+1) for i in 1:nranks]
        println("[rank $rank] I am sending: $snd")
    else
        # Only the root rank sends data
        # so this can be anything
        snd = nothing
    end    
    MPI.Scatter!(snd,rcv,comm;root)
    println("[rank $rank] I have received: $(rcv[])")
end
run(`$(mpiexec()) -np 4 julia --project=. -e $code`);

[rank 0] I am sending: [20, 30, 40, 50]
[rank 0] I have received: 20
[rank 1] I have received: 30
[rank 2] I have received: 40
[rank 3] I have received: 50


## MPI_Bcast

Similar to `MPI_Scatter`, but we send the same message to all processes.

In Julia:
```julia
MPI.Bcast!(buf, comm; root)
```

In C:
```c
int MPI_Bcast(void *buffer, int count, MPI_Datatype datatype,
    int root, MPI_Comm comm)
```


### Example

Rank 0 sends the same integer to all processes.

<div>
<img src="attachment:g1657.png"  align="left" width="350"/>
</div>

In [46]:
code = quote
    using MPI
    MPI.Init()
    comm = MPI.COMM_WORLD
    nranks = MPI.Comm_size(comm)
    rank = MPI.Comm_rank(comm)
    root = 0
    buffer = Ref(0)
    if rank == root
        buffer[] = 20
        println("[rank $rank] I am sending: $(buffer[])")
    end    
    MPI.Bcast!(buffer,comm;root)
    println("[rank $rank] I have received: $(buffer[])")
end
run(`$(mpiexec()) -np 4 julia --project=. -e $code`);

[rank 0] I am sending: 20
[rank 0] I have received: 20
[rank 1] I have received: 20
[rank 2] I have received: 20
[rank 3] I have received: 20


# License

This notebook is part of the course [Programming Large Scale Parallel Systems](https://www.francescverdugo.com/XM_40017) at Vrije Universiteit Amsterdam and may be used under a [CC BY 4.0](https://creativecommons.org/licenses/by/4.0/) license.