6.3. 分布式对象存储#
Ray 分布式计算中涉及共享数据可被放在分布式对象存储(Distributed Ojbect Store)中,被放置在分布式对象存储中的数据被称为远程对象(Remote Object)中。我们可以使用 ray.get()
和 ray.put()
读写这些 Remote Object。与内存中的 Python 对象实例不同,Remote Object 是不可原地直接更改的(Immutable)。
ray.put()
与 ray.get()
#
Show code cell content
import logging
import random
from typing import Tuple
import numpy as np
import pandas as pd
import ray
import torch
if ray.is_initialized:
ray.shutdown()
ray.init(logging_level=logging.ERROR)
如 图 6.4 所示,操作 Remote Object 主要有 ray.put()
和 ray.get()
两个 API。
ray.put()
把某个计算节点中的对象数据进行序列化,并将其写入到 Ray 集群的分布式对象存储中,返回一个RefObjectID
,RefObjectID
是指向这个 Remote Object 的指针。我们可以通过引用这个RefObjectID
,在 Remote Function 或 Remote Class 中分布式地使用这个数据对象。ray.get()
使用RefObjectID
把数据从分布式对象存储中拉取回来,并进行反序列化。
def create_rand_tensor(size: Tuple[int, int, int]) -> torch.tensor:
return torch.randn(size=(size), dtype=torch.float)
torch.manual_seed(42)
# 创建 16 个张量,每个张量大小为 (X, 8, 8)
tensor_obj_ref_list = [ray.put(create_rand_tensor((i, 8, 8))) for i in range(1, 16)]
tensor_obj_ref_list[0], len(tensor_obj_ref_list)
(ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000001e1f505), 15)
使用 ray.get()
从分布式对象存储中拉取数据。
val = ray.get(tensor_obj_ref_list[0])
val.size(), val
(torch.Size([1, 8, 8]),
tensor([[[ 1.9269, 1.4873, 0.9007, -2.1055, 0.6784, -1.2345, -0.0431,
-1.6047],
[-0.7521, 1.6487, -0.3925, -1.4036, -0.7279, -0.5594, -0.7688,
0.7624],
[ 1.6423, -0.1596, -0.4974, 0.4396, -0.7581, 1.0783, 0.8008,
1.6806],
[ 1.2791, 1.2964, 0.6105, 1.3347, -0.2316, 0.0418, -0.2516,
0.8599],
[-1.3847, -0.8712, -0.2234, 1.7174, 0.3189, -0.4245, 0.3057,
-0.7746],
[-1.5576, 0.9956, -0.8798, -0.6011, -1.2742, 2.1228, -1.2347,
-0.4879],
[-0.9138, -0.6581, 0.0780, 0.5258, -0.4880, 1.1914, -0.8140,
-0.7360],
[-1.4032, 0.0360, -0.0635, 0.6756, -0.0978, 1.8446, -1.1845,
1.3835]]]))
或者把 ObjectRefID
列表的所有对象都拉取过来:
results = ray.get(tensor_obj_ref_list)
results[0].size(), results[0]
(torch.Size([1, 8, 8]),
tensor([[[ 1.9269, 1.4873, 0.9007, -2.1055, 0.6784, -1.2345, -0.0431,
-1.6047],
[-0.7521, 1.6487, -0.3925, -1.4036, -0.7279, -0.5594, -0.7688,
0.7624],
[ 1.6423, -0.1596, -0.4974, 0.4396, -0.7581, 1.0783, 0.8008,
1.6806],
[ 1.2791, 1.2964, 0.6105, 1.3347, -0.2316, 0.0418, -0.2516,
0.8599],
[-1.3847, -0.8712, -0.2234, 1.7174, 0.3189, -0.4245, 0.3057,
-0.7746],
[-1.5576, 0.9956, -0.8798, -0.6011, -1.2742, 2.1228, -1.2347,
-0.4879],
[-0.9138, -0.6581, 0.0780, 0.5258, -0.4880, 1.1914, -0.8140,
-0.7360],
[-1.4032, 0.0360, -0.0635, 0.6756, -0.0978, 1.8446, -1.1845,
1.3835]]]))
案例:对数据进行转换#
Remote Ojbect 中的数据是不可修改的(Immutable),即无法对变量原地更改。下面的代码中,在单机上,我们可以对变量 a
进行赋值;但在 Ray 中,我们无法原地更改 Remote Object 的值。
a = torch.rand(size=(1, 8, 8))
a[0] = torch.ones(8, 8)
a
tensor([[[1., 1., 1., 1., 1., 1., 1., 1.],
[1., 1., 1., 1., 1., 1., 1., 1.],
[1., 1., 1., 1., 1., 1., 1., 1.],
[1., 1., 1., 1., 1., 1., 1., 1.],
[1., 1., 1., 1., 1., 1., 1., 1.],
[1., 1., 1., 1., 1., 1., 1., 1.],
[1., 1., 1., 1., 1., 1., 1., 1.],
[1., 1., 1., 1., 1., 1., 1., 1.]]])
如果我们想使用新数据,应该使用 Remote Function 或者 Remote Class 对 Remote Object 进行转换操作,生成新的 Remote Object。
@ray.remote
def transform_tensor(tensor: torch.tensor) -> torch.tensor:
return torch.transpose(tensor, 0, 1)
transformed_object_list = [transform_tensor.remote(t_obj_ref) for t_obj_ref in tensor_obj_ref_list]
transformed_object_list[0].size()
28
传递参数#
Remote Object 可以通过 RefObjectID
在 Task、Actor 之间传递。
直接传递#
直接传递指在 Task 或者 Actor 的函数调用时将 RefObjectID
作为参数传递进去。在下面这个例子中,x_obj_ref
是一个 RefObjectID
,echo()
这个 Remote Function 将自动从 x_obj_ref
获取 x
的值。这个自动获取值的过程被称为自动反引用(De-referenced)。
@ray.remote
def echo(x):
print(f"current value of argument x: {x}")
return x
x = list(range(5))
x_obj_ref = ray.put(x)
x_obj_ref
ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000010e1f505)
ray.get(echo.remote(x_obj_ref))
[0, 1, 2, 3, 4]
ray.get(echo.remote(x))
(echo pid=22623) current value of argument x: [0, 1, 2, 3, 4]
[0, 1, 2, 3, 4]
复杂数据结构#
如果 RefObjectID
被包裹在一个复杂的数据结构中,Ray 并不会自动获取 RefObjectID
对应的值,即 De-referenced 并不是自动的。复杂数据结构包括:
RefObjectID
被包裹在一个dict
中,比如:.remote({"obj": x_obj_ref})
RefObjectID
被包裹在一个list
中,比如:.remote([x_obj_ref])
ray.get(echo.remote({"obj": x_obj_ref}))
{'obj': ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000010000000)}
(echo pid=22630) current value of argument x: [0, 1, 2, 3, 4]
{'obj': ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000010e1f505)}
ray.get(echo.remote([x_obj_ref]))
(echo pid=70968) current value of argument x: [0, 1, 2, 3, 4]
(echo pid=70962) current value of argument x: [0, 1, 2, 3, 4]
(echo pid=70963) current value of argument x: {'obj': ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000010000000)}
(echo pid=70963) current value of argument x: [ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000010000000)]
[ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000010000000)]
(echo pid=22630) current value of argument x: {'obj': ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000010e1f505)}
[ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000010e1f505)]
底层实现#
Ray 集群的每个计算节点都有一个基于共享内存的对象存储, Remote Object 的数据会存储在集群某个或者某些计算节点的对象存储中,所有计算节点的共享内存共同组成了分布式对象存储。
当某个 Remote Object 的数据量较小时(<= 100 KB),它会被存储在计算节点进程内存中;当数据量较大时,它会被存储在分布式的共享内存中;当集群的共享内存的空间不够时,数据会被外溢(Spill)到持久化的存储上,比如硬盘或者S3。
ray.shutdown()