6.3. 分布式对象存储#

Ray 分布式计算中涉及共享数据可被放在分布式对象存储(Distributed Ojbect Store)中,被放置在分布式对象存储中的数据被称为远程对象(Remote Object)中。我们可以使用 ray.get()ray.put() 读写这些 Remote Object。与内存中的 Python 对象实例不同,Remote Object 是不可原地直接更改的(Immutable)。

ray.put()ray.get()#

Hide code cell content
import logging
import random
from typing import Tuple
import numpy as np
import pandas as pd
import ray
import torch

if ray.is_initialized:
    ray.shutdown()
ray.init(logging_level=logging.ERROR)

图 6.4 所示,操作 Remote Object 主要有 ray.put()ray.get() 两个 API。

  • ray.put() 把某个计算节点中的对象数据进行序列化,并将其写入到 Ray 集群的分布式对象存储中,返回一个 RefObjectIDRefObjectID 是指向这个 Remote Object 的指针。我们可以通过引用这个 RefObjectID,在 Remote Function 或 Remote Class 中分布式地使用这个数据对象。

  • ray.get() 使用 RefObjectID 把数据从分布式对象存储中拉取回来,并进行反序列化。

../_images/put-get-object-store.svg

图 6.4 Ray 分布式对象存储示意图#

def create_rand_tensor(size: Tuple[int, int, int]) -> torch.tensor:
    return torch.randn(size=(size), dtype=torch.float)

torch.manual_seed(42)
# 创建 16 个张量,每个张量大小为 (X, 8, 8)
tensor_obj_ref_list = [ray.put(create_rand_tensor((i, 8, 8))) for i in range(1, 16)]
tensor_obj_ref_list[0], len(tensor_obj_ref_list)
(ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000001e1f505), 15)

使用 ray.get() 从分布式对象存储中拉取数据。

val = ray.get(tensor_obj_ref_list[0])
val.size(), val
(torch.Size([1, 8, 8]),
 tensor([[[ 1.9269,  1.4873,  0.9007, -2.1055,  0.6784, -1.2345, -0.0431,
           -1.6047],
          [-0.7521,  1.6487, -0.3925, -1.4036, -0.7279, -0.5594, -0.7688,
            0.7624],
          [ 1.6423, -0.1596, -0.4974,  0.4396, -0.7581,  1.0783,  0.8008,
            1.6806],
          [ 1.2791,  1.2964,  0.6105,  1.3347, -0.2316,  0.0418, -0.2516,
            0.8599],
          [-1.3847, -0.8712, -0.2234,  1.7174,  0.3189, -0.4245,  0.3057,
           -0.7746],
          [-1.5576,  0.9956, -0.8798, -0.6011, -1.2742,  2.1228, -1.2347,
           -0.4879],
          [-0.9138, -0.6581,  0.0780,  0.5258, -0.4880,  1.1914, -0.8140,
           -0.7360],
          [-1.4032,  0.0360, -0.0635,  0.6756, -0.0978,  1.8446, -1.1845,
            1.3835]]]))

或者把 ObjectRefID 列表的所有对象都拉取过来:

results = ray.get(tensor_obj_ref_list)
results[0].size(), results[0]
(torch.Size([1, 8, 8]),
 tensor([[[ 1.9269,  1.4873,  0.9007, -2.1055,  0.6784, -1.2345, -0.0431,
           -1.6047],
          [-0.7521,  1.6487, -0.3925, -1.4036, -0.7279, -0.5594, -0.7688,
            0.7624],
          [ 1.6423, -0.1596, -0.4974,  0.4396, -0.7581,  1.0783,  0.8008,
            1.6806],
          [ 1.2791,  1.2964,  0.6105,  1.3347, -0.2316,  0.0418, -0.2516,
            0.8599],
          [-1.3847, -0.8712, -0.2234,  1.7174,  0.3189, -0.4245,  0.3057,
           -0.7746],
          [-1.5576,  0.9956, -0.8798, -0.6011, -1.2742,  2.1228, -1.2347,
           -0.4879],
          [-0.9138, -0.6581,  0.0780,  0.5258, -0.4880,  1.1914, -0.8140,
           -0.7360],
          [-1.4032,  0.0360, -0.0635,  0.6756, -0.0978,  1.8446, -1.1845,
            1.3835]]]))

