8.2. 数据加载、查看与保存#

Ray Data 可以兼容多种多样的数据源,包括文件、内存数据和数据库。

Hide code cell content
import os
import shutil
from pathlib import Path

import sys
sys.path.append("..")
from utils import nyc_taxi

import ray

if ray.is_initialized:
    ray.shutdown()

ray.init()
2024-04-23 15:46:00,903	INFO worker.py:1740 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8267 

加载数据#

Ray Data 提供了很多预置的数据加载方法,包括读取文件、读取 pandas DataFrame 这种内存数据、读取数据库中的数据。这里我们以纽约出租车司机的案例来演示读取 Parquet 文件。

首先下载该数据,并使用 ray.data 提供的 read_parquet() 方法读取数据,得到一个 Dataset。。

dataset_path = nyc_taxi()
dataset = ray.data.read_parquet(dataset_path)
dataset.take(1)
Hide code cell output
2024-04-23 15:46:02,588	INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-04-23_15-45-59_228244_75431/logs
2024-04-23 15:46:02,589	INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> LimitOperator[limit=1]
[{'VendorID': 1,
  'tpep_pickup_datetime': datetime.datetime(2023, 2, 1, 0, 32, 53),
  'tpep_dropoff_datetime': datetime.datetime(2023, 2, 1, 0, 34, 34),
  'passenger_count': 2.0,
  'trip_distance': 0.3,
  'RatecodeID': 1.0,
  'store_and_fwd_flag': 'N',
  'PULocationID': 142,
  'DOLocationID': 163,
  'payment_type': 2,
  'fare_amount': 4.4,
  'extra': 3.5,
  'mta_tax': 0.5,
  'tip_amount': 0.0,
  'tolls_amount': 0.0,
  'improvement_surcharge': 1.0,
  'total_amount': 9.4,
  'congestion_surcharge': 2.5,
  'airport_fee': None}]

查看这份数据集的数据模式(Schema):

dataset.schema()
Column                 Type
------                 ----
VendorID               int64
tpep_pickup_datetime   timestamp[us]
tpep_dropoff_datetime  timestamp[us]
passenger_count        double
trip_distance          double
RatecodeID             double
store_and_fwd_flag     string
PULocationID           int64
DOLocationID           int64
payment_type           int64
fare_amount            double
extra                  double
mta_tax                double
tip_amount             double
tolls_amount           double
improvement_surcharge  double
total_amount           double
congestion_surcharge   double
airport_fee            double

其他类型的文件格式(CSV、TFRecord 等)读取方法如 表 8.1 所示。

表 8.1 Ray Data 数据读取方法#

Parquet

Text

CSV

TFRecord

二进制

方法

read_parquet()

read_text()

read_csv()

read_tfrecords()

read_binary_files()

列裁剪与行裁剪#

原始文件有很多列(Column),如果我们只关心某些特定的列,比如 passenger_counttip_amountpayment_type 等,可以使用 read_parquet() 方法的 columns 参数。

dataset = ray.data.read_parquet(
    dataset_path, 
    columns=["passenger_count", "tip_amount", "payment_type"]
)
dataset.schema()
Column           Type
------           ----
passenger_count  double
tip_amount       double
payment_type     int64

加了 columns 限制后,只有我们关心的列被读取,其他列不会被读取,即列裁剪。除了列裁剪,Ray Data 也支持行裁剪,即满足特定条件的行被读取,比如 tip_amount 大于 6.0 的行被过滤出来:

import pyarrow as pa

dataset = ray.data.read_parquet(
    dataset_path,
    columns=["passenger_count", "tip_amount", "payment_type"],
    filter=pa.dataset.field("tip_amount") > 6.0
)
dataset.show(limit=2)
2024-04-23 15:46:04,082	INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-04-23_15-45-59_228244_75431/logs
2024-04-23 15:46:04,083	INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> LimitOperator[limit=2]
{'passenger_count': 1.0, 'tip_amount': 8.82, 'payment_type': 1}
{'passenger_count': 1.0, 'tip_amount': 16.35, 'payment_type': 1}

并行度#

