Commit c0735fcd authored by Eduard Pizur's avatar Eduard Pizur
Browse files

initial double DQN

parent 60bb9f39
import collections
import cv2
import numpy as np
import matplotlib.pyplot as plt
import gym
def plot_learning_curve(x, scores, epsilons, filename, lines=None):
fig=plt.figure()
ax=fig.add_subplot(111, label="1")
ax2=fig.add_subplot(111, label="2", frame_on=False)
ax.plot(x, epsilons, color="C0")
ax.set_xlabel("Training Steps", color="C0")
ax.set_ylabel("Epsilon", color="C0")
ax.tick_params(axis='x', colors="C0")
ax.tick_params(axis='y', colors="C0")
N = len(scores)
running_avg = np.empty(N)
for t in range(N):
running_avg[t] = np.mean(scores[max(0, t-20):(t+1)])
ax2.scatter(x, running_avg, color="C1")
ax2.axes.get_xaxis().set_visible(False)
ax2.yaxis.tick_right()
ax2.set_ylabel('Score', color="C1")
ax2.yaxis.set_label_position('right')
ax2.tick_params(axis='y', colors="C1")
if lines is not None:
for line in lines:
plt.axvline(x=line)
plt.savefig(filename)
class RepeatActionAndMaxFrame(gym.Wrapper):
def __init__(self, env=None, repeat=4, clip_reward=False, no_ops=0,
fire_first=False):
super(RepeatActionAndMaxFrame, self).__init__(env)
self.repeat = repeat
self.shape = env.observation_space.low.shape
self.frame_buffer = np.zeros_like((2, self.shape))
self.clip_reward = clip_reward
self.no_ops = no_ops
self.fire_first = fire_first
def step(self, action):
t_reward = 0.0
done = False
for i in range(self.repeat):
obs, reward, done, info = self.env.step(action)
if self.clip_reward:
reward = np.clip(np.array([reward]), -1, 1)[0]
t_reward += reward
idx = i % 2
self.frame_buffer[idx] = obs
if done:
break
max_frame = np.maximum(self.frame_buffer[0], self.frame_buffer[1])
return max_frame, t_reward, done, info
def reset(self):
obs = self.env.reset()
no_ops = np.random.randint(self.no_ops)+1 if self.no_ops > 0 else 0
for _ in range(no_ops):
_, _, done, _ = self.env.step(0)
if done:
self.env.reset()
if self.fire_first:
assert self.env.unwrapped.get_action_meanings()[1] == 'FIRE'
obs, _, _, _ = self.env.step(1)
self.frame_buffer = np.zeros_like((2,self.shape))
self.frame_buffer[0] = obs
return obs
class PreprocessFrame(gym.ObservationWrapper):
def __init__(self, shape, env=None):
super(PreprocessFrame, self).__init__(env)
self.shape = (shape[2], shape[0], shape[1])
self.observation_space = gym.spaces.Box(low=0.0, high=1.0,
shape=self.shape, dtype=np.float32)
def observation(self, obs):
new_frame = cv2.cvtColor(obs, cv2.COLOR_RGB2GRAY)
resized_screen = cv2.resize(new_frame, self.shape[1:],
interpolation=cv2.INTER_AREA)
new_obs = np.array(resized_screen, dtype=np.uint8).reshape(self.shape)
new_obs = new_obs / 255.0
return new_obs
class StackFrames(gym.ObservationWrapper):
def __init__(self, env, repeat):
super(StackFrames, self).__init__(env)
self.observation_space = gym.spaces.Box(
env.observation_space.low.repeat(repeat, axis=0),
env.observation_space.high.repeat(repeat, axis=0),
dtype=np.float32)
self.stack = collections.deque(maxlen=repeat)
def reset(self):
self.stack.clear()
observation = self.env.reset()
for _ in range(self.stack.maxlen):
self.stack.append(observation)
return np.array(self.stack).reshape(self.observation_space.low.shape)
def observation(self, observation):
self.stack.append(observation)
return np.array(self.stack).reshape(self.observation_space.low.shape)
def make_env(env_name, shape=(84,84,1), repeat=4, clip_rewards=False,
no_ops=0, fire_first=False):
env = gym.make(env_name)
env = RepeatActionAndMaxFrame(env, repeat, clip_rewards, no_ops, fire_first)
env = PreprocessFrame(shape, env)
env = StackFrames(env, repeat)
return env
\ No newline at end of file
import torch as T
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
from parameters import *
class DQN(nn.Module):
'''
cnn based on the https://storage.googleapis.com/deepmind-media/dqn/DQNNaturePaper.pdf
'''
def __init__(self, input_shape, num_of_actions):
super(DQN, self).__init__()
'''
CNN model for predicting Q values
'''
self.input_shape = input_shape
self.cnn_layers = nn.Sequential(
nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4),
nn.ReLU(),
nn.Conv2d(32, 64, kernel_size=4, stride=2),
nn.ReLU(),
nn.Conv2d(64, 64, kernel_size=3, stride=1),
nn.ReLU()
)
self.fc_input = self.calculate_linear_input()
self.linear_layers = nn.Sequential(
nn.Linear(in_features=self.fc_input, out_features=512),
nn.ReLU(),
nn.Linear(in_features=512, out_features=num_of_actions)
)
self.optimizer = optim.RMSprop(self.parameters(), lr=LEARNING_RATE)
self.loss = nn.MSELoss().to(DEVICE)
self.to(DEVICE)
def calculate_linear_input(self, *conv_layers):
'''
returns size of input for fc layers
'''
fc_input = self.cnn_layers(torch.zeros(1, *self.input_shape))
return int(np.prod(fc_input.size()))
def forward(self, state):
'''
returns action based on the input observations
'''
state = self.cnn_layers(state)
state = state.view(state.size()[0], -1)
action = self.linear_layers(state)
return action
import random
import numpy as np
import torch as T
from deep_q_network import DQN
from replay_memory import ReplayMemory
from parameters import *
class Agent():
'''
DQN agent for atari games
'''
def __init__(self, num_of_actions, input_shape):
self.device = DEVICE
self.epsilon = EPSILON_START
self.eps_dec = EPSILON_DECREMENT
self.eps_min = EPSILON_MINIMUM
self.input_shape = input_shape
self.num_of_actions = num_of_actions
self.actions = list(range(self.num_of_actions))
self.training_loss = 0.0
# init replay memory
self.memory = ReplayMemory()
# init network
self.network = DQN(num_of_actions=self.num_of_actions,
input_shape=input_shape)
# init target network
self.target_network = DQN(num_of_actions=self.num_of_actions,
input_shape=input_shape)
self.target_network.load_state_dict(self.network.state_dict())
self.target_network.eval()
def choose_action(self, state):
'''
choosing action based on the Epsilon greedy strategy
'''
if random.random() > self.epsilon:
state_ = T.tensor([state], dtype=T.float32).to(DEVICE)
actions = self.network.forward(state_)
action = T.argmax(actions).item()
else:
action = random.choice(self.actions)
return action
def decay_epsilon(self):
'''
decay epsilon
'''
self.epsilon = self.epsilon - self.eps_dec if self.epsilon > self.eps_min else self.eps_min
def replace_weights(self):
'''
replacing weights from network to target network
'''
self.target_network.load_state_dict(self.network.state_dict())
def is_train_process_possible(self):
'''
returns True if memory is bigger or equal than batch size
'''
return len(self.memory) >= BATCH_SIZE
def extract_batch_of_memory(self):
'''
return batch of extracted experiences
'''
experiences = self.memory.sample()
states, actions, next_states, rewards, dones = zip(*experiences)
states = T.tensor(states, dtype=T.float32).to(self.device)
rewards = T.tensor(rewards, dtype=T.int32).to(self.device)
actions = T.tensor(actions, dtype=T.int64).to(self.device)
next_states = T.tensor(next_states, dtype=T.float32).to(self.device)
dones = T.tensor(dones, dtype=T.bool).to(self.device)
return states, actions, next_states, rewards, dones
def train(self):
'''
train our agent using DDQN
'''
states, actions, next_states, rewards, dones = self.extract_batch_of_memory()
# predicted values
q_vals_net = self.network.forward(states)
q_vals_net = q_vals_net.gather(1, actions.unsqueeze(-1)).squeeze(-1)
q_vals_next_net = self.network.forward(next_states)
max_next_actions = T.argmax(q_vals_next_net, dim=1)
q_vals_next_target_net = self.target_network.forward(next_states)
q_vals_next_target_net = q_vals_net.gather(1, max_next_actions.unsqueeze(-1)).squeeze(-1)
q_vals_next_target_net[dones] = 0.0
q_vals_next_target_net = q_vals_next_target_net.detach()
q_target = rewards + DISCOUNT_FACTOR * q_vals_next_target_net
# optimize network
loss = self.network.loss(q_vals_net, q_target)
self.network.optimizer.zero_grad()
loss.backward()
self.network.optimizer.step()
self.training_loss += loss.item()
import gym
import torch
import numpy as np
import time
from torch.utils.tensorboard import SummaryWriter
import datetime
import sys
import os
from double_dqn_agent import Agent
from gym.wrappers import AtariPreprocessing
from gym.wrappers import FrameStack
# import sys
# sys.path.append('../utils/atari_wrappers.py')
from atari_wrappers import make_env
# from .utils.atari_wrappers import make_env
from parameters import *
if __name__ == '__main__':
# Initialization env using wrappers for Atari Games preprocessing from gym
env = make_env(ENVIRONMENT)
# env = gym.make(ENVIRONMENT)
# env = AtariPreprocessing(env, noop_max=0)
# env = FrameStack(env, 4)
# Initializing agent
agent = Agent(num_of_actions=env.action_space.n,
input_shape=env.observation_space.shape)
# Init tensorboard
run_name = "runs/{}/{}".format("DDQN", datetime.datetime.now().strftime("%Y-%m-%d_%H-%M"))
writer = SummaryWriter(run_name)
best_score = 0
learn_steps = 0
scores = []
writer.add_scalar('Epsilon', agent.epsilon, learn_steps)
writer.add_scalar('Best Score', best_score, learn_steps)
writer.add_scalar('Score', best_score, learn_steps)
for episode in range(NUM_OF_EPISODES):
# initialization of each episode
state = env.reset()
done = False
score = 0
# repeat until dies
while not done:
action = agent.choose_action(state)
next_state, reward, done, _ = env.step(action)
# updating agent's memory
experience = (state, action, next_state, reward, done)
agent.memory.add(experience)
# train agent if has enough experiences in the memory
if agent.is_train_process_possible():
agent.train()
agent.decay_epsilon()
score += reward
learn_steps += 1
if learn_steps % 2000 == 0:
writer.add_scalar('Training loss', agent.training_loss / 2000, learn_steps)
agent.training_loss = 0.0
state = next_state
# each n-step update weights of target network
if learn_steps % TARGET_NET_UPDATE == 0:
agent.replace_weights()
# replace best_score with higher score
best_score = score if best_score < score else best_score
scores.append(score)
avg_score = np.mean(scores[-100:])
print('episode: ', episode,'score: ', score,
' average score %.1f' % avg_score, 'best score %.2f' % best_score,
'epsilon %.2f' % agent.epsilon, 'steps', learn_steps)
writer.add_scalar('Epsilon', agent.epsilon, learn_steps)
writer.add_scalar('Best Score', best_score, learn_steps)
writer.add_scalar('Score', score, learn_steps)
writer.close()
env.close()
import torch
# constants
ENVIRONMENT = "MsPacmanNoFrameskip-v4"
LEARNING_RATE = 0.0001
DISCOUNT_FACTOR = 0.99
EPSILON_START = 1
EPSILON_DECREMENT = 0.00001
EPSILON_MINIMUM = 0.05
NUM_OF_EPISODES = 600
TARGET_NET_UPDATE = 1_000
BATCH_SIZE = 32
REPLAY_MEMORY_SIZE = 10_000
DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
\ No newline at end of file
import random
from collections import deque
from parameters import *
class ReplayMemory:
'''
memory of experiences during training
'''
def __init__(self):
self.size = REPLAY_MEMORY_SIZE
self.batch_size = BATCH_SIZE
self.memory = deque(maxlen=self.size)
def __len__(self):
return len(self.memory)
def add(self, experience):
'''
appends experience to the memory
'''
self.memory.append(experience)
def sample(self):
'''
returns batch_size number of experiences from memory
'''
return random.sample(self.memory, self.batch_size)
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment