11.3. 点对点通信#
最简单的通信模式是点对点(Point-to-Point)通信,点对点通信又分为阻塞式(Blocking)和非阻塞式(Non-Blocking)。实现点对点时主要考虑两个问题:
如何控制和识别不同的进程。比如,想让进程 0 给进程 1 发消息。
如何控制数据的读写操作以及消息的发送和接收。这包括确定数据的大小和数据类型。
发送与接收#
Comm.send()
和 Comm.recv()
是分别用于阻塞式发送和接收数据的函数。
Comm.send(obj, dest, tag=0)
的参数主要是 obj
和 dest
。obj
代表要发送的数据,它可以是Python的内置数据类型,如列表 list
和字典 dict
等,也可以是 NumPy 的多维数组 ndarray
,甚至是 GPU 上的 CuPy 数据。
我们之前讨论了通信器(Communicator)和标识号(Rank),可以通过 Rank 来确定目标进程,其中dest
表示目标进程的 Rank。tag
参数用于标识,为程序员提供了更精细的控制选项。利用 tag
可以实现消息的有序传递和筛选。接收方可以基于特定标签选择接收消息,或者按照标签的顺序来接收消息,这提供了更灵活的控制消息发送和接收过程的能力。
案例1:发送 Python 对象#
比如,我们发送一个 Python 对象。Python 对象在通信过程中的序列化使用的是 pickle。
代码片段 11.2 演示了如何发送一个 Python 对象。
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
if rank == 0:
data = {'a': 7, 'b': 3.14}
comm.send(data, dest=1)
print(f"Sended: {data}, from rank: {rank}.")
elif rank == 1:
data = comm.recv(source=0)
print(f"Received: {data}, to rank: {rank}.")
将这份代码保存文件为 send-py-object.py
,在命令行中这样启动:
!mpiexec -np 2 python send-py-object.py
Sended: {'a': 7, 'b': 3.14}, from rank: 0.
Received: {'a': 7, 'b': 3.14}, to rank: 1.
案例2:发送 NumPy ndarray
#
或者发送一个 NumPy ndarray
:
代码片段 11.3 演示了如何发送一个 NumPy ndarray
。
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
# 明确告知 MPI 数据类型为 int
# dtype='i', i 为 INT 的缩写
if rank == 0:
data = np.arange(10, dtype='i')
comm.Send([data, MPI.INT], dest=1)
print(f"Sended: {data}, from rank: {rank}.")
elif rank == 1:
data = np.empty(10, dtype='i')
comm.Recv([data, MPI.INT], source=0)
print(f"Received: {data}, to rank: {rank}.")
# MPI 自动发现数据类型
if rank == 0:
data = np.arange(10, dtype=np.float64)
comm.Send(data, dest=1)
print(f"Sended: {data}, from rank: {rank}.")
elif rank == 1:
data = np.empty(10, dtype=np.float64)
comm.Recv(data, source=0)
print(f"Received: {data}, to rank: {rank}.")
以上代码保存为 send-np.py
文件。
!mpiexec -np 2 python send-np.py
Sended: [0 1 2 3 4 5 6 7 8 9], from rank: 0.
Received: [0 1 2 3 4 5 6 7 8 9], to rank: 1.
Received: [0. 1. 2. 3. 4. 5. 6. 7. 8. 9.], to rank: 1.
Sended: [0. 1. 2. 3. 4. 5. 6. 7. 8. 9.], from rank: 0.
Note
这里的 Send
和 Recv
函数的首字母大写,表示它们是基于缓冲区(Buffer)的操作。使用这些基于缓冲区的函数时,通常需要明确指定数据类型,例如通过传入二元组 (data, MPI.DOUBLE)
或三元组 (data, count, MPI.DOUBLE)
来指定。然而,在之前的例子中,comm.Send(data, dest=1)
并没有显式指定数据类型和大小,这是因为 mpi4py 对 NumPy 和 CuPy 的 ndarray
进行了类型和大小的自动推断。
案例3:Master-Worker#
我们现在来实现一个 Master-Worker 模式的案例,其中包含 size
个进程。size-1
个进程作为 Worker 执行随机数据生成任务,而 Rank 为 size-1 的最后一个进程则作为 Master,负责接收这些数据,并将接收到的数据大小打印出来。
以下代码演示 Master 与 Worker 进程间数据发送和接收的过程。
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
if rank < size - 1:
# Worker 进程
np.random.seed(rank)
# 随机生成
data_count = np.random.randint(100)
data = np.random.randint(100, size=data_count)
comm.send(data, dest=size - 1)
print(f"Worker: worker ID: {rank}; count: {len(data)}")
else:
# Master 进程
for i in range(size - 1):
status = MPI.Status()
data = comm.recv(source=MPI.ANY_SOURCE, status=status)
print(f"Master: worker ID: {status.Get_source()}; count: {len(data)}")
comm.Barrier()
在这个例子中,rank
小于 size - 1
的进程作为 Worker,生成随机数据,并发送给 Rank 为 size - 1 的 Master 进程。Master 进程接收所有 Worker 发送的数据,并打印出接收到的数据的总大小。
!mpiexec -np 8 python master-worker.py
Worker: worker ID: 0; count: 44
Worker: worker ID: 2; count: 40
Worker: worker ID: 4; count: 46
Worker: worker ID: 3; count: 24
Master: worker ID: 2; count: 40
Master: worker ID: 3; count: 24
Master: worker ID: 4; count: 46
Master: worker ID: 0; count: 44
Worker: worker ID: 5; count: 99Master: worker ID: 5; count: 99
Worker: worker ID: 1; count: 37
Master: worker ID: 1; count: 37
Worker: worker ID: 6; count: 10
Master: worker ID: 6; count: 10
案例 4:长方形模拟求 \(\pi\) 值#
对半径为 R 的圆,我们可以采用微分方法将圆切分成 N 个小长方形,当长方形数量达到无穷大的 N 时, 所有长方形总面积接近于 1/4 圆面积,如 图 11.3 所示。
假设此时有 size
个进程参与计算,首先求每个进程需要处理的长方形数量 (N/size
)。每个进程各自计算长方形面积之和,并发送给 Master 进程。第一个进程作为 Master,接收各 Worker 发送数据,汇总所有矩形面积,从而近似计算出 \(\pi\) 值。
代码片段 11.5 演示了长方形模拟求 \(\pi\) 值的过程。
import math
import time
from mpi4py import MPI
communicator = MPI.COMM_WORLD
rank = communicator.Get_rank() # 进程唯一的标识Rank
process_nums = communicator.Get_size()
"""
参数设置:
R=1
N=64*1024*1024
"""
t0 = time.time()
rect_num = 64 * 1024 * 1024
rect_width = 1 / rect_num
step_size = rect_num // process_nums
def cal_rect_area(process_no, step_size, rect_width):
total_area = 0.0
rect_start = (process_no * step_size + 1) * rect_width
for i in range(step_size):
x = rect_start + i * rect_width
# (x,y) 对应于第i个小矩形唯一在圆弧上的顶点
# x^2+y^2=1 => y=sqrt(1-x^2)
rect_length = math.pow(1 - x * x, 0.5)
total_area += rect_width * rect_length
return total_area
# 在每个进程上执行计算
total_area = cal_rect_area(rank, step_size, rect_width)
if rank == 0:
# Master
for i in range(1, process_nums):
total_area += communicator.recv(source=i)
p_i = total_area * 4
t1 = time.time()
print("Simulated PI: {:.10f}, Relative Error:{:.10f}".format(p_i, abs(1 - p_i / math.pi)))
print("Time:{:.3f}s".format(t1 - t0))
else:
# Worker
communicator.send(total_area, dest=0)
上述例子中,我们设置参数为:R=1
, N=64*1024*1024
,保存文件为 rectangle_pi.py
:
!mpiexec -np 8 python rectangle_pi.py
Simulated PI: 3.1415926238, Relative Error:0.0000000095
Time:7.361s
阻塞 v.s. 非阻塞#
阻塞#
我们先分析一下阻塞式通信。Send
和 Recv
这两个基于缓存的方法:
Send
方法会在缓冲区的数据全部发送完毕之后才返回(return
)。该缓冲区可以被其他Send
操作再次使用。Recv
方法会在缓冲区被数据填满之后才返回。
如 图 11.1 所示,阻塞式通信是数据完全传输完成后才会结束等待状态并返回。
阻塞式通信的代码更容易去设计,但容易出现死锁,比如类似下面的逻辑,Rank 1 进程产生了死锁,应该将 Send
和 Recv
调用顺序互换
if rank == 0:
comm.Send(..to rank 1..)
comm.Recv(..from rank 1..)
else if (rank == 1): <- 该进程死锁
comm.Send(..to rank 0..) <- 应将 Send Revc 互换
comm.Recv(..from rank 0..)
非阻塞#
与阻塞式通信相比,非阻塞通信不会等待数据传输完成才开始执行后续操作。它允许通信和计算任务并行执行,从而提高性能:网络负责数据传输,而 CPU 则可以同时进行计算任务。isend
和 irecv
是用于非阻塞通信的函数:
isend
:启动一个非阻塞发送操作,并立即将控制权返回给调用者,允许执行后续代码。irecv
:启动一个非阻塞接收操作,并立即将控制权返回给调用者,允许执行后续代码。
非阻塞式通信调用会立即返回一个 Request
句柄(Handle),调用者随后可以对这个 Request
句柄进行进一步处理,例如等待数据传输完成。带有大写 I 前缀的函数是基于缓冲区的,而带有小写 i 前缀的则不是。isend
的函数参数与 send
类似,但 isend
返回的是一个 Request
对象。Request
类提供了 wait
方法,调用该方法可以显式地等待数据传输完成。原本使用 send
编写的阻塞式代码可以通过使用 isend
加上 Request.wait()
来改写为非阻塞方式。
以下是展示非阻塞通信的一个示例代码。
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
if rank == 0:
data = {'a': 7, 'b': 3.14}
req = comm.isend(data, dest=1, tag=11)
print(f"Sending: {data}, from rank: {rank}.")
req.wait()
print(f"Sended: {data}, from rank: {rank}.")
elif rank == 1:
req = comm.irecv(source=0, tag=11)
print(f"Receiving: to rank: {rank}.")
data = req.wait()
print(f"Received: {data}, to rank: {rank}.")
!mpiexec -np 8 python non-blocking.py
Receiving: to rank: 1.
Sending: {'a': 7, 'b': 3.14}, from rank: 0.
Sended: {'a': 7, 'b': 3.14}, from rank: 0.
Received: {'a': 7, 'b': 3.14}, to rank: 1.
图 11.5 展示了在非阻塞通信中加入 wait()
调用后,数据流的变化情况。