8.5. Modin#

Ray Data 提供的各类数据处理工具设计上较为简单,只能做一些比较简单的转换,能提供的复杂的数据处理功能可能不够多。Modin 是一个旨在提高 pandas 性能的框架,它通过将大数据分割并分布到多核和集群上,实现了对大数据集的高效处理。最初,Modin 底层使用了 Ray 作为其分布式执行引擎,有时也被称为 Ray 上的 pandas。随后,Modin 又集成了 Dask 和 unidist 执行引擎,其中 unidist 是 Modin 团队开发的分布式执行引擎。

安装 Modin 时,用户可以根据需要选择并安装相应的执行引擎,例如使用 pip install "modin[ray]"pip install "modin[dask]"。Modin 默认使用 Ray 作为其执行引擎。

API 兼容性#

Dask DataFrame 与 pandas DataFrame 存在不少差异,许多 pandas 工作流不能快速迁移到 Dask DataFrame。Modin 特别重视与 pandas 的兼容性,用户通过 import modin.pandas as pd,大部分 pandas 工作流可以快速迁移到 Modin。

Dask DataFrame 按列对大数据进行切分,并未记录每个分区的数据量。相比之下,Modin 在多个维度上对数据进行切分,并保留行标签和列标签。Modin 支持 iloc() 行索引;记录了每个数据块的数据量,从而可以支持median()quantile() 等操作;同时支持行和列的转换,比如,pivot()transpose() 等。关于 Modin 的设计,可以参考其两篇论文 [Petersohn et al., 2020] [Petersohn et al., 2021]

import os

import sys
sys.path.append("..")
from utils import nyc_flights

folder_path = nyc_flights()
file_path = os.path.join(folder_path, "nyc-flights", "*.csv")

Note

Modin 的 API 设计旨在与 pandas 保持一致,例如,pandas 的 read_csv() 函数只能读单个文件,不支持 *.csv 这样的通配符。Modin 拓展了 read_csv(),引入了 read_csv_glob() 方法,该方法可以读取 *.csv 等通配符文件,适用于处理大规模数据集。这些新增的 API 在 modin.experimental.pandas 模块中。

import modin.experimental.pandas as pd
df = pd.read_csv_glob(file_path, parse_dates={'Date': [0, 1, 2]})
df.iloc[3]
2024-07-09 17:02:35,641	INFO worker.py:1749 -- Started a local Ray instance.
FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.
FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.
UserWarning: `read_*` implementation has mismatches with pandas:
Data types of partitions are different! Please refer to the troubleshooting section of the Modin documentation to fix this issue.
Date                 1991-01-11 00:00:00
DayOfWeek                              5
DepTime                           1303.0
CRSDepTime                          1215
ArrTime                           1439.0
CRSArrTime                          1336
UniqueCarrier                         US
FlightNum                            121
TailNum                              NaN
ActualElapsedTime                   96.0
CRSElapsedTime                        81
AirTime                              NaN
ArrDelay                            63.0
DepDelay                            48.0
Origin                               EWR
Dest                                 PIT
Distance                           319.0
TaxiIn                               NaN
TaxiOut                              NaN
Cancelled                              0
Diverted                               0
Name: 3, dtype: object
df['ArrDelay'].median()
0.0

如果Modin中尚未实现某些 API,Modin 会回退到 pandas,以确保兼容性。然而,这种设计也存在明显的缺点:将 Modin 的 DataFrame 转换为 pandas DataFrame时会产生额外开销;若 DataFrame 原本分布在多个节点上,转换过程中数据将被集中到单机内存,这可能会超出单机内存的承载能力。

立即执行#

Modin 的计算是立即执行的,与 pandas 一致。用户无须像使用 Dask 那样调用 .compute() 方法来触发计算。Modin 不需要 Dask DataFrame 的数据类型推断功能。在 章节 4.1 中提到的飞机起降数据示例中,Dask DataFrame 的 tail() 方法可能会抛出异常,而 Modin 则能够提供与pandas 相同的语义。

