8.3. 数据转换#

数据处理的核心在于对数据进行一系列的转换(Transform),比如:

  • 如何对一条、一批次进行转换

  • 如何进行分组 groupby

转换#

map() 与 map_batches()#

Ray Data 提供了两类数据转换操作,如 图 8.4 所示。这两类转换操作都是典型的 Embarrassingly Parallel 计算模式,没有 Shuffle 通信成本。

  • 对于一行数据,可以用 Dataset.map()Dataset.flat_map() 这两个 API,即对每一条数据一一进行转换。这与其他大数据框架(Spark 或者 Flink)类似。输入一条数据,输出一条数据。

  • 将多行数据打包为一个批次(Batch),对一个批次的数据进行转换:Dataset.map_batches()。输入一个 Batch,输出一个 Batch。

../_images/map-map-batches.svg

图 8.4 map() v.s. map_batches()#

案例:纽约出租车#

我们仍以纽约出租车数据为例,演示如何使用这两类转换操作。

Hide code cell content
import os
import sys
from typing import Any, Dict

sys.path.append("..")
from utils import nyc_taxi

import numpy as np
import pandas as pd
import torch
import ray

import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

if ray.is_initialized:
    ray.shutdown()

ray.init()
2024-04-23 15:42:27,262	INFO worker.py:1740 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8265 

读取数据到 Dataset 类,查看原有的数据格式。

dataset_path = nyc_taxi()
dataset = ray.data.read_parquet(dataset_path)
dataset.take(1)
2024-04-23 15:42:28,728	INFO dataset.py:2370 -- Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.
2024-04-23 15:42:28,731	INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-04-23_15-42-25_575692_75375/logs
2024-04-23 15:42:28,731	INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> LimitOperator[limit=1]
[{'VendorID': 1,
  'tpep_pickup_datetime': datetime.datetime(2023, 2, 1, 0, 32, 53),
  'tpep_dropoff_datetime': datetime.datetime(2023, 2, 1, 0, 34, 34),
  'passenger_count': 2.0,
  'trip_distance': 0.3,
  'RatecodeID': 1.0,
  'store_and_fwd_flag': 'N',
  'PULocationID': 142,
  'DOLocationID': 163,
  'payment_type': 2,
  'fare_amount': 4.4,
  'extra': 3.5,
  'mta_tax': 0.5,
  'tip_amount': 0.0,
  'tolls_amount': 0.0,
  'improvement_surcharge': 1.0,
  'total_amount': 9.4,
  'congestion_surcharge': 2.5,
  'airport_fee': None}]

Note

Ray Data 的各类操作都是延迟(Lazy)执行的,即这些操作不是立即执行的,而是遇到数据查看或保存操作等时,才会执行,比如:show()take()iter_rows()write_parquet() 等操作会触发转换操作。

map(fn) 的最重要的参数是一个自定义的函数 fn,这个函数对每一条输入数据进行转换,返回一条输出数据。下面这个例子中,我们自定义了函数 transform_row,提取出了每次订单的时长、距离和价格,其他的字段先忽略,重点关注 tpep_dropoff_datetimetpep_pickup_datetime 两个字段。map(fn)fn 是对一条数据进行转换,函数的输入类型是一个 Dict 键值字典,键是 Schema 的字段名。

def transform_row(row: Dict[str, Any]) -> Dict[str, Any]:
    result = {}
    result["trip_duration"] = (row["tpep_dropoff_datetime"] - row["tpep_pickup_datetime"]).total_seconds()
    result["trip_distance"] = row["trip_distance"]
    result["fare_amount"] = row["fare_amount"]
    return result

row_ds = dataset.map(transform_row)
row_ds.take(1)
2024-04-23 15:42:29,570	INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-04-23_15-42-25_575692_75375/logs
2024-04-23 15:42:29,571	INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> TaskPoolMapOperator[Map(transform_row)] -> LimitOperator[limit=1]
[{'trip_duration': 506.0, 'trip_distance': 0.97, 'fare_amount': 9.3}]

map() 不同,map_batches() 是针对数据的一个批次(Batch)进行处理,它模拟在单机处理环境中对整个数据集的操作。map_batches() 的设计宗旨是为了方便开发者将原本为单机环境编写的程序代码无缝地迁移到 Ray 平台上。map_batches() 每个批次的数据格式为 Dict[str, np.ndarray]pd.DataFramepyarrow.Table,分别对应 NumPy 、pandas 和 Arrow。

下面的例子展示如何使用 pandas 的形式对每个批次(Batch)进行操作,其功能与刚才展示的 map() 类似案例,但是通过Pandas DataFrame来实现。

def transform_df(input_df: pd.DataFrame) -> pd.DataFrame:
    result_df = pd.DataFrame()
    result_df["trip_duration"] = (input_df["tpep_dropoff_datetime"] - input_df["tpep_pickup_datetime"]).dt.seconds
    result_df["trip_distance"] = input_df["trip_distance"]
    result_df["fare_amount"] = input_df["fare_amount"]
    return result_df

