11.5. 远程内存访问#

章节 11.2 中我们介绍了两种通信模式,即双边和单边。前几章节的点对点通信和集合通信主要针对的双边通信,本节主要介绍单边通信。单边通信是进程间直接访问远程内存,又称为远程内存访问(Remote Memory Access,RMA)。

Window#

任何进程所分配的内存都是私有的,即只被进程自己访问,远程内存访问要把进程自己的内存区域暴露给其他进程访问,这部分内存不再私有,而变成了公共的,需要特别处理。在 MPI 中,使用窗口(Window)定义可被远程访问的内存区域。某个内存区域被设置为允许远程访问,所有 Window 内的进程都可对这个区域进行读写。图 11.12 展示了私有的内存区域和可被远程访问的 Window。mpi4py 提供了 mpi4py.MPI.Win 类进行 Window 相关操作。

../_images/rma-window.svg

图 11.12 进程私有内存与允许远程访问的 Window#

创建 Window#

我们可以使用 mpi4py.MPI.Win.Allocatempi4py.MPI.Win.Create 创建 Window。其中 mpi4py.MPI.Win.Allocate 创建新的内存缓存,并且该缓存可被远程访问;mpi4py.MPI.Win.Create 将现有的某个内存缓存区域设置为可远程访问。这个区别主要体现在这两个方法的第一个参数,mpi4py.MPI.Win.Allocate(size) 传入的要创建内存缓存的字节数 sizempi4py.MPI.Win.Create(memory) 传入的是内存地址 memory

读写操作#

创建可远程访问的窗口 Window 后,可以通过三类方法对内存区域进行读写操作:mpi4py.MPI.Win.Putmpi4py.MPI.Win.Getmpi4py.MPI.Win.Accumulate。这些方法都接受两个参数:origintarget_rank,分别表示源进程和目标进程。其中,源进程是调用读写方法的进程,而目标进程是将要被读写的进程。

  • Win.Put: 此方法将数据从源进程移动到目标进程。

  • Win.Get: 此方法将数据从目标进程移动到源进程。

  • Win.Accumulate: 类似于 Win.Put,此方法也将数据从源进程移动到目标进程,但不同之处在于它还包括一个聚合操作,即对目标进程中的数据执行某种操作。可用的聚合操作符包括 mpi4py.MPI.SUMmpi4py.MPI.PROD 等。

数据同步#

单机程序是顺序执行的,多机环境下,因为涉及多方数据的读写,可能会出现一些数据同步的问题,如 图 11.13 所示,如果不明确读写操作的顺序,尤其是短时间内同时执行 PUT 会导致某块内存区域的数据并非为程序员所期望的结果。

../_images/rma-sync-problem.svg

图 11.13 并行数据读写时会出现数据同步的问题,P0 和 P1 表示两个不同的进程。#

为解决多机环境中的数据同步问题,需要采用一定的数据同步机制。MPI 提供了几种同步方式,包括主动目标同步(Active Target Synchronization)和被动目标同步(Passive Target Synchronization),如 图 11.14 所示。

../_images/rma-synchronization.svg

图 11.14 主动同步与被动同步#

案例:远程读写#

一个完整的 RMA 程序应该包括:

  1. 创建 Window

  2. 数据同步

  3. 数据读写

代码片段 11.9 展示了一个案例,其代码保存为 rma-lock.py

代码片段 11.9 rma-lock.py#
import numpy as np
from mpi4py import MPI
from mpi4py.util import dtlib

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

datatype = MPI.FLOAT
np_dtype = dtlib.to_numpy_dtype(datatype)
itemsize = datatype.Get_size()

N = 8
win_size = N * itemsize if rank == 0 else 0
win = MPI.Win.Allocate(win_size, comm=comm)

buf = np.empty(N, dtype=np_dtype)
if rank == 0:
    buf.fill(42)
    win.Lock(rank=0)
    win.Put(buf, target_rank=0)
    win.Unlock(rank=0)
    comm.Barrier()
else:
    comm.Barrier()
    win.Lock(rank=0)
    win.Get(buf, target_rank=0)
    win.Unlock(rank=0)
    if np.all(buf == 42):
        print(f"win.Get successfully on Rank {comm.Get_rank()}.")
    else:
        print(f"win.Get failed on Rank {comm.Get_rank()}.")
!mpiexec -np 8 python rma_lock.py
win.Get successfully on Rank 4.
win.Get successfully on Rank 5.
win.Get successfully on Rank 6.
win.Get successfully on Rank 7.
win.Get successfully on Rank 1.
win.Get successfully on Rank 2.
win.Get successfully on Rank 3.