3.3. 将 Dask 扩展到集群#
刚刚的任务都是在单机场景下使用 Dask。dask.distributed
可以帮助我们把 Dask 任务扩展到多台计算节点。
Dask 集群#
如 图 3.3 所示,一个 Dask 集群必须包含一个调度器(Scheduler)和多个工作节点(Worker)。用户通过客户端(Client)向调度器提交计算任务,调度器对任务进行分析,生成 Task Graph,并将 Task 分发到多个 Worker 上。每个 Worker 承担一小部分计算任务,Worker 之间也要互相通信,比如计算结果的归集等。
Scheduler 和 Worker 共同组成了一个 Dask 集群。
LocalCluster
#
默认情况(不进行任何额外的设置),Dask 会启动一个本地的集群 LocalCluster
,并使用客户端 Client
连接这个集群。
from dask.distributed import LocalCluster, Client
cluster = LocalCluster()
cluster
LocalCluster
008c0d37
Dashboard: http://127.0.0.1:8787/status | Workers: 4 |
Total threads: 8 | Total memory: 16.00 GiB |
Status: running | Using processes: True |
Scheduler Info
Scheduler
Scheduler-d62c583b-949f-47ca-a09a-62e8b76b1f52
Comm: tcp://127.0.0.1:56578 | Workers: 4 |
Dashboard: http://127.0.0.1:8787/status | Total threads: 8 |
Started: Just now | Total memory: 16.00 GiB |
Workers
Worker: 0
Comm: tcp://127.0.0.1:56592 | Total threads: 2 |
Dashboard: http://127.0.0.1:56593/status | Memory: 4.00 GiB |
Nanny: tcp://127.0.0.1:56581 | |
Local directory: /var/folders/4n/v40br47s46ggrjm9bdm64lwh0000gn/T/dask-scratch-space/worker-b2u1_h84 |
Worker: 1
Comm: tcp://127.0.0.1:56596 | Total threads: 2 |
Dashboard: http://127.0.0.1:56599/status | Memory: 4.00 GiB |
Nanny: tcp://127.0.0.1:56583 | |
Local directory: /var/folders/4n/v40br47s46ggrjm9bdm64lwh0000gn/T/dask-scratch-space/worker-w2ws5x2t |
Worker: 2
Comm: tcp://127.0.0.1:56595 | Total threads: 2 |
Dashboard: http://127.0.0.1:56597/status | Memory: 4.00 GiB |
Nanny: tcp://127.0.0.1:56585 | |
Local directory: /var/folders/4n/v40br47s46ggrjm9bdm64lwh0000gn/T/dask-scratch-space/worker-sg9v8s9i |
Worker: 3
Comm: tcp://127.0.0.1:56601 | Total threads: 2 |
Dashboard: http://127.0.0.1:56602/status | Memory: 4.00 GiB |
Nanny: tcp://127.0.0.1:56587 | |
Local directory: /var/folders/4n/v40br47s46ggrjm9bdm64lwh0000gn/T/dask-scratch-space/worker-wnsy1wd3 |
Dask 探测到本地资源情况,比如本地有 4 个 CPU 核心、16GB 内存,根据本地资源创建了一个 LocalCluster
。这个 LocalCluster
有 4 个 Worker,每个 Worker 对应一个 CPU 核心。
Dask 同时提供了仪表盘(Dashboard)链接,可以在网页中查看集群和作业的具体信息。
使用 Client
连接这个 LocalCluster
,连接之后,Dask 所有的计算将提交到这个 LocalCluster
上:
client = Client(cluster)
使用命令行启动一个 Dask 集群#
启动 Dask 集群#
当我们有更多的计算节点时,可以使用命令行在不同的计算节点上启动 Dask Scheduler 和 Dask Worker。比如,在 IP 地址为 192.0.0.1
的计算节点上启动 Dask Scheduler,在命令行里输入下面的命令。
# 当前节点为 192.0.0.1
$ dask scheduler
Dask 将日志信息打印出来,其中 Dask Scheduler 的 IP 和端口号为:tcp://192.0.0.1:8786
Scheduler at: tcp://192.0.0.1:8786
dashboard at: ...
在其他计算节点上启动 Worker,这些 Worker 要连接到刚刚启动的 Scheduler 上。在 dask worker
命令后面添加刚刚得到的 Dask Scheduler 的 IP 地址和端口号。比如,在 192.0.0.2
上启动 Dask Worker:
# 当前节点为 192.0.0.2
$ dask worker tcp://192.0.0.1:8786
日志信息显示,Dask 启动了 Worker,并连接到了 Scheduler:
Start worker at: tcp://192.0.0.2:40483
Registered to: tcp://192.0.0.1:8786
Dask Scheduler 默认使用 8786
作为连接端口号,如果使用其他端口号,需要使用 --port
告知 Dask 使用该端口。
$ dask scheduler --port 9786
除了 --port
之外,Dask Scheduler 还有很多其他参数,用户可以根据自身需要设置。
连接 Dask 集群#
假如 Dask Scheduler 启动后返回的地址为 192.0.0.1:8786
,使用下面的代码连接到这个集群上,基于 Dask Array、Dask DataFrame 的 .compute()
的代码会自动调度到这个集群上。
from dask.distributed import Client
client = Client("192.0.0.1:8786")
Python 环境和依赖包管理#
在集群上使用 Dask 时,要注意所有计算节点都安装所需要的 Python 依赖包。
可以使用 distributed.diagnostics.plugin.PipInstall
在运行时给每个 Worker 安装依赖包。PipInstall
的 packages
参数接收一个列表,列表内容为依赖包的名字,它会像 pip install
那样安装这些依赖包。安装的过程发生在程序真正开始前,会消耗一定时间,因此这种方式更适合原型测试阶段,还不确定哪些包需要,哪些不需要。
from dask.distributed import PipInstall
plugin = PipInstall(packages=["scikit-learn"], pip_options=["--upgrade"])
client.register_plugin(plugin)
等最终确定了所需要的依赖包,最好使用以下方式管理集群上的 Python 和各个依赖包:
所有计算节点安装同样版本的软件:比如,使用
conda
或者pip
在每个 Dask Worker 同样的目录下安装 Python 和依赖包。所有计算节点挂载共享文件系统:Python 等软件环境安装到共享文件系统中,所有 Dask Worker 看到的是相同的目录和相同的内容。常见的共享文件系统有网络文件系统(Network File System,NFS)。
使用容器:Python 及依赖包都打包到容器中,在集群上分发容器,每个计算节点都启动相同的容器。
SSH、Kubernetes、高性能计算集群#
Dask 集群本质就是 Dask Scheduler 和一系列 Dask Worker,其他各类具体部署场景都大同小异。Dask 针对不同部署场景提供了封装好的库,用户无需像刚才那样分别登陆到不同节点,依次启动 Dask Scheduler 和 Dask Worker。下面将介绍三种类型:SSH、Kubernetes 和高性能计算集群。
SSH#
只要知道每台计算节点的 IP 地址或者主机名(Hostname),就可以在 Python 代码或者命令行中启动 Dask 集群。 dask.distributed.SSHCluster
基于 asyncssh 包进行了封装,它 SSH 登录到每个计算节点,然后在该节点启动 Dask Scheduler 或 Dask Worker。
Note
如果你在个人电脑上操作 SSHCluster
,即个人电脑是 Client,各个计算节点组成集群,你应该将个人电脑与各个计算节点之间设置免密码登录。确切地说,各个计算节点的 authorized_keys
文件中应该存储了个人电脑的公钥。
可以使用 Python 代码启动这个 Dask 集群:
from dask.distributed import Client, SSHCluster
cluster = SSHCluster(
hosts=["localhost", "node1", "node2"],
connect_options={
"username": "xxx",
"password": "yyy",
}
)
client = Client(cluster)
hosts
是要拉起 Dask 的主机列表,列表上第一个节点将启动 Dask Scheduler,其余节点启动 Dask Worker;connect_options
是 SSH 登录的一些参数,比如用户名 username
、密码 password
、端口号 port
,这些参数以 {"key": "value"}
的形式传递给 Dask。
from dask.distributed import Client, SSHCluster
cluster = SSHCluster(
hosts=["localhost", "node1", "node2"],
connect_options={
"username": "xxx",
"password": "yyy",
}
)
client = Client(cluster)
Kubernetes#
Kubernetes 已经成为云资源管理事实上的标准,使用 Kubernetes 安装软件最简单的方式是使用 Helm。Dask 封装了 dask-kubernetes,并提供了 KubeCluster
和 HelmCluster
两个类。
高性能计算集群#
高性能计算(High-Performance Computing)集群通常使用 Slurm 等调度软件。Dask 为高性能集群封装了 dask-jobqueue,兼容 Slurm 在内的调度软件。
比如,使用 SLURMCluster
向 Slurm 的 cpu 队列申请 10 个节点用于运行 Dask Worker,每个节点资源为 32 核 128GB。scale()
方法会向 Slurm 申请 10 计算节点。
from dask.distributed import Client
from dask_jobqueue import SLURMCluster
cluster = SLURMCluster(cores=32,
memory="128GB",
queue="cpu")
cluster.scale(n=10)
client = Client(cluster)
相比于云上资源,高性能计算集群除了配备常见的以太网,还配备了高带宽和低延迟的 RDMA 网络,(比如,InfiniBand)。RDMA 网络能加速很多对网络要求高的任务。如果想充分利用 RDMA 网络,可以在 SLURMCluster
的 interface
参数传入网卡接口。网卡接口可以在命令行里使用 ifconfig
查看,比如 InfiniBand 网卡一般对应为 ib0
,以太网一般显示为 eth0
。不同的计算节点的网卡命名可能不一样。如果不设置 interface
,Dask 默认使用以太网。
自动缩放#
前面介绍的 Kubernetes 和高性能计算集群(例如 Slurm)均支持自动缩放(Auto-Scaling 或 Adaptive Scaling),因为他们本身就是集群调度管理软件,他们管理着大量计算资源,不同的应用向他们请求资源。Dask 构建在 Kubernetes 或 Slurm 上,相当于在 Kubernetes 或 Slurm 所管理的计算资源上申请一个子集。在 Kubernetes 或 Slurm 上,我们可以使用 Dask 的自动缩放技术,自动增加或减少 Dask 所需计算资源。自动缩放主要考虑以下场景:
用户当前作业对计算资源要求很高,需要更多的资源来满足计算需求。
用户当前作业所申请的计算资源闲置,这些资源可被其他用户使用。尤其是当用户进行交互式数据可视化,而非大规模计算时。
KubeCluster
和 SLURMCluster
都提供了 adapt()
方法。下面的例子可以在 0 到 10 个 Worker 之间动态缩放。自动缩放根据 Dask Scheduler 上作业的负载来决定运行多少个 Dask Worker。Dask 收集一些信息,比如每个 Dask Worker 上已用内存和可用内存,并根据这些信息自适应地调整计算资源数量。
from dask_kubernetes import KubeCluster
cluster = KubeCluster()
cluster.adapt(minimum=0, maximum=10)
Dask Nanny#
Dask 在启动集群时,除了启动 Dask Scheduler 和 Dask Worker 外,还启动了一个叫做 Dask Nanny 的监控服务。Nanny 英文意为保姆,Dask Nanny 就是在执行保姆的作用,它监控着 Dask Worker 的 CPU 和内存使用情况,避免 Dask Worker 超出资源上限;Dask Worker 宕机时,Dask Nanny 负责重启 Dask Worker。如果某个 Dask Worker 被 Dask Nanny 重启了,该 Dask Worker 上的计算任务被重新执行,其他 Dask Worker 保留当时状态和数据,并一直等待该 Dask Worker 恢复到刚刚宕机的时间点;这会给其他 Dask Worker 增加很多负担。如果 Dask Worker 频繁重启,可能需要考虑用 rechunk()
或 repartition()
调整 Partition 的数据大小。
client.shutdown()