TensorFlow强化学习入门(3)——构建仿真环境来进行强化学习

在上一篇文章中,我演示了如何设计一个基于策略的强化学习agent来解决CartPole任务。在本文中,我们将从另一个角度重新审视这个问题——如何构建仿真环境来提升agent在当前环境下的性能。

Model Network : 建模网络,本文中称为仿真环境

如果你还没有阅读本系列之前的文章并且还是强化学习的初学者,我推荐你按照顺序来阅读,文末有之前文章的链接。

仿真环境是什么,我们为什么要引入仿真环境?在上图中,仿真环境是一个用于模拟真实世界中的动态问题的神经网络。拿我们之前CartPole的问题来说,我们需要一个可以根据之前的位置和行动来预测木棒下次位置的模型。在学习得到一个精确的模型之后,我们每次就可以直接用模型来训练我们的agent而不是必须放在真实环境中训练。当然如果原始的环境就是仿真得到的(像CartPole就模拟了真实世界中的物理情景),我们就不必再这样做了。

与计算机模拟不同,真实环境的部署需要时间,而且真实世界中的物理法则也使得环境初始化等一系列操作的可行性大幅下降。相反地,通过模型来模拟环境可以节省时间和成本,agent也可以“假想”自己就在真实环境中运动,我们可以直接在这个虚拟的环境中训练决策网络。只要我们的模拟环境足够优秀,agent即使完全在虚拟环境中训练也可以在真实环境中达到很好的性能。

那么我们如何使用TensorFlow实现这个需求呢?按照我上面所说,我们需要一个能够根据之前的观测和行动转化输出得到新的观测值,收益和状态的神经网络。我们将使用真实环境来训练我们的仿真模型,然后使用仿真模型来训练我们的agent。通过这个方法,我们可以在让agent在不直接接触到真实环境的情况下习得行动策略!下面给出是实现代码(提供版本为评论区重写的版本(译者对代码做了一定修正),作者的原始代码点这里(这个版本有bug,底部有评论指正)查看)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# 译者运行环境为jupyterlab,每个分割线对应一个代码块
import numpy as np
import tensorflow as tf
%matplotlib inline
import matplotlib.pyplot as plt
# --------------------------------------------------
import gym
env = gym.make("CartPole-v0")
# --------------------------------------------------
# 超参数
learning_rate = 1e-2
# 收益的折算因子
gamma = 0.99
# RMSProp中的衰减因子
decay_rate = 0.99
model_batch_size = 3
policy_batch_size = 3
dimen = 4 # 环境中的维度数
# --------------------------------------------------
# 辅助函数
def discount(r, gamma=0.99, standardize=False):
"""
输入一维的收益数组,输出折算后的收益值,例:f([1, 1, 1], 0.99) -> [1, 0.99, 0.9801],折算后根据要求选择进行归一化
"""
discounted = np.array([val * (gamma ** i) for i, val in enumerate(r)])
if standardize:
discounted -= np.mean(discounted)
discounted /= np.std(discounted)
return discounted
def step_model(sess, xs, action):
""" 使用神经网络模型根据之前的状态和行动来生成新的状态 """
# 上一状态
x = xs[-1].reshape(1, -1)
# 存储行动
x = np.hstack([x, [[action]]])
# 预测输出
output_y = sess.run(predicted_state_m, feed_dict={input_x_m: x})
# predicted_state_m == [state_0, state_1, state_2, state_3, reward, done]
output_next_state = output_y[:,:4]
output_reward = output_y[:,4]
output_done = output_y[:,5]
# 限制输出范围
output_next_state[:,0] = np.clip(output_next_state[:,0], -2.4, 2.4)
output_next_state[:,2] = np.clip(output_next_state[:,2], -0.4, 0.4)
# 完成的阀值设置
output_done = True if output_done > 0.01 or len(xs) > 500 else False
return output_next_state, output_reward, output_done
# --------------------------------------------------
# 用于仿真的神经网络
# 架构
# 网络中包含两个具有256个神经元的层,relu函数为激活函数。共有三个输出层,分别输出下一个观测值,收益值和游戏结束的标志
tf.reset_default_graph()
num_hidden_m = 256
# 由于要输入决策网络输出的行动,维度+1
dimen_m = dimen + 1
# 输入占位符
input_x_m = tf.placeholder(tf.float32, [None, dimen_m])
# 第一层
W1_m = tf.get_variable("W1_m", shape=[dimen_m, num_hidden_m], initializer=tf.contrib.layers.xavier_initializer())
B1_m = tf.Variable(tf.zeros([num_hidden_m]), name="B1M")
layer1_m = tf.nn.relu(tf.matmul(input_x_m, W1_m) + B1_m)
# 第二层
W2_m = tf.get_variable("W2_m", shape=[num_hidden_m, num_hidden_m], initializer=tf.contrib.layers.xavier_initializer())
B2_m = tf.Variable(tf.zeros([num_hidden_m]), name="B2_m")
layer2_m = tf.nn.relu(tf.matmul(layer1_m, W2_m) + B2_m)
# 第三层(输出层)
# 注意这里有三个单独的输出层
W_obs_m = tf.get_variable("W_obs_m", shape=[num_hidden_m, 4], initializer=tf.contrib.layers.xavier_initializer())
B_obs_m = tf.Variable(tf.zeros([4]), name="B_obs_m")
W_reward_m = tf.get_variable("W_reward_m", shape=[num_hidden_m, 1], initializer=tf.contrib.layers.xavier_initializer())
B_reward_m = tf.Variable(tf.zeros([1]), name="B_reward_m")
W_done_m = tf.get_variable("W_done_m", shape=[num_hidden_m, 1], initializer=tf.contrib.layers.xavier_initializer())
B_done_m = tf.Variable(tf.zeros([1]), name="B_done_m")
output_obs_m = tf.matmul(layer2_m, W_obs_m) + B_obs_m
output_reward_m = tf.matmul(layer2_m, W_reward_m) + B_reward_m
output_done_m = tf.sigmoid(tf.matmul(layer2_m, W_done_m) + B_done_m)
# 训练所需的输入占位符
actual_obs_m = tf.placeholder(tf.float32, [None, dimen_m], name="actual_obs")
actual_reward_m = tf.placeholder(tf.float32, [None, 1], name="actual_reward")
actual_done_m = tf.placeholder(tf.float32, [None, 1], name="actual_done")
# 整合输出
predicted_state_m = tf.concat([output_obs_m, output_reward_m, output_done_m], axis=1)
# 损失函数
loss_obs_m = tf.square(actual_obs_m[-1, 0:4] - output_obs_m)
loss_reward_m = tf.square(actual_reward_m - output_reward_m)
loss_done_m = -tf.log(actual_done_m * output_done_m + (1 - actual_done_m) * (1 - output_done_m))
# 模型损失为三个输出损失的平均值
loss_m = tf.reduce_max(loss_obs_m + loss_reward_m + loss_done_m)
adam_m = tf.train.AdamOptimizer(learning_rate=learning_rate)
update_m = adam_m.minimize(loss_m)
# --------------------------------------------------
# 决策网络
num_hidden_p = 10 # 决策网络中隐藏层神经元个数
input_x_p = tf.placeholder(tf.float32, [None, dimen], name="input_x")
# 第一层
W1_p = tf.get_variable("W1", shape=[dimen,num_hidden_p],
initializer=tf.contrib.layers.xavier_initializer())
layer1_p = tf.nn.relu(tf.matmul(input_x_p, W1_p))
# 第二层
W2_p = tf.get_variable("W2", shape=[num_hidden_p, 1],
initializer=tf.contrib.layers.xavier_initializer())
output_p = tf.nn.sigmoid(tf.matmul(layer1_p, W2_p))
# 训练所需的输入占位符
input_y_p = tf.placeholder(tf.float32, shape=[None, 1], name="input_y")
advantages_p = tf.placeholder(tf.float32, shape=[None,1], name="reward_signal")
# 损失函数
# 下面表达式等价于 0 if input_y_p == output_p else 1
log_lik_p = tf.log(input_y_p * (input_y_p - output_p) +
(1 - input_y_p) * (input_y_p + output_p))
# We'll be trying to maximize log liklihood
loss_p = -tf.reduce_mean(log_lik_p * advantages_p)
# 梯度
W1_grad_p = tf.placeholder(tf.float32,name="W1_grad")
W2_grad_p = tf.placeholder(tf.float32,name="W2_grad")
batch_grad_p = [W1_grad_p, W2_grad_p]
trainable_vars_p = [W1_p, W2_p]
grads_p = tf.gradients(loss_p, trainable_vars_p)
# 优化器
adam_p = tf.train.AdamOptimizer(learning_rate=learning_rate)
# 更新函数
update_grads_p = adam_p.apply_gradients(zip(batch_grad_p, [W1_p, W2_p]))
# --------------------------------------------------
# 初始化并测试模型运行情况
init = tf.global_variables_initializer()
sess = tf.Session()
sess.run(init)
random_obs = np.random.random(size=[1, env.observation_space.shape[0]])
random_action = env.action_space.sample()
print("obs: {}\naction: {}\noutput obs: {}\nouput reward: {}\noutput done: {}\noutput policy: {}".format(
random_obs,
random_action,
sess.run(output_obs_m,feed_dict={input_x_m: np.hstack([random_obs, [[random_action]]])}),
sess.run(output_reward_m,feed_dict={input_x_m: np.hstack([random_obs, [[random_action]]])}),
sess.run(output_done_m,feed_dict={input_x_m: np.hstack([random_obs, [[random_action]]])}),
sess.run(output_p,feed_dict={input_x_p: random_obs})))
# --------------------------------------------------
# 在真实环境中训练
real_rewards = []
num_episodes = 5000
# Trigger used to decide whether we should train from model or from real environment
train_from_model = False
train_first_steps = 500
# 初始化变量跟踪观测值,收益和行动
observations = np.empty(0).reshape(0,dimen)
rewards = np.empty(0).reshape(0,1)
actions = np.empty(0).reshape(0,1)
# 梯度
grads = np.array([np.zeros(var.get_shape().as_list()) for var in trainable_vars_p])
num_episode = 0
observation = env.reset()
while num_episode < num_episodes:
observation = observation.reshape(1,-1)
# 输出决策
policy = sess.run(output_p, feed_dict={input_x_p: observation})
# 根据策略选定行为,引入一定概率的随机决策
action = 0 if policy > np.random.uniform() else 1
# 跟踪观测值和行动
observations = np.vstack([observations, observation])
actions = np.vstack([actions, action])
# 从仿真环境或者真实环境中获取下一个观测值
if train_from_model:
observation, reward, done = step_model(sess, observations, action)
else:
observation, reward, done, _ = env.step(action)
# 跟踪收益
rewards = np.vstack([rewards, reward])
dones = np.zeros(shape=(len(observations),1))
# 游戏结束或者迭代次数够多
if done or len(observations) > 300:
print("\r{} / {} ".format(num_episode, num_episodes),end="")
# 判断训练环境
if not train_from_model:
# 训练模型的上一个状态
states = np.hstack([observations, actions])
prev_states = states[:-1,:]
next_states = states[1:, :]
next_rewards = rewards[1:, :]
next_dones = dones[1:, :]
feed_dict = {input_x_m: prev_states.astype(np.float32),
actual_obs_m: next_states.astype(np.float32),
actual_done_m: next_dones.astype(np.float32),
actual_reward_m: next_rewards.astype(np.float32)}
loss, _ = sess.run([loss_m, update_m], feed_dict=feed_dict)
real_rewards.append(sum(rewards))
# 折算收益
disc_rewards = discount(rewards, standardize=True)
# 计算梯度
grads += sess.run(grads_p, feed_dict={input_x_p: observations,
input_y_p: actions,
advantages_p: disc_rewards})
num_episode += 1
observation = env.reset()
# 重置变量
observations = np.empty(0).reshape(0,dimen)
rewards = np.empty(0).reshape(0,1)
actions = np.empty(0).reshape(0,1)
# Toggle between training from model and from real environment allowing sufficient time
# to train the model before its used for learning policy
if num_episode > train_first_steps:
train_from_model = not train_from_model
# If batch full
if num_episode % policy_batch_size == 0:
# 更新梯度
sess.run(update_grads_p, feed_dict={W1_grad_p: grads[0], W2_grad_p: grads[1]})
# 重置梯度
grads = np.array([np.zeros(var.get_shape().as_list()) for var in trainable_vars_p])
# 周期性输出提示信息
if (num_episode % (100 * policy_batch_size) == 0):
print("Episode {} last batch rewards: {}".format(
num_episode, sum(real_rewards[-policy_batch_size:])/policy_batch_size))
# 模型性能足够好时退出
if (sum(real_rewards[-10:]) / 10. >= 190): # 可以调至199等更高的值(200为满分)
print("Episode {} Training complete with total score of: {}".format(
num_episode, sum(real_rewards[-policy_batch_size:])/policy_batch_size))
break
# --------------------------------------------------
# 测试模型效果
observation = env.reset()
reward_sum = 0
model_losses = []
while True:
env.render()
observation = np.reshape(observation, [1, -1])
policy = sess.run(output_p, feed_dict={input_x_p: observation})
action = 0 if policy > 0.5 else 1
observation, reward, done, _ = env.step(action)
reward_sum += reward
if done:
print("Total score: {}".format(reward_sum))
break
1
2
3
4
5
6
7
299 / 5000 Episode 300 last batch rewards: [34.66666667]
599 / 5000 Episode 600 last batch rewards: [75.66666667]
899 / 5000 Episode 900 last batch rewards: [61.]
1199 / 5000 Episode 1200 last batch rewards: [200.]
1499 / 5000 Episode 1500 last batch rewards: [194.33333333]
1799 / 5000 Episode 1800 last batch rewards: [169.33333333]
1979 / 5000 Episode 1980 Training complete with total score of: [200.]

到这里,我们引入了两个神经网络,有很多超参数,鼓励读者尝试自己调整超参数来使模型训练更好更快。在下一节我们会探究如何使用卷积神经网络来在更复杂的环境(如雅达利游戏)中学习。

系列文章(翻译进度):

  1. (0) Q-Learning的查找表实现和神经网络实现
  2. (1) 双臂赌博机
  3. (1.5) — 上下文赌博机
  4. (2) —— 基于策略的Agents
  5. (3) —— 构建仿真环境来进行强化学习
  6. Part 4 — Deep Q-Networks and Beyond
  7. Part 5 — Visualizing an Agent’s Thoughts and Actions
  8. Part 6 — Partial Observability and Deep Recurrent Q-Networks
  9. Part 7 — Action-Selection Strategies for Exploration
  10. Part 8 — Asynchronous Actor-Critic Agents (A3C)
您的打赏是对我最大的鼓励!