10.1. Xorbits Data#
Xorbits Data 是面向数据科学的分布式计算框架,具有 Dask 和 Modin 相似的功能,用于加速 pandas DataFrame、NumPy。Xorbits Data 通过切分大数据集,然后利用 pandas 或 NumPy 执行操作。其底层采用了自主研发的 Actor 编程框架 Xoscar,不依赖于 Ray 或者 Dask。
Xorbits 集群#
在进行计算前,Xorbits 需要在多节点环境中初始化集群。对于单机环境,可以直接使用 xorbits.init()
进行初始化。对于集群环境,可以按照以下步骤进行配置:首先启动一个管理进程(Supervisor),然后在各个计算节点上启动 Worker 进程。
# 先在管理节点启动 Supervisor
xorbits-supervisor -H <supervisor_ip> -p <supervisor_port> -w <web_port>
# 在每个计算节点启动 Worker
xorbits-worker -H <worker_ip> -p <worker_port> -s <supervisor_ip>:<supervisor_port>
其中,<supervisor_ip>
和 <supervisor_port>
为管理节点的 IP 和端口号,<web_port>
为仪表盘端口号,客户端也通过这个端口与集群连接。<worker_ip>
和 <worker_port>
为每个计算节点的 IP 和端口号。启动好 Supervisor 和 Worker 后,在代码中使用 xorbits.init("<supervisor_ip>:<web_port>")
连接到这个集群,计算任务就可以横向扩展到集群上了。
API 兼容性#
在 pandas DataFrame 的兼容性上,Modin > Xorbits > Dask DataFrame;在性能上,Xorbits > Dask DataFrame > Modin。
Xorbits 也是在多维度对数据进行切分,保留行标签和列标签,提供了绝大多数 pandas API,比如 iloc()
、median()
。
import os
import sys
sys.path.append("..")
from utils import nyc_taxi
taxi_path = nyc_taxi()
import xorbits
import xorbits.pandas as pd
df = pd.read_parquet(taxi_path, use_arrow_dtype=False)
df.iloc[3]
/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/xorbits/_mars/deploy/oscar/session.py:1953: UserWarning: No existing session found, creating a new local session now.
warnings.warn(warning_msg)
2024-05-09 10:13:12,400 xorbits._mars.deploy.oscar.local 43280 WARNING Web service started at http://0.0.0.0:54965
VendorID 1
tpep_pickup_datetime 2023-01-01 00:03:48
tpep_dropoff_datetime 2023-01-01 00:13:25
passenger_count 0.0
trip_distance 1.9
RatecodeID 1.0
store_and_fwd_flag N
PULocationID 138
DOLocationID 7
payment_type 1
fare_amount 12.1
extra 7.25
mta_tax 0.5
tip_amount 0.0
tolls_amount 0.0
improvement_surcharge 1.0
total_amount 20.85
congestion_surcharge 0.0
airport_fee 1.25
Name: 3, dtype: object
df.dtypes
VendorID int64
tpep_pickup_datetime datetime64[us]
tpep_dropoff_datetime datetime64[us]
passenger_count float64
trip_distance float64
RatecodeID float64
store_and_fwd_flag object
PULocationID int64
DOLocationID int64
payment_type int64
fare_amount float64
extra float64
mta_tax float64
tip_amount float64
tolls_amount float64
improvement_surcharge float64
total_amount float64
congestion_surcharge float64
airport_fee float64
dtype: object
df['trip_distance'].median()
1.79
推迟执行#
Xorbits 采用了类似 Dask 的计算图,任何计算会先转换为计算图再执行;但 Xorbits 不需要明确调用 compute()
来触发计算,这种方式被称为推迟(Deferred)执行。Xorbits 在背后构建了计算图,但只有遇到 print()
等需要将数据呈现给用户的操作,Xorbits 才会执行计算图。这种方式使得 Xorbits 与 pandas 和 NumPy 的语义更加相似。如果手动触发计算,也可以 xorbits.run(df)
。
以下面的数据可视化为例,gb_time
只是一个指向计算图的指针,并不是实际的数据,但当 Plotly 需要 gb_time
的结果时,Xorbits 会触发计算。
df['PU_dayofweek'] = df['tpep_pickup_datetime'].dt.dayofweek
df['PU_hour'] = df['tpep_pickup_datetime'].dt.hour
gb_time = df.groupby(by=['PU_dayofweek', 'PU_hour'], as_index=False).agg(count=('PU_dayofweek', 'count'))
import plotly.express as px
import plotly.io as pio
pio.renderers.default = "notebook"
b = px.bar(
gb_time,
x='PU_hour',
y='count',
color='PU_dayofweek',
color_continuous_scale='sunset_r',
)
b.show()
同样使用计算图,但 Xorbits 可以不用像 Dask DataFrame 那样关注计算图的细节,也不需要使用 repartition()
。Xorbits 在后台计算图构建和执行时做了优化,出现数据倾斜时,Xorbits 会自动优化计算图,避免计算图过大。