4.2. 索引#
Show code cell content
%config InlineBackend.figure_format = 'svg'
import os
import sys
sys.path.append("..")
from utils import nyc_flights
import dask
dask.config.set({'dataframe.query-planning': False})
import dask.dataframe as dd
import pandas as pd
from dask.distributed import LocalCluster, Client
cluster = LocalCluster()
client = Client(cluster)
/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 51899 instead
warnings.warn(
如 图 4.2 所示,pandas DataFrame 主要对二维的表进行处理,有列标签和行标签。行标签通常会被用户忽视,但实际上起着至关重要的作用:索引(Indexing)。大多数 pandas DataFrame 的行标签是排好序的索引,比如从 0 开始递增。 这种排好序的索引使得 pandas DataFrame 里面的数据是有序的。
创建 pandas DataFrame 时,会在最左侧自动生成了索引列。从下面的例子可以看出来,索引列没有列名,称不上是一个“字段”,它是在传入数据的字段基础上新增的列。
df = pd.DataFrame({
'A': ['foo', 'bar', 'baz', 'qux'],
'B': ['one', 'one', 'two', 'three'],
'C': [1, 2, 3, 4],
'D': [10, 20, 30, 40]
})
df
A | B | C | D | |
---|---|---|---|---|
0 | foo | one | 1 | 10 |
1 | bar | one | 2 | 20 |
2 | baz | two | 3 | 30 |
3 | qux | three | 4 | 40 |
也可以设置一个字段作为索引列:
df = df.set_index('A')
df
B | C | D | |
---|---|---|---|
A | |||
foo | one | 1 | 10 |
bar | one | 2 | 20 |
baz | two | 3 | 30 |
qux | three | 4 | 40 |
或者重置回原来的结构:
df = df.reset_index()
df
A | B | C | D | |
---|---|---|---|---|
0 | foo | one | 1 | 10 |
1 | bar | one | 2 | 20 |
2 | baz | two | 3 | 30 |
3 | qux | three | 4 | 40 |
有序行索引#
Dask DataFrame 由多个 pandas DataFrame 组成,但如何在全局维度维护整个 Dask DataFrame 行标签和行顺序是一个很大的挑战。Dask DataFrame 并没有刻意保留全局有序性,也使得它无法支持所有 pandas DataFrame 的功能。
如 图 4.3 所示,Dask DataFrame 在切分时有 divisions
。
以 Dask 提供的样例数据函数 dask.datasets.timeseries
为例,它生成了时间序列,使用时间戳作为行标签,每个 Partition 的边界都被记录下来,存储在 .divisions
里。len(divisons)
等于 npartitions + 1
。
ts_df = dask.datasets.timeseries("2018-01-01", "2023-01-01")
print(f"df.npartitions: {ts_df.npartitions}")
print(f"df.divisions: {len(ts_df.divisions)}")
df.npartitions: 1826
df.divisions: 1827
Dask DataFrame 没有记录每个 Partition 中有多少行,因此无法在全局角度支持基于行索引的操作,比如 iloc
。
try:
ts_df.iloc[3].compute()
except Exception as e:
print(f"{type(e).__name__}, {e}")
NotImplementedError, 'DataFrame.iloc' only supports selecting columns. It must be used like 'df.iloc[:, column_indexer]'.
但是可以支持列标签来选择某些列;或者行标签上的 :
通配符选择所有的行:
ts_df.iloc[:, [1, 2]].compute()
id | x | |
---|---|---|
timestamp | ||
2018-01-01 00:00:00 | 992 | -0.711756 |
2018-01-01 00:00:01 | 1018 | -0.838596 |
2018-01-01 00:00:02 | 1000 | -0.735968 |
2018-01-01 00:00:03 | 1004 | 0.904384 |
2018-01-01 00:00:04 | 1021 | 0.025423 |
... | ... | ... |
2022-12-31 23:59:55 | 1020 | 0.961542 |
2022-12-31 23:59:56 | 963 | -0.663948 |
2022-12-31 23:59:57 | 1010 | 0.510401 |
2022-12-31 23:59:58 | 964 | -0.882126 |
2022-12-31 23:59:59 | 1020 | -0.532950 |
157766400 rows × 2 columns
对于 CSV 文件,Dask DataFrame 并没有自动生成 divisions
。
folder_path = nyc_flights()
file_path = os.path.join(folder_path, "nyc-flights", "*.csv")
flights_ddf = dd.read_csv(file_path,
parse_dates={'Date': [0, 1, 2]},
dtype={'TailNum': object,
'CRSElapsedTime': float,
'Cancelled': bool})
flights_ddf.divisions
/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/dask/dataframe/io/csv.py:640: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.
head = reader(BytesIO(b_sample), nrows=sample_rows, **head_kwargs)
(None, None, None, None, None, None, None)
因为没有记录每个 Partition 有多少条数据,Dask DataFrame 无法很好地支持一些操作,比如 median()
这样的百分位操作,因为这些操作需要:(1) 对数据排序;(2) 定位到特定的行。
try:
flights_ddf['DepDelay'].median()
except Exception as e:
print(f"{type(e).__name__}, {e}")
NotImplementedError, Dask doesn't implement an exact median in all cases as this is hard to do in parallel. See the `median_approximate` method instead, which uses an approximate algorithm.
设置索引列#
set_index()
#
在 Dask DataFrame 中,我们可以使用 set_index()
方法手动设置某一列为索引列,这个操作除了设置某个字段为索引列,还会根据这个字段对全局数据进行排序,它打乱了原来每个 Partition 的数据排序,因此会有很高的成本。
下面的例子展示了 set_index()
带来的变化:
def print_partitions(ddf):
for i in range(ddf.npartitions):
print(ddf.partitions[i].compute())
df = pd.DataFrame(
{"col1": ["01", "05", "02", "03", "04"], "col2": ["a", "b", "c", "d", "e"]}
)
ddf = dd.from_pandas(df, npartitions=2)
print_partitions(ddf)
col1 col2
0 01 a
1 05 b
2 02 c
col1 col2
3 03 d
4 04 e
ddf2 = ddf.set_index("col1")
print_partitions(ddf2)
2024-04-23 16:05:06,483 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 008ee90768895dabe7a3e94389222068 initialized by task ('shuffle-transfer-008ee90768895dabe7a3e94389222068', 0) executed on worker tcp://127.0.0.1:51911
2024-04-23 16:05:06,505 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 008ee90768895dabe7a3e94389222068 deactivated due to stimulus 'task-finished-1713859506.50483'
col2
col1
01 a
2024-04-23 16:05:06,545 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 01fddf4f11082a43a6075f7888029dd3 initialized by task ('shuffle-transfer-01fddf4f11082a43a6075f7888029dd3', 1) executed on worker tcp://127.0.0.1:51912
2024-04-23 16:05:06,604 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 01fddf4f11082a43a6075f7888029dd3 deactivated due to stimulus 'task-finished-1713859506.6028118'
col2
col1
02 c
03 d
04 e
05 b
这个例子设置 col1
列为索引列,2 个 Partition 中的数据被打乱重排。如果是在数据量很大的场景,全局数据排序和重分布的成本极高。因此应该尽量避免这个操作。set_index()
也有它的优势,它可以加速下游的计算。数据重分布又被称为 Shuffle,我们会在 章节 4.4 中介绍 Shuffle 的计算过程和成本。
回到时间序列数据,该数据使用时间戳作为索引列。下面使用了两种方式对这份数据 set_index()
。第一种没有设置 divisions
,第二种设置了 divisions
。
第一种不设置 divisions
耗时很长,因为 Dask DataFrame 计算了所有 Partiton 的数据分布,并根据分布重排列了所有的 Partition,可以看到,Partition 的数目也发生了变化。
%%time
ts_df1 = ts_df.set_index("id")
nu = ts_df1.loc[[1001]].name.nunique().compute()
print(f"before set_index npartitions: {ts_df.npartitions}")
print(f"after set_index npartitions: {ts_df1.npartitions}")
2024-04-23 16:05:16,522 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle d162433f4ca23d129354be4d414ea589 initialized by task ('shuffle-transfer-d162433f4ca23d129354be4d414ea589', 999) executed on worker tcp://127.0.0.1:51914
2024-04-23 16:05:27,101 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle d162433f4ca23d129354be4d414ea589 deactivated due to stimulus 'task-finished-1713859527.100699'
before set_index npartitions: 1826
after set_index npartitions: 165
CPU times: user 6.63 s, sys: 3.65 s, total: 10.3 s
Wall time: 20.6 s
第二种方式先提前获取了 divisions
,然后将这些 divisions
用于设置 set_index()
。设定 division
的 set_index()
速度更快。
dask_computed_divisions = ts_df.set_index("id").divisions
unique_divisions = list(dict.fromkeys(list(dask_computed_divisions)))
%%time
ts_df2 = ts_df.set_index("id", divisions=unique_divisions)
nuids = ts_df2.loc[[1001]].name.nunique().compute()
2024-04-23 16:05:38,056 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle d162433f4ca23d129354be4d414ea589 initialized by task ('shuffle-transfer-d162433f4ca23d129354be4d414ea589', 999) executed on worker tcp://127.0.0.1:51914
2024-04-23 16:05:49,629 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle d162433f4ca23d129354be4d414ea589 deactivated due to stimulus 'task-finished-1713859549.629161'
CPU times: user 3.24 s, sys: 1.7 s, total: 4.94 s
Wall time: 11.9 s
如果不设置索引列,直接对 id
列进行查询,发现反而更快。
%%time
nu = ts_df.loc[ts_df["id"] == 1001].name.nunique().compute()
CPU times: user 1.88 s, sys: 1.09 s, total: 2.97 s
Wall time: 8.38 s
所以 Dask DataFrame 要慎重使用 set_index()
,如果 set_index()
之后有很多以下操作,可以考虑使用 set_index()
。
使用
loc
对索引列进行过滤两个 Dask DataFrame 在索引列上合并(
merge()
)在索引列上进行分组聚合(
groupby()
)
reset_index()
#
在 pandas 中,groupby
默认 as_index=True
,分组字段经过 groupby()
之后成为索引列。索引列在 DataFrame 中并不是“正式”的字段,如果分组聚合之后只有一个“正式”字段(不考虑索引列),分组聚合的结果就成了一个 Series
。比如下面 pandas 的例子,Origin
列就是分组字段,如果不设置 as_index=False
,groupby("Origin", as_index=False)["DepDelay"].mean()
生成的是一个 Series
。
# pandas
file_path = os.path.join(folder_path, "1991.csv")
pdf = pd.read_csv(file_path,
parse_dates={'Date': [0, 1, 2]},
dtype={'TailNum': object,
'CRSElapsedTime': float,
'Cancelled': bool})
uncancelled_pdf = pdf[pdf["Cancelled"] == False]
avg_pdf = uncancelled_pdf.groupby("Origin", as_index=False)["DepDelay"].mean()
avg_pdf.columns = ["Origin", "AvgDepDelay"]
avg_pdf.sort_values("AvgDepDelay")
/var/folders/4n/v40br47s46ggrjm9bdm64lwh0000gn/T/ipykernel_76150/639704942.py:3: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.
pdf = pd.read_csv(file_path,
Origin | AvgDepDelay | |
---|---|---|
2 | LGA | 5.726304 |
0 | EWR | 6.916220 |
1 | JFK | 9.311532 |
或者是 reset_index()
,来取消索引列,分组字段会成为 DataFrame
的一个正式的字段。
avg_pdf = uncancelled_pdf.groupby("Origin")["DepDelay"].mean().reset_index()
avg_pdf.columns = ["Origin", "AvgDepDelay"]
avg_pdf.sort_values("AvgDepDelay")
Origin | AvgDepDelay | |
---|---|---|
2 | LGA | 5.726304 |
0 | EWR | 6.916220 |
1 | JFK | 9.311532 |
Dask DataFrame 的 groupby()
不支持 as_index
参数。Dask DataFrame 只能使用 reset_index()
来取消索引列。
uncancelled_ddf = flights_ddf[flights_ddf["Cancelled"] == False]
avg_ddf = uncancelled_ddf.groupby("Origin")["DepDelay"].mean().reset_index()
avg_ddf.columns = ["Origin", "AvgDepDelay"]
avg_ddf = avg_ddf.compute()
# pandas 只使用了一年数据,因此结果不一样
avg_ddf.sort_values("AvgDepDelay")
/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/dask/dataframe/io/csv.py:195: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.
df = reader(bio, **kwargs)
/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/dask/dataframe/io/csv.py:195: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.
df = reader(bio, **kwargs)
/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/dask/dataframe/io/csv.py:195: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.
df = reader(bio, **kwargs)
/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/dask/dataframe/io/csv.py:195: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.
df = reader(bio, **kwargs)
Origin | AvgDepDelay | |
---|---|---|
2 | LGA | 6.944939 |
0 | EWR | 9.997188 |
1 | JFK | 10.766914 |
client.shutdown()