4.4. Shuffle#
对于一个分布式系统,在不同 Worker 之间交换数据被称为 Shuffle,或者说 Shuffle 将数据从某个 Partition 移动到其他 Partition。有些 Shuffle 是显式的,比如 repartition
,从 API 名称可以看出它会在不同的 Partition 之间交换数据;有些 Shuffle 是隐式的,比如 sort
,merge
或 groupby
背后都有 Shuffle 过程。Shuffle 一直是分布式大数据计算领域的难题之一,像 sort
、merge
或 groupby
这些 pandas 算子在单机上实现起来相对简单,但是在大数据分布式计算场景,实现起来并不那么容易。
Shuffle 实现机制#
章节 3.5 介绍了 Dask 主要基于 Task Graph 构建, Dask 的 Task Graph 是一个有向无环图。有向边表示下游 Partition 的输入依赖上游 Partition 的输出,任何数据移动都会在 Task Graph 上生成一条有向边。很多计算任务的 Shuffle 有大量的数据移动,有的场景下,所有数据都会打散,也意味着上游会有多条指向下游的边。这种基于 Task Graph 的 Shuffle 会使得 Task Graph 非常庞大,Task Graph 过大会使得 Dask Scheduler 的负载过重,进一步导致计算极其缓慢。如 图 4.5 左侧所示,tasks
是基于 Task Graph 的机制,上游和下游之间建立了有向边,如果有中间层(通常因为上游流入的数据太大,需要将数据进一步切分成多个 Partition),那么中间层还会进一步增加 Task Graph 的复杂程度。
为解决 Task Graph 过大的问题,Dask 设计了一种点对点(Peer-to-peer)的 Shuffle 机制。如 图 4.5 右侧所示,p2p
在 Task Graph 中引入了一个虚拟的障碍(Barrier)节点。Barrier 并不是一个真正的 Task,引入 Barrier 节点可以使 Task Graph 复杂度显著下降。
目前,Dask 提供了两类 Shuffle 实现策略:单机和分布式。
单机。如果数据大小超出了内存空间,可以将中间数据写到磁盘上。单机场景默认使用这种策略。
分布式。如 图 4.5 所示,分布式场景提供了两种 Shuffle 策略,
tasks
和p2p
。tasks
是基于 Task Graph 的 Shuffle 实现,很多场景效率比较低,会遇到刚提到的 Task Graph 过大的问题。p2p
基于点对点的 Shuffle 实现,Task Graph 的复杂性显著降低,性能也显著提升。Dask 会优先选择p2p
。
dask.config.set({"dataframe.shuffle.method": "p2p"})
对当前 Python 脚本的所有计算都使用 p2p
方式进行 Shuffle。也可以针对某个算子设置 Shuffle 策略,比如 ddf.merge(shuffle_method="p2p")
。
为对比两种分布式场景的 Shuffle 机制性能,这里搭建了一个两节点的 Dask 集群,并用 shuffle
进行了测试,读者也可以使用单机的 LocalCluster
,调大数据量,观察不同的 Shuffle 机制的性能表现。
import dask
from dask.distributed import Client, LocalCluster
dask.config.set({'dataframe.query-planning': False})
# 将 `10.0.0.3:8786` 更换为你的 Scheduler 地址
# 如果没有 Dask 集群,可以使用 LocalCluster
# client = Client(LocalCluster())
client = Client("10.0.0.3:8786")
ddf = dask.datasets.timeseries(
start="2024-01-01",
end="2024-07-01",
dtypes={"x": float, "y": float},
freq="1 h",
)
%%time
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
shuffled = ddf.shuffle(on="x")
shuffled.compute()
CPU times: user 138 ms, sys: 19 ms, total: 157 ms
Wall time: 5.58 s
%%time
with dask.config.set({"dataframe.shuffle.method": "tasks"}):
shuffled = ddf.shuffle(on="x")
shuffled.compute()
CPU times: user 136 ms, sys: 8.48 ms, total: 144 ms
Wall time: 15.8 s
数据重分布#
Dask DataFrame 提供了三种数据重分布方法:set_index()
,repartition()
和 shuffle()
,这三种都可能在全局层面对数据进行重分布。
方法名 |
用途 |
是否修改索引 |
是否可以修改 Partition 数量 |
---|---|---|---|
修改索引列,加速后续基于索引列的计算 |
是 |
是 |
|
修改 Partition 数量,多用于数据倾斜场景 |
否 |
是 |
|
将 DataFrame 重新打散并排列 |
否 |
是 |
在 章节 4.2 我们提过,set_index()
将某字段设置为索引列,后续一系列计算非常依赖这个字段,set_index()
能显著加速后续计算。repartition()
主要解决数据倾斜的问题,即某些 Partiton 上的数据过大,过大的 Partition 有可能导致内存不足。
案例分析:groupby
#
我们以 groupby(by=key).sum()
为例,了解其背后的 Shuffle 过程。图 4.6 展示了计算过程,它主要有三个阶段:分组(Split)、组内聚合(Apply Aggregation,比如 sum
)、组间聚合(Combine)。
分布式场景下,不同的数据分布在不同的 Partition 下,涉及到 Shuffle 的阶段有:
分组:按照
by
指定的分组字段进行分组,相同的分组字段被分到一起,这里涉及到大量 Shuffle 操作。组内聚合:组内聚合的 Shuffle 操作相对比较少。
组间聚合:组间聚合的 Shuffle 操作相对比较少。
根据 Shuffle 操作的数量,不难得出结论:
groupby(by=indexed_columns).agg()
和groupby(by=indexed_columns).apply(user_def_fn)
性能最好。indexed_columns
指的是分组字段by
是索引列(章节 4.2 中set_index
的列);agg
指的是 Dask DataFrame 提供的官方的sum
,mean
等聚合方法。因为indexed_columns
是排过序的了,可以很快地对indexed_columns
进行分组和数据分发。groupby(by=non_indexed_columns).agg()
的数据交换量要更大一些,Dask 官方提供的agg
方法做过一些优化。groupby(by=non_indexed_columns).apply(user_def_fn)
的成本最高。它既要对所有数据进行交换,又要执行用户自定义的函数,用户自定义函数的效率比 Dask 官方的低。