Skip to content
Snippets Groups Projects
Commit 374f8067 authored by tuhe's avatar tuhe
Browse files

Exercise 9

parent 75a6f326
Branches
No related tags found
No related merge requests found
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
"""This directory contains the exercises for week 9."""
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
"""
References:
[SB18] Richard S. Sutton and Andrew G. Barto. Reinforcement Learning: An Introduction. The MIT Press, second edition, 2018. (Freely available online).
"""
from irlc import savepdf
import matplotlib.pyplot as plt
from irlc.ex09.value_iteration import value_iteration
from irlc.ex09.mdp import MDP
class GamblerMDP(MDP):
r"""
The gamler problem (see description given in (SB18, Example 4.3))
See the MDP class for more information about the methods. In summary:
- the state is the amount of money you have. if state = goal or state = 0 the game ends (use this for is_terminal)
- A are the available actions (a list). Note that these depends on the state; see below or example for details.
- Psr are the transitions (see MDP class for documentation)
"""
def __init__(self, goal=100, p_heads=0.4):
super().__init__(initial_state=goal//2)
self.goal = goal
self.p_heads = p_heads
def is_terminal(self, state):
""" Implement if the state is terminal (0 or self.goal) """
# TODO: 1 lines missing.
raise NotImplementedError("Return true only if state is terminal.")
def A(self, s):
r""" Action is the amount you choose to gamle.
You can gamble from 0 and up to the amount of money you have (state),
but not so much you will exceed the goal amount (see (SB18) for details).
In other words, return this as a list, and the number of elements should depend on the state s. """
# TODO: 1 lines missing.
raise NotImplementedError("Implement function body")
def Psr(self, s, a):
""" Implement transition probabilities here.
the reward is 1 if you win (obtain goal amount) and otherwise 0. Remember the format should
return a dictionary with entries:
> { (sp, r) : probability }
You can see the small-gridworld example (see exercise description) for an example of how to use this function,
but now you should keep in mind that since you can win (or not) the dictionary you return should have two entries:
one with a probability of self.p_heads (winning) and one with a probability of 1-self.p_heads (loosing).
"""
# TODO: 4 lines missing.
raise NotImplementedError("Implement function body")
return outcome_dict
def gambler():
r"""
Gambler's problem from (SB18, Example 4.3)
"""
mdp = GamblerMDP(p_heads=0.4)
pi, V = value_iteration(mdp, gamma=1., theta=1e-11)
V = [V[s] for s in mdp.states]
plt.bar(mdp.states, V)
plt.xlabel('Capital')
plt.ylabel('Value Estimates')
plt.title('Final value function (expected return) vs State (Capital)')
plt.grid()
savepdf("gambler_valuefunction")
plt.show()
y = [pi[s] for s in mdp.nonterminal_states]
plt.bar(mdp.nonterminal_states, y, align='center', alpha=0.5)
plt.xlabel('Capital')
plt.ylabel('Final policy (stake)')
plt.title('Capital vs Final Policy')
plt.grid()
savepdf("gambler_policy")
plt.show()
if __name__ == "__main__":
gambler()
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
"""
References:
[SB18] Richard S. Sutton and Andrew G. Barto. Reinforcement Learning: An Introduction. The MIT Press, second edition, 2018. (Freely available online).
"""
import numpy as np
import gymnasium as gym
from gymnasium import Env
from collections import defaultdict
from tqdm import tqdm
import sys
class MDP:
r"""
This class represents a Markov Decision Process. It defines three main components:
- The actions available in a given state :math:`A(s)`
- The transition probabilities :math:`p(s', r | s, a)`
- A terminal check to determine if a state :math:`s` is terminal
- A way to specify the initial state:
- As a single state the MDP always begins in (most common)
- As a general distribution :math:`p(s_0)`.
In addition to this it allows you to access either
- The set of all states (including terminal states) as ``mdp.states``
- The set of all non-terminal states as ``mdp.non_terminal_states``
.. note::
The ``states`` and ``non_termianl_states`` are computed lazily. This means that if you don't access them, they won't use memory.
This allows you to specify MDPs with an infinite number of states without running out of memory.
"""
def __init__(self, initial_state=None, verbose=False):
"""
Initialize the MDP. In the case where ``initial_state`` is set to a value :math:`s_0`, the initial state distribution will be
.. math::
p(s_0) = 1
:param initial_state: An optional initial state.
:param verbose: If ``True``, the class will print out debug information (useful for very large MDPs)
"""
self.verbose=verbose
self.initial_state = initial_state # Starting state s_0 of the MDP.
# The following variables that begin with _ are used to cache computations. The reason why we don't compute them
# up-front is because their computation may be time-consuming and they might not be needed.
self._states = None
self._nonterminal_states = None
self._terminal_states = None
def is_terminal(self, state) -> bool:
r"""
Determines if a state is terminal (i.e., the environment/model is complete). In (SB18), the terminal
state is written as :math:`s_T`.
.. runblock:: pycon
>>> from irlc.gridworld.gridworld_environments import FrozenLake
>>> mdp = FrozenLake().mdp
>>> mdp.is_terminal(mdp.initial_state) # False, obviously.
:param state: The state :math:`s` to check
:return: ``True`` if the state is terminal and otherwise ``False``.
"""
return False # Return true if the given state is terminal.
def Psr(self, state, action) -> dict:
r"""
Represents the transition probabilities:
.. math::
P(s', r | s, a)
When called with state ``state`` and action ``action``, the function returns a dictionary of the form
``{(s1, r1): p1, (s2, r2): p2, ...}``, so that ``p2`` is the probability of transitioning to ``s2`` (and obtaining
reward ``r2``) given we are in state ``state`` and take action ``action``:
.. math::
P(s_2, r_2 | s,a) = p_2
An example:
.. runblock:: pycon
>>> from irlc.gridworld.gridworld_environments import FrozenLake
>>> mdp = FrozenLake().mdp
>>> transitions = mdp.Psr(mdp.initial_state, 0) # P( ... | s0, a=0)
>>> for (sp, r), p in transitions.items():
... print(f"P(s'={sp}, r={r} | s={mdp.initial_state}, a=0) = {p}")
:param state: The state to compute the transition probabilities in
:param action: The action to compute the transition probabilities in
:return: A dictionary where the keys are state, reward pairs we will transition to, :math:`p(s', r | ...)`, and the values are their probability.
"""
raise NotImplementedError("Return state distribution as a dictionary (see class documentation)")
def A(self, state) -> list:
r"""
Returns a list of actions available in the given state:
.. math::
A(s)
An example to get the actions in the initial state:
.. runblock:: pycon
>>> from irlc.gridworld.gridworld_environments import FrozenLake
>>> mdp = FrozenLake().mdp
>>> mdp.A(mdp.initial_state)
:param state: State to compute the actions in :math:`s`
:return: The list of available actions :math:`\mathcal A(s) = \{0, 1, ..., n-1\}`
"""
raise NotImplementedError("Return set/list of actions in given state A(s) = {a1, a2, ...}")
def initial_state_distribution(self):
"""
(**Optional**) specify the initial state distribution. Should return a dictionary of the form:
``{s0: p0, s1: p1, ..., sn: pn}``, in which case :math:`p(S_0 = s_k) = p_k`.
You will typically not overwrite this function but just set the initial state. In that case the initial state distribution
is deterministic:
.. runblock:: pycon
>>> from irlc.gridworld.gridworld_environments import FrozenLake
>>> mdp = FrozenLake().mdp
>>> mdp.initial_state_distribution()
:return: An initial state distribution as a dictionary, where the keys are states, and the valuse are their probability.
"""
if self.initial_state is not None:
return {self.initial_state: 1}
else:
raise Exception("Either specify the initial state, or implement this method.")
@property
def nonterminal_states(self):
r"""
The list of non-terminal states, i.e. :math:`\mathcal{S}` in (SB18)
.. runblock:: pycon
>>> from irlc.gridworld.gridworld_environments import FrozenLake
>>> mdp = FrozenLake().mdp
>>> mdp.nonterminal_states
:return: The list of non-terminal states :math:`\mathcal{S}`
"""
if self._nonterminal_states is None:
self._nonterminal_states = [s for s in self.states if not self.is_terminal(s)]
return self._nonterminal_states
@property
def states(self):
r"""
The list of all states including terminal ones, i.e. :math:`\mathcal{S}^+` in (SB18).
The terminal states are those where ``is_terminal(state)`` is true.
.. runblock:: pycon
>>> from irlc.gridworld.gridworld_environments import FrozenLake
>>> mdp = FrozenLake().mdp
>>> mdp.states
:return: The list all states :math:`\mathcal{S}^+`
"""
if self._states is None:
next_chunk = set(self.initial_state_distribution().keys())
all_states = list(next_chunk)
while True:
new_states = set()
for s in tqdm(next_chunk, file=sys.stdout) if self.verbose else next_chunk:
if self.is_terminal(s):
continue
for a in self.A(s):
new_states = new_states | {sp for sp, r in self.Psr(s, a)}
new_states = [s for s in new_states if s not in all_states]
if len(new_states) == 0:
break
all_states += new_states
next_chunk = new_states
self._states = list(set(all_states))
return self._states
def rng_from_dict(d):
""" Helper function. If d is a dictionary {x1: p1, x2: p2, ...} then this will sample an x_i with probability p_i """
w, pw = zip(*d.items()) # seperate w and p(w)
i = np.random.choice(len(w), p=pw) # Required because numpy cast w to array (and w may contain tuples)
return w[i]
class MDP2GymEnv(Env):
def A(self, state):
raise Exception("Don't use this function; it is here for legacy reasons")
def __init__(self, mdp, render_mode=None):
# We ignore this variable in this class, however, the Gridworld environment will check if
# render_mode == "human" and use it to render the environment. See:
# https://younis.dev/blog/render-api/
self.render_mode = render_mode
self.mdp = mdp
self.state = None
# actions = set
all_actions = set.union(*[set(self.mdp.A(s)) for s in self.mdp.nonterminal_states ])
n = max(all_actions) - min(all_actions) + 1
assert isinstance(n, int)
self.action_space = gym.spaces.Discrete(n=n, start=min(all_actions))
# Make observation space:
states = self.mdp.nonterminal_states
if not hasattr(self, 'observation_space'):
if isinstance(states[0], tuple):
self.observation_space = gym.spaces.Tuple([gym.spaces.Discrete(n+1) for n in np.asarray(states).max(axis=0)])
else:
print("Could not guess observation space. Set it manually.")
def reset(self, seed=None, options=None):
info = {}
if seed is not None:
np.random.seed(seed)
self.action_space.seed(seed)
self.observation_space.seed(seed)
info['seed'] = seed
ps = self.mdp.initial_state_distribution()
self.state = rng_from_dict(ps)
if self.render_mode == "human":
self.render()
info['mask'] = self._mk_mask(self.state)
return self.state, info
def step(self, action):
ps = self.mdp.Psr(self.state, action)
self.state, reward = rng_from_dict(ps)
terminated = self.mdp.is_terminal(self.state)
if self.render_mode == "human":
self.render()
info = {'mask': self._mk_mask(self.state)} if not terminated else None
return self.state, reward, terminated, False, info
def _mk_mask(self, state):
# self.A(state)
mask = np.zeros((self.action_space.n,), dtype=np.int8)
for a in self.mdp.A(state):
mask[a - self.action_space.start] = 1
return mask
class GymEnv2MDP(MDP):
def __init__(self, env):
super().__init__()
self._states = list(range(env.observation_space.n))
if hasattr(env, 'env'):
env = env.env
self._terminal_states = []
for s in env.unwrapped.P:
for a in env.unwrapped.P[s]:
for (pr, sp, reward, done) in env.unwrapped.P[s][a]:
if done:
self._terminal_states.append(sp)
self._terminal_states = set(self._terminal_states)
self.env = env
def is_terminal(self, state):
return state in self._terminal_states
def A(self, state):
return list(self.env.unwrapped.P[state].keys())
def Psr(self, state, action):
d = defaultdict(float)
for (pr, sp, reward, done) in self.env.unwrapped.P[state][action]:
d[ (sp, reward)] += pr
return d
if __name__ == '__main__':
"""A handful of examples of using the MDP-class in conjunction with a gym environment:"""
env = gym.make("FrozenLake-v1")
mdp = GymEnv2MDP(env)
from irlc.ex09.value_iteration import value_iteration
value_iteration(mdp)
mdp = GymEnv2MDP(gym.make("FrozenLake-v1"))
print("N = ", mdp.nonterminal_states)
print("S = ", mdp.states)
print("Is state 3 terminal?", mdp.is_terminal(3), "is state 11 terminal?", mdp.is_terminal(11))
state = 0
print("A(S=0) =", mdp.A(state))
action = 2
mdp.Psr(state, action) # Get transition probabilities
for (next_state, reward), Pr in mdp.Psr(state, action).items():
print(f"P(S'={next_state},R={reward} | S={state}, A={action} ) = {Pr:.2f}")
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
"""
References:
[SB18] Richard S. Sutton and Andrew G. Barto. Reinforcement Learning: An Introduction. The MIT Press, second edition, 2018. (Freely available online).
"""
from irlc.ex09.mdp import MDP
def value_function2q_function(mdp : MDP, s, gamma, v : dict) -> dict:
r"""This helper function converts a value function to an action-value function.
Given a value-function ``v`` and a state ``s``, this function implements the update:
.. math::
Q(s,a) = \mathbb{E}[r + \gamma * v(s') | s, a] = \sum_{r, s'} (r + \gamma v(s') ) p(s', r| s,a)
as described in (SB18, ). It should return a dictionary of the form::
{a1: Q(s,a1), a2: Q(s,a2), ..., an: Q(s,an)}
where the actions are keys. You can compute these using ``mdp.A(s)``. When done the following should work::
Qs = value_function2q_function(mdp, s, gamma, v)
Qs[a] # This is the Q-value Q(s,a)
Hints:
* Remember that ``v[s'] = 0`` if ``s'`` is a terminal state (this is explained in (SB18)).
:param mdp: An MDP instance. Use this to compute :math:`p(s', r| s,a)`
:param s: A state
:param gamma: The discount factor :math:`\gamma`
:param v: The value function represented as a dictionary.
:return: A dictionary representing :math:`Q` of the form ``{a1: Q(s,a1), a2: Q(s,a2), ..., an: Q(s,an)}``
"""
# TODO: 1 lines missing.
# TODO: 1 lines missing.
raise NotImplementedError("Implement function body")
return q_dict
def expected_reward(mdp : MDP, s, a) -> float:
# TODO: 1 lines missing.
raise NotImplementedError("Insert your solution and remove this error.")
return expected_reward
def q_function2value_function(policy : dict, Q : dict, s) -> float:
# TODO: 1 lines missing.
raise NotImplementedError("Insert your solution and remove this error.")
return V_s
if __name__ == "__main__":
from irlc.gridworld.gridworld_environments import FrozenLake
mdp = FrozenLake(living_reward=0.2).mdp # Get the MDP of this environment.
## Part 1: Expected reward
s0 = mdp.initial_state
s0 = (0, 3) # initial state
a = 3 # Go east.
print("Expected reward E[r | s0, a] =", expected_reward(mdp, s=s0, a=0), "should be 0.2")
print("Expected reward E[r | s0, a] =", expected_reward(mdp, s=(1, 2), a=0), "should be 0")
## Part 2
# First let's create a non-trivial value function
V = {}
for s in mdp.nonterminal_states:
V[s] = s[0] + 2*s[1]
print("Value function is", V)
# Compute the corresponding Q(s,a)-values in state s0:
q_ = value_function2q_function(mdp, s=s0, gamma=0.9, v=V)
print(f"Q-values in {s0=} is", q_)
## Part 3
# Create a non-trivial Q-function for this problem.
Q = {}
for s in mdp.nonterminal_states:
for a in mdp.A(s):
Q[s,a] = s[0] + 2*s[1] - 10*a # The particular values are not important in this example
# Create a policy. In this case pi(a=3) = 0.4.
pi = {0: 0.2,
1: 0.2,
2: 0.2,
3: 0.4}
print(f"Value-function in {s0=} is", q_function2value_function(pi, Q, s=s0))
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
"""
References:
[SB18] Richard S. Sutton and Andrew G. Barto. Reinforcement Learning: An Introduction. The MIT Press, second edition, 2018. (Freely available online).
"""
from collections import defaultdict
import numpy as np
import matplotlib.pyplot as plt
from irlc.ex09.mdp_warmup import value_function2q_function
from irlc.ex09.small_gridworld import SmallGridworldMDP, plot_value_function
from irlc import savepdf
def policy_evaluation(pi, mdp, gamma=.99, theta=0.00001):
r""" Implements the iterative policy-evaluation algorithm ((SB18, Section 4.1)).
The algorithm is given a policy pi which is represented as a dictionary so that
> pi[s][a] = p
is the probability p of taking action a in state s. The 'mdp' is a MDP-instance and the other terms have the same meaning as in the algorithm.
It should return a dictionary v so that
> v[s]
is the value-function evaluated in state s. I recommend using the qs_-function defined above.
"""
v = defaultdict(float)
Delta = theta #Initialize the 'Delta'-variable to a large value to make sure the first iteration of the method runs.
while Delta >= theta: # Outer loop in (SB18)
Delta = 0 # Remember to update Delta (same meaning as in (SB18))
# Remember that 'S' in (SB18) is actually just the set of non-terminal states (NOT including terminal states!)
for s in mdp.nonterminal_states: # See the MDP class if you are curious about how this variable is defined.
""" Implement the main body of the policy evaluation algorithm here. You can do this directly,
or implement (and use) the value_function2q_function-function (consider what it does and compare to the algorithm).
If you do so, note that value_function2q_function(mdp, s, gamma, v) computes the equivalent of Q(s,a) (as a dictionary),
and in the algorithm, you then need to compute the expectation over pi:
> sum_a pi(a|s) Q(s,a)
In code it would be more akin to
q = value_function2q_function(...)
sum_a pi[s][a] * q[a]
Don't be afraid to use a few more lines than I do.
"""
# TODO: 2 lines missing.
raise NotImplementedError("Insert your solution and remove this error.")
r""" stop condition. v_ is the current value of the value function (see algorithm listing in (SB18)) which you need to update. """
Delta = max(Delta, np.abs(v_ - v[s]))
return v
if __name__ == "__main__":
mdp = SmallGridworldMDP()
"""
Create the random policy pi0 below. The policy is defined as a nested dict, i.e.
> pi0[s][a] = (probability to take action a in state s)
"""
pi0 = {s: {a: 1/len(mdp.A(s)) for a in mdp.A(s) } for s in mdp.nonterminal_states }
V = policy_evaluation(pi0, mdp, gamma=1)
plot_value_function(mdp, V)
plt.title("Value function using random policy")
savepdf("policy_eval")
plt.show()
expected_v = np.array([0, -14, -20, -22,
-14, -18, -20, -20,
-20, -20, -18, -14,
-22, -20, -14, 0])
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
"""
References:
[SB18] Richard S. Sutton and Andrew G. Barto. Reinforcement Learning: An Introduction. The MIT Press, second edition, 2018. (Freely available online).
"""
import numpy as np
from irlc.ex09.small_gridworld import SmallGridworldMDP
import matplotlib.pyplot as plt
from irlc.ex09.policy_evaluation import policy_evaluation
from irlc.ex09.mdp_warmup import value_function2q_function
def policy_iteration(mdp, gamma=1.0):
r"""
Implement policy iteration (see (SB18, Section 4.3)).
Note that policy iteration only considers deterministic policies. we will therefore use the shortcut by representing the policy pi
as a dictionary (similar to the DP-problem in week 2!) so that
> a = pi[s]
is the action in state s.
"""
pi = {s: np.random.choice(mdp.A(s)) for s in mdp.nonterminal_states}
policy_stable = False
V = None # Sutton has an initialization-step, but it can actually be skipped if we intialize the policy randomly.
while not policy_stable:
# Evaluate the current policy using your code from the previous exercise.
# The main complication is that we need to transform our deterministic policy, pi[s], into a stochastic one pi[s][a].
# It will be defined as:
# >>> pi_prob[s][a] = 1 if a = pi[s] and otherwise 0.
pi_prob = {s: {a: 1 if pi[s] == a else 0 for a in mdp.A(s)} for s in mdp.nonterminal_states}
V = policy_evaluation(pi_prob, mdp, gamma)
V = policy_evaluation( {s: {pi[s]: 1} for s in mdp.nonterminal_states}, mdp, gamma)
r""" Implement the method. This is step (3) in (SB18). """
policy_stable = True # Will be set to False if the policy pi changes
r""" Implement the steps for policy improvement here. Start by writing a for-loop over all non-terminal states
you can see the policy_evaluation function for how to do this, but
I recommend looking at the property mdp.nonterminal_states (see MDP class for more information).
Hints:
* In the algorithm in (SB18), you need to perform an argmax_a over what is actually Q-values. The function
value_function2q_function(mdp, s, gamma, V) can compute these.
* The argmax itself, assuming you follow the above procedure, involves a dictionary. It can be computed
using methods similar to those we saw in week2 of the DP problem.
It is not a coincidence these algorithms are very similar -- if you think about it, the maximization step closely resembles the DP algorithm!
"""
# TODO: 6 lines missing.
raise NotImplementedError("Insert your solution and remove this error.")
return pi, V
if __name__ == "__main__":
mdp = SmallGridworldMDP()
pi, v = policy_iteration(mdp, gamma=0.99)
expected_v = np.array([ 0, -1, -2, -3,
-1, -2, -3, -2,
-2, -3, -2, -1,
-3, -2, -1, 0])
from irlc.ex09.small_gridworld import plot_value_function
plot_value_function(mdp, v)
plt.title("Value function using policy iteration to find optimal policy")
from irlc import savepdf
savepdf("policy_iteration")
plt.show()
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
import numpy as np
from irlc.utils.common import defaultdict2
from irlc import Agent
class TabularAgent(Agent):
"""
This helper class will simplify the implementation of most basic reinforcement learning. Specifically it provides:
- A :math:`Q(s,a)`-table data structure
- An epsilon-greedy exploration method
The code for the class is very simple, and I think it is a good idea to at least skim it.
The Q-data structure can be used a follows:
.. runblock:: pycon
>>> from irlc.ex09.rl_agent import TabularAgent
>>> from irlc.gridworld.gridworld_environments import BookGridEnvironment
>>> env = BookGridEnvironment()
>>> agent = TabularAgent(env)
>>> state, info = env.reset() # Get the info-dictionary corresponding to s
>>> agent.Q[state, 1] = 2.5 # Update a Q-value; action a=1 is now optimal.
>>> agent.Q[state, 1] # Check it has indeed been updated.
>>> agent.Q[state, 0] # Q-values are 0 by default.
>>> agent.Q.get_optimal_action(state, info) # Note we pass along the info-dictionary corresopnding to this state
.. note::
The ``get_optimal_action``-function requires an ``info`` dictionary. This is required since the info dictionary
contains information about which actions are available. To read more about the Q-values, see :class:`~irlc.ex09.rl_agent.TabularQ`.
"""
def __init__(self, env, gamma=0.99, epsilon=0):
r"""
Initialize a tabular environment. For convenience, it stores the discount factor :math:`\gamma` and
exploration parameter :math:`\varepsilon` for epsilon-greedy exploration. Access them as e.g. ``self.gamma``
When you implement an agent and overwrite the ``__init__``-method, you should include a call such as
``super().__init__(gamma, epsilon)``.
:param env: The gym environment
:param gamma: The discount factor :math:`\gamma`
:param epsilon: Exploration parameter :math:`\varepsilon` for epsilon-greedy exploration
"""
super().__init__(env)
self.gamma, self.epsilon = gamma, epsilon
self.Q = TabularQ(env)
def pi_eps(self, s, info):
"""
Performs :math:`\\varepsilon`-greedy exploration with :math:`\\varepsilon =` ``self.epsilon`` and returns the
action. Recall this means that with probability :math:`\\varepsilon` it returns a random action, and otherwise
it returns an action associated with a maximal Q-value (:math:`\\arg\\max_a Q(s,a)`). An example:
.. runblock:: pycon
>>> from irlc.ex09.rl_agent import TabularAgent
>>> from irlc.gridworld.gridworld_environments import BookGridEnvironment
>>> env = BookGridEnvironment()
>>> agent = TabularAgent(env)
>>> state, info = env.reset()
>>> agent.pi_eps(state, info) # Note we pass along the info-dictionary corresopnding to this state
.. note::
The ``info`` dictionary is used to mask (exclude) actions that are not possible in the state.
It is similar to the info dictionary in ``agent.pi(s,info)``.
:param s: A state :math:`s_t`
:param info: The corresponding ``info``-dictionary returned by the gym environment
:return: An action computed using :math:`\\varepsilon`-greedy action selection based the Q-values stored in the ``self.Q`` class.
"""
if info is not None and 'seed' in info: # In case info contains a seed, reset the random number generator.
np.random.seed(info['seed'])
return Agent.pi(self, s, k=0, info=info) if np.random.rand() < self.epsilon else self.Q.get_optimal_action(s, info)
class ValueAgent(TabularAgent):
"""
This is a simple wrapper class around the Agent class above. It fixes the policy and is therefore useful for doing
value estimation.
"""
def __init__(self, env, gamma=0.95, policy=None, v_init_fun=None):
self.env = env
self.policy = policy # policy to evaluate
""" self.v holds the value estimates.
Initially v[s] = 0 unless v_init_fun is given in which case v[s] = v_init_fun(s). """
self.v = defaultdict2(float if v_init_fun is None else v_init_fun)
super().__init__(env, gamma=gamma)
self.Q = None # Blank out the Q-values which will not be used.
def pi(self, s, k, info=None):
return TabularAgent.pi(self, s, k, info) if self.policy is None else self.policy(s)
def value(self, s):
return self.v[s]
def _masked_actions(action_space, mask):
"""Helper function which applies a mask to the action space."""
from irlc.utils.common import DiscreteTextActionSpace
if isinstance(action_space, DiscreteTextActionSpace):
return [a for a in range(action_space.n) if mask[a] == 1]
else:
return [a for a in range(action_space.n) if mask[a - action_space.start] == 1]
class TabularQ:
"""
This is a helper class for storing Q-values. It is used by the :class:`~ircl.ex09.rl_agent.TabularAgent` to store
Q-values where it can be be accessed as ``self.Q[s,a]``.
"""
def __init__(self, env):
"""
Initialize the table. It requires a gym environment to know how many actions there are for each state.
:param env: A gym environment.
"""
self._known_masks = {} # Cache the known action masks.
def q_default(s):
if s in self._known_masks:
return {a: 0 for a in range(self.env.action_space.n) if self._known_masks[s][a- self.env.action_space.start] == 1}
else:
return {a: 0 for a in range(self.env.action_space.n)}
# qfun = lambda s: OrderedDict({a: 0 for a in (env.P[s] if hasattr(env, 'P') else range(env.action_space.n))})
self.q_ = defaultdict2(lambda s: q_default(s))
self.env = env
def get_Qs(self, state, info_s=None):
"""
Get a list of all known Q-values for this particular state. That is, in a given state, it will return the two
lists:
.. math::
\\begin{bmatrix} a_1 \\\\ a_2 \\\\ \\vdots \\\\ a_k \\end{bmatrix}, \\quad
\\begin{bmatrix} Q(s,a_1) \\\\ Q(s,a_1) \\\\ \\vdots \\\\ Q(s,a_k) \\end{bmatrix} \\\\
the ``info_s`` parameter will ensure actions are correctly masked. An example of how to use this function from
a policy:
.. runblock:: pycon
>>> from irlc.ex09.rl_agent import TabularAgent
>>> class MyAgent(TabularAgent):
... def pi(self, s, k, info=None):
... actions, q_values = self.Q.get_Qs(s, info)
:param state: The state to query
:param info_s: The info-dictionary returned by the environment for this state. Used for action-masking.
:return:
- actions - A tuple containing all actions available in this state ``(a_1, a_2, ..., a_k)``
- Qs - A tuple containing all Q-values available in this state ``(Q[s,a1], Q[s, a2], ..., Q[s,ak])``
"""
if info_s is not None and 'mask' in info_s:
if state not in self._known_masks:
self._known_masks[state] = info_s['mask']
# Probably a good idea to check the Q-values are okay...
avail_actions = _masked_actions(self.env.action_space, info_s['mask'])
self.q_[state] = {a: self.q_[state][a] for a in avail_actions}
(actions, Qa) = zip(*self.q_[state].items())
return tuple(actions), tuple(Qa)
def get_optimal_action(self, state, info_s):
"""
For a given state ``state``, this function returns the optimal action for that state.
.. math::
a^* = \\arg\\max_a Q(s,a)
An example:
.. runblock:: pycon
>>> from irlc.ex09.rl_agent import TabularAgent
>>> class MyAgent(TabularAgent):
... def pi(self, s, k, info=None):
... a_star = self.Q.get_optimal_action(s, info)
:param state: State to find the optimal action in :math:`s`
:param info_s: The ``info``-dictionary corresponding to this state
:return: The optimal action according to the Q-table :math:`a^*`
"""
actions, Qa = self.get_Qs(state, info_s)
a_ = np.argmax(np.asarray(Qa) + np.random.rand(len(Qa)) * 1e-8)
return actions[a_]
def _chk_mask(self, s, a):
if s in self._known_masks:
mask = self._known_masks[s]
if mask[a - self.env.action_space.start] == 0:
raise Exception(f" Invalid action. You tried to access Q[{s}, {a}], however the action {a} has been previously masked and therefore cannot exist in this state. The mask for {s} is mask={mask}.")
def __getitem__(self, state_comma_action):
s, a = state_comma_action
self._chk_mask(s, a)
return self.q_[s][a]
def __setitem__(self, state_comma_action, q_value):
s, a = state_comma_action
self._chk_mask(s, a)
self.q_[s][a] = q_value
def to_dict(self):
"""
This helper function converts the known Q-values to a dictionary. This function is only used for
visualization purposes in some of the examples.
:return: A dictionary ``q`` of all known Q-values of the form ``q[s][a]``
"""
# Convert to a regular dictionary
d = {s: {a: Q for a, Q in Qs.items() } for s,Qs in self.q_.items()}
return d
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
import numpy as np
from irlc.ex09.mdp import MDP
import seaborn as sns
# action space available to the agent
UP,RIGHT, DOWN, LEFT = 0, 1, 2, 3
class SmallGridworldMDP(MDP):
def __init__(self, rows=4, cols=4):
self.rows, self.cols = rows, cols # Number of rows, columns.
super().__init__(initial_state=(rows//2, cols//2) ) # Initial state is in the middle of the board.
def A(self, state):
return [UP, DOWN, RIGHT, LEFT] # All four directions available.
def Psr(self, state, action):
row, col = state # state is in the format state = (row, col)
if action == UP: row -= 1
if action == DOWN: row += 1
if action == LEFT: col += 1
if action == RIGHT: col -= 1
col = min(self.cols-1, max(col, 0)) # Check boundary conditions.
row = min(self.rows-1, max(row, 0))
reward = -1 # Always get a reward of -1
next_state = (row, col)
# Note that P(next_state, reward | state, action) = 1 because environment is deterministic
return {(next_state, reward): 1}
def is_terminal(self, state):
row, col = state
return (row == 0 and col == 0) or (row == self.rows-1 and col == self.cols-1)
def plot_value_function(env, v):
A = np.zeros((env.rows, env.cols))
for (row, col) in env.nonterminal_states:
A[row, col] = v[(row,col)]
sns.heatmap(A, cmap="YlGnBu", annot=True, cbar=False, square=True, fmt='g')
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
"""
References:
[SB18] Richard S. Sutton and Andrew G. Barto. Reinforcement Learning: An Introduction. The MIT Press, second edition, 2018. (Freely available online).
"""
import matplotlib.pyplot as plt
from collections import defaultdict
import numpy as np
from irlc.ex09.mdp_warmup import value_function2q_function
from irlc import savepdf
def value_iteration(mdp, gamma=.99, theta=0.0001, max_iters=10 ** 6, verbose=False):
r"""Implement the value-iteration algorithm defined in (SB18, Section 4.4).
The inputs should be self-explanatory given the pseudo-code.
I have also included a max_iters variable which represents an upper bound on the total number of iterations. This is useful
if you want to check what the algorithm does after a certain (e.g. 1 or 2) steps.
The verbose-variable makes the algorithm print out the biggest change in the value-function in a single step.
This is useful if you run it on a large problem and want to know how much time remains, or simply get an idea of
how quickly it converges.
"""
V = defaultdict(lambda: 0) # value function
for i in range(max_iters):
Delta = 0
for s in mdp.nonterminal_states:
""" Perform the update the value-function V[s] here for the given state.
Note that this has a lot of similarity to the policy-evaluation algorithm, and you can re-use
a lot of that solution, including value_function2q_function(...) (assuming you used that function). """
# TODO: 2 lines missing.
raise NotImplementedError("Complete the algorithm here.")
if verbose:
print(i, Delta)
if Delta < theta:
break
# Turn the value-function into a policy. It implements the last line of the algorithm.
pi = values2policy(mdp, V, gamma)
return pi, V
def values2policy(mdp, V, gamma):
r""" Turn the value-function V into a policy. The value function V is implemented as a dictionary so that
> value = V[s]
is the value-function in state s.
The procedure you implement is the very last line of the value-iteration algorithm (SB18, Section 4.4), and it should return
a policy pi as a dictionary so that
> a = pi[s]
is the action in state s.
Note once again you can re-use the qs_-function. and the argmax -- in fact, the solution is very similar to your solution to the
policy-iteration problem in policy_iteration.py.
As you have properly noticed, even though we implement different algorithms, they are all build using the same
building-block.
"""
pi = {}
for s in mdp.nonterminal_states:
# Create the policy here. pi[s] = a is the action to be taken in state s.
# You can use the qs_ helper function to simplify things and perhaps
# re-use ideas from the dp.py problem from week 2.
# TODO: 2 lines missing.
raise NotImplementedError("Insert your solution and remove this error.")
return pi
if __name__ == "__main__":
import seaborn as sns
from irlc.ex09.small_gridworld import SmallGridworldMDP, plot_value_function
env = SmallGridworldMDP()
policy, v = value_iteration(env, gamma=0.99, theta=1e-6)
plot_value_function(env, v)
plt.title("Value function obtained using value iteration to find optimal policy")
savepdf("value_iteration")
plt.show()
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
from irlc.ex09.value_iteration import value_iteration
from irlc import TabularAgent
import numpy as np
class ValueIterationAgent(TabularAgent):
def __init__(self, env, mdp=None, gamma=1, epsilon=0, **kwargs):
super().__init__(env)
self.epsilon = epsilon
# TODO: 1 lines missing.
raise NotImplementedError("Call the value_iteration function and store the policy for later.")
def pi(self, s, k, info=None):
""" With probability (1-epsilon), the take optimal action as computed using value iteration
With probability epsilon, take a random action. You can do this using return self.random_pi(s)
"""
if np.random.rand() < self.epsilon:
return super().pi(s, k, info) # Recall that by default the policy takes random actions.
else:
""" Return the optimal action here. This should be computed using value-iteration.
To speed things up, I recommend calling value-iteration from the __init__-method and store the policy. """
# TODO: 1 lines missing.
raise NotImplementedError("Compute and return optimal action according to value-iteration.")
return action
def __str__(self):
return f"ValueIteration(epsilon={self.epsilon})"
if __name__ == "__main__":
from irlc.gridworld.gridworld_environments import SuttonCornerGridEnvironment
env = SuttonCornerGridEnvironment(living_reward=-1, render_mode='human')
from irlc import train, interactive
# Note you can access the MDP for a gridworld using env.mdp. The mdp will be an instance of the MDP class we have used for planning so far.
agent = ValueIterationAgent(env, mdp=env.mdp) # Make a ValueIteartion-based agent
# Visualize & interactivity. Press P or space to follow the policy.
agent.Q = None # This ensures that the value function is visualized.
env, agent = interactive(env, agent)
train(env, agent, num_episodes=20) # Train for 100 episodes
env.savepdf("smallgrid.pdf") # Take a snapshot of the final configuration
env.close() # Whenever you use a VideoMonitor, call this to avoid a dumb openglwhatever error message on exit
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment