6.3. 分布式对象存储#
Ray 分布式计算中涉及共享数据可被放在分布式对象存储(Distributed Ojbect Store)中,被放置在分布式对象存储中的数据被称为远程对象(Remote Object)中。我们可以使用 ray.get() 和 ray.put() 读写这些 Remote Object。与内存中的 Python 对象实例不同,Remote Object 是不可原地直接更改的(Immutable)。
ray.put() 与 ray.get()#
Show code cell content
import logging
import random
from typing import Tuple
import numpy as np
import pandas as pd
import ray
import torch
if ray.is_initialized:
    ray.shutdown()
ray.init(logging_level=logging.ERROR)
如 图 6.4 所示,操作 Remote Object 主要有 ray.put() 和 ray.get() 两个 API。
- ray.put()把某个计算节点中的对象数据进行序列化,并将其写入到 Ray 集群的分布式对象存储中,返回一个- RefObjectID,- RefObjectID是指向这个 Remote Object 的指针。我们可以通过引用这个- RefObjectID,在 Remote Function 或 Remote Class 中分布式地使用这个数据对象。
- ray.get()使用- RefObjectID把数据从分布式对象存储中拉取回来,并进行反序列化。
图 6.4 Ray 分布式对象存储示意图#
def create_rand_tensor(size: Tuple[int, int, int]) -> torch.tensor:
    return torch.randn(size=(size), dtype=torch.float)
torch.manual_seed(42)
# 创建 16 个张量,每个张量大小为 (X, 8, 8)
tensor_obj_ref_list = [ray.put(create_rand_tensor((i, 8, 8))) for i in range(1, 16)]
tensor_obj_ref_list[0], len(tensor_obj_ref_list)
(ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000001e1f505), 15)
使用 ray.get() 从分布式对象存储中拉取数据。
val = ray.get(tensor_obj_ref_list[0])
val.size(), val
(torch.Size([1, 8, 8]),
 tensor([[[ 1.9269,  1.4873,  0.9007, -2.1055,  0.6784, -1.2345, -0.0431,
           -1.6047],
          [-0.7521,  1.6487, -0.3925, -1.4036, -0.7279, -0.5594, -0.7688,
            0.7624],
          [ 1.6423, -0.1596, -0.4974,  0.4396, -0.7581,  1.0783,  0.8008,
            1.6806],
          [ 1.2791,  1.2964,  0.6105,  1.3347, -0.2316,  0.0418, -0.2516,
            0.8599],
          [-1.3847, -0.8712, -0.2234,  1.7174,  0.3189, -0.4245,  0.3057,
           -0.7746],
          [-1.5576,  0.9956, -0.8798, -0.6011, -1.2742,  2.1228, -1.2347,
           -0.4879],
          [-0.9138, -0.6581,  0.0780,  0.5258, -0.4880,  1.1914, -0.8140,
           -0.7360],
          [-1.4032,  0.0360, -0.0635,  0.6756, -0.0978,  1.8446, -1.1845,
            1.3835]]]))
或者把 ObjectRefID 列表的所有对象都拉取过来:
results = ray.get(tensor_obj_ref_list)
results[0].size(), results[0]
(torch.Size([1, 8, 8]),
 tensor([[[ 1.9269,  1.4873,  0.9007, -2.1055,  0.6784, -1.2345, -0.0431,
           -1.6047],
          [-0.7521,  1.6487, -0.3925, -1.4036, -0.7279, -0.5594, -0.7688,
            0.7624],
          [ 1.6423, -0.1596, -0.4974,  0.4396, -0.7581,  1.0783,  0.8008,
            1.6806],
          [ 1.2791,  1.2964,  0.6105,  1.3347, -0.2316,  0.0418, -0.2516,
            0.8599],
          [-1.3847, -0.8712, -0.2234,  1.7174,  0.3189, -0.4245,  0.3057,
           -0.7746],
          [-1.5576,  0.9956, -0.8798, -0.6011, -1.2742,  2.1228, -1.2347,
           -0.4879],
          [-0.9138, -0.6581,  0.0780,  0.5258, -0.4880,  1.1914, -0.8140,
           -0.7360],
          [-1.4032,  0.0360, -0.0635,  0.6756, -0.0978,  1.8446, -1.1845,
            1.3835]]]))
案例:对数据进行转换#
Remote Ojbect 中的数据是不可修改的(Immutable),即无法对变量原地更改。下面的代码中,在单机上,我们可以对变量 a 进行赋值;但在 Ray 中,我们无法原地更改 Remote Object 的值。
a = torch.rand(size=(1, 8, 8))
a[0] = torch.ones(8, 8)
a
tensor([[[1., 1., 1., 1., 1., 1., 1., 1.],
         [1., 1., 1., 1., 1., 1., 1., 1.],
         [1., 1., 1., 1., 1., 1., 1., 1.],
         [1., 1., 1., 1., 1., 1., 1., 1.],
         [1., 1., 1., 1., 1., 1., 1., 1.],
         [1., 1., 1., 1., 1., 1., 1., 1.],
         [1., 1., 1., 1., 1., 1., 1., 1.],
         [1., 1., 1., 1., 1., 1., 1., 1.]]])
如果我们想使用新数据,应该使用 Remote Function 或者 Remote Class 对 Remote Object 进行转换操作,生成新的 Remote Object。
@ray.remote
def transform_tensor(tensor: torch.tensor) -> torch.tensor:
    return torch.transpose(tensor, 0, 1)
transformed_object_list = [transform_tensor.remote(t_obj_ref) for t_obj_ref in tensor_obj_ref_list]
transformed_object_list[0].size()
28
传递参数#
Remote Object 可以通过 RefObjectID 在 Task、Actor 之间传递。
直接传递#
直接传递指在 Task 或者 Actor 的函数调用时将 RefObjectID 作为参数传递进去。在下面这个例子中,x_obj_ref 是一个 RefObjectID ,echo() 这个 Remote Function 将自动从 x_obj_ref 获取 x 的值。这个自动获取值的过程被称为自动反引用(De-referenced)。
@ray.remote
def echo(x):
    print(f"current value of argument x: {x}")
    return x
x = list(range(5))
x_obj_ref = ray.put(x)
x_obj_ref
ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000010e1f505)
ray.get(echo.remote(x_obj_ref))
[0, 1, 2, 3, 4]
ray.get(echo.remote(x))
(echo pid=22623) current value of argument x: [0, 1, 2, 3, 4]
[0, 1, 2, 3, 4]
复杂数据结构#
如果 RefObjectID 被包裹在一个复杂的数据结构中,Ray 并不会自动获取 RefObjectID 对应的值,即 De-referenced 并不是自动的。复杂数据结构包括:
- RefObjectID被包裹在一个- dict中,比如:- .remote({"obj": x_obj_ref})
- RefObjectID被包裹在一个- list中,比如:- .remote([x_obj_ref])
ray.get(echo.remote({"obj": x_obj_ref}))
{'obj': ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000010000000)}
(echo pid=22630) current value of argument x: [0, 1, 2, 3, 4]
{'obj': ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000010e1f505)}
ray.get(echo.remote([x_obj_ref]))
(echo pid=70968) current value of argument x: [0, 1, 2, 3, 4]
(echo pid=70962) current value of argument x: [0, 1, 2, 3, 4]
(echo pid=70963) current value of argument x: {'obj': ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000010000000)}
(echo pid=70963) current value of argument x: [ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000010000000)]
[ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000010000000)]
(echo pid=22630) current value of argument x: {'obj': ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000010e1f505)}
[ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000010e1f505)]
底层实现#
Ray 集群的每个计算节点都有一个基于共享内存的对象存储, Remote Object 的数据会存储在集群某个或者某些计算节点的对象存储中,所有计算节点的共享内存共同组成了分布式对象存储。
当某个 Remote Object 的数据量较小时(<= 100 KB),它会被存储在计算节点进程内存中;当数据量较大时,它会被存储在分布式的共享内存中;当集群的共享内存的空间不够时,数据会被外溢(Spill)到持久化的存储上,比如硬盘或者S3。
ray.shutdown()
