4.4. Shuffle#

对于一个分布式系统,在不同 Worker 之间交换数据被称为 Shuffle,或者说 Shuffle 将数据从某个 Partition 移动到其他 Partition。有些 Shuffle 是显式的,比如 repartition,从 API 名称可以看出它会在不同的 Partition 之间交换数据;有些 Shuffle 是隐式的,比如 sortmergegroupby 背后都有 Shuffle 过程。Shuffle 一直是分布式大数据计算领域的难题之一,像 sortmergegroupby 这些 pandas 算子在单机上实现起来相对简单,但是在大数据分布式计算场景,实现起来并不那么容易。

Shuffle 实现机制#

章节 3.5 介绍了 Dask 主要基于 Task Graph 构建, Dask 的 Task Graph 是一个有向无环图。有向边表示下游 Partition 的输入依赖上游 Partition 的输出,任何数据移动都会在 Task Graph 上生成一条有向边。很多计算任务的 Shuffle 有大量的数据移动,有的场景下,所有数据都会打散,也意味着上游会有多条指向下游的边。这种基于 Task Graph 的 Shuffle 会使得 Task Graph 非常庞大,Task Graph 过大会使得 Dask Scheduler 的负载过重,进一步导致计算极其缓慢。如 图 4.5 左侧所示,tasks 是基于 Task Graph 的机制,上游和下游之间建立了有向边,如果有中间层(通常因为上游流入的数据太大,需要将数据进一步切分成多个 Partition),那么中间层还会进一步增加 Task Graph 的复杂程度。

为解决 Task Graph 过大的问题,Dask 设计了一种点对点(Peer-to-peer)的 Shuffle 机制。如 图 4.5 右侧所示,p2p 在 Task Graph 中引入了一个虚拟的障碍(Barrier)节点。Barrier 并不是一个真正的 Task,引入 Barrier 节点可以使 Task Graph 复杂度显著下降。

../_images/shuffle-tasks-p2p.svg

图 4.5 Dask Shuffle: tasks v.s. p2p#

目前,Dask 提供了两类 Shuffle 实现策略:单机和分布式。

  • 单机。如果数据大小超出了内存空间,可以将中间数据写到磁盘上。单机场景默认使用这种策略。

  • 分布式。如 图 4.5 所示,分布式场景提供了两种 Shuffle 策略,tasksp2ptasks 是基于 Task Graph 的 Shuffle 实现,很多场景效率比较低,会遇到刚提到的 Task Graph 过大的问题。p2p 基于点对点的 Shuffle 实现,Task Graph 的复杂性显著降低,性能也显著提升。Dask 会优先选择 p2p

dask.config.set({"dataframe.shuffle.method": "p2p"}) 对当前 Python 脚本的所有计算都使用 p2p 方式进行 Shuffle。也可以针对某个算子设置 Shuffle 策略,比如 ddf.merge(shuffle_method="p2p")

为对比两种分布式场景的 Shuffle 机制性能,这里搭建了一个两节点的 Dask 集群,并用 shuffle 进行了测试,读者也可以使用单机的 LocalCluster,调大数据量,观察不同的 Shuffle 机制的性能表现。

import dask
from dask.distributed import Client, LocalCluster

dask.config.set({'dataframe.query-planning': False})

# 将 `10.0.0.3:8786` 更换为你的 Scheduler 地址
# 如果没有 Dask 集群,可以使用 LocalCluster
# client = Client(LocalCluster())
client = Client("10.0.0.3:8786")

ddf = dask.datasets.timeseries(
        start="2024-01-01",
        end="2024-07-01",
        dtypes={"x": float, "y": float},
        freq="1 h",
    )
%%time
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
    shuffled = ddf.shuffle(on="x")
    shuffled.compute()
CPU times: user 138 ms, sys: 19 ms, total: 157 ms
Wall time: 5.58 s
%%time
with dask.config.set({"dataframe.shuffle.method": "tasks"}):
    shuffled = ddf.shuffle(on="x")
    shuffled.compute()
CPU times: user 136 ms, sys: 8.48 ms, total: 144 ms
Wall time: 15.8 s

数据重分布#

Dask DataFrame 提供了三种数据重分布方法:set_index()repartition()shuffle(),这三种都可能在全局层面对数据进行重分布。

表 4.3 Dask 三种数据重分布方法#

方法名

用途

是否修改索引

是否可以修改 Partition 数量

DataFrame.set_index()

修改索引列,加速后续基于索引列的计算

DataFrame.repartition()

修改 Partition 数量,多用于数据倾斜场景

DataFrame.shuffle()

将 DataFrame 重新打散并排列

章节 4.2 我们提过,set_index() 将某字段设置为索引列,后续一系列计算非常依赖这个字段,set_index() 能显著加速后续计算。repartition() 主要解决数据倾斜的问题,即某些 Partiton 上的数据过大,过大的 Partition 有可能导致内存不足。

案例分析:groupby#

我们以 groupby(by=key).sum() 为例,了解其背后的 Shuffle 过程。图 4.6 展示了计算过程,它主要有三个阶段:分组(Split)、组内聚合(Apply Aggregation,比如 sum)、组间聚合(Combine)。

../_images/groupby.svg

图 4.6 DataFrame groupby 示意图#

分布式场景下,不同的数据分布在不同的 Partition 下,涉及到 Shuffle 的阶段有:

  • 分组:按照 by 指定的分组字段进行分组,相同的分组字段被分到一起,这里涉及到大量 Shuffle 操作。

  • 组内聚合:组内聚合的 Shuffle 操作相对比较少。

  • 组间聚合:组间聚合的 Shuffle 操作相对比较少。

根据 Shuffle 操作的数量,不难得出结论:

  • groupby(by=indexed_columns).agg()groupby(by=indexed_columns).apply(user_def_fn) 性能最好。indexed_columns 指的是分组字段 by 是索引列(章节 4.2set_index 的列);agg 指的是 Dask DataFrame 提供的官方的 summean 等聚合方法。因为 indexed_columns 是排过序的了,可以很快地对 indexed_columns 进行分组和数据分发。

  • groupby(by=non_indexed_columns).agg() 的数据交换量要更大一些,Dask 官方提供的 agg 方法做过一些优化。

  • groupby(by=non_indexed_columns).apply(user_def_fn) 的成本最高。它既要对所有数据进行交换,又要执行用户自定义的函数,用户自定义函数的效率比 Dask 官方的低。