3.3. 将 Dask 扩展到集群#

刚刚的任务都是在单机场景下使用 Dask。dask.distributed 可以帮助我们把 Dask 任务扩展到多台计算节点。

Dask 集群#

图 3.3 所示,一个 Dask 集群必须包含一个调度器(Scheduler)和多个工作节点(Worker)。用户通过客户端(Client)向调度器提交计算任务,调度器对任务进行分析,生成 Task Graph,并将 Task 分发到多个 Worker 上。每个 Worker 承担一小部分计算任务,Worker 之间也要互相通信,比如计算结果的归集等。

../_images/dask-distributed.svg

图 3.3 Dask Distributed#

Scheduler 和 Worker 共同组成了一个 Dask 集群。

LocalCluster#

默认情况(不进行任何额外的设置),Dask 会启动一个本地的集群 LocalCluster,并使用客户端 Client 连接这个集群。

from dask.distributed import LocalCluster, Client

cluster = LocalCluster()
cluster

Dask 探测到本地资源情况,比如本地有 4 个 CPU 核心、16GB 内存,根据本地资源创建了一个 LocalCluster。这个 LocalCluster 有 4 个 Worker,每个 Worker 对应一个 CPU 核心。

Dask 同时提供了仪表盘(Dashboard)链接,可以在网页中查看集群和作业的具体信息。

使用 Client 连接这个 LocalCluster,连接之后,Dask 所有的计算将提交到这个 LocalCluster 上:

client = Client(cluster)

使用命令行启动一个 Dask 集群#

启动 Dask 集群#

当我们有更多的计算节点时,可以使用命令行在不同的计算节点上启动 Dask Scheduler 和 Dask Worker。比如,在 IP 地址为 192.0.0.1 的计算节点上启动 Dask Scheduler,在命令行里输入下面的命令。

# 当前节点为 192.0.0.1
$ dask scheduler

Dask 将日志信息打印出来,其中 Dask Scheduler 的 IP 和端口号为:tcp://192.0.0.1:8786

Scheduler at:   tcp://192.0.0.1:8786
dashboard at: ...

在其他计算节点上启动 Worker,这些 Worker 要连接到刚刚启动的 Scheduler 上。在 dask worker 命令后面添加刚刚得到的 Dask Scheduler 的 IP 地址和端口号。比如,在 192.0.0.2 上启动 Dask Worker:

# 当前节点为 192.0.0.2
$ dask worker tcp://192.0.0.1:8786

日志信息显示,Dask 启动了 Worker,并连接到了 Scheduler:

Start worker at:  tcp://192.0.0.2:40483
Registered to:    tcp://192.0.0.1:8786

Dask Scheduler 默认使用 8786 作为连接端口号,如果使用其他端口号,需要使用 --port 告知 Dask 使用该端口。

$ dask scheduler --port 9786

除了 --port 之外,Dask Scheduler 还有很多其他参数,用户可以根据自身需要设置。

连接 Dask 集群#

假如 Dask Scheduler 启动后返回的地址为 192.0.0.1:8786,使用下面的代码连接到这个集群上,基于 Dask Array、Dask DataFrame 的 .compute() 的代码会自动调度到这个集群上。

from dask.distributed import Client

client = Client("192.0.0.1:8786")

Python 环境和依赖包管理#

在集群上使用 Dask 时,要注意所有计算节点都安装所需要的 Python 依赖包。

可以使用 distributed.diagnostics.plugin.PipInstall 在运行时给每个 Worker 安装依赖包。PipInstallpackages 参数接收一个列表,列表内容为依赖包的名字,它会像 pip install 那样安装这些依赖包。安装的过程发生在程序真正开始前,会消耗一定时间,因此这种方式更适合原型测试阶段,还不确定哪些包需要,哪些不需要。

from dask.distributed import PipInstall
plugin = PipInstall(packages=["scikit-learn"], pip_options=["--upgrade"])
client.register_plugin(plugin)

等最终确定了所需要的依赖包,最好使用以下方式管理集群上的 Python 和各个依赖包:

  • 所有计算节点安装同样版本的软件:比如,使用 conda 或者 pip 在每个 Dask Worker 同样的目录下安装 Python 和依赖包。

  • 所有计算节点挂载共享文件系统:Python 等软件环境安装到共享文件系统中,所有 Dask Worker 看到的是相同的目录和相同的内容。常见的共享文件系统有网络文件系统(Network File System,NFS)。

  • 使用容器:Python 及依赖包都打包到容器中,在集群上分发容器,每个计算节点都启动相同的容器。

SSH、Kubernetes、高性能计算集群#

Dask 集群本质就是 Dask Scheduler 和一系列 Dask Worker,其他各类具体部署场景都大同小异。Dask 针对不同部署场景提供了封装好的库,用户无需像刚才那样分别登陆到不同节点,依次启动 Dask Scheduler 和 Dask Worker。下面将介绍三种类型:SSH、Kubernetes 和高性能计算集群。

SSH#

只要知道每台计算节点的 IP 地址或者主机名(Hostname),就可以在 Python 代码或者命令行中启动 Dask 集群。 dask.distributed.SSHCluster 基于 asyncssh 包进行了封装,它 SSH 登录到每个计算节点,然后在该节点启动 Dask Scheduler 或 Dask Worker。

Note

如果你在个人电脑上操作 SSHCluster,即个人电脑是 Client,各个计算节点组成集群,你应该将个人电脑与各个计算节点之间设置免密码登录。确切地说,各个计算节点的 authorized_keys 文件中应该存储了个人电脑的公钥。

可以使用 Python 代码启动这个 Dask 集群:

from dask.distributed import Client, SSHCluster
cluster = SSHCluster(
    hosts=["localhost", "node1", "node2"],
    connect_options={
        "username": "xxx",
        "password": "yyy",
        }
)
client = Client(cluster)

hosts 是要拉起 Dask 的主机列表,列表上第一个节点将启动 Dask Scheduler,其余节点启动 Dask Worker;connect_options 是 SSH 登录的一些参数,比如用户名 username、密码 password、端口号 port,这些参数以 {"key": "value"} 的形式传递给 Dask。

from dask.distributed import Client, SSHCluster
cluster = SSHCluster(
    hosts=["localhost", "node1", "node2"],
    connect_options={
        "username": "xxx",
        "password": "yyy",
        }
)
client = Client(cluster)

Kubernetes#

Kubernetes 已经成为云资源管理事实上的标准,使用 Kubernetes 安装软件最简单的方式是使用 Helm。Dask 封装了 dask-kubernetes,并提供了 KubeClusterHelmCluster 两个类。

高性能计算集群#

高性能计算(High-Performance Computing)集群通常使用 Slurm 等调度软件。Dask 为高性能集群封装了 dask-jobqueue,兼容 Slurm 在内的调度软件。

比如,使用 SLURMCluster 向 Slurm 的 cpu 队列申请 10 个节点用于运行 Dask Worker,每个节点资源为 32 核 128GB。scale() 方法会向 Slurm 申请 10 计算节点。

from dask.distributed import Client
from dask_jobqueue import SLURMCluster

cluster = SLURMCluster(cores=32,
                       memory="128GB",
                       queue="cpu")
cluster.scale(n=10)
client = Client(cluster)

相比于云上资源,高性能计算集群除了配备常见的以太网,还配备了高带宽和低延迟的 RDMA 网络,(比如,InfiniBand)。RDMA 网络能加速很多对网络要求高的任务。如果想充分利用 RDMA 网络,可以在 SLURMClusterinterface 参数传入网卡接口。网卡接口可以在命令行里使用 ifconfig 查看,比如 InfiniBand 网卡一般对应为 ib0,以太网一般显示为 eth0 。不同的计算节点的网卡命名可能不一样。如果不设置 interface,Dask 默认使用以太网。

自动缩放#

前面介绍的 Kubernetes 和高性能计算集群(例如 Slurm)均支持自动缩放(Auto-Scaling 或 Adaptive Scaling),因为他们本身就是集群调度管理软件,他们管理着大量计算资源,不同的应用向他们请求资源。Dask 构建在 Kubernetes 或 Slurm 上,相当于在 Kubernetes 或 Slurm 所管理的计算资源上申请一个子集。在 Kubernetes 或 Slurm 上,我们可以使用 Dask 的自动缩放技术,自动增加或减少 Dask 所需计算资源。自动缩放主要考虑以下场景:

  • 用户当前作业对计算资源要求很高,需要更多的资源来满足计算需求。

  • 用户当前作业所申请的计算资源闲置,这些资源可被其他用户使用。尤其是当用户进行交互式数据可视化,而非大规模计算时。

KubeClusterSLURMCluster 都提供了 adapt() 方法。下面的例子可以在 0 到 10 个 Worker 之间动态缩放。自动缩放根据 Dask Scheduler 上作业的负载来决定运行多少个 Dask Worker。Dask 收集一些信息,比如每个 Dask Worker 上已用内存和可用内存,并根据这些信息自适应地调整计算资源数量。

from dask_kubernetes import KubeCluster

cluster = KubeCluster()
cluster.adapt(minimum=0, maximum=10)

Dask Nanny#

Dask 在启动集群时,除了启动 Dask Scheduler 和 Dask Worker 外,还启动了一个叫做 Dask Nanny 的监控服务。Nanny 英文意为保姆,Dask Nanny 就是在执行保姆的作用,它监控着 Dask Worker 的 CPU 和内存使用情况,避免 Dask Worker 超出资源上限;Dask Worker 宕机时,Dask Nanny 负责重启 Dask Worker。如果某个 Dask Worker 被 Dask Nanny 重启了,该 Dask Worker 上的计算任务被重新执行,其他 Dask Worker 保留当时状态和数据,并一直等待该 Dask Worker 恢复到刚刚宕机的时间点;这会给其他 Dask Worker 增加很多负担。如果 Dask Worker 频繁重启,可能需要考虑用 rechunk()repartition() 调整 Partition 的数据大小。

client.shutdown()