DQN 简单入门

Scroll Down

DQN 简单入门

背景描述

这是一个走迷宫的过程

img

回报矩阵描述为,也就是上图的一个转化,也就是所谓的Q(S,A)的值,行代表S现在状态,列代表A动作

现在的状态S\下一步状态123456
1-1-1-1-10-1
2-1-1-10-1100
3-1-1-10-1-1
4-1-1-10-1-1
50-1-1100-1100
6-10-1-10100

DQN 流程

了解DQN之前最好了解一下Q-Learning,DQN是建立在Q-Learning的基础上的,因为现实世界中很难去描述一个状态的Q值,但是有一样东西对这个非常在行,那就是神经网络,我们用神经网络来计算一个状态的Q值,并通过贝尔曼方程去让这个神经网络学习到正确的Q值

贝尔曼方程:参考这个https://zhuanlan.zhihu.com/p/86525700

下面我将使用Tensorflow2.2.0来编码实现这个DQN网络

0. 超参数定义

在一切开始之前,我们定义一些超参数,我先在这里列出一些我们需要用到的超参数,在后面进行讲解

# 执行步数。
step_index = 0

# 状态数。
state_num = 6

# 动作数。
action_num = 6

# 训练之前观察多少步。
OBSERVE = 10.

# 选取的小批量训练样本数。
BATCH = 40

# epsilon 的最小值,当 epsilon 小于该值时,将不在随机选择行为。
FINAL_EPSILON = 0.0001

# epsilon 的初始值,epsilon 逐渐减小。
INITIAL_EPSILON = 0.1

# epsilon 衰减的总步数。
EXPLORE = 3000000.

# 探索模式计数。
epsilon = INITIAL_EPSILON

# 训练步数统计。
learn_step_counter = 0

# 学习率。(一开始用SGD用了这个参数,后来换了adam就没用了)
learning_rate = 0.001

# γ经验折损率。
gamma = 0.9

# 记忆上限。
memory_size = 5000

# 当前记忆数。
memory_counter = 0

# 保存观察到的执行过的行动的存储器,即:曾经经历过的记忆。
replay_memory_store = deque()

# 生成一个状态矩阵(6 X 6),每一行代表一个状态。
state_list = np.identity(state_num)

# 生成一个动作矩阵。
action_list = np.identity(state_num)

1. 网络初始化

DQN的网络是采用一个6X16X16X6的神经网络

输入由一个1X6的向量,由6个状态组成,如状态1我们表示为[0,1,0,0,0,0]

网络的输出是六个状态的Q值(包括自己本身的Q值),类似于[0.2,-0.6,-0.53,0.9,-0.9,0.9]这样一个输出向量,代表着对应状态的Q值

1.1 更新Q值的方法

顺便在这里讲一下方程更新的方法,也是提前讲述了

$$
Q(S,A)=R+γ(Max(Q(S_n,A)))
$$
这个公式的意思Q(S,A)的值是根据当前的回报,加上遗忘率(或者说失效率)乘上,下一个状态中Q值最大的那个动作对应的Q

我们举个例子,比如说我现在是状态2,我选择了动作3,也就是说我下一个目的地就是状态3,那么按照我们刚刚方程的计算方法,新的Q值应该为 状态2选择动作3的 Q 值,假设这个Q值为1,遗忘率伽马为0.9,状态三中的Q值最大为10,那么按照公式计算出的Q值为 1 + 0.9*10 算出来也就是10了,这就是理论Q值的计算方法,我们的神经网络会无限逼近这个值的

1.2 自定的损失函数

我这里自定了一个loss function,这里的损失函数我们定义为我们求出的一个预估值和目标值的差值的平方

def loss_function(y_true, y_pred):
    """
    自定的损失函数,仅计算输入和输出之间的一个元素的差值的平方
    这个模型是计算 Q 值的,所以我们的损失函数应描述为根据贝尔曼方程求解当前状态并选择对应的动作的 目标 Q 与 神经网络输出 Q 的差值
    :param y_true: 真实值
    :param y_pred: 预测值
    :return:
    """
    return K.square(y_true - y_pred)

这里还得说一下,我们更新的值仅仅是六个输出中的一个

比如说:我对应Q(S)的值为[1,2,5,0,2,2],我对应动作是1,也就是Q(S,1)的值是2,但是我通过上面的公式算出的值是5,我们计算的其实是(5-2)^2,我在下面的编码中通过构建一个向量(仅改变了对应A的值),比如说对应此处就是[1,5,5,0,2,2],其他值都是一样的,这样算出正确的损失

1.3 网络结构定义

这里使用了Tensorflow 2.x 新增的类似Keras方法定义这个网络

model = Sequential()
model.add(Dense(units=6, activation='tanh', input_shape=(None, 6)))
model.add(Dense(units=16, activation='tanh'))
model.add(Dense(units=16, activation='tanh'))
model.add(Dense(units=6, activation='tanh'))
model.compile(optimizer='adam', metrics=['accuracy'],
              loss=loss_function)
# 查看一下模型的结构
model.summary()

1.4 回报矩阵定义

当然与此同时我们也需要初始化一下我们的reward矩阵,和问题背景中的值是相对应的

r = np.array([[-1, -1, -1, -1, 0, -1],
              [-1, -1, -1, 0, -1, 100.0],
              [-1, -1, -1, 0, -1, -1],
              [-1, 0, 0, -1, 0, -1],
              [0, -1, -1, 100, -1, 100.0],
              [-1, 0, -1, -1, 0, 100.0],
              ])

2. 策略动作选择

根据一定的策略进行探索,保持一定的随机性,并逐步减小epsilon,选择一个最佳的Action

这一部分是基于神奇的 ε-贪心算法,这一部分算法的核心思想就是保留一点随机性,在急于经验选择最佳的Action的时候保证一定的随机性,这一部分也是这个DQN网络能否成功的关键因素

def select_action(state_index):
    """
    根据策略选择动作。
    :param state_index: 当前状态。
    :return:
    """
    global epsilon
    current_state = state_list[state_index:state_index + 1]
		# 根据随机数选择是完全随机还是通过经验选择
    if np.random.uniform() < epsilon:
        current_action_index = np.random.randint(0, action_num)
    else:
        actions_value = model.predict(current_state)
        action = np.argmax(actions_value)
        current_action_index = action
    # 开始训练后,在 epsilon 小于一定的值之前,将逐步减小 epsilon。
    if step_index > OBSERVE and epsilon > FINAL_EPSILON:
        epsilon -= (INITIAL_EPSILON - FINAL_EPSILON) / EXPLORE

    return current_action_index

3. 执行动作

根据输入的动作,执行这个动作,切换当前的状态和下一步的状态,返回相对的值:

  1. 执行后下一个状态
  2. 返回的奖励
  3. 游戏是否结束
def step(state, action):
    """
    执行动作。
    :param state: 当前状态。
    :param action: 执行的动作。
    :return:
    """
    reward = r[state][action]

    next_state = action

    done = False

    if action == 5:
        done = True

    return next_state, reward, done

4. 保存记忆

探索的时候保存记忆,到一个队列中

4.1 经验池

DQN最主要的特点是引入了经验回放,即将一个五元组添加到一个经验池中,这些五元组之后将用来更新Q网络参数,动作和奖励是标量,是否结束是布尔值。

经验池的功能主要是解决相关性及非静态分布问题。具体做法是把每个时间步agent与环境交互得到的转移样本 储存到回放记忆单元,要训练时就随机拿出一些(minibatch)来训练。(其实就是将游戏的过程打成碎片存储,训练时随机抽取就避免了相关性问题)

4.2 代码实现

def save_store(current_state_index, current_action_index, current_reward, next_state_index, done):
    """
    保存记忆。
    :param current_state_index: 当前状态 index。
    :param current_action_index: 动作 index。
    :param current_reward: 奖励。
    :param next_state_index: 下一个状态 index。
    :param done: 是否结束。
    :return:
    """
    current_state = state_list[current_state_index:current_state_index + 1]
    current_action = action_list[current_action_index:current_action_index + 1]
    next_state = state_list[next_state_index:next_state_index + 1]
    # 记忆动作(当前状态, 当前执行的动作, 当前动作的得分,下一个状态)。
    replay_memory_store.append((
        current_state,
        current_action,
        current_reward,
        next_state,
        done))

    # 如果超过记忆的容量,则将最久远的记忆移除。
    if len(replay_memory_store) > memory_size:
        replay_memory_store.popleft()
    global memory_counter
    memory_counter += 1

5. 记忆回放

  1. 随机选择一小批记忆样本
  2. 预测执行完动作A到了状态S'的Q值
  3. 利用1.1中的公式算出对应的期望Q(S,A)值
  4. 用现有的Q值和期望Q值训练神经网络

注意:这里一定要绕清楚哪个值对应哪个值,以及用什么值去更新神经网络,这里出错那么这个网络就会出错

def experience_replay():
    """
    记忆回放。
    :return:
    """
    global learn_step_counter
    # 随机选择一小批记忆样本。
    batch = BATCH if memory_counter > BATCH else memory_counter
    minibatch = random.sample(replay_memory_store, batch)

    batch_state = None
    batch_action = None
    batch_reward = None
    batch_next_state = None
    batch_done = None
		# 把minibatch中的值取出来
    for index in range(len(minibatch)):
        if batch_state is None:
            batch_state = minibatch[index][0]
        elif batch_state is not None:
            batch_state = np.vstack((batch_state, minibatch[index][0]))

        if batch_action is None:
            batch_action = minibatch[index][1]
        elif batch_action is not None:
            batch_action = np.vstack((batch_action, minibatch[index][1]))

        if batch_reward is None:
            batch_reward = minibatch[index][2]
        elif batch_reward is not None:
            batch_reward = np.vstack((batch_reward, minibatch[index][2]))

        if batch_next_state is None:
            batch_next_state = minibatch[index][3]
        elif batch_next_state is not None:
            batch_next_state = np.vstack((batch_next_state, minibatch[index][3]))

        if batch_done is None:
            batch_done = minibatch[index][4]
        elif batch_done is not None:
            batch_done = np.vstack((batch_done, minibatch[index][4]))

    # q_predict_next:下一个状态的 Q 值。
    q_predict_next = model.predict(batch_next_state)

    # 当前state的对应q值表 
    q_current_value = model.predict(batch_state)
    
    # 目标 Q 值
    q_target = []
    
    # 这里可以参考第一部分中所描述的计算方法
    # 循环计算目标Q值
    for i in range(len(minibatch)):

        # 当前即时得分。
        current_reward = batch_reward[i][0]

        # 更新 Q 值。
        new_q_value = current_reward + gamma * np.max(q_predict_next[i])

        # 获取自身Q值的预测
        cur_q_value = q_current_value[i]

        # 当得分小于 -1 时,表示走了不可走的位置。
        if current_reward <= -1:
            cur_q_value[batch_next_state[i].argmax()] = current_reward
            q_target.append(cur_q_value)
        else:
            cur_q_value[batch_next_state[i].argmax()] = new_q_value
            q_target.append(cur_q_value)

    q_target = np.asarray(q_target)
    # 更新网络
    model.fit(batch_state, q_target, epochs=1, batch_size=len(minibatch))
    learn_step_counter += 1

6. 训练模型

按照以下步骤执行

  1. 根据策略搜索,选择动作
  2. 执行这个动作
  3. 保存记忆
  4. 判断记忆是否足够多
    1. 如果足够多:进行训练
    2. 如果不够多:继续探索
def train():
    """
    训练。
    :return:
    """
    # 初始化当前状态。
    current_state = np.random.randint(0, action_num - 1)
    epsilon = INITIAL_EPSILON

    global step_index

    while True:
        # 选择动作。
        action = select_action(current_state)

        # 执行动作,得到:下一个状态,执行动作的得分,是否结束。
        next_state, reward, done = step(current_state, action)

        # 保存记忆。
        save_store(current_state, action, reward, next_state, done)

        # 先观察一段时间累积足够的记忆在进行训练。
        if step_index > OBSERVE:
            experience_replay()

        # 训练次数
        if step_index > 1000:
            break
				
        # 如果状态为游戏结束,那么就随机生成一个状态
        if done:
            current_state = np.random.randint(0, action_num - 1)
        else:
            current_state = next_state

        step_index += 1
        if step_index % 100 == 0:
            print("current step:{}".format(step_index))
            print(model.predict(action_list))

测试这个模型

def test():
    """
    运行并测试。
    :return:
    """
    # 首先进行训练
    train()
	
  	# 每个房间都走一遍,尝试走到5号房间
    for index in range(5):
	
        start_room = index

        print("#############################", "Agent 在", start_room, "开始行动", "#############################")

        current_state = start_room

        step = 0

        target_state = 5

        while current_state != target_state:
            out_result = model.predict(state_list[current_state:current_state + 1])

            next_state = np.argmax(out_result[0])

            print("Agent 由", current_state, "号房间移动到了", next_state, "号房间")

            current_state = next_state

            step += 1

        print("Agent 在", start_room, "号房间开始移动了", step, "步到达了目标房间 5")

        print("#############################", "Agent 在", 5, "结束行动", "#############################")

参考文献

Deep Q-Network 学习笔记(二)—— Q-Learning与神经网络结合使用(有代码实现)

深度强化学习——DQN