4.2. 索引#

Hide code cell content
%config InlineBackend.figure_format = 'svg'
import os
import sys
sys.path.append("..")
from utils import nyc_flights

import dask
dask.config.set({'dataframe.query-planning': False})
import dask.dataframe as dd
import pandas as pd
from dask.distributed import LocalCluster, Client

cluster = LocalCluster()
client = Client(cluster)
/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 51899 instead
  warnings.warn(

图 4.2 所示,pandas DataFrame 主要对二维的表进行处理,有列标签和行标签。行标签通常会被用户忽视,但实际上起着至关重要的作用:索引(Indexing)。大多数 pandas DataFrame 的行标签是排好序的索引,比如从 0 开始递增。 这种排好序的索引使得 pandas DataFrame 里面的数据是有序的。

../_images/dataframe-model.svg

图 4.2 pandas DataFrame 数据模型#

创建 pandas DataFrame 时,会在最左侧自动生成了索引列。从下面的例子可以看出来,索引列没有列名,称不上是一个“字段”,它是在传入数据的字段基础上新增的列。

df = pd.DataFrame({
   'A': ['foo', 'bar', 'baz', 'qux'],
   'B': ['one', 'one', 'two', 'three'],
   'C': [1, 2, 3, 4],
   'D': [10, 20, 30, 40]
})
df
A B C D
0 foo one 1 10
1 bar one 2 20
2 baz two 3 30
3 qux three 4 40

也可以设置一个字段作为索引列:

df = df.set_index('A')
df
B C D
A
foo one 1 10
bar one 2 20
baz two 3 30
qux three 4 40

或者重置回原来的结构:

df = df.reset_index()
df
A B C D
0 foo one 1 10
1 bar one 2 20
2 baz two 3 30
3 qux three 4 40

有序行索引#

Dask DataFrame 由多个 pandas DataFrame 组成,但如何在全局维度维护整个 Dask DataFrame 行标签和行顺序是一个很大的挑战。Dask DataFrame 并没有刻意保留全局有序性,也使得它无法支持所有 pandas DataFrame 的功能。

图 4.3 所示,Dask DataFrame 在切分时有 divisions

../_images/divisions.svg

图 4.3 Dask DataFrame 的 divisions#

以 Dask 提供的样例数据函数 dask.datasets.timeseries 为例,它生成了时间序列,使用时间戳作为行标签,每个 Partition 的边界都被记录下来,存储在 .divisions 里。len(divisons) 等于 npartitions + 1

ts_df = dask.datasets.timeseries("2018-01-01", "2023-01-01")
print(f"df.npartitions: {ts_df.npartitions}")
print(f"df.divisions: {len(ts_df.divisions)}")
df.npartitions: 1826
df.divisions: 1827

Dask DataFrame 没有记录每个 Partition 中有多少行,因此无法在全局角度支持基于行索引的操作,比如 iloc

try:
    ts_df.iloc[3].compute()
except Exception as e:
    print(f"{type(e).__name__}, {e}")
NotImplementedError, 'DataFrame.iloc' only supports selecting columns. It must be used like 'df.iloc[:, column_indexer]'.

但是可以支持列标签来选择某些列;或者行标签上的 : 通配符选择所有的行:

ts_df.iloc[:, [1, 2]].compute()
id x
timestamp
2018-01-01 00:00:00 992 -0.711756
2018-01-01 00:00:01 1018 -0.838596
2018-01-01 00:00:02 1000 -0.735968
2018-01-01 00:00:03 1004 0.904384
2018-01-01 00:00:04 1021 0.025423
... ... ...
2022-12-31 23:59:55 1020 0.961542
2022-12-31 23:59:56 963 -0.663948
2022-12-31 23:59:57 1010 0.510401
2022-12-31 23:59:58 964 -0.882126
2022-12-31 23:59:59 1020 -0.532950

157766400 rows × 2 columns

对于 CSV 文件,Dask DataFrame 并没有自动生成 divisions

folder_path = nyc_flights()
file_path = os.path.join(folder_path, "nyc-flights", "*.csv")
flights_ddf = dd.read_csv(file_path,
                 parse_dates={'Date': [0, 1, 2]},
                 dtype={'TailNum': object,
                        'CRSElapsedTime': float,
                        'Cancelled': bool})
flights_ddf.divisions
/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/dask/dataframe/io/csv.py:640: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.
  head = reader(BytesIO(b_sample), nrows=sample_rows, **head_kwargs)
(None, None, None, None, None, None, None)

因为没有记录每个 Partition 有多少条数据,Dask DataFrame 无法很好地支持一些操作,比如 median() 这样的百分位操作,因为这些操作需要:(1) 对数据排序;(2) 定位到特定的行。

try:
    flights_ddf['DepDelay'].median()
except Exception as e:
    print(f"{type(e).__name__}, {e}")
NotImplementedError, Dask doesn't implement an exact median in all cases as this is hard to do in parallel. See the `median_approximate` method instead, which uses an approximate algorithm.

设置索引列#

set_index()#

在 Dask DataFrame 中,我们可以使用 set_index() 方法手动设置某一列为索引列,这个操作除了设置某个字段为索引列,还会根据这个字段对全局数据进行排序,它打乱了原来每个 Partition 的数据排序,因此会有很高的成本。

下面的例子展示了 set_index() 带来的变化:

def print_partitions(ddf):
    for i in range(ddf.npartitions):
        print(ddf.partitions[i].compute())

df = pd.DataFrame(
    {"col1": ["01", "05", "02", "03", "04"], "col2": ["a", "b", "c", "d", "e"]}
)
ddf = dd.from_pandas(df, npartitions=2)
print_partitions(ddf)
  col1 col2
0   01    a
1   05    b
2   02    c
  col1 col2
3   03    d
4   04    e
ddf2 = ddf.set_index("col1")
print_partitions(ddf2)
2024-04-23 16:05:06,483 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 008ee90768895dabe7a3e94389222068 initialized by task ('shuffle-transfer-008ee90768895dabe7a3e94389222068', 0) executed on worker tcp://127.0.0.1:51911
2024-04-23 16:05:06,505 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 008ee90768895dabe7a3e94389222068 deactivated due to stimulus 'task-finished-1713859506.50483'
     col2
col1     
01      a
2024-04-23 16:05:06,545 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 01fddf4f11082a43a6075f7888029dd3 initialized by task ('shuffle-transfer-01fddf4f11082a43a6075f7888029dd3', 1) executed on worker tcp://127.0.0.1:51912
2024-04-23 16:05:06,604 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 01fddf4f11082a43a6075f7888029dd3 deactivated due to stimulus 'task-finished-1713859506.6028118'
     col2
col1     
02      c
03      d
04      e
05      b

这个例子设置 col1 列为索引列,2 个 Partition 中的数据被打乱重排。如果是在数据量很大的场景,全局数据排序和重分布的成本极高。因此应该尽量避免这个操作。set_index() 也有它的优势,它可以加速下游的计算。数据重分布又被称为 Shuffle,我们会在 章节 4.4 中介绍 Shuffle 的计算过程和成本。

回到时间序列数据,该数据使用时间戳作为索引列。下面使用了两种方式对这份数据 set_index()。第一种没有设置 divisions,第二种设置了 divisions

第一种不设置 divisions 耗时很长,因为 Dask DataFrame 计算了所有 Partiton 的数据分布,并根据分布重排列了所有的 Partition,可以看到,Partition 的数目也发生了变化。

%%time
ts_df1 = ts_df.set_index("id")
nu =  ts_df1.loc[[1001]].name.nunique().compute()
print(f"before set_index npartitions: {ts_df.npartitions}")
print(f"after set_index npartitions: {ts_df1.npartitions}")
2024-04-23 16:05:16,522 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle d162433f4ca23d129354be4d414ea589 initialized by task ('shuffle-transfer-d162433f4ca23d129354be4d414ea589', 999) executed on worker tcp://127.0.0.1:51914
2024-04-23 16:05:27,101 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle d162433f4ca23d129354be4d414ea589 deactivated due to stimulus 'task-finished-1713859527.100699'
before set_index npartitions: 1826
after set_index npartitions: 165
CPU times: user 6.63 s, sys: 3.65 s, total: 10.3 s
Wall time: 20.6 s

第二种方式先提前获取了 divisions,然后将这些 divisions 用于设置 set_index()。设定 divisionset_index() 速度更快。

dask_computed_divisions = ts_df.set_index("id").divisions
unique_divisions = list(dict.fromkeys(list(dask_computed_divisions)))
%%time
ts_df2 = ts_df.set_index("id", divisions=unique_divisions)
nuids = ts_df2.loc[[1001]].name.nunique().compute()
2024-04-23 16:05:38,056 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle d162433f4ca23d129354be4d414ea589 initialized by task ('shuffle-transfer-d162433f4ca23d129354be4d414ea589', 999) executed on worker tcp://127.0.0.1:51914
2024-04-23 16:05:49,629 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle d162433f4ca23d129354be4d414ea589 deactivated due to stimulus 'task-finished-1713859549.629161'
CPU times: user 3.24 s, sys: 1.7 s, total: 4.94 s
Wall time: 11.9 s

如果不设置索引列,直接对 id 列进行查询,发现反而更快。

%%time
nu = ts_df.loc[ts_df["id"] == 1001].name.nunique().compute()
CPU times: user 1.88 s, sys: 1.09 s, total: 2.97 s
Wall time: 8.38 s

所以 Dask DataFrame 要慎重使用 set_index(),如果 set_index() 之后有很多以下操作,可以考虑使用 set_index()

  • 使用 loc 对索引列进行过滤

  • 两个 Dask DataFrame 在索引列上合并(merge()

  • 在索引列上进行分组聚合(groupby()

reset_index()#

在 pandas 中,groupby 默认 as_index=True,分组字段经过 groupby() 之后成为索引列。索引列在 DataFrame 中并不是“正式”的字段,如果分组聚合之后只有一个“正式”字段(不考虑索引列),分组聚合的结果就成了一个 Series。比如下面 pandas 的例子,Origin 列就是分组字段,如果不设置 as_index=Falsegroupby("Origin", as_index=False)["DepDelay"].mean() 生成的是一个 Series

# pandas
file_path = os.path.join(folder_path, "1991.csv")
pdf = pd.read_csv(file_path,
                 parse_dates={'Date': [0, 1, 2]},
                 dtype={'TailNum': object,
                        'CRSElapsedTime': float,
                        'Cancelled': bool})
uncancelled_pdf = pdf[pdf["Cancelled"] == False]
avg_pdf = uncancelled_pdf.groupby("Origin", as_index=False)["DepDelay"].mean()
avg_pdf.columns = ["Origin", "AvgDepDelay"]
avg_pdf.sort_values("AvgDepDelay")
/var/folders/4n/v40br47s46ggrjm9bdm64lwh0000gn/T/ipykernel_76150/639704942.py:3: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.
  pdf = pd.read_csv(file_path,
Origin AvgDepDelay
2 LGA 5.726304
0 EWR 6.916220
1 JFK 9.311532

或者是 reset_index(),来取消索引列,分组字段会成为 DataFrame 的一个正式的字段。

avg_pdf = uncancelled_pdf.groupby("Origin")["DepDelay"].mean().reset_index()
avg_pdf.columns = ["Origin", "AvgDepDelay"]
avg_pdf.sort_values("AvgDepDelay")
Origin AvgDepDelay
2 LGA 5.726304
0 EWR 6.916220
1 JFK 9.311532

Dask DataFrame 的 groupby() 不支持 as_index 参数。Dask DataFrame 只能使用 reset_index() 来取消索引列。

uncancelled_ddf = flights_ddf[flights_ddf["Cancelled"] == False]
avg_ddf = uncancelled_ddf.groupby("Origin")["DepDelay"].mean().reset_index()
avg_ddf.columns = ["Origin", "AvgDepDelay"]
avg_ddf = avg_ddf.compute()
# pandas 只使用了一年数据,因此结果不一样
avg_ddf.sort_values("AvgDepDelay")
/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/dask/dataframe/io/csv.py:195: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.
  df = reader(bio, **kwargs)
/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/dask/dataframe/io/csv.py:195: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.
  df = reader(bio, **kwargs)
/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/dask/dataframe/io/csv.py:195: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.
  df = reader(bio, **kwargs)
/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/dask/dataframe/io/csv.py:195: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.
  df = reader(bio, **kwargs)
Origin AvgDepDelay
2 LGA 6.944939
0 EWR 9.997188
1 JFK 10.766914
client.shutdown()