6.4. 分布式类#

章节 6.2 展示了如何将一个无状态的函数扩展到 Ray 集群上进行分布式计算,但实际的场景中,我们经常需要进行有状态的计算。最简单的有状态计算包括维护一个计数器,每遇到某种条件,计数器加一。这类有状态的计算对于给定的输入,不一定得到确定的输出。单机场景我们可以使用 Python 的类(Class)来实现,计数器可作为类的成员变量。Ray 可以将 Python 类拓展到集群上,即远程类(Remote Class),又被称为行动者(Actor)。Actor 的名字来自 Actor 编程模型 [Hewitt et al., 1973] ,这是一个典型的分布式计算编程模型,被广泛应用在大数据和人工智能领域,但 Actor 编程模型比较抽象,我们先从计数器的案例来入手。

案例:分布式计数器#

Hide code cell content
import logging
from typing import Dict, List, Tuple
import ray

if ray.is_initialized:
    ray.shutdown()
ray.init(logging_level=logging.ERROR)

Ray 的 Remote Class 也使用 ray.remote() 来装饰。

@ray.remote
class Counter:
    def __init__(self):
        self.value = 0

    def increment(self):
        self.value += 1
        return self.value

    def get_counter(self):
        return self.value

初始化一个实例,在类名 Counter 后面加上 remote(),即创建一个分布式的 Actor。

counter = Counter.remote()

接下来,我们将使用 Counter 类的计数功能,特别是 increment() 方法。在使用这个方法时,需要在函数名后附加 remote() ,即在对象实例函数调用时加上 remote()对象实例.函数名.remote()

obj_ref = counter.increment.remote()
print(ray.get(obj_ref))
1

我们可以用同一个类创建不同的 Actor 实例,不同 Actor 实例的成员函数调用可以被并行化执行,但同一个 Actor 的成员函数调用是顺序执行的。

# 创建 10 个 Actor 实例
counters = [Counter.remote() for _ in range(10)]

# 对每个 Actor 进行 increment 操作
# 这些操作可以分布式执行
results = ray.get([c.increment.remote() for c in counters])
print(results)
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

同一个 Actor 实例是互相共享状态的,所谓共享状态是指,Actor 可能被分布式地调度,无论调度到哪个计算节点,对 Actor 实例的任何操作都像对单机 Python 类和实例的操作一样,对象实例的成员变量是可被访问、修改以及实时更新的。

# 对第一个 Actor 进行5次 increment 操作
# 这5次 increment 操作是顺序执行的,5次操作共享状态数据 value
results = ray.get([counters[0].increment.remote() for _ in range(5)])
print(results)
[2, 3, 4, 5, 6]

Actor 编程模型#

Actor 编程模型是一种分布式编程的范式,每门编程语言或框架有自己的实现。Actor 编程模型的基本要素是 Actor 实例,即每个 Actor 对象都是唯一的。我们可以把单个 Actor 实例理解成单个带地址信息的进程。每个 Actor 都拥有地址信息,我们就可以从别的 Actor 向这个 Actor 发送信息,就像我们通过手机号或电子邮件地址互相发送信息一样。一个 Actor 可以有一个地址,也可以有多个地址,多个 Actor 可以共享同一个地址,拥有多少个地址主要取决于我们想以怎样的方式收发数据。多个 Actor 共享同一个地址,就像公司里有一个群组邮箱,群组包含了多个人,有个对外的公共地址,向这个群组发邮件,群组中的每个人都可以收到消息。

拥有地址和内存空间,Actor 可以做以下事情:

  • 存储数据,比如状态数据

  • 从别的 Actor 接收消息

  • 向别的 Actor 发送消息

  • 创建新的 Actor

Actor 存储的状态数据只能由 Actor 自己来管理,不能被其他 Actor 修改。这有点像面向对象编程语言中类的实例,如果想修改实例的数据,一般通过实例的成员函数。如果我们想修改 Actor 里面存储的状态数据,应该向 Actor 发送消息,Actor 接收到消息,并基于自己存储的数据,做出决策:决定修改状态数据,或者再向其他 Actor 发送消息。比如,刚才的计数器案例中,Actor 收到 increment() 的消息,并根据自己存储的状态,做自增操作。

为了保证 Actor 编程模型分布式环境下状态的一致性,对同一个 Actor 多次发送同样请求,多次请求是顺序执行的。就像计数器案例中,对同一个 Actor 进行 5 次 increment() 操作,这 5 次操作是顺序执行的。

Actor 编程模型是消息驱动的,给某个 Actor 发送消息,它就会对该消息进行响应,修改自身的状态或者继续给其他 Actor 发送消息。Actor 编程模型不需要显式地在多个进程之间同步数据,因此也没有锁的问题以及同步等待的时间。Actor 编程模型可被用于大量异步操作的场景。

案例:排行榜#

接下来我们基于 Actor 实现一个更加复杂的案例:成绩排行榜。这个排行榜的状态是一个键值对,名为 self.board,键是名字(name),是一个 str 类型,值是分数(score),是一个 float 类型。

@ray.remote
class Ranking:
    def __init__(self, minimal_score: float = 60.0):
        self.minimal = minimal_score
        self.board = dict()

    def add(self, name: str, score: float) -> Dict[str, float]:
        try:
            score = float(score)
            if score < self.minimal:
                return
            if name in self.board:
                self.board[name] = max(score, self.board[name])
            else:
                self.board[name] = score
            self.board = dict(sorted(self.board.items(), key=lambda item: item[1]))
            return self.board
        except Exception as e:
            print(f"The data type of score should be float but we receive {type(score)}.")
            return self.board

    def top(self, n: int = 1) -> List[Tuple[str, float]]:
        n = min(n, len(self.board))
        results = list(self.board.items())[:n]
        return results

    def pop(self) -> Dict[str, float]:
        if len(self.board) <= 0:
            raise Exception("The board is empty.")
        else:
            _, _ = self.board.popitem()
        return self.board

在这个排行榜的例子中,一共三个函数:

  • __init__() :构造器。

  • add():添加一条新记录,同时对输入进行解析,如果 score 不能转换成 float 会抛出异常;并对已有记录排序。

  • pop():删除最大值的那条记录,如果 self.board 为空,会抛出异常。

使用 .remote() 函数来创建这个 Remote Class 对应的 Actor 实例。

# 创建排行榜
ranking = Ranking.remote()

这里的 ranking 是一个 Actor 的引用(Actor Handle),有点像 ObjectRef,我们用 ranking 这个 Actor Handle 来管理这个 Actor。一旦 Actor Handle 被销毁,对应的 Actor 以及其状态也被销毁。

我们可以创建多个 Actor 实例,每个实例管理自己的状态。还可以用 ActorClass.options() 给这些 Actor 实例设置一些选项,起名字,设置 CPU、GPU 计算资源等。

# 创建一个数学排行榜 math_ranking
# 它与刚创建的 ranking 相互独立
math_ranking = Ranking.remote(minimal_score=80)

# 创建一个化学排行榜 chem_ranking
# 并且有一个名字
chem_ranking = Ranking.options(name="Chemistry").remote()

有了名字之后,就可以通过 ray.get_actor() 来获取 Actor Handle,

# 获取名为 Chemistry 的 Actor Handle
cr = ray.get_actor("Chemistry")

ranking 排行榜内添加新记录,即调用 add() 函数。调用类成员函数,都要记得加上 .remote() ,否则会报错。

# 增加新记录
ranking.add.remote("Alice", 90)
ranking.add.remote("Bob", 60)

print(f"Current ranking: {ray.get(ranking.top.remote(3))}")
Current ranking: [('Bob', 60.0), ('Alice', 90.0)]
ray.get(ranking.add.remote("Mark", 'a88'))
{'Bob': 60.0, 'Alice': 90.0}

在上面的案例中,有些调用会引发异常,比如插入一个字符串,Ray 通常会处理异常并打印出来,但是为了保险起见,你也可以在调用这些 Remote Class 的成员方法时手动做好 try/except 的异常捕获:

try:
    ray.get(ranking.pop.remote())
    ray.get(ranking.pop.remote())
    ray.get(ranking.pop.remote())
except Exception as e:
    print(e)
(Ranking pid=94276) The data type of score should be float but we receive <class 'str'>.
ray::Ranking.pop() (pid=94276, ip=127.0.0.1, actor_id=ad9e8acb97292b765b95c42501000000, repr=<__main__.Ranking object at 0x10348d690>)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/var/folders/4n/v40br47s46ggrjm9bdm64lwh0000gn/T/ipykernel_94239/1570506600.py", line 29, in pop
Exception: The board is empty.

案例:Actor Pool#

实践上,经常创建一个 Actor 资源池(Actor Pool),ActorPool 有点像 multiprocessing.Pool,Actor Pool 中有包含多个 Actor,每个 Actor 功能一样,而且可以分式地在多个计算节点上运行。

from ray.util import ActorPool

@ray.remote
class PoolActor:
    def add(self, operands):
        (a, b) = operands
        return a + b

    def double(self, operand):
        return operand * 2

# 将创建的 Actor 添加至 ActorPool 中
a1, a2, a3 = PoolActor.remote(), PoolActor.remote(), PoolActor.remote()
pool = ActorPool([a1, a2, a3])

如果我们想调用 ActorPool 中的 Actor,可以使用 map(fn, values)submit(fn, value) 方法。这两个方法非常相似,所接收的参数是一个函数 fn 和参数 value 或者参数列表 valuesmap()values 是一个列表,让函数并行地分发给多个 Actor 去处理;submit()value 是单个值,每次从 ActorPool 中选择一个 Actor 去执行。fn 是一个 Lambda 表达式,或者说是一个匿名函数。这个 Lambda 表达式有两个参数:actorvalueactor 就是我们定义的单个 Actor 的函数调用,value 是这个函数的参数。匿名函数 fn 的第一个参数是 ActorPool 中的 Actor,第二个参数是函数的参数。

pool.map(lambda a, v: a.double.remote(v), [3, 4, 5, 4])

pool.submit(lambda a, v: a.double.remote(v), 3)
pool.submit(lambda a, v: a.double.remote(v), 4)

map()submit() 将计算任务提交到了 ActorPool 中,ActorPool 并不是直接返回结果,而是异步地分发给后台不同的 Actor 去执行。需要使用 get_next() 阻塞地返回结果。

try:
    print(pool.get_next())
    print(pool.get_next())
    print(pool.get_next())
except Exception as e:
    print(e)
6
8
10

当然,如果已经把所有结果都取回,仍然再去 get_next(),将会抛出异常。

在这里,submit()value 参数只能是单个对象,不能是参数列表,如果想传入多个参数,可以把参数包裹成元组。比如 add() 方法对两个操作数做计算,我们把两个操作数包裹为一个元组,实现 add() 函数时使用 (a, b) = operands 解析这个元组。

pool.submit(lambda a, v: a.add.remote(v), (1, 10))
print(pool.get_next())
8
ray.shutdown()