TensorFlow强化学习入门(4)——深度Q网络(DQN)及其扩展

时间:2022-04-29
本文章向大家介绍TensorFlow强化学习入门(4)——深度Q网络(DQN)及其扩展,主要内容包括从Q网络到深度Q网络、改进2:历程重现、改进3:目标网络分离、超越DQN、Dueling DQN、综合实践、基本概念、基础应用、原理机制和需要注意的事项等,并结合实例形式分析了其使用技巧,希望通过本文能帮助到大家理解应用这部分内容。
一个聪明的游戏agent可以学会避开危险的陷阱

本文中我们将一起创建一个深度Q网络(DQN)。它基于我们系列文章中(0)的单层Q网络,如果你是强化学习的初学者,我推荐你到文末跳转到(0)开始阅读。尽管简单的Q网路已经可以在简单的问题上和Q表表现一样出色,但是深度Q网络可以使其变得更强。要将简单的Q网络转化为深度Q网路,我们需要以下改进:

  1. 将单层的网络切换为多层卷积网络。
  2. 支持历程重现(Experience Replay),使我们的网路可以通过其记忆的历程来进行自我训练。
  3. 利用第二“目标”网络来计算更新Q值。

这三点创新也使得Google DeepMind团队的DQN agent在很多雅达利游戏上达到超越人类水平。我们将依次讲解每一点并演示如何将其集成在我们的代码中。不过这三个创新点并不是终点,深度学习的研究速度很快以至于在2014年时DQN就已经不是最先进的通用agent了。由此我还将讲解两个简单的DQN架构的提升方案——Double DQN 和 Dueling DQN,它们将为网络带来性能和稳定性的提升。最后我们将得到一个可以完成一系列具有挑战性的雅达利游戏的网络并用一个简单的任务来训练测试其效果。

从Q网络到深度Q网络

改进1:卷积层

由于我们的agent要玩电子游戏,所以它必须能像人类或其他灵长动物一样理解屏幕上的输出内容。与单独考虑每个像素的输入不同,卷积层使网络以区域为单位来理解输出,同时在向更高层的网络传递信息时,这些区域的联系也可以得到保持。这和感受野的机理类似,实际上也已经有研究表明卷积神经网络学习得到的表示与灵长类动物视觉皮层中的表示类似。因此卷积层非常适合作为我们网络的前置元素。

在TensorFlow中,我们可以利用tf.contrib.layers.convolution2d方法来快速创建一个卷积层,示例如下:

convolution_layer = tf.contrib.layers.convolution2d(inputs, num_outputs, kernel_size, stride, padding)

num_outs: 我们使用多少个卷积核来接收上一层的信息。 kernel_size:接收上一层信息所用的滑动窗大小。 Stride:滑动窗边界每次移动的像素数。 padding:是否为图像边界补充padding来保持输入输出尺寸一致,'SAME'填充,'VALID'不填充。

改进2:历程重现

DQN的第二个主要改进就是支持历程重现。其基本思想就是将agent的训练历程存储下来,然后从中随机抽取来训练网络,通过这种方式我们可以使得我们的agent在任务中的表现更加稳定健壮。通过历程的随机抽取,我们可以确保网络只能基于当前环境的状态进行学习,从而习得比原始训练历程更丰富的表示。这些历程被存储在形为<state, action, reward, next state>的元组当中。历程重现缓冲器中存储着固定长度的最近训练记录,每当有新的元素进入时,最旧的一个就将被移除。当需要训练的时候,我们只需要从缓冲器中随机提取训练记录即可。我们后面将创建一个简单的类实现记录的存储和重新提取。

改进3:目标网络分离

DQN的第三个主要改进,也是最独特的一个改进,就是在训练过程中对第二个网络的利用。第二个网络用于计算训练过程中每个行动带来的损失值。为什么不直接使用一个网络来估算损失值呢?原因是训练中的每一步都会带来Q网络中值的变化,当我们基于不断变化的值来调整我们的网络参数时,预估值的变化很容易失控。此时目标值和预估Q值作用产生的反馈会将不稳定转移至网络自身。为了规避这一风险,目标网络应当被冻结,只对主Q网络做周期性的缓慢更新。通过这一手段,训练过程可以变得更加稳定。

除了周期性地单次更新目标网络之外,我们也可以频繁地更新网络,不过更新的幅度要小。这项技术在DeepMind的另一篇论文中有介绍,这一技术也使训练过程更加稳定。

超越DQN

