Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
Iveta Kaminská
2020smad
Commits
dd3e3e12
Commit
dd3e3e12
authored
Jun 30, 2020
by
Ladislav Hlatky
Browse files
Upload New File
parent
2fff7e16
Changes
1
Hide whitespace changes
Inline
Side-by-side
DQN/dqn-test3.py
0 → 100644
View file @
dd3e3e12
import
gym
import
numpy
as
np
import
pandas
as
pd
from
collections
import
deque
import
random
import
keras
import
time
import
os
import
sys
from
io
import
StringIO
from
keras
import
Sequential
from
keras.layers
import
Dense
from
keras.activations
import
relu
,
linear
from
keras.optimizers
import
Adam
from
keras.losses
import
mean_squared_error
from
keras.models
import
load_model
from
collections
import
deque
# pristup k hodnotam loss funkcie
class
LossHistory
(
keras
.
callbacks
.
Callback
):
def
on_train_begin
(
self
,
logs
=
{}):
self
.
losses
=
[]
def
on_batch_end
(
self
,
batch
,
logs
=
{}):
self
.
losses
.
append
(
logs
.
get
(
'loss'
))
class
DQN
:
def
__init__
(
self
,
env
,
lr
,
gamma
,
epsilon
,
epsilon_decay
,
epsilon_min
,
batch_size
,
memory_size
):
# inicializacia agenta
self
.
env
=
env
self
.
action_space
=
env
.
action_space
self
.
observation_space
=
env
.
observation_space
self
.
counter
=
0
self
.
lr
=
lr
self
.
gamma
=
gamma
self
.
epsilon
=
epsilon
self
.
epsilon_decay
=
epsilon_decay
self
.
rewards_list
=
[]
self
.
loss_callback
=
LossHistory
()
self
.
replay_memory_buffer
=
deque
(
maxlen
=
memory_size
)
self
.
batch_size
=
batch_size
self
.
epsilon_min
=
epsilon_min
self
.
num_action_space
=
self
.
action_space
.
n
self
.
num_observation_space
=
env
.
observation_space
.
shape
[
0
]
self
.
model
=
self
.
initialize_model
()
self
.
loss
=
0
def
initialize_model
(
self
):
model
=
Sequential
()
model
.
add
(
Dense
(
512
,
input_dim
=
self
.
num_observation_space
,
activation
=
relu
))
model
.
add
(
Dense
(
256
,
activation
=
relu
))
model
.
add
(
Dense
(
self
.
num_action_space
,
activation
=
linear
))
# kompilacia modelu
model
.
compile
(
loss
=
mean_squared_error
,
optimizer
=
Adam
(
lr
=
self
.
lr
))
print
(
model
.
summary
())
return
model
def
get_action
(
self
,
state
):
if
np
.
random
.
rand
()
<
self
.
epsilon
:
return
random
.
randrange
(
self
.
num_action_space
)
predicted_actions
=
self
.
model
.
predict
(
state
)
return
np
.
argmax
(
predicted_actions
[
0
])
def
add_to_replay_memory
(
self
,
state
,
action
,
reward
,
next_state
,
done
):
self
.
replay_memory_buffer
.
append
((
state
,
action
,
reward
,
next_state
,
done
))
def
learn_and_update_weights_by_reply
(
self
):
# kontrola velkosti replay_memory_buffer-a
if
len
(
self
.
replay_memory_buffer
)
<
self
.
batch_size
or
self
.
counter
!=
0
:
return
# Early Stopping - predchadzanie pretrenovaniu
if
np
.
mean
(
self
.
rewards_list
[
-
10
:])
>
180
:
return
random_sample
=
self
.
get_random_sample_from_replay_mem
()
states
,
actions
,
rewards
,
next_states
,
done_list
=
self
.
get_attribues_from_sample
(
random_sample
)
# aktualizacia targets podla dqn
targets
=
rewards
+
self
.
gamma
*
(
np
.
amax
(
self
.
model
.
predict_on_batch
(
next_states
),
axis
=
1
))
*
(
1
-
done_list
)
target_vec
=
self
.
model
.
predict_on_batch
(
states
)
indexes
=
np
.
array
([
i
for
i
in
range
(
self
.
batch_size
)])
target_vec
[[
indexes
],
[
actions
]]
=
targets
# validacia je kvoli tomu, ze potrebujeme loss callback pocas trenovania
self
.
model
.
fit
(
states
,
target_vec
,
epochs
=
1
,
verbose
=
0
,
batch_size
=
self
.
batch_size
,
validation_split
=
0.2
,
callbacks
=
[
self
.
loss_callback
])
self
.
loss
=
self
.
loss_callback
.
losses
[
0
]
def
get_attribues_from_sample
(
self
,
random_sample
):
states
=
np
.
array
([
i
[
0
]
for
i
in
random_sample
])
actions
=
np
.
array
([
i
[
1
]
for
i
in
random_sample
])
rewards
=
np
.
array
([
i
[
2
]
for
i
in
random_sample
])
next_states
=
np
.
array
([
i
[
3
]
for
i
in
random_sample
])
done_list
=
np
.
array
([
i
[
4
]
for
i
in
random_sample
])
states
=
np
.
squeeze
(
states
)
next_states
=
np
.
squeeze
(
next_states
)
return
np
.
squeeze
(
states
),
actions
,
rewards
,
next_states
,
done_list
def
get_random_sample_from_replay_mem
(
self
):
random_sample
=
random
.
sample
(
self
.
replay_memory_buffer
,
self
.
batch_size
)
return
random_sample
def
train
(
self
,
number_of_tests
,
num_episodes
=
50
,
number_of_steps
=
1000
,
can_stop
=
True
):
tests_duration
=
[]
tests_num_of_ep_to_solve
=
np
.
full
(
number_of_tests
,
-
1
)
tests_duration_when_solved
=
np
.
full
(
number_of_tests
,
-
1
)
f_duration_in_ep
=
open
(
test_directory_name
+
"/duration_in_episodes.txt"
,
"w"
,
buffering
=
1
)
f_duration_when_solved
=
open
(
test_directory_name
+
"/duration_when_solved.txt"
,
"w"
,
buffering
=
1
)
for
test
in
range
(
number_of_tests
):
test_start
=
int
(
time
.
time
())
rewards_per_episode
=
[]
f_rewards_per_episode
=
open
(
"./"
+
test_directory_name
+
"/"
+
str
(
test
)
+
"_rewards.txt"
,
"a"
,
buffering
=
1
)
f_epsilons_per_episode
=
open
(
"./"
+
test_directory_name
+
"/"
+
str
(
test
)
+
"_epsilons.txt"
,
"a"
,
buffering
=
1
)
f_avg_rewards_per_episode
=
open
(
"./"
+
test_directory_name
+
"/"
+
str
(
test
)
+
"_avg.txt"
,
"a"
,
buffering
=
1
)
f_steps_per_episode
=
open
(
"./"
+
test_directory_name
+
"/"
+
str
(
test
)
+
"_steps.txt"
,
"a"
,
buffering
=
1
)
f_avg_loss_values_per_episode
=
open
(
"./"
+
test_directory_name
+
"/"
+
str
(
test
)
+
"_loss.txt"
,
"a"
,
buffering
=
1
)
episode
=
0
step
=
0
loss_history
=
0
print
(
tests_num_of_ep_to_solve
)
print
(
tests_duration_when_solved
)
for
episode
in
range
(
num_episodes
):
state
=
env
.
reset
()
reward_for_episode
=
0
loss_values_per_step
=
[
0
]
state
=
np
.
reshape
(
state
,
[
1
,
self
.
num_observation_space
])
for
step
in
range
(
number_of_steps
):
env
.
render
()
received_action
=
self
.
get_action
(
state
)
next_state
,
reward
,
done
,
info
=
env
.
step
(
received_action
)
next_state
=
np
.
reshape
(
next_state
,
[
1
,
self
.
num_observation_space
])
self
.
add_to_replay_memory
(
state
,
received_action
,
reward
,
next_state
,
done
)
reward_for_episode
+=
reward
state
=
next_state
self
.
update_counter
()
self
.
learn_and_update_weights_by_reply
()
loss_history
+=
self
.
loss
loss_values_per_step
.
append
(
loss_history
)
if
done
:
break
# zmensovanie epsilonu po kazdej skusenosti
if
self
.
epsilon
>
self
.
epsilon_min
:
self
.
epsilon
*=
self
.
epsilon_decay
self
.
rewards_list
.
append
(
reward_for_episode
)
last_rewards_mean
=
np
.
mean
(
self
.
rewards_list
[
-
100
:])
f_rewards_per_episode
.
write
(
'%d
\n
'
%
reward_for_episode
)
f_epsilons_per_episode
.
write
(
'%f
\n
'
%
self
.
epsilon
)
f_avg_rewards_per_episode
.
write
(
'%f
\n
'
%
last_rewards_mean
)
f_steps_per_episode
.
write
(
'%d
\n
'
%
step
)
mean_loss
=
np
.
mean
(
loss_values_per_step
)
f_avg_loss_values_per_episode
.
write
(
f
'%f
\n
'
%
mean_loss
)
if
(
last_rewards_mean
>=
200
and
len
(
rewards_per_episode
)
>=
100
)
and
(
not
can_stop
):
self
.
model
.
save
(
"./"
+
test_directory_name
+
"/q_saved-model_t"
+
str
(
test
))
self
.
model
.
save
(
"./"
+
test_directory_name
+
"/t_saved-model_t"
+
str
(
test
))
self
.
tests_num_of_ep_to_solve
[
test
]
=
episode
tests_duration_when_solved
[
test
]
=
int
(
time
.
time
())
-
test_start
print
(
"DQN Training Complete..."
)
# break
print
(
episode
,
"
\t
: Episode || Reward: "
,
reward_for_episode
,
"
\t
|| Average Reward: "
,
last_rewards_mean
,
"
\t
epsilon: "
,
self
.
epsilon
)
test_end
=
int
(
time
.
time
())
tests_duration
.
append
(
test_end
-
test_start
)
f_rewards_per_episode
.
close
()
f_epsilons_per_episode
.
close
()
f_avg_rewards_per_episode
.
close
()
f_steps_per_episode
.
close
()
f_avg_loss_values_per_episode
.
close
()
f_report
.
write
(
f
"Test %d
\n
duration:
\n
%d
\n
episodes to LAST_100_REWARD_GOAL
\n
%d
\n
"
%
(
test
,
tests_duration
[
test
],
tests_num_of_ep_to_solve
[
test
]))
f_duration_in_ep
.
write
(
"%d
\n
"
%
(
tests_num_of_ep_to_solve
[
test
]))
f_duration_when_solved
.
write
(
"%d
\n
"
%
(
tests_duration_when_solved
[
test
]))
DQN
(
env
,
0.001
,
0.99
,
1.0
,
0.995
,
0.01
,
64
,
500000
)
f_report
.
close
()
f_duration_in_ep
.
close
()
f_duration_when_solved
.
close
()
print
(
"End of tests."
)
def
update_counter
(
self
):
self
.
counter
+=
1
step_size
=
5
self
.
counter
=
self
.
counter
%
step_size
def
create_report
(
f
,
model
,
lr
,
epsilon
,
epsilon_decay
,
gamma
,
training_episodes
,
number_of_tests
,
number_of_steps
,
epsilon_min
):
timestamp
=
int
(
time
.
time
())
f
.
write
(
"testing_id: %d
\n
"
%
timestamp
)
f
.
write
(
"lander-doubleDQN-v13
\n
"
)
f
.
write
(
"number of tests:
\t
%d
\n
"
%
number_of_tests
)
f
.
write
(
"NUMBER_OF_EPISODES_MAX:
\t
%d
\n
"
%
training_episodes
)
f
.
write
(
"NUMBER_OF_STEPS_MAX:
\t
%d
\n
"
%
number_of_steps
)
f
.
write
(
"LEARNING_RATE:
\t
%f
\n
"
%
lr
)
f
.
write
(
"GAMMA:
\t
%f
\n
"
%
gamma
)
f
.
write
(
"EPSILON_MAX:
\t
%f
\n
"
%
epsilon
)
f
.
write
(
"EPSILON_MIN:
\t
%f
\n
"
%
epsilon_min
)
f
.
write
(
"EPSILON_DECAY:
\t
%f
\n
"
%
epsilon_decay
)
f
.
write
(
"MEMORY_SIZE:
\t
%d
\n
"
%
memory_size
)
f
.
write
(
"BATCH_SIZE:
\t
%d
\n
"
%
batch_size
)
f
.
write
(
f
"LAST_100_REWARD_GOAL:
\t
%f
\r\n
"
%
200
)
tmp_smry
=
StringIO
()
model
.
summary
(
print_fn
=
lambda
x
:
tmp_smry
.
write
(
x
+
'
\n
'
))
summary
=
tmp_smry
.
getvalue
()
f
.
write
(
summary
)
if
__name__
==
'__main__'
:
test_directory_name
=
sys
.
argv
[
1
]
number_of_tests
=
int
(
sys
.
argv
[
2
])
# nastavenie parametrov
lr
=
0.001
epsilon
=
1.0
epsilon_decay
=
0.995
epsilon_min
=
0.01
gamma
=
0.99
training_episodes
=
50
number_of_steps
=
1000
batch_size
=
64
memory_size
=
500000
if
not
os
.
path
.
exists
(
test_directory_name
):
os
.
makedirs
(
test_directory_name
)
env
=
gym
.
make
(
'LunarLander-v2'
)
env
.
seed
(
21
)
np
.
random
.
seed
(
21
)
solver
=
DQN
(
env
,
lr
,
gamma
,
epsilon
,
epsilon_decay
,
epsilon_min
,
batch_size
,
memory_size
)
# vytvorenie reportu na zaciatku trenovania
f_report
=
open
(
test_directory_name
+
"/testing_report.txt"
,
"w+"
)
create_report
(
f_report
,
solver
.
model
,
lr
,
epsilon
,
epsilon_decay
,
gamma
,
training_episodes
,
number_of_tests
,
number_of_steps
,
epsilon_min
)
solver
.
train
(
number_of_tests
,
training_episodes
,
number_of_steps
,
True
)
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment