3.3. 将 Dask 扩展到集群#
刚刚的任务都是在单机场景下使用 Dask。dask.distributed 可以帮助我们把 Dask 任务扩展到多台计算节点。
Dask 集群#
如 图 3.3 所示,一个 Dask 集群必须包含一个调度器(Scheduler)和多个工作节点(Worker)。用户通过客户端(Client)向调度器提交计算任务,调度器对任务进行分析,生成 Task Graph,并将 Task 分发到多个 Worker 上。每个 Worker 承担一小部分计算任务,Worker 之间也要互相通信,比如计算结果的归集等。
图 3.3 Dask Distributed#
Scheduler 和 Worker 共同组成了一个 Dask 集群。
LocalCluster#
默认情况(不进行任何额外的设置),Dask 会启动一个本地的集群 LocalCluster,并使用客户端 Client 连接这个集群。
from dask.distributed import LocalCluster, Client
cluster = LocalCluster()
cluster
LocalCluster
008c0d37
| Dashboard: http://127.0.0.1:8787/status | Workers: 4 |
| Total threads: 8 | Total memory: 16.00 GiB |
| Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-d62c583b-949f-47ca-a09a-62e8b76b1f52
| Comm: tcp://127.0.0.1:56578 | Workers: 4 |
| Dashboard: http://127.0.0.1:8787/status | Total threads: 8 |
| Started: Just now | Total memory: 16.00 GiB |
Workers
Worker: 0
| Comm: tcp://127.0.0.1:56592 | Total threads: 2 |
| Dashboard: http://127.0.0.1:56593/status | Memory: 4.00 GiB |
| Nanny: tcp://127.0.0.1:56581 | |
| Local directory: /var/folders/4n/v40br47s46ggrjm9bdm64lwh0000gn/T/dask-scratch-space/worker-b2u1_h84 | |
Worker: 1
| Comm: tcp://127.0.0.1:56596 | Total threads: 2 |
| Dashboard: http://127.0.0.1:56599/status | Memory: 4.00 GiB |
| Nanny: tcp://127.0.0.1:56583 | |
| Local directory: /var/folders/4n/v40br47s46ggrjm9bdm64lwh0000gn/T/dask-scratch-space/worker-w2ws5x2t | |
Worker: 2
| Comm: tcp://127.0.0.1:56595 | Total threads: 2 |
| Dashboard: http://127.0.0.1:56597/status | Memory: 4.00 GiB |
| Nanny: tcp://127.0.0.1:56585 | |
| Local directory: /var/folders/4n/v40br47s46ggrjm9bdm64lwh0000gn/T/dask-scratch-space/worker-sg9v8s9i | |
Worker: 3
| Comm: tcp://127.0.0.1:56601 | Total threads: 2 |
| Dashboard: http://127.0.0.1:56602/status | Memory: 4.00 GiB |
| Nanny: tcp://127.0.0.1:56587 | |
| Local directory: /var/folders/4n/v40br47s46ggrjm9bdm64lwh0000gn/T/dask-scratch-space/worker-wnsy1wd3 | |
Dask 探测到本地资源情况,比如本地有 4 个 CPU 核心、16GB 内存,根据本地资源创建了一个 LocalCluster。这个 LocalCluster 有 4 个 Worker,每个 Worker 对应一个 CPU 核心。
Dask 同时提供了仪表盘(Dashboard)链接,可以在网页中查看集群和作业的具体信息。
使用 Client 连接这个 LocalCluster,连接之后,Dask 所有的计算将提交到这个 LocalCluster 上:
client = Client(cluster)
使用命令行启动一个 Dask 集群#
启动 Dask 集群#
当我们有更多的计算节点时,可以使用命令行在不同的计算节点上启动 Dask Scheduler 和 Dask Worker。比如,在 IP 地址为 192.0.0.1 的计算节点上启动 Dask Scheduler,在命令行里输入下面的命令。
# 当前节点为 192.0.0.1
$ dask scheduler
Dask 将日志信息打印出来,其中 Dask Scheduler 的 IP 和端口号为:tcp://192.0.0.1:8786
Scheduler at: tcp://192.0.0.1:8786
dashboard at: ...
在其他计算节点上启动 Worker,这些 Worker 要连接到刚刚启动的 Scheduler 上。在 dask worker 命令后面添加刚刚得到的 Dask Scheduler 的 IP 地址和端口号。比如,在 192.0.0.2 上启动 Dask Worker:
# 当前节点为 192.0.0.2
$ dask worker tcp://192.0.0.1:8786
日志信息显示,Dask 启动了 Worker,并连接到了 Scheduler:
Start worker at: tcp://192.0.0.2:40483
Registered to: tcp://192.0.0.1:8786
Dask Scheduler 默认使用 8786 作为连接端口号,如果使用其他端口号,需要使用 --port 告知 Dask 使用该端口。
$ dask scheduler --port 9786
除了 --port 之外,Dask Scheduler 还有很多其他参数,用户可以根据自身需要设置。
连接 Dask 集群#
假如 Dask Scheduler 启动后返回的地址为 192.0.0.1:8786,使用下面的代码连接到这个集群上,基于 Dask Array、Dask DataFrame 的 .compute() 的代码会自动调度到这个集群上。
from dask.distributed import Client
client = Client("192.0.0.1:8786")
Python 环境和依赖包管理#
在集群上使用 Dask 时,要注意所有计算节点都安装所需要的 Python 依赖包。
可以使用 distributed.diagnostics.plugin.PipInstall 在运行时给每个 Worker 安装依赖包。PipInstall 的 packages 参数接收一个列表,列表内容为依赖包的名字,它会像 pip install 那样安装这些依赖包。安装的过程发生在程序真正开始前,会消耗一定时间,因此这种方式更适合原型测试阶段,还不确定哪些包需要,哪些不需要。
from dask.distributed import PipInstall
plugin = PipInstall(packages=["scikit-learn"], pip_options=["--upgrade"])
client.register_plugin(plugin)
等最终确定了所需要的依赖包,最好使用以下方式管理集群上的 Python 和各个依赖包:
所有计算节点安装同样版本的软件:比如,使用
conda或者pip在每个 Dask Worker 同样的目录下安装 Python 和依赖包。所有计算节点挂载共享文件系统:Python 等软件环境安装到共享文件系统中,所有 Dask Worker 看到的是相同的目录和相同的内容。常见的共享文件系统有网络文件系统(Network File System,NFS)。
使用容器:Python 及依赖包都打包到容器中,在集群上分发容器,每个计算节点都启动相同的容器。
SSH、Kubernetes、高性能计算集群#
Dask 集群本质就是 Dask Scheduler 和一系列 Dask Worker,其他各类具体部署场景都大同小异。Dask 针对不同部署场景提供了封装好的库,用户无需像刚才那样分别登陆到不同节点,依次启动 Dask Scheduler 和 Dask Worker。下面将介绍三种类型:SSH、Kubernetes 和高性能计算集群。
SSH#
只要知道每台计算节点的 IP 地址或者主机名(Hostname),就可以在 Python 代码或者命令行中启动 Dask 集群。 dask.distributed.SSHCluster 基于 asyncssh 包进行了封装,它 SSH 登录到每个计算节点,然后在该节点启动 Dask Scheduler 或 Dask Worker。
Note
如果你在个人电脑上操作 SSHCluster,即个人电脑是 Client,各个计算节点组成集群,你应该将个人电脑与各个计算节点之间设置免密码登录。确切地说,各个计算节点的 authorized_keys 文件中应该存储了个人电脑的公钥。
可以使用 Python 代码启动这个 Dask 集群:
from dask.distributed import Client, SSHCluster
cluster = SSHCluster(
hosts=["localhost", "node1", "node2"],
connect_options={
"username": "xxx",
"password": "yyy",
}
)
client = Client(cluster)
hosts 是要拉起 Dask 的主机列表,列表上第一个节点将启动 Dask Scheduler,其余节点启动 Dask Worker;connect_options 是 SSH 登录的一些参数,比如用户名 username、密码 password、端口号 port,这些参数以 {"key": "value"} 的形式传递给 Dask。
from dask.distributed import Client, SSHCluster
cluster = SSHCluster(
hosts=["localhost", "node1", "node2"],
connect_options={
"username": "xxx",
"password": "yyy",
}
)
client = Client(cluster)
Kubernetes#
Kubernetes 已经成为云资源管理事实上的标准,使用 Kubernetes 安装软件最简单的方式是使用 Helm。Dask 封装了 dask-kubernetes,并提供了 KubeCluster 和 HelmCluster 两个类。
高性能计算集群#
高性能计算(High-Performance Computing)集群通常使用 Slurm 等调度软件。Dask 为高性能集群封装了 dask-jobqueue,兼容 Slurm 在内的调度软件。
比如,使用 SLURMCluster 向 Slurm 的 cpu 队列申请 10 个节点用于运行 Dask Worker,每个节点资源为 32 核 128GB。scale() 方法会向 Slurm 申请 10 计算节点。
from dask.distributed import Client
from dask_jobqueue import SLURMCluster
cluster = SLURMCluster(cores=32,
memory="128GB",
queue="cpu")
cluster.scale(n=10)
client = Client(cluster)
相比于云上资源,高性能计算集群除了配备常见的以太网,还配备了高带宽和低延迟的 RDMA 网络,(比如,InfiniBand)。RDMA 网络能加速很多对网络要求高的任务。如果想充分利用 RDMA 网络,可以在 SLURMCluster 的 interface 参数传入网卡接口。网卡接口可以在命令行里使用 ifconfig 查看,比如 InfiniBand 网卡一般对应为 ib0,以太网一般显示为 eth0 。不同的计算节点的网卡命名可能不一样。如果不设置 interface,Dask 默认使用以太网。
自动缩放#
前面介绍的 Kubernetes 和高性能计算集群(例如 Slurm)均支持自动缩放(Auto-Scaling 或 Adaptive Scaling),因为他们本身就是集群调度管理软件,他们管理着大量计算资源,不同的应用向他们请求资源。Dask 构建在 Kubernetes 或 Slurm 上,相当于在 Kubernetes 或 Slurm 所管理的计算资源上申请一个子集。在 Kubernetes 或 Slurm 上,我们可以使用 Dask 的自动缩放技术,自动增加或减少 Dask 所需计算资源。自动缩放主要考虑以下场景:
用户当前作业对计算资源要求很高,需要更多的资源来满足计算需求。
用户当前作业所申请的计算资源闲置,这些资源可被其他用户使用。尤其是当用户进行交互式数据可视化,而非大规模计算时。
KubeCluster 和 SLURMCluster 都提供了 adapt() 方法。下面的例子可以在 0 到 10 个 Worker 之间动态缩放。自动缩放根据 Dask Scheduler 上作业的负载来决定运行多少个 Dask Worker。Dask 收集一些信息,比如每个 Dask Worker 上已用内存和可用内存,并根据这些信息自适应地调整计算资源数量。
from dask_kubernetes import KubeCluster
cluster = KubeCluster()
cluster.adapt(minimum=0, maximum=10)
Dask Nanny#
Dask 在启动集群时,除了启动 Dask Scheduler 和 Dask Worker 外,还启动了一个叫做 Dask Nanny 的监控服务。Nanny 英文意为保姆,Dask Nanny 就是在执行保姆的作用,它监控着 Dask Worker 的 CPU 和内存使用情况,避免 Dask Worker 超出资源上限;Dask Worker 宕机时,Dask Nanny 负责重启 Dask Worker。如果某个 Dask Worker 被 Dask Nanny 重启了,该 Dask Worker 上的计算任务被重新执行,其他 Dask Worker 保留当时状态和数据,并一直等待该 Dask Worker 恢复到刚刚宕机的时间点;这会给其他 Dask Worker 增加很多负担。如果 Dask Worker 频繁重启,可能需要考虑用 rechunk() 或 repartition() 调整 Partition 的数据大小。
client.shutdown()