根据上面的改进,我们可以复现2014年提出的DQN。但是技术日新月异,现在已经有很多技术的性能和稳定性都已经达到、超越了2014年DeepMind提出的DQN架构的水平。在将你的DQN应用于你喜欢的雅达利游戏之前,我建议你先在原先的网络上添加一些新特性,下面我将着重说明其中的两个(Double DQN 和 Dueling DQN )并给出其部分代码实现,借助它们我们的网络可以在更短的时间内训练达到更优的性能。

Double DQN

上:常规的DQN和Q值的流向,下:Dueling DQN,其状态价值value和行动得分advantage被分别计算后在最后一层综合为Q值

Double DQN产生的直接原因是常规的DQN在给定状态下往往会高估具有潜力的行动。如果所有的行动总是被同样高估的,那么这个情况也不错,但是事实并非如此。你可以想象一个情形,次优的行动经常得到超过最优行动的Q值,此时agent将很难习得最优的策略。为了纠正这个错误,DDQN的作者使用了一个简单的技巧:利用主网络选择行动,目标网络来生成该行动的目标Q值,而不是在训练过程中计算目标Q值的同时选择最大Q值对应的行动。将行动选择从目标Q值生成逻辑中抽离出来后,网络高估行动的问题基本得到了解决,训练也更加快速和可信。下面给出DDQN更新目标值使用的等式:

Q-Target = r + γQ(s’,argmax(Q(s’,a,ϴ),ϴ’))

Dueling DQN

为了解释Dueling DQN中网络架构变更的原因,我们首先要解释一些额外的强化学习术语。到目前为止我们讨论的Q值对应于确定情况下某种行动的优劣,可以写作Q(s,a)。“确定状态下的行动”可以被拆分为两个更细粒度的基本变量/符号来表示。第一个是价值函数V(s),它告诉我们当前状态的优劣。第二个是决策函数(advantage function),它告诉我们和其他行动相比某一行动的优劣。我们可以将Q值视为V和A综合后的结果,即可以表示为:

Q(s,a) =V(s) + A(a)

Dueling DQN的目标是获得一个可以分别计算价值和决策并通过最后一层综合得到Q值的网络。乍一看这么做好像没有任何意义,反正最后都要整合在一起,为什么还要单独再拆开呢?这么做的好处主要体现在强化学习的agent不需要在每个时刻都同时考虑价值和决策。举例来说:想象你在坐在公园的长椅上看日落的场景,这是十分美好的,也就是说坐在长椅上这一行为会带来很高的收益。但是如果不考虑你当前所处的状态的话(日落),我们不需要作出任何动作,即思考坐在长椅上这一动作的价值是没有意义的。通过将状态价值从其绑定的动作上分离出来后,我们可以得到更加健壮的状态价值预估。

综合实践

简单的像素游戏,目标是在避开红色方块的前提下将蓝色方块移至绿色方块处

现在我们已经习得了构建我们的DQN所需的全部技巧,下面就让我们在实际的游戏环境中进行测试吧!虽然上面我们说DQN经过足够的训练后可以学会雅达利游戏,但是要在这些游戏上表现良好,至少要在强大算力的计算机上训练一天。为了做教学演示,我设计了一个我们DQN可以在比较强大的算力(我使用的是GTX970)下经过数小时训练掌握的简单游戏。在这个环境下,agent将控制一个蓝色方块,目标是避开红色方块(分值 -1)的前提下移动至绿色方块(分值 +1)。每个episode将以随机生成的5x5的网格开局,agent需要在50步内得到尽可能高的分数。由于方块的位置是随机产生的,agent不是像FrozenLake问题中那样简单地习得一个固定路径就可以了,它必须理解这些方块的空间特征。下面让我们来实际尝试一下吧!

游戏环境代码,请放在和你实验用的jupyterbook的目录下

# 译者运行环境为jupyterlab,每个分割线对应一个代码块,Python3,需要pillow库
from __future__ import division

