Commit 9866f13a authored by udoedo's avatar udoedo
Browse files

initial push

parent 6aad48b2
import torch as T
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
class DQN(nn.Module):
'''
cnn based on the https://storage.googleapis.com/deepmind-media/dqn/DQNNaturePaper.pdf
'''
def __init__(self, input_shape, num_of_actions, lr, device='cpu'):
super(DQN, self).__init__()
'''
CNN model for predicting Q values
'''
self.input_shape = input_shape
self.conv1 = nn.Conv2d(input_shape[0], 32, 8, stride=4)
self.conv2 = nn.Conv2d(32, 64, 4, stride=2)
self.conv3 = nn.Conv2d(64, 64, 3, stride=1)
fc_input = self.calculate_fc_input(self.conv1, self.conv2, self.conv3)
self.fc1 = nn.Linear(in_features=fc_input, out_features=512)
self.fc2 = nn.Linear(in_features=512, out_features=num_of_actions)
self.optimizer = optim.RMSprop(self.parameters(), lr=lr)
self.loss = nn.MSELoss()
self.to(device)
def calculate_fc_input(self, *conv_layers):
'''
returns size of input for fc layers
'''
width = self.input_shape[1]
for layer in conv_layers:
width = self.conv_output(width, layer.kernel_size, layer.padding, layer.stride)
fc_input = int(width*width*layer.out_channels)
return fc_input
def conv_output(self, width, filter_, padding, stride):
'''
returns output size of Conv2d layer
formula for calculating output size of Conv2d layer: w' = (width - filter +2 *padding)/stride + 1
return 3136
'''
return ((width - filter_[0] + 2*padding[0])/(stride[0]))+1
def forward(self, state):
'''
returns action based on the input observations
'''
conv1 = F.relu(self.conv1(state))
conv2 = F.relu(self.conv2(conv1))
conv3 = F.relu(self.conv3(conv2))
conv_state = conv3.view(conv3.size()[0], -1)
flat1 = F.relu(self.fc1(conv_state))
action = self.fc2(flat1)
return action
\ No newline at end of file
import random
import numpy as np
import torch as T
from deep_q_network import DQN
from replay_memory import ReplayMemory
class Agent():
'''
DQN agent for atari games
'''
def __init__(self, epsilon_start, num_of_actions, learning_rate, device, input_shape, \
epsilon_dec, epsilon_min, memory_size, discount_factor, batch_size):
self.learning_rate = learning_rate
self.discount_factor = discount_factor
self.device = device
self.input_shape = input_shape
self.memory_size = memory_size
self.batch_size = batch_size
self.epsilon = epsilon_start
self.eps_dec = epsilon_dec
self.eps_min = epsilon_min
self.num_of_actions = num_of_actions
self.actions = [i for i in range(self.num_of_actions)]
self.memory = ReplayMemory(max_len=self.memory_size, batch_size=self.batch_size)
self.network = DQN(lr=self.learning_rate,
num_of_actions=self.num_of_actions,
device = self.device,
input_shape=input_shape)
self.target_network = DQN(lr=self.learning_rate,
num_of_actions=self.num_of_actions,
device = self.device,
input_shape=input_shape)
def choose_action(self, observation):
'''
choosing action based on the Epsilon greedy strategy
'''
if random.random() > self.epsilon:
observation = T.tensor([observation], dtype=T.float)
actions = self.network.forward(observation)
action = T.argmax(actions).item()
else:
action = random.choice(self.actions)
return action
def decaying_epsilon(self):
'''
decaying epsilon
'''
self.epsilon *= self.eps_dec if self.epsilon > self.eps_min else self.eps_min
def replace_weights(self):
'''
replacing weight from network to target network
'''
self.target_network.load_state_dict(self.network.state_dict())
def is_train_process_possible(self):
'''
returns True if memory is bigger or equal than batch size
'''
return len(self.memory) >= self.batch_size
def sample_memory(self):
'''
return batch of extracted experiences
'''
experiences = self.memory.sample()
observations, actions, rewards, new_observations, dones = self.extract_experiences(experiences)
observations = T.tensor(observations ,dtype=T.float32).to(self.device)
rewards = T.tensor(rewards).to(self.device,dtype=T.int32)
actions = T.tensor(actions).to(self.device,dtype=T.int32)
next_observations = T.tensor(new_observations, dtype=T.float32).to(self.device)
dones = T.tensor(dones).to(self.device,dtype=T.int32)
return observations, actions, rewards, next_observations, dones
def extract_experiences(self, experiences):
'''
extract observation, action, new_bservation, reward from each experience
'''
observation, action, new_observation, reward, dones = [],[],[],[],[]
for experience in experiences:
observation.append(experience.state)
action.append(experience.action)
new_observation.append(experience.next_state)
reward.append(experience.reward)
dones.append(experience.dones)
return observation, action, reward, new_observation, dones
def train(self):
'''
train our agent's network
'''
self.network.optimizer.zero_grad()
observation, actions, rewards, new_observation, dones = self.sample_memory()
q_values_output = self.network.forward(observation)
q_values_next_output = self.network.forward(new_observation)
q_values_current_state = []
q_values_next_state = []
for i in range(self.batch_size):
q_values_current_state.append(q_values_output[i][int(actions[i])].item())
if dones[i].item() != 1:
q_values_next_state.append(T.max(q_values_next_output[i]).item())
else:
q_values_next_state.append(0.0)
q_target = rewards + self.discount_factor*T.tensor(q_values_next_state,requires_grad=True)
#optimize network
loss = self.network.loss(T.tensor(q_values_current_state), q_target)
loss.backward()
self.network.optimizer.step()
# decaying epsilon
self.decaying_epsilon()
import gym
import torch
import numpy as np
import time
from torch.utils.tensorboard import SummaryWriter
from collections import namedtuple
import datetime
from dqn_agent import Agent
from gym.wrappers import AtariPreprocessing
from gym.wrappers import FrameStack
# constants
LEARNING_RATE = 0.00025
DISCOUNT_FACTOR = 0.99
EPSILON_START = 1
EPSILON_DECREMENT = 0.9995
EPSILON_MINIMUM = 0.1
NUM_OF_EPISODES = 1_000
TARGET_UPDATE = 100
BATCH_SIZE = 32
REPLAY_MEMORY_SIZE = 50_000
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
Experience = namedtuple('Experience', ('state', 'action', 'next_state', 'reward', 'dones'))
if __name__ == '__main__':
# Initialization env using warppers for Atari Games preprocessing from gym
env = gym.make("MsPacmanNoFrameskip-v4")
env = AtariPreprocessing(env)
env = FrameStack(env, 4)
best_score = 0
# Init tensorboard
log_name = "logs/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
tb = SummaryWriter(log_name)
# Initializing agents with hyperparameters
agent = Agent(learning_rate=LEARNING_RATE,
discount_factor=DISCOUNT_FACTOR,
epsilon_start=EPSILON_START,
epsilon_dec=EPSILON_DECREMENT,
epsilon_min=EPSILON_MINIMUM,
memory_size=REPLAY_MEMORY_SIZE,
batch_size=BATCH_SIZE,
num_of_actions=env.action_space.n,
input_shape=env.observation_space.shape,
device=DEVICE)
tb.add_graph(agent.network)
learn_steps = 0
for episode in range(NUM_OF_EPISODES):
# initilization of each episode
observation = env.reset()
done = False
score = 0
# Writing on Tensorboard, showing values of constants during training
tb.add_scalar('Epsilon', agent.epsilon, learn_steps)
tb.add_scalar('Best Score', best_score, learn_steps)
# repeating until dies
while not done:
# env.render()
action = agent.choose_action(observation)
new_observation, reward, done, info = env.step(action)
# updating agent memory
experience = Experience(observation, action, new_observation, reward, done)
agent.memory.update(experience)
# train agent if has enough experiences in memory
if agent.is_train_process_possible():
agent.train()
score += reward
learn_steps += 1
observation = new_observation
# each n-step update weights of target network
if learn_steps % TARGET_UPDATE == 0:
agent.replace_weights()
# replace best_score with bigger score
best_score = score if best_score > score else best_score
print('episode',episode,'score:',score,'best score',best_score)
tb.add_scalar('Score', score, learn_steps)
tb.close()
env.close()
import random
class ReplayMemory:
'''
memory of experiences during training
'''
def __init__(self, max_len, batch_size):
self.max_len = max_len
self.memory = []
self.batch_size = batch_size
self.position = 0
def update(self, experience):
'''
appends experience to the memory
'''
if len(self.memory) < self.max_len:
self.memory.append(None)
self.memory[self.position] = experience
self.position = (self.position + 1) % self.max_len
def sample(self):
'''
returns batch_size number of experiences from memory
'''
return random.sample(self.memory, self.batch_size)
def __len__(self):
return len(self.memory)
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment