Xorbits Data

10.1. Xorbits Data#

Xorbits Data 是面向数据科学的分布式计算框架,具有 Dask 和 Modin 相似的功能,用于加速 pandas DataFrame、NumPy。Xorbits Data 通过切分大数据集,然后利用 pandas 或 NumPy 执行操作。其底层采用了自主研发的 Actor 编程框架 Xoscar,不依赖于 Ray 或者 Dask。

Xorbits 集群#

在进行计算前,Xorbits 需要在多节点环境中初始化集群。对于单机环境,可以直接使用 xorbits.init() 进行初始化。对于集群环境,可以按照以下步骤进行配置:首先启动一个管理进程(Supervisor),然后在各个计算节点上启动 Worker 进程。

# 先在管理节点启动 Supervisor
xorbits-supervisor -H <supervisor_ip> -p <supervisor_port> -w <web_port>

# 在每个计算节点启动 Worker
xorbits-worker -H <worker_ip> -p <worker_port> -s <supervisor_ip>:<supervisor_port>

其中,<supervisor_ip><supervisor_port> 为管理节点的 IP 和端口号,<web_port> 为仪表盘端口号,客户端也通过这个端口与集群连接。<worker_ip><worker_port> 为每个计算节点的 IP 和端口号。启动好 Supervisor 和 Worker 后,在代码中使用 xorbits.init("<supervisor_ip>:<web_port>") 连接到这个集群,计算任务就可以横向扩展到集群上了。

API 兼容性#

在 pandas DataFrame 的兼容性上,Modin > Xorbits > Dask DataFrame;在性能上,Xorbits > Dask DataFrame > Modin。

Xorbits 也是在多维度对数据进行切分,保留行标签和列标签,提供了绝大多数 pandas API,比如 iloc()median()

import os

import sys
sys.path.append("..")
from utils import nyc_taxi

taxi_path = nyc_taxi()
import xorbits
import xorbits.pandas as pd

df = pd.read_parquet(taxi_path, use_arrow_dtype=False)
df.iloc[3]
/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/xorbits/_mars/deploy/oscar/session.py:1953: UserWarning: No existing session found, creating a new local session now.
  warnings.warn(warning_msg)
2024-05-09 10:13:12,400 xorbits._mars.deploy.oscar.local 43280 WARNING  Web service started at http://0.0.0.0:54965
VendorID                                   1
tpep_pickup_datetime     2023-01-01 00:03:48
tpep_dropoff_datetime    2023-01-01 00:13:25
passenger_count                          0.0
trip_distance                            1.9
RatecodeID                               1.0
store_and_fwd_flag                         N
PULocationID                             138
DOLocationID                               7
payment_type                               1
fare_amount                             12.1
extra                                   7.25
mta_tax                                  0.5
tip_amount                               0.0
tolls_amount                             0.0
improvement_surcharge                    1.0
total_amount                           20.85
congestion_surcharge                     0.0
airport_fee                             1.25
Name: 3, dtype: object
df.dtypes
VendorID                          int64
tpep_pickup_datetime     datetime64[us]
tpep_dropoff_datetime    datetime64[us]
passenger_count                 float64
trip_distance                   float64
RatecodeID                      float64
store_and_fwd_flag               object
PULocationID                      int64
DOLocationID                      int64
payment_type                      int64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
improvement_surcharge           float64
total_amount                    float64
congestion_surcharge            float64
airport_fee                     float64
dtype: object
df['trip_distance'].median()
1.79

推迟执行#

Xorbits 采用了类似 Dask 的计算图,任何计算会先转换为计算图再执行;但 Xorbits 不需要明确调用 compute() 来触发计算,这种方式被称为推迟(Deferred)执行。Xorbits 在背后构建了计算图,但只有遇到 print() 等需要将数据呈现给用户的操作,Xorbits 才会执行计算图。这种方式使得 Xorbits 与 pandas 和 NumPy 的语义更加相似。如果手动触发计算,也可以 xorbits.run(df)

以下面的数据可视化为例,gb_time 只是一个指向计算图的指针,并不是实际的数据,但当 Plotly 需要 gb_time 的结果时,Xorbits 会触发计算。

df['PU_dayofweek'] = df['tpep_pickup_datetime'].dt.dayofweek
df['PU_hour'] = df['tpep_pickup_datetime'].dt.hour
gb_time = df.groupby(by=['PU_dayofweek', 'PU_hour'], as_index=False).agg(count=('PU_dayofweek', 'count'))
import plotly.express as px
import plotly.io as pio
pio.renderers.default = "notebook"

b = px.bar(
    gb_time,
    x='PU_hour',
    y='count',
    color='PU_dayofweek',
    color_continuous_scale='sunset_r',
)
b.show()

同样使用计算图,但 Xorbits 可以不用像 Dask DataFrame 那样关注计算图的细节,也不需要使用 repartition()。Xorbits 在后台计算图构建和执行时做了优化,出现数据倾斜时,Xorbits 会自动优化计算图,避免计算图过大。