-
Notifications
You must be signed in to change notification settings - Fork 859
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
chenyingyinglalala
committed
Nov 3, 2019
0 parents
commit 2ad8b9f
Showing
60 changed files
with
5,559 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
import numpy as np | ||
import pandas as pd | ||
import matplotlib.pyplot as plt | ||
import time | ||
|
||
ALPHA = 0.1 | ||
GAMMA = 0.95 | ||
EPSILION = 0.9 | ||
N_STATE = 20 | ||
ACTIONS = ['left', 'right'] | ||
MAX_EPISODES = 200 | ||
FRESH_TIME = 0.1 | ||
|
||
def build_q_table(n_state, actions): | ||
q_table = pd.DataFrame( | ||
np.zeros((n_state, len(actions))), | ||
np.arange(n_state), | ||
actions | ||
) | ||
return q_table | ||
|
||
def choose_action(state, q_table): | ||
#epslion - greedy policy | ||
state_action = q_table.loc[state,:] | ||
if np.random.uniform()>EPSILION or (state_action==0).all(): | ||
action_name = np.random.choice(ACTIONS) | ||
else: | ||
action_name = state_action.idxmax() | ||
return action_name | ||
|
||
def get_env_feedback(state, action): | ||
if action=='right': | ||
if state == N_STATE-2: | ||
next_state = 'terminal' | ||
reward = 1 | ||
else: | ||
next_state = state+1 | ||
reward = -0.5 | ||
else: | ||
if state == 0: | ||
next_state = 0 | ||
|
||
else: | ||
next_state = state-1 | ||
reward = -0.5 | ||
return next_state, reward | ||
|
||
def update_env(state,episode, step_counter): | ||
env = ['-'] *(N_STATE-1)+['T'] | ||
if state =='terminal': | ||
print("Episode {}, the total step is {}".format(episode+1, step_counter)) | ||
final_env = ['-'] *(N_STATE-1)+['T'] | ||
return True, step_counter | ||
else: | ||
env[state]='*' | ||
env = ''.join(env) | ||
print(env) | ||
time.sleep(FRESH_TIME) | ||
return False, step_counter | ||
|
||
|
||
def q_learning(): | ||
q_table = build_q_table(N_STATE, ACTIONS) | ||
step_counter_times = [] | ||
for episode in range(MAX_EPISODES): | ||
state = 0 | ||
is_terminal = False | ||
step_counter = 0 | ||
update_env(state, episode, step_counter) | ||
while not is_terminal: | ||
action = choose_action(state,q_table) | ||
next_state, reward = get_env_feedback(state, action) | ||
next_q = q_table.loc[state, action] | ||
if next_state == 'terminal': | ||
is_terminal = True | ||
q_target = reward | ||
else: | ||
delta = reward + GAMMA*q_table.iloc[next_state,:].max()-q_table.loc[state, action] | ||
q_table.loc[state, action] += ALPHA*delta | ||
state = next_state | ||
is_terminal,steps = update_env(state, episode, step_counter+1) | ||
step_counter+=1 | ||
if is_terminal: | ||
step_counter_times.append(steps) | ||
|
||
return q_table, step_counter_times | ||
|
||
def main(): | ||
q_table, step_counter_times= q_learning() | ||
print("Q table\n{}\n".format(q_table)) | ||
print('end') | ||
|
||
plt.plot(step_counter_times,'g-') | ||
plt.ylabel("steps") | ||
plt.show() | ||
print("The step_counter_times is {}".format(step_counter_times)) | ||
|
||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
|
||
import numpy as np | ||
import pandas as pd | ||
import matplotlib.pyplot as plt | ||
import time | ||
|
||
ALPHA = 0.1 | ||
GAMMA = 0.95 | ||
EPSILION = 0.9 | ||
N_STATE = 6 | ||
ACTIONS = ['left', 'right'] | ||
MAX_EPISODES = 200 | ||
FRESH_TIME = 0.1 | ||
|
||
def build_q_table(n_state, actions): | ||
q_table = pd.DataFrame( | ||
np.zeros((n_state, len(actions))), | ||
np.arange(n_state), | ||
actions | ||
) | ||
return q_table | ||
|
||
def choose_action(state, q_table): | ||
#epslion - greedy policy | ||
state_action = q_table.loc[state,:] | ||
if np.random.uniform()>EPSILION or (state_action==0).all(): | ||
action_name = np.random.choice(ACTIONS) | ||
else: | ||
action_name = state_action.idxmax() | ||
return action_name | ||
|
||
def get_env_feedback(state, action): | ||
if action=='right': | ||
if state == N_STATE-2: | ||
next_state = 'terminal' | ||
reward = 1 | ||
else: | ||
next_state = state+1 | ||
reward = -0.5 | ||
else: | ||
if state == 0: | ||
next_state = 0 | ||
|
||
else: | ||
next_state = state-1 | ||
reward = -0.5 | ||
return next_state, reward | ||
|
||
def update_env(state,episode, step_counter): | ||
env = ['-'] *(N_STATE-1)+['T'] | ||
if state =='terminal': | ||
print("Episode {}, the total step is {}".format(episode+1, step_counter)) | ||
final_env = ['-'] *(N_STATE-1)+['T'] | ||
return True, step_counter | ||
else: | ||
env[state]='*' | ||
env = ''.join(env) | ||
print(env) | ||
time.sleep(FRESH_TIME) | ||
return False, step_counter | ||
|
||
|
||
def sarsa_learning(): | ||
q_table = build_q_table(N_STATE, ACTIONS) | ||
step_counter_times = [] | ||
for episode in range(MAX_EPISODES): | ||
state = 0 | ||
is_terminal = False | ||
step_counter = 0 | ||
update_env(state, episode, step_counter) | ||
while not is_terminal: | ||
action = choose_action(state,q_table) | ||
next_state, reward = get_env_feedback(state, action) | ||
if next_state != 'terminal': | ||
next_action = choose_action(next_state, q_table) #sarsa update method | ||
else: | ||
next_action = action | ||
next_q = q_table.loc[state, action] | ||
|
||
if next_state == 'terminal': | ||
is_terminal = True | ||
q_target = reward | ||
else: | ||
delta = reward + GAMMA*q_table.loc[next_state,next_action]-q_table.loc[state, action] | ||
q_table.loc[state, action] += ALPHA*delta | ||
state = next_state | ||
is_terminal,steps = update_env(state, episode, step_counter+1) | ||
step_counter+=1 | ||
if is_terminal: | ||
step_counter_times.append(steps) | ||
|
||
return q_table, step_counter_times | ||
|
||
def main(): | ||
q_table, step_counter_times= sarsa_learning() | ||
print("Q table\n{}\n".format(q_table)) | ||
print('end') | ||
|
||
plt.plot(step_counter_times,'g-') | ||
plt.ylabel("steps") | ||
plt.show() | ||
print("The step_counter_times is {}".format(step_counter_times)) | ||
|
||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,152 @@ | ||
|
||
import numpy as np | ||
|
||
class GridWorld: | ||
|
||
def __init__(self, tot_row, tot_col): | ||
self.action_space_size = 4 | ||
self.world_row = tot_row | ||
self.world_col = tot_col | ||
#The world is a matrix of size row x col x 2 | ||
#The first layer contains the obstacles | ||
#The second layer contains the rewards | ||
#self.world_matrix = np.zeros((tot_row, tot_col, 2)) | ||
self.transition_matrix = np.ones((self.action_space_size, self.action_space_size))/ self.action_space_size | ||
#self.transition_array = np.ones(self.action_space_size) / self.action_space_size | ||
self.reward_matrix = np.zeros((tot_row, tot_col)) | ||
self.state_matrix = np.zeros((tot_row, tot_col)) | ||
self.position = [np.random.randint(tot_row), np.random.randint(tot_col)] | ||
|
||
#def setTransitionArray(self, transition_array): | ||
#if(transition_array.shape != self.transition_array): | ||
#raise ValueError('The shape of the two matrices must be the same.') | ||
#self.transition_array = transition_array | ||
|
||
def setTransitionMatrix(self, transition_matrix): | ||
'''Set the reward matrix. | ||
The transition matrix here is intended as a matrix which has a line | ||
for each action and the element of the row are the probabilities to | ||
executes each action when a command is given. For example: | ||
[[0.55, 0.25, 0.10, 0.10] | ||
[0.25, 0.25, 0.25, 0.25] | ||
[0.30, 0.20, 0.40, 0.10] | ||
[0.10, 0.20, 0.10, 0.60]] | ||
This matrix defines the transition rules for all the 4 possible actions. | ||
The first row corresponds to the probabilities of executing each one of | ||
the 4 actions when the policy orders to the robot to go UP. In this case | ||
the transition model says that with a probability of 0.55 the robot will | ||
go UP, with a probaiblity of 0.25 RIGHT, 0.10 DOWN and 0.10 LEFT. | ||
''' | ||
if(transition_matrix.shape != self.transition_matrix.shape): | ||
raise ValueError('The shape of the two matrices must be the same.') | ||
self.transition_matrix = transition_matrix | ||
|
||
def setRewardMatrix(self, reward_matrix): | ||
'''Set the reward matrix. | ||
''' | ||
if(reward_matrix.shape != self.reward_matrix.shape): | ||
raise ValueError('The shape of the matrix does not match with the shape of the world.') | ||
self.reward_matrix = reward_matrix | ||
|
||
def setStateMatrix(self, state_matrix): | ||
'''Set the obstacles in the world. | ||
The input to the function is a matrix with the | ||
same size of the world | ||
-1 for states which are not walkable. | ||
+1 for terminal states | ||
0 for all the walkable states (non terminal) | ||
The following matrix represents the 4x3 world | ||
used in the series "dissecting reinforcement learning" | ||
[[0, 0, 0, +1] | ||
[0, -1, 0, +1] | ||
[0, 0, 0, 0]] | ||
''' | ||
if(state_matrix.shape != self.state_matrix.shape): | ||
raise ValueError('The shape of the matrix does not match with the shape of the world.') | ||
self.state_matrix = state_matrix | ||
|
||
def setPosition(self, index_row=None, index_col=None): | ||
''' Set the position of the robot in a specific state. | ||
''' | ||
if(index_row is None or index_col is None): self.position = [np.random.randint(tot_row), np.random.randint(tot_col)] | ||
else: self.position = [index_row, index_col] | ||
|
||
def render(self): | ||
''' Print the current world in the terminal. | ||
O represents the robot position | ||
- respresent empty states. | ||
# represents obstacles | ||
* represents terminal states | ||
''' | ||
graph = "" | ||
for row in range(self.world_row): | ||
row_string = "" | ||
for col in range(self.world_col): | ||
if(self.position == [row, col]): row_string += u" \u25CB " # u" \u25CC " | ||
else: | ||
if(self.state_matrix[row, col] == 0): row_string += ' - ' | ||
elif(self.state_matrix[row, col] == -1): row_string += ' # ' | ||
elif(self.state_matrix[row, col] == +1): row_string += ' * ' | ||
row_string += '\n' | ||
graph += row_string | ||
print(graph) | ||
|
||
def reset(self, exploring_starts=False): | ||
''' Set the position of the robot in the bottom left corner. | ||
It returns the first observation | ||
''' | ||
if exploring_starts: | ||
while(True): | ||
row = np.random.randint(0, self.world_row) | ||
col = np.random.randint(0, self.world_col) | ||
if(self.state_matrix[row, col] == 0): break | ||
self.position = [row, col] | ||
else: | ||
self.position = [self.world_row-1, 0] | ||
#reward = self.reward_matrix[self.position[0], self.position[1]] | ||
return self.position | ||
|
||
def step(self, action): | ||
''' One step in the world. | ||
[observation, reward, done = env.step(action)] | ||
The robot moves one step in the world based on the action given. | ||
The action can be 0=UP, 1=RIGHT, 2=DOWN, 3=LEFT | ||
@return observation the position of the robot after the step | ||
@return reward the reward associated with the next state | ||
@return done True if the state is terminal | ||
''' | ||
if(action >= self.action_space_size): | ||
raise ValueError('The action is not included in the action space.') | ||
|
||
#Based on the current action and the probability derived | ||
#from the trasition model it chooses a new actio to perform | ||
action = np.random.choice(4, 1, p=self.transition_matrix[int(action),:]) | ||
#action = self.transition_model(action) | ||
|
||
#Generating a new position based on the current position and action | ||
if(action == 0): new_position = [self.position[0]-1, self.position[1]] #UP | ||
elif(action == 1): new_position = [self.position[0], self.position[1]+1] #RIGHT | ||
elif(action == 2): new_position = [self.position[0]+1, self.position[1]] #DOWN | ||
elif(action == 3): new_position = [self.position[0], self.position[1]-1] #LEFT | ||
else: raise ValueError('The action is not included in the action space.') | ||
|
||
#Check if the new position is a valid position | ||
#print(self.state_matrix) | ||
if (new_position[0]>=0 and new_position[0]<self.world_row): | ||
if(new_position[1]>=0 and new_position[1]<self.world_col): | ||
if(self.state_matrix[new_position[0], new_position[1]] != -1): | ||
self.position = new_position | ||
|
||
reward = self.reward_matrix[self.position[0], self.position[1]] | ||
#Done is True if the state is a terminal state | ||
done = bool(self.state_matrix[self.position[0], self.position[1]]) | ||
return self.position, reward, done | ||
|
Oops, something went wrong.