batch_ds = dataset.map_batches(transform_df, batch_format="pandas")
batch_ds.take(1)
2024-04-23 15:42:43,882	INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-04-23_15-42-25_575692_75375/logs
2024-04-23 15:42:43,882	INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> TaskPoolMapOperator[MapBatches(transform_df)] -> LimitOperator[limit=1]
[{'trip_duration': 506, 'trip_distance': 0.97, 'fare_amount': 9.3}]

在实现 map() 或者 map_batch() 时,也可以使用 Python 的 lambda 表达式,即一个匿名的 Python 函数。比如:

filtered_dataset = dataset.map_batches(lambda df: df[df["trip_distance"] > 4], batch_format="pandas")
print(f"过滤后的行数:{filtered_dataset.count()}")
2024-04-23 15:42:44,780	INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-04-23_15-42-25_575692_75375/logs
2024-04-23 15:42:44,780	INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> TaskPoolMapOperator[MapBatches(<lambda>)]
过滤后的行数:8109022

Task 与 Actor#

转换操作本质上执行函数 fn ,该函数接收收入数据,进行处理转换,产生输出数据。在默认设置下,Ray Data 通过 Ray Task 并行执行这些转换操作。Ray Task 适用于无状态的计算场景,即函数 fn 内部不依赖于外部数据。当涉及有状态的计算时,例如需要重复使用的数据或资源,应该使用 Ray Actor。例如,在机器学习模型预测的场景中,模型需要被加载并用于对不同的数据集进行预测。这种情况下,模型的状态是被反复使用的,因此属于有状态的计算。以下示例演示了模拟的机器学习模型预测过程。需要注意的是,示例中使用的并非一个训练好的模型,而是一个等价的变换 torch.nn.Identity(),它将输入数据直接作为输出,不做任何改变。

class TorchPredictor:

    def __init__(self):
        self.model = torch.nn.Identity()
        self.model.eval()

    def __call__(self, df: pd.DataFrame) -> Dict[str, np.ndarray]:
        pred = {}
        inputs = torch.as_tensor(df['trip_distance'], dtype=torch.float32)
        with torch.inference_mode():
            pred["output"] = self.model(inputs).detach().numpy()
        return pred

pred_ds = batch_ds.limit(100).map_batches(TorchPredictor, compute=ray.data.ActorPoolStrategy(size=2))
pred_ds.take(3)
2024-04-23 15:42:58,123	WARNING util.py:560 -- The argument ``compute`` is deprecated in Ray 2.9. Please specify argument ``concurrency`` instead. For more information, see https://docs.ray.io/en/master/data/transforming-data.html#stateful-transforms.
2024-04-23 15:42:58,126	INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-04-23_15-42-25_575692_75375/logs
2024-04-23 15:42:58,126	INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> TaskPoolMapOperator[MapBatches(transform_df)] -> LimitOperator[limit=100] -> ActorPoolMapOperator[MapBatches(TorchPredictor)] -> LimitOperator[limit=3]
2024-04-23 15:43:00,636	WARNING actor_pool_map_operator.py:292 -- To ensure full parallelization across an actor pool of size 2, the Dataset should consist of at least 2 distinct blocks. Consider increasing the parallelism when creating the Dataset.
[{'output': 0.9700000286102295},
 {'output': 1.100000023841858},
 {'output': 2.509999990463257}]

使用 Actor 大概分为 3 步骤:

  1. 创建一个类,这个类包含一个 __init__() 方法和一个 __call__() 方法。__init__() 方法初始化一些可被反复使用的状态数据,__call__() 方法实现转换操作。可以参考刚才实现的 TorchPredictor 类。

  2. 创建一个 ActorPoolStrategy,指定一共多少个 Worker。

  3. 调用 map_batch() 方法,将 ActorPoolStrategy 传递给 compute 参数。

分组#

数据处理中,另一个常用的操作是分组聚合。Ray Data 提供了 groupby() 函数。首先使用 groupby() 对数据按照特定字段进行分组,然后通过 map_groups() 对分组后的数据执行聚合操作。 groupby(key) 的参数 key 指定了用于分组的字段;map_groups(fn) 的参数 fn 定义了对同一组数据执行的操作。Ray Data 内置了一些聚合函数,包括常用的求和 sum()、最大值 max()、平均值 mean() 等。例如,可以使用 mean()value 字段进行聚合。

ds = ray.data.from_items([
    {"group": 1, "value": 1},
    {"group": 1, "value": 2},
    {"group": 2, "value": 3},
    {"group": 2, "value": 4}])
mean_ds = ds.groupby("group").mean("value")
mean_ds.show()
2024-04-23 15:43:00,727	INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-04-23_15-42-25_575692_75375/logs
2024-04-23 15:43:00,727	INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> AllToAllOperator[Aggregate] -> LimitOperator[limit=20]
{'group': 1, 'mean(value)': 1.5}
{'group': 2, 'mean(value)': 3.5}