import gym
import numpy as np
import random
import tensorflow as tf
import tensorflow.contrib.slim as slim
import matplotlib.pyplot as plt
import scipy.misc
import os
%matplotlib inline
# --------------------------------------------------
# 加载游戏环境
# 你可以自行调整游戏难度(网格大小),小的网格可以使网络训练更快,大的网格可以提升游戏难度
from gridworld import gameEnv
env = gameEnv(partial=False, size=5)
# --------------------------------------------------
# 实现网络
class Qnetwork():
    def __init__(self, h_size):
        # 网络接收到游戏传递出的一帧图像并将之转化为数组
        # 之后调整大小并通过四个卷积层
        self.scalarInput = tf.placeholder(shape=[None, 21168], dtype=tf.float32)
        self.imageIn = tf.reshape(self.scalarInput, shape=[-1, 84, 84, 3])
        self.conv1 = slim.conv2d(inputs=self.imageIn, num_outputs=32, kernel_size=[8,8], stride=[4,4], padding='VALID', biases_initializer=None)
        self.conv2 = slim.conv2d(inputs=self.conv1, num_outputs=64, kernel_size=[4,4], stride=[2,2], padding='VALID', biases_initializer=None)
        self.conv3 = slim.conv2d(inputs=self.conv2, num_outputs=64, kernel_size=[3,3], stride=[1,1], padding='VALID', biases_initializer=None)
        self.conv4 = slim.conv2d(inputs=self.conv3, num_outputs=h_size, kernel_size=[7,7], stride=[1,1], padding='VALID', biases_initializer=None)
        # 取得最后一层卷积层的输出进行拆分,分别计算价值与决策
        self.streamAC, self.streamVC = tf.split(self.conv4, 2, 3)
        self.streamA = slim.flatten(self.streamAC)
        self.streamV = slim.flatten(self.streamVC)
        xavier_init = tf.contrib.layers.xavier_initializer()
        self.AW = tf.Variable(xavier_init([h_size//2, env.actions]))
        self.VW = tf.Variable(xavier_init([h_size//2, 1]))
        self.Advantage = tf.matmul(self.streamA, self.AW)
        self.Value = tf.matmul(self.streamV, self.VW)
        
        # 综合得到最终的Q值
        self.Qout = self.Value + tf.subtract(self.Advantage, tf.reduce_mean(self.Advantage, axis=1, keep_dims=True))
        self.predict = tf.argmax(self.Qout, 1)
        
        # 将目标Q值和预测Q值作差平方和作为损失值
        self.targetQ = tf.placeholder(shape=[None], dtype=tf.float32)
        self.actions = tf.placeholder(shape=[None], dtype=tf.int32)
        self.actions_onehot = tf.one_hot(self.actions, env.actions, dtype=tf.float32)
        
        self.Q = tf.reduce_sum(tf.multiply(self.Qout, self.actions_onehot), axis=1)
        
        self.td_error = tf.square(self.targetQ - self.Q)
        self.loss = tf.reduce_mean(self.td_error)
        self.trainer = tf.train.AdamOptimizer(learning_rate=0.0001)
        self.updateModel = self.trainer.minimize(self.loss)
# --------------------------------------------------
# 历程重现
# 这个类赋予了网络存储、重采样来进行训练的能力
class experience_buffer():
    def __init__(self, buffer_size = 50000):
        self.buffer = []
        self.buffer_size = buffer_size
    
    def add(self, experience):
        if len(self.buffer) + len(experience) >= self.buffer_size:
            self.buffer[0:(len(experience) + len(self.buffer)) - self.buffer_size] = []
        self.buffer.extend(experience)
    
    def sample(self, size):
        return np.reshape(np.array(random.sample(self.buffer, size)), [size, 5])
# --------------------------------------------------
# 用于处理游戏返回帧的函数
def processState(states):
    return np.reshape(states, [21168])
# --------------------------------------------------
# 利用主网络参数更新目标网络
def updateTargetGraph(tfVars, tau):
    total_vars = len(tfVars)
    op_holder = []
    for idx, var in enumerate(tfVars[0: total_vars//2]):
        op_holder.append(tfVars[idx+total_vars//2].assign((var.value()*tau) + ((1-tau)*tfVars[idx+total_vars//2].value())))
    return op_holder

def updateTarget(op_holder,sess):
    for op in op_holder:
        sess.run(op)
# --------------------------------------------------
batch_size = 32 #每次训练使用多少训练记录
update_freq = 4 # 多久执行一次训练操作
y = .99 # Q 值的折算因子
startE = 1 # 随机行动的初始概率
endE = 0.1 # 随机行动的最低概率
annealing_steps = 10000. # startE衰减至endE所需的步骤数
num_episodes = 10000 # 网络在游戏环境下训练的episodes数
pre_train_steps = 10000 # 训练开始前允许的随机行动次数
max_epLength = 50 # episode的最大允许值
load_model = False # 是否载入保存的模型
path = "./dqn" # 我们模型的保存路径
h_size = 512 # 最后一个卷积层的尺寸
tau = 0.001 # 目标网络更新至主网络的速率
# --------------------------------------------------
tf.reset_default_graph()
mainQN = Qnetwork(h_size)
targetQN = Qnetwork(h_size)

init = tf.global_variables_initializer()

saver = tf.train.Saver()

trainables = tf.trainable_variables()

targetOps = updateTargetGraph(trainables,tau)

myBuffer = experience_buffer()

# 设置随机决策的衰减速率
e = startE
stepDrop = (startE - endE)/annealing_steps

#创建每个episode中包含所有收益和操作记录的列表
jList = []
rList = []
total_steps = 0

#创建用于保存模型的目录
if not os.path.exists(path):
    os.makedirs(path)

with tf.Session() as sess:
    sess.run(init)
    if load_model == True:
        print('Loading Model...')
        ckpt = tf.train.get_checkpoint_state(path)
        saver.restore(sess,ckpt.model_checkpoint_path)
    for i in range(num_episodes):
        episodeBuffer = experience_buffer()
        # 初始化环境
        s = env.reset()
        s = processState(s)
        d = False
        rAll = 0
        j = 0
        # Q网络
        while j < max_epLength: # 如果agent移动了超过200次还没有接触任何方块,停止本次训练
            j+=1
            # 根据Q网络和贪心法则选取行动(有随机行动的可能性)
            if np.random.rand(1) < e or total_steps < pre_train_steps:
                a = np.random.randint(0,4)
            else:
                a = sess.run(mainQN.predict,feed_dict={mainQN.scalarInput:[s]})[0]
            s1,r,d = env.step(a)
            s1 = processState(s1)
            total_steps += 1
            episodeBuffer.add(np.reshape(np.array([s,a,r,s1,d]),[1,5])) # 保存训练记录至缓冲器
            
            if total_steps > pre_train_steps:
                if e > endE:
                    e -= stepDrop
                
                if total_steps % (update_freq) == 0:
                    trainBatch = myBuffer.sample(batch_size) # 从记录中随机获取训练批次数据
                    # 使用 Double-DQN 更新目标Q值
                    Q1 = sess.run(mainQN.predict,feed_dict={mainQN.scalarInput:np.vstack(trainBatch[:,3])})
                    Q2 = sess.run(targetQN.Qout,feed_dict={targetQN.scalarInput:np.vstack(trainBatch[:,3])})
                    end_multiplier = -(trainBatch[:,4] - 1)
                    doubleQ = Q2[range(batch_size),Q1]
                    targetQ = trainBatch[:,2] + (y*doubleQ * end_multiplier)
                    # 利用目标值更新网络
                    _ = sess.run(mainQN.updateModel, 
                        feed_dict={mainQN.scalarInput:np.vstack(trainBatch[:,0]),mainQN.targetQ:targetQ, mainQN.actions:trainBatch[:,1]})
                    
                    updateTarget(targetOps,sess) # 更新目标网络至主网络
            rAll += r
            s = s1
            
            if d == True:

                break
        
        myBuffer.add(episodeBuffer.buffer)
        jList.append(j)
        rList.append(rAll)
        # 周期性保存训练结果
        if i % 1000 == 0:
            saver.save(sess,path+'/model-'+str(i)+'.ckpt')
            print("Saved Model")
        if len(rList) % 10 == 0:
            print(total_steps,np.mean(rList[-10:]), e)
    saver.save(sess,path+'/model-'+str(i)+'.ckpt')
print("平均得分: " + str(sum(rList)/num_episodes))
# --------------------------------------------------
rMat = np.resize(np.array(rList),[len(rList)//100,100])
rMean = np.average(rMat,1)
plt.plot(rMean)
# --------------------------------------------------

结果输出:

···
498000 24.0 0.09999999999985551
498500 22.0 0.09999999999985551
499000 22.8 0.09999999999985551
499500 21.5 0.09999999999985551
500000 22.1 0.09999999999985551
平均得分: 20.7166
···
游戏得分曲线

游戏环境输出84x84x3的彩色图片,使用和OpenAI gym相似的函数回调,这使得代码可以轻松地移植至OpenAI的雅达利游戏上。在计算资源和时间允许的情况下,我建议你在其他的雅达利游戏上进行尝试。超参数可能需要一些调整,但是一定是有可行解的,祝你好运!

系列文章(翻译进度):

  1. (0) Q-Learning的查找表实现和神经网络实现
  2. (1) 双臂赌博机
  3. (1.5) — 上下文赌博机
  4. (2) —— 基于策略的Agents
  5. (3) —— 构建仿真环境来进行强化学习
  6. (4)—— 深度Q网络及扩展
  7. Part 5 — Visualizing an Agent’s Thoughts and Actions
  8. Part 6 — Partial Observability and Deep Recurrent Q-Networks
  9. Part 7 — Action-Selection Strategies for Exploration
  10. Part 8 — Asynchronous Actor-Critic Agents (A3C)