11.4. 集合通信#

点对点通信 介绍了点对点通信,本节主要介绍一种全局的通信方式:集合通信,它允许在组内的多个进程之间同时进行数据传输。目前,集合通信仅支持阻塞模式。

集合通信主要包括以下几类:

首字母大写的函数名表示这些操作是基于缓冲区的,比如 Comm.BcastComm.ScatterComm.GatherComm.AllgatherComm.Alltoall。而首字母小写的函数名表示这些操作可以传输 Python 对象,比如 Comm.bcastComm.scatterComm.gatherComm.allgatherComm.alltoall

同步#

MPI 计算分布在多个进程中,而这些进程的计算速度可能各不相同。Comm.Barrier 对 Communicator 里所有进程都执行同步等待,正如 “Barrier” 这个英文名所暗示的,它相当于设置了一个障碍,要求所有进程到达这一点后才能继续执行。计算速度快的进程在到达 Comm.Barrier(),不能继续执行 Comm.Barrier() 之后的计算逻辑,它们必须等待其他所有进程也到达这一点。

数据移动#

广播#

Comm.Bcast 将数据从一个发送者全局广播给组里所有进程,广播操作适用于需要将同一份数据发送给所有进程的场景,例如将一个全局变量的值发送给所有进程,如 图 11.6 所示。

../_images/broadcast.svg

图 11.6 Broadcast#

案例:广播#

代码片段 11.7 演示了如何将一个 NumPy 数组广播到所有的进程:

代码片段 11.7 broadcast.py#
import numpy as np
from mpi4py import MPI

comm = MPI.COMM_WORLD

comm.Barrier()

N = 5
if comm.rank == 0:
    A = np.arange(N, dtype=np.float64)    # rank 0 初始化数据到变量 A
else:
    A = np.empty(N, dtype=np.float64)     # 其他节点的变量 A 为空

# 广播
comm.Bcast([A, MPI.DOUBLE])

# 验证所有节点上的 A
print("Rank:%2d, data:%s" % (comm.rank, A))
!mpiexec -np 4 python broadcast.py
Rank: 0, data:[0. 1. 2. 3. 4.]
Rank: 2, data:[0. 1. 2. 3. 4.]
Rank: 1, data:[0. 1. 2. 3. 4.]
Rank: 3, data:[0. 1. 2. 3. 4.]

Scatter 和 Gather#

Comm.ScatterComm.Gather 是一组相对应的操作。

  • Comm.Scatter 将数据从一个进程分散到组中的所有进程,一个进程将数据分散成多个块,每个块发送给对应的进程。其他进程接收并存储各自的块。Scatter 操作适用于将一个较大的数据集分割成多个小块。

  • Comm.GatherComm.Scatter 相反,将组里所有进程的小数据块归集到一个进程上。

../_images/scatter-gather.svg

图 11.7 Scatter 与 Gather#

案例:Scatter#

代码片段 11.8 演示了如何使用 Scatter 将数据分散到所有进程。

代码片段 11.8 scatter.py#
from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

sendbuf = None
if rank == 0:
    sendbuf = np.empty([size, 8], dtype='i')
    sendbuf.T[:,:] = range(size)
    print(f"Rank: {rank}, to be scattered: \n{sendbuf}")
recvbuf = np.empty(8, dtype='i')
comm.Scatter(sendbuf, recvbuf, root=0)
print(f"Rank: {rank}, after scatter: {recvbuf}")
assert np.allclose(recvbuf, rank)
!mpiexec -np 4 python scatter.py
Rank: 0, to be scattered: 
[[0 0 0 0 0 0 0 0]
 [1 1 1 1 1 1 1 1]
 [2 2 2 2 2 2 2 2]
 [3 3 3 3 3 3 3 3]]
Rank: 0, after scatter: [0 0 0 0 0 0 0 0]
Rank: 1, after scatter: [1 1 1 1 1 1 1 1]
Rank: 2, after scatter: [2 2 2 2 2 2 2 2]
Rank: 3, after scatter: [3 3 3 3 3 3 3 3]

Allgather 和 Alltoall#

另外两个比较复杂的操作是 Comm.AllgatherComm.Alltoall

Comm.AllgatherComm.Gather 的进阶版,如 图 11.8 所示,它把散落在多个进程的多个小数据块发送给每个进程,每个进程都包含了一份相同的数据。

../_images/allgather.svg

图 11.8 Allgather#

Comm.AlltoallComm.ScatterComm.Gather 的组合,如 图 11.9 所示,先进行 Comm.Scatter,再进行 Comm.Gather。如果把数据看成一个矩阵,Comm.Alltoall 又可以被看做是一种全局的转置(Transpose)操作。

../_images/alltoall.svg

图 11.9 Alltoall#

集合计算#

集合计算是散落在不同进程的数据聚合在一起的同时对数据进行计算,比如 Comm.Reduce。如 图 11.10图 11.11 所示,数据归集到某个进程时,还执行了聚合函数 f,常用的聚合函数有求和 MPI.SUM 等。

../_images/reduce.svg

图 11.10 Reduce#

../_images/scan.svg

图 11.11 Scan#