案例:对数据进行转换#

Remote Ojbect 中的数据是不可修改的(Immutable),即无法对变量原地更改。下面的代码中,在单机上,我们可以对变量 a 进行赋值;但在 Ray 中,我们无法原地更改 Remote Object 的值。

a = torch.rand(size=(1, 8, 8))
a[0] = torch.ones(8, 8)
a
tensor([[[1., 1., 1., 1., 1., 1., 1., 1.],
         [1., 1., 1., 1., 1., 1., 1., 1.],
         [1., 1., 1., 1., 1., 1., 1., 1.],
         [1., 1., 1., 1., 1., 1., 1., 1.],
         [1., 1., 1., 1., 1., 1., 1., 1.],
         [1., 1., 1., 1., 1., 1., 1., 1.],
         [1., 1., 1., 1., 1., 1., 1., 1.],
         [1., 1., 1., 1., 1., 1., 1., 1.]]])

如果我们想使用新数据,应该使用 Remote Function 或者 Remote Class 对 Remote Object 进行转换操作,生成新的 Remote Object。

@ray.remote
def transform_tensor(tensor: torch.tensor) -> torch.tensor:
    return torch.transpose(tensor, 0, 1)

transformed_object_list = [transform_tensor.remote(t_obj_ref) for t_obj_ref in tensor_obj_ref_list]
transformed_object_list[0].size()
28

传递参数#

Remote Object 可以通过 RefObjectID 在 Task、Actor 之间传递。

直接传递#

直接传递指在 Task 或者 Actor 的函数调用时将 RefObjectID 作为参数传递进去。在下面这个例子中,x_obj_ref 是一个 RefObjectIDecho() 这个 Remote Function 将自动从 x_obj_ref 获取 x 的值。这个自动获取值的过程被称为自动反引用(De-referenced)。

@ray.remote
def echo(x):
    print(f"current value of argument x: {x}")
    return x

x = list(range(5))
x_obj_ref = ray.put(x)
x_obj_ref
ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000010e1f505)
ray.get(echo.remote(x_obj_ref))
[0, 1, 2, 3, 4]
ray.get(echo.remote(x))
(echo pid=22623) current value of argument x: [0, 1, 2, 3, 4]
[0, 1, 2, 3, 4]

复杂数据结构#

如果 RefObjectID 被包裹在一个复杂的数据结构中,Ray 并不会自动获取 RefObjectID 对应的值,即 De-referenced 并不是自动的。复杂数据结构包括:

  • RefObjectID 被包裹在一个 dict 中,比如:.remote({"obj": x_obj_ref})

  • RefObjectID 被包裹在一个 list 中,比如:.remote([x_obj_ref])

ray.get(echo.remote({"obj": x_obj_ref}))
{'obj': ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000010000000)}
(echo pid=22630) current value of argument x: [0, 1, 2, 3, 4]
{'obj': ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000010e1f505)}
ray.get(echo.remote([x_obj_ref]))
(echo pid=70968) current value of argument x: [0, 1, 2, 3, 4]
(echo pid=70962) current value of argument x: [0, 1, 2, 3, 4]
(echo pid=70963) current value of argument x: {'obj': ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000010000000)}
(echo pid=70963) current value of argument x: [ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000010000000)]
[ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000010000000)]
(echo pid=22630) current value of argument x: {'obj': ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000010e1f505)}
[ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000010e1f505)]

底层实现#

Ray 集群的每个计算节点都有一个基于共享内存的对象存储, Remote Object 的数据会存储在集群某个或者某些计算节点的对象存储中,所有计算节点的共享内存共同组成了分布式对象存储。

当某个 Remote Object 的数据量较小时(<= 100 KB),它会被存储在计算节点进程内存中;当数据量较大时,它会被存储在分布式的共享内存中;当集群的共享内存的空间不够时,数据会被外溢(Spill)到持久化的存储上,比如硬盘或者S3。

ray.shutdown()