章节 8.1 我们提到,Ray Data 背后使用 Task 或 Actor 将数据处理过程并行化,在数据读取时,可以设置 parallelism 参数,以优化并行数据处理过程。Ray Data 提供的各类数据读取方法(比如,read_parquet()),都可以设置 parallelism 参数,来控制底层的并行执行的过程。如果不设置 parallelism,Ray Data 通过以下方式试探 parallelism

  1. Ray 获取集群中可用的 CPU 核数。

  2. parallelism 被设置为 CPU 核数的 2 倍。如果 parallelism 小于 8,则设置为 8。

  3. 估计每个 Block 的大小,如果每个 Block 平均大于 512 MiB,Ray 增大 parallelism,直到每个 Block 小于 512 MiB。

使用者也可以根据数据的实际情况,手动设置 parallelism,比如 ray.data.read_parquet(path, parallelism=512) 将强制生成 512 个 Ray Task 并行地读取数据。

查看数据#

查看数据包括查看数据的模式、数据的某些行或某些批次。比如,刚才使用的 show() 方法,以及接下来将要介绍的 count()take() 等。

查看数据集的样本数量:

dataset.count()
5324349

查看数据的某几行,可以使用 Dataset.take()Dataset.take_all() 两个方法。take() 方法把 Dataset 中的某一行取出,以字典的形式打印出来,字典的 Key 是字段名,字典的 Value 是对应的值。

dataset.take(limit=1)
2024-04-23 15:46:06,493	INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-04-23_15-45-59_228244_75431/logs
2024-04-23 15:46:06,493	INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> LimitOperator[limit=1]
[{'passenger_count': 1.0, 'tip_amount': 15.0, 'payment_type': 1}]

也可以将 Dataset 拆分为小的批次(Batch),使用 Dataset.take_batch() 查看一个 Batch 的数据。take_batch() 方法的一个重要参数是 batch_size,用来设置 Batch 大小。

batch = dataset.take_batch(batch_size=2)
batch
2024-04-23 15:46:06,732	INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-04-23_15-45-59_228244_75431/logs
2024-04-23 15:46:06,732	INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> LimitOperator[limit=2]
{'passenger_count': array([1., 1.]),
 'tip_amount': array([15., 10.]),
 'payment_type': array([1, 1])}

迭代数据#

顺序迭代#

Ray Data 提供了迭代数据的方法,Dataset.iter_rows()Dataset.iter_batches()iter_rows() 迭代每一行,iter_batches() 迭代每一个批次。

比如,我们迭代前 5 行:

cnt = 0
for row in dataset.iter_rows():
    cnt += 1
    if cnt > 5:
        break
    print(row)
{'passenger_count': 1.0, 'tip_amount': 15.0, 'payment_type': 1}
{'passenger_count': 1.0, 'tip_amount': 10.0, 'payment_type': 1}
{'passenger_count': 1.0, 'tip_amount': 10.74, 'payment_type': 1}
{'passenger_count': 4.0, 'tip_amount': 7.75, 'payment_type': 1}
{'passenger_count': 1.0, 'tip_amount': 6.22, 'payment_type': 1}

或者迭代前 5 个批次:

cnt = 0
for batch in dataset.iter_batches(batch_size=2):
    cnt += 1
    if cnt > 5:
        break
    print(batch)
{'passenger_count': array([1., 1.]), 'tip_amount': array([15., 10.]), 'payment_type': array([1, 1])}
{'passenger_count': array([1., 4.]), 'tip_amount': array([10.74,  7.75]), 'payment_type': array([1, 1])}
{'passenger_count': array([1., 2.]), 'tip_amount': array([ 6.22, 13.26]), 'payment_type': array([1, 1])}
{'passenger_count': array([1., 1.]), 'tip_amount': array([15.85,  6.36]), 'payment_type': array([1, 1])}
{'passenger_count': array([1., 1.]), 'tip_amount': array([ 6.6 , 15.05]), 'payment_type': array([1, 1])}

深度学习框架 PyTorch 和 TensorFlow 经常对批次数据进行训练或推理,Ray Data 为了更好地与之融合,提供了 Dataset.iter_torch_batches()Dataset.iter_tf_batches() 方法,这两个方法会把数据转化为 PyTorch 和 TensorFlow 的 Tensor 数据格式。

