8.2. 数据加载、查看与保存#
Ray Data 可以兼容多种多样的数据源,包括文件、内存数据和数据库。
Show code cell content
import os
import shutil
from pathlib import Path
import sys
sys.path.append("..")
from utils import nyc_taxi
import ray
if ray.is_initialized:
    ray.shutdown()
ray.init()
2024-04-23 15:46:00,903	INFO worker.py:1740 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8267 
加载数据#
Ray Data 提供了很多预置的数据加载方法,包括读取文件、读取 pandas DataFrame 这种内存数据、读取数据库中的数据。这里我们以纽约出租车司机的案例来演示读取 Parquet 文件。
首先下载该数据,并使用 ray.data 提供的 read_parquet() 方法读取数据,得到一个 Dataset。。
dataset_path = nyc_taxi()
dataset = ray.data.read_parquet(dataset_path)
dataset.take(1)
Show code cell output
2024-04-23 15:46:02,588	INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-04-23_15-45-59_228244_75431/logs
2024-04-23 15:46:02,589	INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> LimitOperator[limit=1]
[{'VendorID': 1,
  'tpep_pickup_datetime': datetime.datetime(2023, 2, 1, 0, 32, 53),
  'tpep_dropoff_datetime': datetime.datetime(2023, 2, 1, 0, 34, 34),
  'passenger_count': 2.0,
  'trip_distance': 0.3,
  'RatecodeID': 1.0,
  'store_and_fwd_flag': 'N',
  'PULocationID': 142,
  'DOLocationID': 163,
  'payment_type': 2,
  'fare_amount': 4.4,
  'extra': 3.5,
  'mta_tax': 0.5,
  'tip_amount': 0.0,
  'tolls_amount': 0.0,
  'improvement_surcharge': 1.0,
  'total_amount': 9.4,
  'congestion_surcharge': 2.5,
  'airport_fee': None}]
查看这份数据集的数据模式(Schema):
dataset.schema()
Column                 Type
------                 ----
VendorID               int64
tpep_pickup_datetime   timestamp[us]
tpep_dropoff_datetime  timestamp[us]
passenger_count        double
trip_distance          double
RatecodeID             double
store_and_fwd_flag     string
PULocationID           int64
DOLocationID           int64
payment_type           int64
fare_amount            double
extra                  double
mta_tax                double
tip_amount             double
tolls_amount           double
improvement_surcharge  double
total_amount           double
congestion_surcharge   double
airport_fee            double
其他类型的文件格式(CSV、TFRecord 等)读取方法如 表 8.1 所示。
| Parquet | Text | CSV | TFRecord | 二进制 | |
|---|---|---|---|---|---|
| 方法 | 
列裁剪与行裁剪#
原始文件有很多列(Column),如果我们只关心某些特定的列,比如 passenger_count、tip_amount、payment_type 等,可以使用 read_parquet() 方法的 columns 参数。
dataset = ray.data.read_parquet(
    dataset_path, 
    columns=["passenger_count", "tip_amount", "payment_type"]
)
dataset.schema()
Column           Type
------           ----
passenger_count  double
tip_amount       double
payment_type     int64
加了 columns 限制后,只有我们关心的列被读取,其他列不会被读取,即列裁剪。除了列裁剪,Ray Data 也支持行裁剪,即满足特定条件的行被读取,比如 tip_amount 大于 6.0 的行被过滤出来:
import pyarrow as pa
dataset = ray.data.read_parquet(
    dataset_path,
    columns=["passenger_count", "tip_amount", "payment_type"],
    filter=pa.dataset.field("tip_amount") > 6.0
)
dataset.show(limit=2)
2024-04-23 15:46:04,082	INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-04-23_15-45-59_228244_75431/logs
2024-04-23 15:46:04,083	INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> LimitOperator[limit=2]
{'passenger_count': 1.0, 'tip_amount': 8.82, 'payment_type': 1}
{'passenger_count': 1.0, 'tip_amount': 16.35, 'payment_type': 1}
并行度#
章节 8.1 我们提到,Ray Data 背后使用 Task 或 Actor 将数据处理过程并行化,在数据读取时,可以设置 parallelism 参数,以优化并行数据处理过程。Ray Data 提供的各类数据读取方法(比如,read_parquet()),都可以设置 parallelism 参数,来控制底层的并行执行的过程。如果不设置 parallelism,Ray Data 通过以下方式试探 parallelism:
- Ray 获取集群中可用的 CPU 核数。 
- parallelism被设置为 CPU 核数的 2 倍。如果- parallelism小于 8,则设置为 8。
- 估计每个 - Block的大小,如果每个- Block平均大于 512 MiB,Ray 增大- parallelism,直到每个- Block小于 512 MiB。
使用者也可以根据数据的实际情况,手动设置 parallelism,比如 ray.data.read_parquet(path, parallelism=512) 将强制生成 512 个 Ray Task 并行地读取数据。
查看数据#
查看数据包括查看数据的模式、数据的某些行或某些批次。比如,刚才使用的 show() 方法,以及接下来将要介绍的 count()、take() 等。
查看数据集的样本数量:
dataset.count()
5324349
查看数据的某几行,可以使用 Dataset.take() 或 Dataset.take_all() 两个方法。take() 方法把 Dataset 中的某一行取出,以字典的形式打印出来,字典的 Key 是字段名,字典的 Value 是对应的值。
dataset.take(limit=1)
2024-04-23 15:46:06,493	INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-04-23_15-45-59_228244_75431/logs
2024-04-23 15:46:06,493	INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> LimitOperator[limit=1]
[{'passenger_count': 1.0, 'tip_amount': 15.0, 'payment_type': 1}]
也可以将 Dataset 拆分为小的批次(Batch),使用 Dataset.take_batch() 查看一个 Batch 的数据。take_batch() 方法的一个重要参数是 batch_size,用来设置 Batch 大小。
batch = dataset.take_batch(batch_size=2)
batch
2024-04-23 15:46:06,732	INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-04-23_15-45-59_228244_75431/logs
2024-04-23 15:46:06,732	INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> LimitOperator[limit=2]
{'passenger_count': array([1., 1.]),
 'tip_amount': array([15., 10.]),
 'payment_type': array([1, 1])}
迭代数据#
顺序迭代#
Ray Data 提供了迭代数据的方法,Dataset.iter_rows() 和 Dataset.iter_batches(),iter_rows() 迭代每一行,iter_batches() 迭代每一个批次。
比如,我们迭代前 5 行:
cnt = 0
for row in dataset.iter_rows():
    cnt += 1
    if cnt > 5:
        break
    print(row)
{'passenger_count': 1.0, 'tip_amount': 15.0, 'payment_type': 1}
{'passenger_count': 1.0, 'tip_amount': 10.0, 'payment_type': 1}
{'passenger_count': 1.0, 'tip_amount': 10.74, 'payment_type': 1}
{'passenger_count': 4.0, 'tip_amount': 7.75, 'payment_type': 1}
{'passenger_count': 1.0, 'tip_amount': 6.22, 'payment_type': 1}
或者迭代前 5 个批次:
cnt = 0
for batch in dataset.iter_batches(batch_size=2):
    cnt += 1
    if cnt > 5:
        break
    print(batch)
{'passenger_count': array([1., 1.]), 'tip_amount': array([15., 10.]), 'payment_type': array([1, 1])}
{'passenger_count': array([1., 4.]), 'tip_amount': array([10.74,  7.75]), 'payment_type': array([1, 1])}
{'passenger_count': array([1., 2.]), 'tip_amount': array([ 6.22, 13.26]), 'payment_type': array([1, 1])}
{'passenger_count': array([1., 1.]), 'tip_amount': array([15.85,  6.36]), 'payment_type': array([1, 1])}
{'passenger_count': array([1., 1.]), 'tip_amount': array([ 6.6 , 15.05]), 'payment_type': array([1, 1])}
深度学习框架 PyTorch 和 TensorFlow 经常对批次数据进行训练或推理,Ray Data 为了更好地与之融合,提供了 Dataset.iter_torch_batches() 和 Dataset.iter_tf_batches() 方法,这两个方法会把数据转化为 PyTorch 和 TensorFlow 的 Tensor 数据格式。
cnt = 0
for batch in dataset.iter_torch_batches(batch_size=2):
    cnt += 1
    if cnt > 5:
        break
    print(batch)
{'passenger_count': tensor([1., 1.], dtype=torch.float64), 'tip_amount': tensor([15., 10.], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([1., 4.], dtype=torch.float64), 'tip_amount': tensor([10.7400,  7.7500], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([1., 2.], dtype=torch.float64), 'tip_amount': tensor([ 6.2200, 13.2600], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([1., 1.], dtype=torch.float64), 'tip_amount': tensor([15.8500,  6.3600], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([1., 1.], dtype=torch.float64), 'tip_amount': tensor([ 6.6000, 15.0500], dtype=torch.float64), 'payment_type': tensor([1, 1])}
随机迭代#
机器学习中经常要对样本数据进行随机洗牌(Shuffle),Ray Data 提供了两种方式:
- 全量数据 Shuffle 
- 本地缓存 Shuffle 
Dataset.random_shuffle() 方法对全量所有数据进行洗牌,这意味着散落在不同计算节点的数据将相互交换,将产生大量节点间通信开销,速度很慢。
本地缓存 Shuffle 指在计算节点上使用一块缓存区域,缓存区域的数据进行洗牌,这意味着随机性会大大降低,但性能会比全量数据 Shuffle 速度快很多,节点间通信开销也大大降低。我们只需要在迭代方法中使用 local_shuffle_buffer_size 参数;并用 local_shuffle_seed 设置随机种子。
下面的例子中,设置了一块缓存区域,该缓存区域至少包含 250 行数据,也就是说,在至少 250 行数据上洗牌。
cnt = 0
for batch in dataset.iter_torch_batches(batch_size=2, local_shuffle_buffer_size=250):
    cnt += 1
    if cnt > 5:
        break
    print(batch)
{'passenger_count': tensor([2., 2.], dtype=torch.float64), 'tip_amount': tensor([8.1800, 8.0000], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([4., 1.], dtype=torch.float64), 'tip_amount': tensor([20.1400, 10.8900], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([1., 1.], dtype=torch.float64), 'tip_amount': tensor([15.8600,  7.9500], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([2., 1.], dtype=torch.float64), 'tip_amount': tensor([15.0000, 10.9500], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([1., 1.], dtype=torch.float64), 'tip_amount': tensor([10.5600,  6.6500], dtype=torch.float64), 'payment_type': tensor([1, 1])}
Note
本地缓存 Shuffle 是一种在随机度和性能之间平衡的策略。在机器学习中,随机度越高,机器学习模型的准确度越高。
保存数据#
数据的保存可分为两类:
- 数据保存到本地或共享的文件系统,比如本地文件系统或 S3。 
- 数据转化为其他格式的数据或写入特定的数据库,比如 pandas 或 MongoDB。 
写入文件系统#
使用 HDFS、S3 或者其他文件系统时,Ray Data 遵守 表 4.2 中提及的 URI 和文件系统 Scheme 标准,应在 URI 中明确 Scheme 信息。
表 8.2 列举了几个将 Dataset 保存为不同文件格式的 API。
| Parquet | CSV | JSON | TFRecord | |
|---|---|---|---|---|
| 方法 | 
将数据持久化到文件系统时,注意写明文件系统的 Scheme。比如,写入本地的 /tmp/trip 目录:
dataset.write_parquet("local:///tmp/trip")
2024-04-23 15:46:08,067	INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-04-23_15-45-59_228244_75431/logs
2024-04-23 15:46:08,067	INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> TaskPoolMapOperator[Write]
默认情况下,Ray Data 在写数据时,是以多个文件的形式写入文件系统,每个 Block 对应一个文件,有多少个 Block 就会有多少个文件。可以使用 repartition() 修改文件数量。
if os.path.exists("/tmp/files/"):
    shutil.rmtree("/tmp/files/")
dataset.repartition(3).write_csv("/tmp/files/")
print(os.listdir("/tmp/files/"))
2024-04-23 15:46:08,669	INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-04-23_15-45-59_228244_75431/logs
2024-04-23 15:46:08,669	INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> AllToAllOperator[Repartition] -> TaskPoolMapOperator[Write]
['10_000002_000000.csv', '10_000000_000000.csv', '10_000001_000000.csv']
转化成其他框架格式#
我们可以将 Ray Data 的数据转化为单机运行的 pandas DataFrame 或者分布式的 Dask DataFrame,如 表 8.3 所示。
| pandas | Dask | Spark | |
|---|---|---|---|
| 方法 | 
