3.4. GPU
GPU 以及其他异构计算加速器被广泛用来加速深度学习,Dask 社区联合 NVIDIA,提供了基于 GPU 的数据科学工具包,以加速各类任务。
Dask GPU 集群
Dask-CUDA 对 dask.distributed
扩展,可以识别和管理 GPU 设备。
使用这些 GPU 设备前,先通过 pip install dask_cuda
安装 Dask-CUDA。跟 章节 3.3 提到的dask.distributed
类似,Dask-CUDA 提供了一个单机的 LocalCUDACluster
,LocalCUDACluster
会自动发现并注册该计算节点上的多张 GPU 卡,每张 GPU 自动配比一定数量的 CPU 核心。比如,我的环境有 4 张 GPU 卡,启动一个单机 Dask 集群,会自动启动 4 个 Dask Worker,每个 Worker 一张 GPU 卡。
/fs/fast/u20200002/envs/dispy/lib/python3.11/site-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 37111 instead
warnings.warn(
Client
Client-5c3311bf-0ce5-11ef-bd8c-000012e4fe80
Cluster Info
LocalCUDACluster
209b2784
Scheduler Info
Scheduler
Scheduler-39587c13-5825-4748-be18-a18f23c602bb
Workers
Worker: 0
Comm: tcp://127.0.0.1:36681
|
Total threads: 1
|
Dashboard: http://127.0.0.1:38373/status
|
Memory: 22.50 GiB
|
Nanny: tcp://127.0.0.1:41031
|
|
Local directory: /tmp/dask-scratch-space/worker-jkx850hc
|
Worker: 1
Comm: tcp://127.0.0.1:37987
|
Total threads: 1
|
Dashboard: http://127.0.0.1:38845/status
|
Memory: 22.50 GiB
|
Nanny: tcp://127.0.0.1:36415
|
|
Local directory: /tmp/dask-scratch-space/worker-gelyun5u
|
Worker: 2
Comm: tcp://127.0.0.1:36139
|
Total threads: 1
|
Dashboard: http://127.0.0.1:44939/status
|
Memory: 22.50 GiB
|
Nanny: tcp://127.0.0.1:40211
|
|
Local directory: /tmp/dask-scratch-space/worker-c6owcg7k
|
Worker: 3
Comm: tcp://127.0.0.1:46363
|
Total threads: 1
|
Dashboard: http://127.0.0.1:40611/status
|
Memory: 22.50 GiB
|
Nanny: tcp://127.0.0.1:38093
|
|
Local directory: /tmp/dask-scratch-space/worker-hyl9pn8_
|
我们也可以启动一个 Dask GPU 集群,先启动 Dask Scheduler:
再在每个 GPU 节点上启动支持 GPU 的 Worker,构成一个 Dask GPU 集群。
dask cuda worker tcp://scheduler:8786
Client
Client-6039933f-0ce3-11ef-b163-000012e4fe80
Scheduler Info
Scheduler
Scheduler-d073585d-dcac-41bf-9c5c-1055fe07576c
Comm: tcp://10.0.0.3:8786
|
Workers: 8
|
Dashboard: http://10.0.0.3:8787/status
|
Total threads: 8
|
Started: Just now
|
Total memory: 180.00 GiB
|
Workers
Worker: tcp://10.0.0.2:34491
Comm: tcp://10.0.0.2:34491
|
Total threads: 1
|
Dashboard: http://10.0.0.2:38385/status
|
Memory: 22.50 GiB
|
Nanny: tcp://10.0.0.2:37559
|
|
Local directory: /tmp/dask-scratch-space/worker-p2de783n
|
Tasks executing:
|
Tasks in memory:
|
Tasks ready:
|
Tasks in flight:
|
CPU usage: 4.0%
|
Last seen: Just now
|
Memory usage: 216.19 MiB
|
Spilled bytes: 0 B
|
Read bytes: 8.81 kiB
|
Write bytes: 14.61 kiB
|
Worker: tcp://10.0.0.2:39239
Comm: tcp://10.0.0.2:39239
|
Total threads: 1
|
Dashboard: http://10.0.0.2:45797/status
|
Memory: 22.50 GiB
|
Nanny: tcp://10.0.0.2:36259
|
|
Local directory: /tmp/dask-scratch-space/worker-mo04yp4a
|
Tasks executing:
|
Tasks in memory:
|
Tasks ready:
|
Tasks in flight:
|
CPU usage: 6.0%
|
Last seen: Just now
|
Memory usage: 216.30 MiB
|
Spilled bytes: 0 B
|
Read bytes: 9.76 kiB
|
Write bytes: 14.86 kiB
|
Worker: tcp://10.0.0.2:40863
Comm: tcp://10.0.0.2:40863
|
Total threads: 1
|
Dashboard: http://10.0.0.2:43677/status
|
Memory: 22.50 GiB
|
Nanny: tcp://10.0.0.2:32877
|
|
Local directory: /tmp/dask-scratch-space/worker-4p9jsv4f
|
Tasks executing:
|
Tasks in memory:
|
Tasks ready:
|
Tasks in flight:
|
CPU usage: 4.0%
|
Last seen: Just now
|
Memory usage: 216.27 MiB
|
Spilled bytes: 0 B
|
Read bytes: 9.77 kiB
|
Write bytes: 14.88 kiB
|
Worker: tcp://10.0.0.2:46243
Comm: tcp://10.0.0.2:46243
|
Total threads: 1
|
Dashboard: http://10.0.0.2:40513/status
|
Memory: 22.50 GiB
|
Nanny: tcp://10.0.0.2:45107
|
|
Local directory: /tmp/dask-scratch-space/worker-gt5epnxr
|
Tasks executing:
|
Tasks in memory:
|
Tasks ready:
|
Tasks in flight:
|
CPU usage: 4.0%
|
Last seen: Just now
|
Memory usage: 216.21 MiB
|
Spilled bytes: 0 B
|
Read bytes: 10.04 kiB
|
Write bytes: 15.00 kiB
|
Worker: tcp://10.0.0.3:39647
Comm: tcp://10.0.0.3:39647
|
Total threads: 1
|
Dashboard: http://10.0.0.3:38377/status
|
Memory: 22.50 GiB
|
Nanny: tcp://10.0.0.3:34843
|
|
Local directory: /tmp/dask-scratch-space/worker-gqcyic7m
|
Tasks executing:
|
Tasks in memory:
|
Tasks ready:
|
Tasks in flight:
|
CPU usage: 4.0%
|
Last seen: Just now
|
Memory usage: 217.51 MiB
|
Spilled bytes: 0 B
|
Read bytes: 63.74 kiB
|
Write bytes: 58.80 kiB
|
Worker: tcp://10.0.0.3:40155
Comm: tcp://10.0.0.3:40155
|
Total threads: 1
|
Dashboard: http://10.0.0.3:34723/status
|
Memory: 22.50 GiB
|
Nanny: tcp://10.0.0.3:46339
|
|
Local directory: /tmp/dask-scratch-space/worker-yo78gnof
|
Tasks executing:
|
Tasks in memory:
|
Tasks ready:
|
Tasks in flight:
|
CPU usage: 6.0%
|
Last seen: Just now
|
Memory usage: 218.25 MiB
|
Spilled bytes: 0 B
|
Read bytes: 63.73 kiB
|
Write bytes: 58.80 kiB
|
Worker: tcp://10.0.0.3:45005
Comm: tcp://10.0.0.3:45005
|
Total threads: 1
|
Dashboard: http://10.0.0.3:42503/status
|
Memory: 22.50 GiB
|
Nanny: tcp://10.0.0.3:34929
|
|
Local directory: /tmp/dask-scratch-space/worker-skts4xjq
|
Tasks executing:
|
Tasks in memory:
|
Tasks ready:
|
Tasks in flight:
|
CPU usage: 6.0%
|
Last seen: Just now
|
Memory usage: 216.24 MiB
|
Spilled bytes: 0 B
|
Read bytes: 63.74 kiB
|
Write bytes: 58.81 kiB
|
Worker: tcp://10.0.0.3:46333
Comm: tcp://10.0.0.3:46333
|
Total threads: 1
|
Dashboard: http://10.0.0.3:36413/status
|
Memory: 22.50 GiB
|
Nanny: tcp://10.0.0.3:44405
|
|
Local directory: /tmp/dask-scratch-space/worker-pu9uzxbg
|
Tasks executing:
|
Tasks in memory:
|
Tasks ready:
|
Tasks in flight:
|
CPU usage: 4.0%
|
Last seen: Just now
|
Memory usage: 218.16 MiB
|
Spilled bytes: 0 B
|
Read bytes: 64.86 kiB
|
Write bytes: 59.93 kiB
|
Note
Dask-CUDA 只发现并注册这些 GPU,但无法做到 GPU 的隔离,其他非 Dask 的任务仍然可以抢占该 GPU。GPU 资源的隔离应该需要借助 Kubernetes 等容器技术。
GPU 任务
并不是所有任务都能被 GPU 加速,GPU 主要加速一些计算密集型任务,比如机器学习和深度学习等。目前,Dask 支持的 GPU 上框架包括:
Note
使用 NVIDIA 的 GPU 的各类库,应该将 CUDA 目录添加到 PATH
和 LD_LIBRARY_PATH
环境变量中,CuPy 和 cuDF 需要依赖所安装的动态链接库。
案例:奇异值分解
下面的代码在 GPU 上进行奇异值分解,是一种适合 GPU 加速的任务。设置 dask.config.set({"array.backend": "cupy"})
即可将 Dask Array 的执行后端改为 GPU 上的 CuPy。
import cupy
import dask
import dask.array as da
dask.config.set({"array.backend": "cupy"})
rs = dask.array.random.RandomState(RandomState=cupy.random.RandomState)
x = rs.random((10000, 1000), chunks=(1000, 1000))
u, s, v = dask.array.linalg.svd(x)