11.3. 点对点通信#

最简单的通信模式是点对点(Point-to-Point)通信,点对点通信又分为阻塞式(Blocking)和非阻塞式(Non-Blocking)。实现点对点时主要考虑两个问题:

  • 如何控制和识别不同的进程。比如,想让进程 0 给进程 1 发消息。

  • 如何控制数据的读写操作以及消息的发送和接收。这包括确定数据的大小和数据类型。

发送与接收#

Comm.send()Comm.recv() 是分别用于阻塞式发送和接收数据的函数。

Comm.send(obj, dest, tag=0) 的参数主要是 objdestobj 代表要发送的数据,它可以是Python的内置数据类型,如列表 list 和字典 dict 等,也可以是 NumPy 的多维数组 ndarray,甚至是 GPU 上的 CuPy 数据。 我们之前讨论了通信器(Communicator)和标识号(Rank),可以通过 Rank 来确定目标进程,其中dest 表示目标进程的 Rank。tag 参数用于标识,为程序员提供了更精细的控制选项。利用 tag 可以实现消息的有序传递和筛选。接收方可以基于特定标签选择接收消息,或者按照标签的顺序来接收消息,这提供了更灵活的控制消息发送和接收过程的能力。

案例1:发送 Python 对象#

比如,我们发送一个 Python 对象。Python 对象在通信过程中的序列化使用的是 pickle

代码片段 11.2 演示了如何发送一个 Python 对象。

代码片段 11.2 send-py-object.py#
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

if rank == 0:
    data = {'a': 7, 'b': 3.14}
    comm.send(data, dest=1)
    print(f"Sended: {data}, from rank: {rank}.")
elif rank == 1:
    data = comm.recv(source=0)
    print(f"Received: {data}, to rank: {rank}.")

将这份代码保存文件为 send-py-object.py,在命令行中这样启动:

!mpiexec -np 2 python send-py-object.py
Sended: {'a': 7, 'b': 3.14}, from rank: 0.
Received: {'a': 7, 'b': 3.14}, to rank: 1.

案例2:发送 NumPy ndarray#

或者发送一个 NumPy ndarray

代码片段 11.3 演示了如何发送一个 NumPy ndarray

代码片段 11.3 send-np.py#
from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

# 明确告知 MPI 数据类型为 int
# dtype='i', i 为 INT 的缩写
if rank == 0:
    data = np.arange(10, dtype='i')
    comm.Send([data, MPI.INT], dest=1)
    print(f"Sended: {data}, from rank: {rank}.")
elif rank == 1:
    data = np.empty(10, dtype='i')
    comm.Recv([data, MPI.INT], source=0)
    print(f"Received: {data}, to rank: {rank}.")

# MPI 自动发现数据类型
if rank == 0:
    data = np.arange(10, dtype=np.float64)
    comm.Send(data, dest=1)
    print(f"Sended: {data}, from rank: {rank}.")
elif rank == 1:
    data = np.empty(10, dtype=np.float64)
    comm.Recv(data, source=0)
    print(f"Received: {data}, to rank: {rank}.")

以上代码保存为 send-np.py 文件。

!mpiexec -np 2 python send-np.py
Sended: [0 1 2 3 4 5 6 7 8 9], from rank: 0.
Received: [0 1 2 3 4 5 6 7 8 9], to rank: 1.
Received: [0. 1. 2. 3. 4. 5. 6. 7. 8. 9.], to rank: 1.
Sended: [0. 1. 2. 3. 4. 5. 6. 7. 8. 9.], from rank: 0.

Note

这里的 SendRecv 函数的首字母大写,表示它们是基于缓冲区(Buffer)的操作。使用这些基于缓冲区的函数时,通常需要明确指定数据类型,例如通过传入二元组 (data, MPI.DOUBLE) 或三元组 (data, count, MPI.DOUBLE) 来指定。然而,在之前的例子中,comm.Send(data, dest=1) 并没有显式指定数据类型和大小,这是因为 mpi4py 对 NumPy 和 CuPy 的 ndarray 进行了类型和大小的自动推断。

案例3:Master-Worker#

我们现在来实现一个 Master-Worker 模式的案例,其中包含 size 个进程。size-1 个进程作为 Worker 执行随机数据生成任务,而 Rank 为 size-1 的最后一个进程则作为 Master,负责接收这些数据,并将接收到的数据大小打印出来。

以下代码演示 Master 与 Worker 进程间数据发送和接收的过程。

代码片段 11.4 master-worker.py#
from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

if rank < size - 1:
    # Worker 进程
    np.random.seed(rank)
    # 随机生成
    data_count = np.random.randint(100)
    data = np.random.randint(100, size=data_count)
    comm.send(data, dest=size - 1)
    print(f"Worker: worker ID: {rank}; count: {len(data)}")
else:
    # Master 进程
    for i in range(size - 1):
        status = MPI.Status()
        data = comm.recv(source=MPI.ANY_SOURCE, status=status)
        print(f"Master: worker ID: {status.Get_source()}; count: {len(data)}")

comm.Barrier()

在这个例子中,rank 小于 size - 1 的进程作为 Worker,生成随机数据,并发送给 Rank 为 size - 1 的 Master 进程。Master 进程接收所有 Worker 发送的数据,并打印出接收到的数据的总大小。

!mpiexec -np 8 python master-worker.py
Worker: worker ID: 0; count: 44
Worker: worker ID: 2; count: 40
Worker: worker ID: 4; count: 46
Worker: worker ID: 3; count: 24
Master: worker ID: 2; count: 40
Master: worker ID: 3; count: 24
Master: worker ID: 4; count: 46
Master: worker ID: 0; count: 44
Worker: worker ID: 5; count: 99Master: worker ID: 5; count: 99