df.tail(3)
Date DayOfWeek DepTime CRSDepTime ArrTime CRSArrTime UniqueCarrier FlightNum TailNum ActualElapsedTime ... AirTime ArrDelay DepDelay Origin Dest Distance TaxiIn TaxiOut Cancelled Diverted
1555982 1994-12-27 2 1721.0 1715 1930.0 1945 DL 149 NaN 129.0 ... NaN -15.0 6.0 JFK ATL 760.0 NaN NaN 0 0
1555983 1994-12-28 3 1715.0 1715 1934.0 1945 DL 149 NaN 139.0 ... NaN -11.0 0.0 JFK ATL 760.0 NaN NaN 0 0
1555984 1994-12-29 4 1715.0 1715 1941.0 1945 DL 149 NaN 146.0 ... NaN -4.0 0.0 JFK ATL 760.0 NaN NaN 0 0

3 rows x 21 columns

执行引擎#

Modin 支持 Ray、Dask 和 unidist 分布式执行引擎:可以利用单机多核,也可以运行在集群上。以 Ray 为例,用户可以向 Ray 集群上提交作业,在代码中初始 Ray 运行时 ray.init(address="auto") 后,会将作业运行 Ray 集群。

Modin 默认使用 Ray 作为执行后端,也可以通过环境变量 MODIN_ENGINE 来设置执行后端,在命令行里:export MODIN_ENGINE=dask;或在 Jupyter Notebook 中:

import modin.config as modin_cfg
modin_cfg.Engine.put("ray")

undist 是 Modin 自己实现的一个执行后端,它支持 MPI,如果想用 undist MPI,除了设置 MODIN_ENGINE 还要设置 UNIDIST_BACKEND

export MODIN_ENGINE=unidist
export UNIDIST_BACKEND=mpi 

案例:纽约市出租车数据分析#

我们将使用 Modin 对出租车数据进行数据分析。

%matplotlib inline
import matplotlib_inline
matplotlib_inline.backend_inline.set_matplotlib_formats('svg')
import matplotlib.pyplot as plt
from utils import nyc_taxi

taxi_path = nyc_taxi()

我们先读取数据。我们可以使用 read_parquet_glob() 与通配符 * 直接读取多个 Parquet 文件,在这里我们仅使用 read_parquet()

df = pd.read_parquet(os.path.join(taxi_path, "yellow_tripdata_2023-01.parquet"))

df.head()
FutureWarning: Passing 'use_legacy_dataset' is deprecated as of pyarrow 15.0.0 and will be removed in a future version.
VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count trip_distance RatecodeID store_and_fwd_flag PULocationID DOLocationID payment_type fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge total_amount congestion_surcharge airport_fee
0 2 2023-01-01 00:32:10 2023-01-01 00:40:36 1.0 0.97 1.0 N 161 141 2 9.3 1.00 0.5 0.00 0.0 1.0 14.30 2.5 0.00
1 2 2023-01-01 00:55:08 2023-01-01 01:01:27 1.0 1.10 1.0 N 43 237 1 7.9 1.00 0.5 4.00 0.0 1.0 16.90 2.5 0.00
2 2 2023-01-01 00:25:04 2023-01-01 00:37:49 1.0 2.51 1.0 N 48 238 1 14.9 1.00 0.5 15.00 0.0 1.0 34.90 2.5 0.00
3 1 2023-01-01 00:03:48 2023-01-01 00:13:25 0.0 1.90 1.0 N 138 7 1 12.1 7.25 0.5 0.00 0.0 1.0 20.85 0.0 1.25
4 2 2023-01-01 00:10:29 2023-01-01 00:21:19 1.0 1.43 1.0 N 107 79 1 11.4 1.00 0.5 3.28 0.0 1.0 19.68 2.5 0.00

下面我们展示数据预处理部分。

df = df.dropna(subset=['total_amount', 'RatecodeID'])

# 转换数据类型
df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
df['tpep_dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'])

# 计算行程时长
df['trip_duration'] = (df['tpep_dropoff_datetime'] - df['tpep_pickup_datetime']).dt.total_seconds() / 60

接下来我们进行数据的条件筛选并使用 groupby() 切分数据。

# 选择含有非零乘客的行程
non_zero_passenger_df = df[df['passenger_count'] > 0]
average_trip_duration = non_zero_passenger_df['trip_duration'].mean()
print("平均行程时长(非零乘客):")
print(average_trip_duration)

# 按支付方式分组,计算总费用的分布
total_amount_by_payment = non_zero_passenger_df.groupby('payment_type')['total_amount'].sum()
print("按支付方式分组的总费用:")
print(total_amount_by_payment)

# 对支付方式分组的数据进行排序
sorted_total_by_payment = total_amount_by_payment.sort_values(ascending=False)
print("\n按总费用降序排序的支付方式:")
print(sorted_total_by_payment)

# 按乘客数量分组,计算平均行程距离和总费用
avg_distance_amount_by_passenger = df.groupby('passenger_count').agg({
    'trip_distance': 'mean',
    'total_amount': 'mean'
})
print("\n按乘客数量分组的平均行程距离和总费用:")
print(avg_distance_amount_by_passenger)
(raylet) Spilled 4064 MiB, 399 objects, write throughput 2484 MiB/s. Set RAY_verbose_spill_logs=0 to disable this message.
平均行程时长(非零乘客):
15.68557239550762
按支付方式分组的总费用:
payment_type
1    67232359.58
2    12062049.01
3      165402.32
4       79019.28
Name: total_amount, dtype: float64

按总费用降序排序的支付方式:
payment_type
1    67232359.58
2    12062049.01
3      165402.32
4       79019.28
Name: total_amount, dtype: float64

按乘客数量分组的平均行程距离和总费用:
                 trip_distance  total_amount
passenger_count                             
0.0                   2.761904     24.162124
1.0                   3.338169     26.443472
2.0                   3.931051     29.313282
3.0                   3.664393     28.475420
4.0                   3.812581     29.611602
5.0                   3.282478     26.588261
6.0                   3.250963     26.558484
7.0                   4.238333     85.111667
8.0                   4.270769     99.336923
9.0                   0.000000     92.250000

我们再使用 apply() 对数据进行操作,以及使用 pivot_table() 创建数据透视表。

# 应用函数计算含税车费
df['fare_with_tax'] = df.apply(lambda row: row['fare_amount'] + row['mta_tax'], axis=1)
print("计算含税车费:")
print(df[['fare_amount', 'mta_tax', 'fare_with_tax']].head())

# 重塑数据创建数据透视表
pivot_table = df.pivot_table(values='total_amount', index='DOLocationID', columns='payment_type', aggfunc='mean')
print("\n数据透视表(总费用按下车地点和支付方式):")
print(pivot_table.head())
计算含税车费:
   fare_amount  mta_tax  fare_with_tax
0          9.3      0.5            9.8
1          7.9      0.5            8.4
2         14.9      0.5           15.4
3         12.1      0.5           12.6
4         11.4      0.5           11.9

数据透视表(总费用按下车地点和支付方式):
payment_type           1           2          3          4
DOLocationID                                              
1             130.041868  108.956124  73.196774   1.598767
2              58.446250   31.411538        NaN   0.000000
3              62.407438   65.358209   1.500000  12.360000
4              27.395179   20.115848  12.228704   2.635455
5             103.328113  135.500000        NaN        NaN

最后我们展示可视化部分。

# 绘制乘客数量分组的平均行程距离和总费用柱状图
fig, ax1 = plt.subplots(figsize=(10, 6))
avg_distance_amount_by_passenger['trip_distance'].plot(kind='bar', color='blue', ax=ax1, position=1, width=0.4)
ax1.set_ylabel('Average Trip Distance (miles)', color='blue')
ax2 = ax1.twinx()
avg_distance_amount_by_passenger['total_amount'].plot(kind='bar', color='red', ax=ax2, position=0, width=0.4)
ax2.set_ylabel('Average Total Amount ($)', color='red')
plt.title('Average Trip Distance and Total Amount by Passenger Count')
ax1.set_xlabel('Passenger Count')
plt.grid(True)
plt.show()
../_images/a738fcf917f7066357d28584296c9d91be7be10a88e1a0cdfcd03eb9f4987a0d.svg