Ray:AI的分布式系统

时间:2022-04-26
本文章向大家介绍Ray:AI的分布式系统,主要内容包括AI的开源框架、Ray低级API、Actors、参数服务器示例、Ray高级库、更多信息、基本概念、基础应用、原理机制和需要注意的事项等,并结合实例形式分析了其使用技巧,希望通过本文能帮助到大家理解应用这部分内容。

随着机器学习算法和技术的进步,越来越多的机器学习应用程序需要多台机器,而且必须利用并行性。但是,在集群上进行机器学习的基础设施仍然是专门设置的。尽管针对特定用例(如参数服务器或超参数搜索)和AI之外的高质量分布式系统(如Hadoop或Spark)提供了良好的解决方案,但在边界开发算法的从业者往往从头构建自己的系统基础架构。这些努力相当于是多余的。

举个例子,采取一个概念上简单的算法,如进化策略强化学习(Evolution Strategies for reinforcement learning。该算法大约有十几行伪代码,其Python实现并不多。但是,在较大的机器或集群上高效地运行算法需要更多的软件工程。作者的实现涉及数千行代码,并且必须定义通信协议,消息序列化和反序列化策略以及各种数据处理策略。

Ray的目标之一是使实践者能够将运行在笔记本上的原型算法转换成高效的分布式应用程序,该应用程序可以高效地在集群上运行(或者在单一的多核机器上),而且代码的额外行数相对较少。这样的框架应该包括手动优化系统的性能优势,而不需要用户对调度,数据传输和机器故障进行推理。

AI的开源框架

与深度学习框架的关系: Ray与TensorFlow,PyTorch和MXNet等深度学习框架完全兼容,在许多应用中与Ray一起使用一个或多个深度学习框架是很自然的(例如,我们的强化学习库使用TensorFlow和PyTorch)。

与其他分布式系统的关系:今天使用了许多流行的分布式系统,但是其中大多数并不是用AI应用程序构建的,并且缺乏支持所需的性能以及表示AI应用程序的API。从今天的分布式系统来看,它们缺少以下功能(以各种组合方式):

  • 支持毫秒级任务和每秒数百万个任务
  • 嵌套并行(在任务内并行化任务,例如超参数搜索内部的并行模拟)(见下图)
  • 在运行时动态确定任意任务依赖关系(例如,为了避免等待缓慢的工作人员)
  • 在共享可变状态下运行的任务(例如,神经网络权重或模拟器)
  • 支持异构资源(CPU,GPU等)

一个嵌套并行的简单例子。一个应用程序运行两个并行的实验(每个都是一个长期运行的任务),每个实验运行一些并行的模拟(每个都是一个任务)。

有两种使用Ray的主要方法:通过其较低级别的API和更高级别的库。较高级别的库建立在较低级别的API之上。目前这些包括Ray RLlib,一个可扩展的强化学习库和Ray.tune,一个高效的分布式超参数搜索库。

Ray低级API

Ray API的目标是自然地表达非常普遍的计算模式和应用程序,而不受像MapReduce这样的固定模式的限制。

动态任务图

Ray应用程序或作业中的基础基元是一个动态任务图。这与TensorFlow中的计算图非常不同。而在TensorFlow中,一个计算图代表一个神经网络,并且在单个应用程序中执行多次,在Ray中,任务图代表整个应用程序,并且只执行一次。任务图不是事先知道的。它是在应用程序运行时动态构建的,执行一个任务可能会触发创建更多任务。

上图是一个计算图的例子。白色的椭圆形表示任务,蓝色的方框表示对象。箭头表示任务取决于对象或任务创建对象。

任意的Python函数都可以作为任务执行,并且可以任意依赖其他任务的输出。下面的例子给出了说明。

# 定义两个远程函数。
# 这些函数的调用创建了远程执行的任务
@ray.remote
def multiply(x, y):
    return np.dot(x, y)

@ray.remote
def zeros(size):
    return np.zeros(size)

# 开始两个并行的任务,这些会立即返回futures并在后台执行
x_id = zeros.remote((100, 100))
y_id = zeros.remote((100, 100))

# 开始第三个任务,但这并不会被提前计划,直到前两个都完成了.
z_id = multiply.remote(x_id, y_id)

# 获取结果。这个结果直到第三个任务完成才能得到
z = ray.get(z_id)

Actors

只有上述远程功能和任务不能完成的一件事情是让多个任务在相同的共享可变状态下运行。这出现在机器学习中的多个上下文中,其中共享状态可以是模拟器的状态,神经网络的权重或完全不同的东西。Ray使用actor抽象来封装多个任务之间共享的可变状态。下面一个小例子,展示了如何用Atari模拟器做到这一点。

import gym

@ray.remote
class Simulator(object):
    def __init__(self):
        self.env = gym.make("Pong-v0")
        self.env.reset()

    def step(self, action):
        return self.env.step(action)

# 创建一个模拟器,这将启动一个为Actor运行所有方法的远程进程。
simulator = Simulator.remote()

observations = []
for _ in range(4):
    # 在模拟器中采取行动0,这个调用会顺利而且它返回一个future
    observations.append(simulator.step.remote(0))

虽然简单,但actor可以以很多灵活的方式应用。例如,actor可以封装模拟器或神经网络策略,并可用于分布式培训(如使用参数服务器)或在实时应用程序中进行策略服务。

actor为为许多客户端进程提供预测/操作。 下:多个参数服务器actor使用多个工作进程执行分布式培训。

参数服务器示例

一个参数服务器可以被实现为Ray actor,如下所示:

@ray.remote
class ParameterServer(object):
    def __init__(self, keys, values):
        # 这些值将会被改变,所以我们需要创建本地副本
        values = [value.copy() for value in values]
        self.parameters = dict(zip(keys, values))

    def get(self, keys):
        return [self.parameters[key] for key in keys]

    def update(self, keys, values):
        # 这个更新函数增加了现有的值,但是更新函数可以被任意定义
        for key, value in zip(keys, values):
            self.parameters[key] += value

来看一个更完整的例子

要实例化参数服务器,请执行以下操作。

parameter_server = ParameterServer.remote(initial_keys, initial_values)

要创建连续检索和更新参数的四个长时间运行的任务,请执行以下操作。

@ray.remote
def worker_task(parameter_server):
    while True:
        keys = ['key1', 'key2', 'key3']
        # 获取最新的参数
        values = ray.get(parameter_server.get.remote(keys))
        # 计算参数更新
        updates = …
        # 更新参数
        parameter_server.update.remote(keys, updates)个

# 开始4个长时间运行的任务
for _ in range(4):
    worker_task.remote(parameter_server)

Ray高级库

Ray RLlib是一个可扩展的强化学习库,可在许多机器上运行。它可以通过示例培训脚本以及通过Python API来使用。它目前包括以下的实现:

  • A3C
  • DQN
  • 进化策略
  • PPO

我们正在努力增加更多的算法。RLlib与OpenAI体育馆完全兼容。

Ray.tune是一个高效的分布式超参数搜索库。它提供了用于深度学习,强化学习和其他计算密集型任务的Python API。这是一个说明用法的小例子:

from ray.tune import register_trainable, grid_search, run_experiments

# 函数在优化,超参数在配置参数中
def my_func(config, reporter):
    import time, numpy as np
    i = 0
    while True:
        reporter(timesteps_total=i, mean_accuracy=(i ** config['alpha']))
        i += config['beta']
        time.sleep(0.01)

register_trainable('my_func', my_func)

run_experiments({
    'my_experiment': {
        'run': 'my_func',
        'resources': {'cpu': 1, 'gpu': 0},
        'stop': {'mean_accuracy': 100},
        'config': {
            'alpha': grid_search([0.2, 0.4, 0.6]),
            'beta': grid_search([1, 2]),
        },
    }
})

正在进行的结果可以使用Tensorboard和rllab的VisKit(或者您可以直接阅读JSON日志)等工具进行实时可视化。Ray.tune支持网格搜索,随机搜索和更复杂的早期停止算法,如HyperBand。

更多信息

有关Ray的更多信息,请查看以下链接。

Ray可以安装pip install ray。我们鼓励你尝试Ray。