Worker: worker ID: 1; count: 37
Master: worker ID: 1; count: 37
Worker: worker ID: 6; count: 10
Master: worker ID: 6; count: 10

案例 4:长方形模拟求 \(\pi\)#

对半径为 R 的圆,我们可以采用微分方法将圆切分成 N 个小长方形,当长方形数量达到无穷大的 N 时, 所有长方形总面积接近于 1/4 圆面积,如 图 11.3 所示。

../_images/rectangle-pi.svg

图 11.3 使用 N 个小长方形模拟 1/4 圆#

假设此时有 size 个进程参与计算,首先求每个进程需要处理的长方形数量 (N/size)。每个进程各自计算长方形面积之和,并发送给 Master 进程。第一个进程作为 Master,接收各 Worker 发送数据,汇总所有矩形面积,从而近似计算出 \(\pi\) 值。

代码片段 11.5 演示了长方形模拟求 \(\pi\) 值的过程。

代码片段 11.5 rectangle-pi.py#
import math
import time

from mpi4py import MPI

communicator = MPI.COMM_WORLD
rank = communicator.Get_rank()  # 进程唯一的标识Rank
process_nums = communicator.Get_size()
"""
参数设置:
R=1
N=64*1024*1024
"""
t0 = time.time()
rect_num = 64 * 1024 * 1024
rect_width = 1 / rect_num
step_size = rect_num // process_nums

def cal_rect_area(process_no, step_size, rect_width):
    total_area = 0.0
    rect_start = (process_no * step_size + 1) * rect_width

    for i in range(step_size):
        x = rect_start + i * rect_width
        # (x,y) 对应于第i个小矩形唯一在圆弧上的顶点
        # x^2+y^2=1 => y=sqrt(1-x^2)
        rect_length = math.pow(1 - x * x, 0.5)
        total_area += rect_width * rect_length
    return total_area

# 在每个进程上执行计算
total_area = cal_rect_area(rank, step_size, rect_width)

if rank == 0:
    # Master
    for i in range(1, process_nums):
        total_area += communicator.recv(source=i)
    p_i = total_area * 4
    t1 = time.time()
    print("Simulated PI: {:.10f}, Relative Error:{:.10f}".format(p_i, abs(1 - p_i / math.pi)))
    print("Time:{:.3f}s".format(t1 - t0))
else:
    # Worker
    communicator.send(total_area, dest=0)

上述例子中,我们设置参数为:R=1, N=64*1024*1024,保存文件为 rectangle_pi.py

!mpiexec -np 8 python rectangle_pi.py
Simulated PI: 3.1415926238, Relative Error:0.0000000095
Time:7.361s

阻塞 v.s. 非阻塞#

阻塞#

我们先分析一下阻塞式通信。SendRecv 这两个基于缓存的方法:

  • Send 方法会在缓冲区的数据全部发送完毕之后才返回(return)。该缓冲区可以被其他 Send 操作再次使用。

  • Recv 方法会在缓冲区被数据填满之后才返回。

图 11.1 所示,阻塞式通信是数据完全传输完成后才会结束等待状态并返回。

../_images/blocking.svg

图 11.4 阻塞式通信示意图#

阻塞式通信的代码更容易去设计,但容易出现死锁,比如类似下面的逻辑,Rank 1 进程产生了死锁,应该将 SendRecv 调用顺序互换

if rank == 0:
	comm.Send(..to rank 1..)
    comm.Recv(..from rank 1..)
else if (rank == 1): <- 该进程死锁
    comm.Send(..to rank 0..)       <- 应将 Send Revc 互换
    comm.Recv(..from rank 0..)

非阻塞#

与阻塞式通信相比,非阻塞通信不会等待数据传输完成才开始执行后续操作。它允许通信和计算任务并行执行,从而提高性能:网络负责数据传输,而 CPU 则可以同时进行计算任务。isendirecv 是用于非阻塞通信的函数:

  • isend:启动一个非阻塞发送操作,并立即将控制权返回给调用者,允许执行后续代码。

  • irecv:启动一个非阻塞接收操作,并立即将控制权返回给调用者,允许执行后续代码。

非阻塞式通信调用会立即返回一个 Request 句柄(Handle),调用者随后可以对这个 Request 句柄进行进一步处理,例如等待数据传输完成。带有大写 I 前缀的函数是基于缓冲区的,而带有小写 i 前缀的则不是。isend 的函数参数与 send 类似,但 isend 返回的是一个 Request 对象。Request 类提供了 wait 方法,调用该方法可以显式地等待数据传输完成。原本使用 send 编写的阻塞式代码可以通过使用 isend 加上 Request.wait() 来改写为非阻塞方式。

以下是展示非阻塞通信的一个示例代码。

代码片段 11.6 non-blocking.py#
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

if rank == 0:
    data = {'a': 7, 'b': 3.14}
    req = comm.isend(data, dest=1, tag=11)
    print(f"Sending: {data}, from rank: {rank}.")
    req.wait()
    print(f"Sended: {data}, from rank: {rank}.")
elif rank == 1:
    req = comm.irecv(source=0, tag=11)
    print(f"Receiving: to rank: {rank}.")
    data = req.wait()
    print(f"Received: {data}, to rank: {rank}.")
!mpiexec -np 8 python non-blocking.py
Receiving: to rank: 1.
Sending: {'a': 7, 'b': 3.14}, from rank: 0.
Sended: {'a': 7, 'b': 3.14}, from rank: 0.
Received: {'a': 7, 'b': 3.14}, to rank: 1.

图 11.5 展示了在非阻塞通信中加入 wait() 调用后,数据流的变化情况。

../_images/non-blocking.svg

图 11.5 非阻塞式通信示意图#