8.2. 数据加载、查看与保存#
Ray Data 可以兼容多种多样的数据源,包括文件、内存数据和数据库。
Show code cell content
import os
import shutil
from pathlib import Path
import sys
sys.path.append("..")
from utils import nyc_taxi
import ray
if ray.is_initialized:
ray.shutdown()
ray.init()
2024-04-23 15:46:00,903 INFO worker.py:1740 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8267
加载数据#
Ray Data 提供了很多预置的数据加载方法,包括读取文件、读取 pandas DataFrame 这种内存数据、读取数据库中的数据。这里我们以纽约出租车司机的案例来演示读取 Parquet 文件。
首先下载该数据,并使用 ray.data
提供的 read_parquet()
方法读取数据,得到一个 Dataset
。。
dataset_path = nyc_taxi()
dataset = ray.data.read_parquet(dataset_path)
dataset.take(1)
Show code cell output
2024-04-23 15:46:02,588 INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-04-23_15-45-59_228244_75431/logs
2024-04-23 15:46:02,589 INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> LimitOperator[limit=1]
[{'VendorID': 1,
'tpep_pickup_datetime': datetime.datetime(2023, 2, 1, 0, 32, 53),
'tpep_dropoff_datetime': datetime.datetime(2023, 2, 1, 0, 34, 34),
'passenger_count': 2.0,
'trip_distance': 0.3,
'RatecodeID': 1.0,
'store_and_fwd_flag': 'N',
'PULocationID': 142,
'DOLocationID': 163,
'payment_type': 2,
'fare_amount': 4.4,
'extra': 3.5,
'mta_tax': 0.5,
'tip_amount': 0.0,
'tolls_amount': 0.0,
'improvement_surcharge': 1.0,
'total_amount': 9.4,
'congestion_surcharge': 2.5,
'airport_fee': None}]
查看这份数据集的数据模式(Schema):
dataset.schema()
Column Type
------ ----
VendorID int64
tpep_pickup_datetime timestamp[us]
tpep_dropoff_datetime timestamp[us]
passenger_count double
trip_distance double
RatecodeID double
store_and_fwd_flag string
PULocationID int64
DOLocationID int64
payment_type int64
fare_amount double
extra double
mta_tax double
tip_amount double
tolls_amount double
improvement_surcharge double
total_amount double
congestion_surcharge double
airport_fee double
其他类型的文件格式(CSV、TFRecord 等)读取方法如 表 8.1 所示。
Parquet |
Text |
CSV |
TFRecord |
二进制 |
|
---|---|---|---|---|---|
方法 |
列裁剪与行裁剪#
原始文件有很多列(Column),如果我们只关心某些特定的列,比如 passenger_count
、tip_amount
、payment_type
等,可以使用 read_parquet()
方法的 columns
参数。
dataset = ray.data.read_parquet(
dataset_path,
columns=["passenger_count", "tip_amount", "payment_type"]
)
dataset.schema()
Column Type
------ ----
passenger_count double
tip_amount double
payment_type int64
加了 columns
限制后,只有我们关心的列被读取,其他列不会被读取,即列裁剪。除了列裁剪,Ray Data 也支持行裁剪,即满足特定条件的行被读取,比如 tip_amount
大于 6.0 的行被过滤出来:
import pyarrow as pa
dataset = ray.data.read_parquet(
dataset_path,
columns=["passenger_count", "tip_amount", "payment_type"],
filter=pa.dataset.field("tip_amount") > 6.0
)
dataset.show(limit=2)
2024-04-23 15:46:04,082 INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-04-23_15-45-59_228244_75431/logs
2024-04-23 15:46:04,083 INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> LimitOperator[limit=2]
{'passenger_count': 1.0, 'tip_amount': 8.82, 'payment_type': 1}
{'passenger_count': 1.0, 'tip_amount': 16.35, 'payment_type': 1}
并行度#
章节 8.1 我们提到,Ray Data 背后使用 Task 或 Actor 将数据处理过程并行化,在数据读取时,可以设置 parallelism
参数,以优化并行数据处理过程。Ray Data 提供的各类数据读取方法(比如,read_parquet()
),都可以设置 parallelism
参数,来控制底层的并行执行的过程。如果不设置 parallelism
,Ray Data 通过以下方式试探 parallelism
:
Ray 获取集群中可用的 CPU 核数。
parallelism
被设置为 CPU 核数的 2 倍。如果parallelism
小于 8,则设置为 8。估计每个
Block
的大小,如果每个Block
平均大于 512 MiB,Ray 增大parallelism
,直到每个Block
小于 512 MiB。
使用者也可以根据数据的实际情况,手动设置 parallelism
,比如 ray.data.read_parquet(path, parallelism=512)
将强制生成 512 个 Ray Task 并行地读取数据。
查看数据#
查看数据包括查看数据的模式、数据的某些行或某些批次。比如,刚才使用的 show()
方法,以及接下来将要介绍的 count()
、take()
等。
查看数据集的样本数量:
dataset.count()
5324349
查看数据的某几行,可以使用 Dataset.take()
或 Dataset.take_all()
两个方法。take()
方法把 Dataset
中的某一行取出,以字典的形式打印出来,字典的 Key 是字段名,字典的 Value 是对应的值。
dataset.take(limit=1)
2024-04-23 15:46:06,493 INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-04-23_15-45-59_228244_75431/logs
2024-04-23 15:46:06,493 INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> LimitOperator[limit=1]
[{'passenger_count': 1.0, 'tip_amount': 15.0, 'payment_type': 1}]
也可以将 Dataset
拆分为小的批次(Batch),使用 Dataset.take_batch()
查看一个 Batch 的数据。take_batch()
方法的一个重要参数是 batch_size
,用来设置 Batch 大小。
batch = dataset.take_batch(batch_size=2)
batch
2024-04-23 15:46:06,732 INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-04-23_15-45-59_228244_75431/logs
2024-04-23 15:46:06,732 INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> LimitOperator[limit=2]
{'passenger_count': array([1., 1.]),
'tip_amount': array([15., 10.]),
'payment_type': array([1, 1])}
迭代数据#
顺序迭代#
Ray Data 提供了迭代数据的方法,Dataset.iter_rows()
和 Dataset.iter_batches()
,iter_rows()
迭代每一行,iter_batches()
迭代每一个批次。
比如,我们迭代前 5 行:
cnt = 0
for row in dataset.iter_rows():
cnt += 1
if cnt > 5:
break
print(row)
{'passenger_count': 1.0, 'tip_amount': 15.0, 'payment_type': 1}
{'passenger_count': 1.0, 'tip_amount': 10.0, 'payment_type': 1}
{'passenger_count': 1.0, 'tip_amount': 10.74, 'payment_type': 1}
{'passenger_count': 4.0, 'tip_amount': 7.75, 'payment_type': 1}
{'passenger_count': 1.0, 'tip_amount': 6.22, 'payment_type': 1}
或者迭代前 5 个批次:
cnt = 0
for batch in dataset.iter_batches(batch_size=2):
cnt += 1
if cnt > 5:
break
print(batch)
{'passenger_count': array([1., 1.]), 'tip_amount': array([15., 10.]), 'payment_type': array([1, 1])}
{'passenger_count': array([1., 4.]), 'tip_amount': array([10.74, 7.75]), 'payment_type': array([1, 1])}
{'passenger_count': array([1., 2.]), 'tip_amount': array([ 6.22, 13.26]), 'payment_type': array([1, 1])}
{'passenger_count': array([1., 1.]), 'tip_amount': array([15.85, 6.36]), 'payment_type': array([1, 1])}
{'passenger_count': array([1., 1.]), 'tip_amount': array([ 6.6 , 15.05]), 'payment_type': array([1, 1])}
深度学习框架 PyTorch 和 TensorFlow 经常对批次数据进行训练或推理,Ray Data 为了更好地与之融合,提供了 Dataset.iter_torch_batches()
和 Dataset.iter_tf_batches()
方法,这两个方法会把数据转化为 PyTorch 和 TensorFlow 的 Tensor
数据格式。
cnt = 0
for batch in dataset.iter_torch_batches(batch_size=2):
cnt += 1
if cnt > 5:
break
print(batch)
{'passenger_count': tensor([1., 1.], dtype=torch.float64), 'tip_amount': tensor([15., 10.], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([1., 4.], dtype=torch.float64), 'tip_amount': tensor([10.7400, 7.7500], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([1., 2.], dtype=torch.float64), 'tip_amount': tensor([ 6.2200, 13.2600], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([1., 1.], dtype=torch.float64), 'tip_amount': tensor([15.8500, 6.3600], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([1., 1.], dtype=torch.float64), 'tip_amount': tensor([ 6.6000, 15.0500], dtype=torch.float64), 'payment_type': tensor([1, 1])}
随机迭代#
机器学习中经常要对样本数据进行随机洗牌(Shuffle),Ray Data 提供了两种方式:
全量数据 Shuffle
本地缓存 Shuffle
Dataset.random_shuffle()
方法对全量所有数据进行洗牌,这意味着散落在不同计算节点的数据将相互交换,将产生大量节点间通信开销,速度很慢。
本地缓存 Shuffle 指在计算节点上使用一块缓存区域,缓存区域的数据进行洗牌,这意味着随机性会大大降低,但性能会比全量数据 Shuffle 速度快很多,节点间通信开销也大大降低。我们只需要在迭代方法中使用 local_shuffle_buffer_size
参数;并用 local_shuffle_seed
设置随机种子。
下面的例子中,设置了一块缓存区域,该缓存区域至少包含 250 行数据,也就是说,在至少 250 行数据上洗牌。
cnt = 0
for batch in dataset.iter_torch_batches(batch_size=2, local_shuffle_buffer_size=250):
cnt += 1
if cnt > 5:
break
print(batch)
{'passenger_count': tensor([2., 2.], dtype=torch.float64), 'tip_amount': tensor([8.1800, 8.0000], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([4., 1.], dtype=torch.float64), 'tip_amount': tensor([20.1400, 10.8900], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([1., 1.], dtype=torch.float64), 'tip_amount': tensor([15.8600, 7.9500], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([2., 1.], dtype=torch.float64), 'tip_amount': tensor([15.0000, 10.9500], dtype=torch.float64), 'payment_type': tensor([1, 1])}
{'passenger_count': tensor([1., 1.], dtype=torch.float64), 'tip_amount': tensor([10.5600, 6.6500], dtype=torch.float64), 'payment_type': tensor([1, 1])}
Note
本地缓存 Shuffle 是一种在随机度和性能之间平衡的策略。在机器学习中,随机度越高,机器学习模型的准确度越高。
保存数据#
数据的保存可分为两类:
数据保存到本地或共享的文件系统,比如本地文件系统或 S3。
数据转化为其他格式的数据或写入特定的数据库,比如 pandas 或 MongoDB。
写入文件系统#
使用 HDFS、S3 或者其他文件系统时,Ray Data 遵守 表 4.2 中提及的 URI 和文件系统 Scheme 标准,应在 URI 中明确 Scheme 信息。
表 8.2 列举了几个将 Dataset
保存为不同文件格式的 API。
Parquet |
CSV |
JSON |
TFRecord |
|
---|---|---|---|---|
方法 |
将数据持久化到文件系统时,注意写明文件系统的 Scheme。比如,写入本地的 /tmp/trip
目录:
dataset.write_parquet("local:///tmp/trip")
2024-04-23 15:46:08,067 INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-04-23_15-45-59_228244_75431/logs
2024-04-23 15:46:08,067 INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> TaskPoolMapOperator[Write]
默认情况下,Ray Data 在写数据时,是以多个文件的形式写入文件系统,每个 Block
对应一个文件,有多少个 Block
就会有多少个文件。可以使用 repartition()
修改文件数量。
if os.path.exists("/tmp/files/"):
shutil.rmtree("/tmp/files/")
dataset.repartition(3).write_csv("/tmp/files/")
print(os.listdir("/tmp/files/"))
2024-04-23 15:46:08,669 INFO streaming_executor.py:112 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-04-23_15-45-59_228244_75431/logs
2024-04-23 15:46:08,669 INFO streaming_executor.py:113 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> AllToAllOperator[Repartition] -> TaskPoolMapOperator[Write]
['10_000002_000000.csv', '10_000000_000000.csv', '10_000001_000000.csv']
转化成其他框架格式#
我们可以将 Ray Data 的数据转化为单机运行的 pandas DataFrame 或者分布式的 Dask DataFrame,如 表 8.3 所示。
pandas |
Dask |
Spark |
|
---|---|---|---|
方法 |