OpenAI Gym에서 제공하는 CartPole-v1 환경을 대상으로 Double DQN 알고리즘을 Tensorflow2 코드로 구현하였다.
학습결과는 다음과 같다.
다음은 학습이 끝난 후 카트폴의 움직임이다.
Double DQN 코드는 Q 신경망을 구현하고 학습시키기 위한 doubledqn_learn.py, 이를 실행시키기 위한 doubledqn_main.py, 학습을 마친 신경망 파라미터를 읽어와 에이전트를 구동하기 위한 doubledqn_load_play.py 그리고 리플레이 버퍼를 구현한 replaybuffer.py로 구성되어 있다.
전체 코드 구조는 다음과 같다.
다음은 Tensorflow 2 코드다.
doubledqn_learn.py
# Double DQN learn (tf2 subclassing API version)
# coded by St.Watermelon
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
import tensorflow as tf
from replaybuffer import ReplayBuffer
# Q network
class DoubleDQN(Model):
def __init__(self, action_n):
super(DoubleDQN, self).__init__()
self.h1 = Dense(64, activation='relu')
self.h2 = Dense(32, activation='relu')
self.h3 = Dense(16, activation='relu')
self.q = Dense(action_n, activation='linear')
def call(self, x):
x = self.h1(x)
x = self.h2(x)
x = self.h3(x)
q = self.q(x)
return q
class DoubleDQNagent(object):
def __init__(self, env):
## hyperparameters
self.GAMMA = 0.95
self.BATCH_SIZE = 32
self.BUFFER_SIZE = 20000
self.DDQN_LEARNING_RATE = 0.001
self.TAU = 0.001
self.EPSILON = 1.0
self.EPSILON_DECAY = 0.995
self.EPSILON_MIN = 0.01
self.env = env
# get state dimension and action number
self.state_dim = env.observation_space.shape[0] # 4
self.action_n = env.action_space.n # 2
## create Q networks
self.doubledqn = DoubleDQN(self.action_n)
self.target_doubledqn = DoubleDQN(self.action_n)
self.doubledqn.build(input_shape=(None, self.state_dim))
self.target_doubledqn.build(input_shape=(None, self.state_dim))
self.doubledqn.summary()
# optimizer
self.doubledqn_opt = Adam(self.DDQN_LEARNING_RATE)
## initialize replay buffer
self.buffer = ReplayBuffer(self.BUFFER_SIZE)
# save the results
self.save_epi_reward = []
## get action
def choose_action(self, state):
if np.random.random() <= self.EPSILON:
return self.env.action_space.sample()
else:
qs = self.doubledqn(tf.convert_to_tensor([state], dtype=tf.float32))
return np.argmax(qs.numpy())
## transfer actor weights to target actor with a tau
def update_target_network(self, TAU):
phi = self.doubledqn.get_weights()
target_phi = self.target_doubledqn.get_weights()
for i in range(len(phi)):
target_phi[i] = TAU * phi[i] + (1 - TAU) * target_phi[i]
self.target_doubledqn.set_weights(target_phi)
## single gradient update on a single batch data
def doubledqn_learn(self, states, actions, td_targets):
with tf.GradientTape() as tape:
one_hot_actions = tf.one_hot(actions, self.action_n)
q = self.doubledqn(states, training=True)
q_values = tf.reduce_sum(one_hot_actions * q, axis=1, keepdims=True)
loss = tf.reduce_mean(tf.square(q_values-td_targets))
grads = tape.gradient(loss, self.doubledqn.trainable_variables)
self.doubledqn_opt.apply_gradients(zip(grads, self.doubledqn.trainable_variables))
## computing TD target: y_k = r_k + gamma* max Q(s_k+1, a)
def td_target(self, rewards, target_qs, max_a, dones):
one_hot_max_a = tf.one_hot(max_a, self.action_n)
max_q = tf.reduce_sum(one_hot_max_a * target_qs, axis=1, keepdims=True)
y_k = np.zeros(max_q.shape)
for i in range(max_q.shape[0]): # number of batch
if dones[i]:
y_k[i] = rewards[i]
else:
y_k[i] = rewards[i] + self.GAMMA * max_q[i]
return y_k
## load actor weights
def load_weights(self, path):
self.doubledqn.load_weights(path + 'cartpole_ddqn.h5')
## train the agent
def train(self, max_episode_num):
# initial transfer model weights to target model network
self.update_target_network(1.0)
for ep in range(int(max_episode_num)):
# reset episode
time, episode_reward, done = 0, 0, False
# reset the environment and observe the first state
state = self.env.reset()
while not done:
# visualize the environment
#self.env.render()
# pick an action
action = self.choose_action(state)
# observe reward, new_state
next_state, reward, done, _ = self.env.step(action)
train_reward = reward + time*0.01
# add transition to replay buffer
self.buffer.add_buffer(state, action, train_reward, next_state, done)
if self.buffer.buffer_count() > 1000: # start train after buffer has some amounts
# decaying EPSILON
if self.EPSILON > self.EPSILON_MIN:
self.EPSILON *= self.EPSILON_DECAY
# sample transitions from replay buffer
states, actions, rewards, next_states, dones = self.buffer.sample_batch(self.BATCH_SIZE)
# compute max_a = argmax Q_phi(next_states, a)
curr_net_qs = self.doubledqn(tf.convert_to_tensor(next_states, dtype=tf.float32))
max_a = np.argmax(curr_net_qs.numpy(), axis=1)
# predict target Q-values
target_qs = self.target_doubledqn(tf.convert_to_tensor(
next_states, dtype=tf.float32))
# compute TD targets
y_i = self.td_target(rewards, target_qs.numpy(), max_a, dones)
# train critic using sampled batch
self.doubledqn_learn(tf.convert_to_tensor(states, dtype=tf.float32),
actions,
tf.convert_to_tensor(y_i, dtype=tf.float32))
# update target network
self.update_target_network(self.TAU)
# update current state
state = next_state
episode_reward += reward
time += 1
## display rewards every episode
print('Episode: ', ep+1, 'Time: ', time, 'Reward: ', episode_reward)
self.save_epi_reward.append(episode_reward)
## save weights every episode
self.doubledqn.save_weights("./save_weights/cartpole_ddqn.h5")
np.savetxt('./save_weights/cartpole_epi_reward.txt', self.save_epi_reward)
## save them to file if done
def plot_result(self):
plt.plot(self.save_epi_reward)
plt.show()
doubledqn_main.py
# DoubleDQN main
# coded by St.Watermelon
from doubledqn_learn import DoubleDQNagent
import gym
def main():
max_episode_num = 500
env_name = 'CartPole-v1'
env = gym.make(env_name)
agent = DoubleDQNagent(env)
agent.train(max_episode_num)
agent.plot_result()
if __name__=="__main__":
main()
doubledqn_load_play.py
# DoubleDQN load and play
# coded by St.Watermelon
import gym
import numpy as np
import tensorflow as tf
from doubledqn_learn import DoubleDQNagent
def main():
env_name = 'CartPole-v1'
env = gym.make(env_name)
print(env.observation_space.shape[0]) # 4
# get action dimension
print(env.action_space, env.observation_space)
agent = DoubleDQNagent(env)
agent.load_weights('./save_weights/')
time = 0
state = env.reset()
while True:
env.render()
qs = agent.doubledqn(tf.convert_to_tensor([state], dtype=tf.float32))
action = np.argmax(qs.numpy())
state, reward, done, _ = env.step(action)
time += 1
print('Time: ', time, 'Reward: ', reward)
if done:
break
env.close()
if __name__=="__main__":
main()
'AI 딥러닝 > RL' 카테고리의 다른 글
Tensorflow2로 만든 DDPG 코드: Pendulum-v0 (0) | 2021.05.14 |
---|---|
DQN에서 DDPG로 (0) | 2021.05.14 |
Double DQN 알고리즘 (0) | 2021.05.11 |
Tensorflow2로 만든 DQN 코드: CartPole-v1 (0) | 2021.05.04 |
DQN 알고리즘 - 2 (0) | 2021.05.04 |
댓글