cnt = 0
for batch in dataset.iter_torch_batches(batch_size=2):
    cnt += 1
    if cnt > 5:
        break
    print(batch)
{'passenger_count': tensor([1., 1.], dtype=torch.float64), 'tip_amount': tensor([15., 10.], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([1., 4.], dtype=torch.float64), 'tip_amount': tensor([10.7400,  7.7500], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([1., 2.], dtype=torch.float64), 'tip_amount': tensor([ 6.2200, 13.2600], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([1., 1.], dtype=torch.float64), 'tip_amount': tensor([15.8500,  6.3600], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([1., 1.], dtype=torch.float64), 'tip_amount': tensor([ 6.6000, 15.0500], dtype=torch.float64), 'payment_type': tensor([1, 1])}

随机迭代#

机器学习中经常要对样本数据进行随机洗牌(Shuffle),Ray Data 提供了两种方式:

  • 全量数据 Shuffle

  • 本地缓存 Shuffle

Dataset.random_shuffle() 方法对全量所有数据进行洗牌,这意味着散落在不同计算节点的数据将相互交换,将产生大量节点间通信开销,速度很慢。

本地缓存 Shuffle 指在计算节点上使用一块缓存区域,缓存区域的数据进行洗牌,这意味着随机性会大大降低,但性能会比全量数据 Shuffle 速度快很多,节点间通信开销也大大降低。我们只需要在迭代方法中使用 local_shuffle_buffer_size 参数;并用 local_shuffle_seed 设置随机种子。

下面的例子中,设置了一块缓存区域,该缓存区域至少包含 250 行数据,也就是说,在至少 250 行数据上洗牌。

cnt = 0
for batch in dataset.iter_torch_batches(batch_size=2, local_shuffle_buffer_size=250):
    cnt += 1
    if cnt > 5:
        break
    print(batch)
{'passenger_count': tensor([2., 2.], dtype=torch.float64), 'tip_amount': tensor([8.1800, 8.0000], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([4., 1.], dtype=torch.float64), 'tip_amount': tensor([20.1400, 10.8900], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([1., 1.], dtype=torch.float64), 'tip_amount': tensor([15.8600,  7.9500], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([2., 1.], dtype=torch.float64), 'tip_amount': tensor([15.0000, 10.9500], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([1., 1.], dtype=torch.float64), 'tip_amount': tensor([10.5600,  6.6500], dtype=torch.float64), 'payment_type': tensor([1, 1])}

Note

本地缓存 Shuffle 是一种在随机度和性能之间平衡的策略。在机器学习中,随机度越高,机器学习模型的准确度越高。

保存数据#

数据的保存可分为两类:

  • 数据保存到本地或共享的文件系统,比如本地文件系统或 S3。

  • 数据转化为其他格式的数据或写入特定的数据库,比如 pandas 或 MongoDB。

写入文件系统#

使用 HDFS、S3 或者其他文件系统时,Ray Data 遵守 表 4.2 中提及的 URI 和文件系统 Scheme 标准,应在 URI 中明确 Scheme 信息。

表 8.2 列举了几个将 Dataset 保存为不同文件格式的 API。

表 8.2 将 Dataset 写入文件系统#

Parquet

CSV

JSON

TFRecord

方法

Dataset.write_parquet()

Dataset.write_csv()

Dataset.write_json()

Dataset.write_tfrecords()

将数据持久化到文件系统时,注意写明文件系统的 Scheme。比如,写入本地的 /tmp/trip 目录:

dataset.write_parquet("local:///tmp/trip")
2024-04-23 15:46:08,067	INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-04-23_15-45-59_228244_75431/logs
2024-04-23 15:46:08,067	INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> TaskPoolMapOperator[Write]

默认情况下,Ray Data 在写数据时,是以多个文件的形式写入文件系统,每个 Block 对应一个文件,有多少个 Block 就会有多少个文件。可以使用 repartition() 修改文件数量。

if os.path.exists("/tmp/files/"):
    shutil.rmtree("/tmp/files/")
dataset.repartition(3).write_csv("/tmp/files/")
print(os.listdir("/tmp/files/"))
2024-04-23 15:46:08,669	INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-04-23_15-45-59_228244_75431/logs
2024-04-23 15:46:08,669	INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> AllToAllOperator[Repartition] -> TaskPoolMapOperator[Write]
['10_000002_000000.csv', '10_000000_000000.csv', '10_000001_000000.csv']

转化成其他框架格式#

我们可以将 Ray Data 的数据转化为单机运行的 pandas DataFrame 或者分布式的 Dask DataFrame,如 表 8.3 所示。

表 8.3 将 Dataset 保存为其他框架数据格式#

pandas

Dask

Spark

方法

Dataset.to_pandas()

Dataset.to_dask()

Dataset.to_spark()