Skip to content
Snippets Groups Projects
Commit 0ce6ad9f authored by tuhe's avatar tuhe
Browse files

updates

parent 7afc1009
Branches
No related tags found
No related merge requests found
Showing
with 1866 additions and 0 deletions
""" Source code for 02466, Introduction to reinforcement learning and control, offered at DTU """
__version__ = "0.0.1"
# Do not import Matplotlib (or imports which import matplotlib) in case you have to run in headless mode.
import shutil
import inspect
import lzma, pickle
import gymnasium
import numpy as np
import os
# Global imports from across the API. Allows imports like
# > from irlc import Agent, train
from irlc.utils.irlc_plot import main_plot as main_plot
from irlc.utils.irlc_plot import plot_trajectory as plot_trajectory
try:
from irlc.ex01.agent import Agent as Agent, train as train
from irlc.ex09.rl_agent import TabularAgent, ValueAgent
except ImportError:
pass
from irlc.utils.player_wrapper import interactive as interactive
from irlc.utils.lazylog import LazyLog # This one is unclear. Is it required?
from irlc.utils.timer import Timer
def get_irlc_base():
dir_path = os.path.dirname(os.path.realpath(__file__))
return dir_path
def get_students_base():
return os.path.join(get_irlc_base(), "../../../02465students/")
def pd2latex_(pd, index=False, escape=False, column_spec=None, **kwargs): # You can add column specs.
for c in pd.columns:
if pd[c].values.dtype == 'float64' and all(pd[c].values - np.round(pd[c].values)==0):
pd[c] = pd[c].astype(int)
ss = pd.to_latex(index=index, escape=escape, **kwargs)
return fix_bookstabs_latex_(ss,column_spec=column_spec)
def fix_bookstabs_latex_(ss, linewidth=True, first_column_left=True, column_spec=None):
to_tabular_x = linewidth
if to_tabular_x:
ss = ss.replace("tabular", "tabularx")
lines = ss.split("\n")
hd = lines[0].split("{")
if column_spec is None:
adj = (('l' if to_tabular_x else 'l') if first_column_left else 'C') + ("".join(["C"] * (len(hd[-1][:-1]) - 1)))
else:
adj = column_spec
# adj = ( ('l' if to_tabular_x else 'l') if first_column_left else 'C') + ("".join(["C"] * (len(hd[-1][:-1])-1)))
if linewidth:
lines[0] = "\\begin{tabularx}{\\linewidth}{" + adj + "}"
else:
lines[0] = "\\begin{tabular}{" + adj.lower() + "}"
ss = '\n'.join(lines)
return ss
def plotenv(env : gymnasium.Env):
"""
Given a Gymnasium environment instance, this function will plot the environment as a matplotlib image. Remember to call ``plt.show()`` to actually see the image.
For this function to work, you must create the environment with :python:`render_mode='human'`.
.. note::
This function may not work for all gymnasium environments, however, it will work for most environments we use in this course.
:param env: The environment to plot.
"""
from PIL import Image
import matplotlib.pyplot as plt
if hasattr(env, 'render_mode') and not env.render_mode == 'rgb_array':
env.render_mode, rmt = 'rgb_array', env.render_mode
frame = env.render()
if hasattr(env, 'render_mode') and not env.render_mode == 'rgb_array':
env.render_mode = rmt
im = Image.fromarray(frame)
plt.figure(figsize=(16, 16))
plt.imshow(im)
plt.axis('off')
plt.tight_layout()
def _savepdf_env(file, env):
from PIL import Image
import matplotlib.pyplot as plt
if hasattr(env, 'render_mode') and not env.render_mode == 'rgb_array':
env.render_mode, rmt = 'rgb_array', env.render_mode
frame = env.render()
if hasattr(env, 'render_mode') and not env.render_mode == 'rgb_array':
env.render_mode = rmt
im = Image.fromarray(frame)
snapshot_base = file
if snapshot_base.endswith(".png"):
sf = snapshot_base[:-4]
fext = 'png'
else:
fext = 'pdf'
if snapshot_base.endswith(".pdf"):
sf = snapshot_base[:-4]
else:
sf = snapshot_base
sf = f"{sf}.{fext}"
dn = os.path.dirname(sf)
if len(dn) > 0 and not os.path.isdir(dn):
os.makedirs(dn)
print("Saving snapshot of environment to", os.path.abspath(sf))
if fext == 'png':
im.save(sf)
from irlc import _move_to_output_directory
_move_to_output_directory(sf)
else:
plt.figure(figsize=(16, 16))
plt.imshow(im)
plt.axis('off')
plt.tight_layout()
from irlc import savepdf
savepdf(sf, verbose=True)
# plt.show()
def savepdf(pdf, verbose=False, watermark=False, env=None):
"""
Convenience function for saving PDFs. Just call it after you have created your plot as ``savepdf('my_file.pdf')``
to save a PDF of the plot.
You can also pass an environment, in which case the environment will be stored to a pdf file.
:param pdf: The file to save to, for instance ``"my_pdf.pdf"``
:param verbose: Print output destination (optional)
:param watermark: Include a watermark (optional)
:return: Full path of the created PDF.
"""
if env is not None:
_savepdf_env(pdf, env)
return
import matplotlib.pyplot as plt
pdf = os.path.normpath(pdf.strip())
pdf = pdf+".pdf" if not pdf.endswith(".pdf") else pdf
if os.sep in pdf:
pdf = os.path.abspath(pdf)
else:
pdf = os.path.join(os.getcwd(), "pdf", pdf)
if not os.path.isdir(os.path.dirname(pdf)):
os.makedirs(os.path.dirname(pdf))
# filename = None
stack = inspect.stack()
modules = [inspect.getmodule(s[0]) for s in inspect.stack()]
files = [m.__file__ for m in modules if m is not None]
if any( [f.endswith("RUN_OUTPUT_CAPTURE.py") for f in files] ):
return
# for s in stack:
# print(s)
# print(stack)
# for k in range(len(stack)-1, -1, -1):
# frame = stack[k]
# module = inspect.getmodule(frame[0])
# filename = module.__file__
# print(filename)
# if not any([filename.endswith(f) for f in ["pydev_code_executor.py", "pydevd.py", "_pydev_execfile.py", "pydevconsole.py", "pydev_ipython_console.py"] ]):
# # print("breaking c. debugger", filename)
# break
# if any( [filename.endswith(f) for f in ["pydevd.py", "_pydev_execfile.py"]]):
# print("pdf path could not be resolved due to debug mode being active in pycharm", filename)
# return
# print("Selected filename", filename)
# wd = os.path.dirname(filename)
# pdf_dir = wd +"/pdf"
# if filename.endswith("_RUN_OUTPUT_CAPTURE.py"):
# return
# if not os.path.isdir(pdf_dir):
# os.mkdir(pdf_dir)
wd = os.getcwd()
irlc_base = os.path.dirname(__file__)
if False:
pass
else:
plt.savefig(fname=pdf)
outf = os.path.normpath(os.path.abspath(pdf))
print("> [savepdf]", pdf + (f" [full path: {outf}]" if verbose else ""))
return outf
def _move_to_output_directory(file):
"""
Hidden function: Move file given file to static output dir.
"""
if not is_this_my_computer():
return
CDIR = os.path.dirname(os.path.realpath(__file__)).replace('\\', '/')
shared_output_dir = CDIR + "/../../shared/output"
shutil.copy(file, shared_output_dir + "/"+ os.path.basename(file) )
def bmatrix(a):
if False:
return a.__str__()
else:
np.set_printoptions(suppress=True)
"""Returns a LaTeX bmatrix
:a: numpy array
:returns: LaTeX bmatrix as a string
"""
if len(a.shape) > 2:
raise ValueError('bmatrix can at most display two dimensions')
lines = str(a).replace('[', '').replace(']', '').splitlines()
rv = [r'\begin{bmatrix}']
rv += [' ' + ' & '.join(l.split()) + r'\\' for l in lines]
rv += [r'\end{bmatrix}']
return '\n'.join(rv)
def is_this_my_computer():
CDIR = os.path.dirname(os.path.realpath(__file__)).replace('\\', '/')
return os.path.exists(CDIR + "/../../Exercises")
def cache_write(object, file_name, only_on_professors_computer=False, verbose=True, protocol=-1): # -1 is default protocol. Fix crash issue with large files.
if only_on_professors_computer and not is_this_my_computer():
""" Probably for your own good :-). """
return
dn = os.path.dirname(file_name)
if not os.path.exists(dn):
os.mkdir(dn)
if verbose: print("Writing cache...", file_name)
with lzma.open(file_name, 'wb') as f:
pickle.dump(object, f)
# compress_pickle.dump(object, f, compression="lzma", protocol=protocol)
if verbose:
print("Done!")
def cache_exists(file_name):
return os.path.exists(file_name)
def cache_read(file_name):
if os.path.exists(file_name):
with lzma.open(file_name, 'rb') as f:
return pickle.load(f)
else:
return None
File added
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
# from irlc.car.car_viewer import CarViewer
from irlc.car.car_viewer import CarViewerPygame
import numpy as np
import sympy as sym
from scipy.optimize import Bounds
from gymnasium.spaces import Box
from irlc.car.sym_map import SymMap, wrap_angle
from irlc.ex03.control_model import ControlModel
from irlc.ex03.control_cost import SymbolicQRCost
from irlc.ex04.discrete_control_model import DiscreteControlModel
from irlc.ex04.control_environment import ControlEnvironment
# from irlc.ex03.control_specification import ControlSpecification
"""
class MySpecification():
def get_bounds(self):
return bounds
def get_cost(self):
pass
def sym_f(self):
return ...
def simulate(self):
# Simulate using RK4.
pass
spec = MySpecification()
model = Model(spec)
model.simulate(...)
"""
class SymbolicBicycleModel(ControlModel):
metadata = {
'render.modes': ['human', 'rgb_array'],
'video.frames_per_second': 30
}
def __init__(self, map_width=0.8, simple_bounds=None, cost=None, hot_start=False, verbose=True):
s = """
Coordinate system of the car:
State x consist of
x[0] = Vx (speed in direction of the car body)
x[1] = Vy (speed perpendicular to car body)
x[2] = wz (Yaw rate; how fast the car is turning)
x[3] = e_psi (Angle of rotation between car body and centerline)
x[4] = s (How far we are along the track)
x[5] = e_y (Distance between car body and closest point on centerline)
Meanwhile the actions are
u[0] : Angle between wheels and car body (i.e. are we steering to the right or to the left)
u[1] : Engine force (applied to the rear wheels, i.e. accelerates car)
"""
if verbose:
print(s)
# if simple_bounds is None:
# simple_bounds = dict()
self.map = SymMap(width=map_width)
self.v_max = 3.0
self.viewer = None # rendering
self.hot_start = hot_start
# self.observation_space = Box(low=np.asarray([-np.inf, -np.inf, -np.inf, -np.inf, -np.inf, -map_width], dtype=float),
# high=np.asarray([v_max, np.inf, np.inf, np.inf, np.inf, map_width]), dtype=float)
# self.action_space = Box(low=np.asarray([-0.5, -1]), high=np.asarray([0.5, 1]), dtype=float)
# xl = np.zeros((6,))
# xl[4] = self.map.TrackLength
# simple_bounds = {'x0': Bounds([-np.inf, -np.inf, -np.inf, -np.inf, -np.inf, -map_width], [v_max, np.inf, np.inf, np.inf, np.inf, map_width]),
# 'xF': Bounds(list(xl), list(xl)), **simple_bounds}
# n = 6
# d = 2
# if cost is None:
# cost = SymbolicQRCost(Q=np.zeros((6,6)), R=np.eye(2)*10, qc=0*1.)
# bounds = dict(x_low=[-np.inf, -np.inf, -np.inf, -np.inf, -np.inf, -map_width], x_high=[self.v_max, np.inf, np.inf, np.inf, np.inf, map_width],
# u_low=[-0.5, -1], u_high=[0.5, 1])
super().__init__()
def get_cost(self) -> SymbolicQRCost:
return SymbolicQRCost(Q=np.zeros((6,6)), R=np.eye(2)*10, qc=1.*0)
def x_bound(self) -> Box:
return Box(np.asarray([-np.inf, -np.inf, -np.inf, -np.inf, -np.inf, -self.map.width]),
np.asarray([self.v_max, np.inf, np.inf, np.inf, np.inf, self.map.width]))
def u_bound(self) -> Box:
return Box(np.asarray([-0.5, -1]),np.asarray([0.5, 1]))
def render(self, x, render_mode='human'):
if self.viewer == None:
self.viewer = CarViewerPygame(self)
self.viewer.update(self.x_curv2x_XY(x))
return self.viewer.blit(render_mode=render_mode)
# return self.viewer.render(return_rgb_array=mode == 'rgb_array')
def close(self):
if self.viewer is not None:
self.viewer.close()
def x_curv2x_XY(self, x_curv):
'''
Utility function for converting x (including velocities, etc.) from local (curvilinear) coordinates to global XY position.
'''
Xc, Yc, vangle = self.map.getGlobalPosition(s=x_curv[4], ey=x_curv[5], epsi=x_curv[3])
dglob = np.asarray([x_curv[0], x_curv[1], x_curv[2], vangle, Xc, Yc])
return dglob
def sym_f(self, x, u, t=None, curvelinear_coordinates=True, curvature_s=None):
'''
Create derivative function
\dot{x} = f(x, u)
We will both create it in curvelinear coordinates or normal (global) coordinates.
'''
# Vehicle Parameters
m = 1.98
lf = 0.125
lr = 0.125
Iz = 0.024
Df = 0.8 * m * 9.81 / 2.0
Cf = 1.25
Bf = 1.0
Dr = 0.8 * m * 9.81 / 2.0
Cr = 1.25
Br = 1.0
vx = x[0]
vy = x[1]
wz = x[2]
if curvelinear_coordinates:
epsi = x[3]
s = x[4]
ey = x[5]
else:
psi = x[3]
delta = u[0]
a = u[1]
alpha_f = delta - sym.atan2(vy + lf * wz, vx)
alpha_r = -sym.atan2(vy - lf * wz, vx)
# Compute lateral force at front and rear tire
Fyf = 2 * Df * sym.sin(Cf * sym.atan(Bf * alpha_f))
Fyr = 2 * Dr * sym.sin(Cr * sym.atan(Br * alpha_r))
d_vx = (a - 1 / m * Fyf * sym.sin(delta) + wz * vy)
d_vy = (1 / m * (Fyf * sym.cos(delta) + Fyr) - wz * vx)
d_wz = (1 / Iz * (lf * Fyf * sym.cos(delta) - lr * Fyr))
if curvelinear_coordinates:
cur = self.map.sym_curvature(s)
d_epsi = (wz - (vx * sym.cos(epsi) - vy * sym.sin(epsi)) / (1 - cur * ey) * cur)
d_s = ((vx * sym.cos(epsi) - vy * sym.sin(epsi)) / (1 - cur * ey))
"""
Compute derivative of e_y here (d_ey). See paper for details.
"""
d_ey = (vx * sym.sin(epsi) + vy * sym.cos(epsi)) # Old ex here ! b ! b
# implement the ODE governing ey (distane from center of road) in curveliner coordinates
xp = [d_vx, d_vy, d_wz, d_epsi, d_s, d_ey]
else:
d_psi = wz
d_X = ((vx * sym.cos(psi) - vy * sym.sin(psi)))
d_Y = (vx * sym.sin(psi) + vy * sym.cos(psi))
xp = [d_vx, d_vy, d_wz, d_psi, d_X, d_Y]
return xp
def fix_angles(self, x):
# fix angular component of x
if x.size == self.state_size:
x[3] = wrap_angle(x[3])
elif x.shape[1] == self.state_size:
x[:,3] = wrap_angle(x[:,3])
return x
class DiscreteCarModel(DiscreteControlModel):
def __init__(self, dt=0.1, cost=None, **kwargs):
model = SymbolicBicycleModel(**kwargs)
# self.observation_space = model.observation_space
# self.action_space = model.action_space
# n = 6
# d = 2
# if cost is None:
# from irlc.ex04.cost_discrete import DiscreteQRCost
# cost = DiscreteQRCost(Q=np.zeros((model.state_size, model.state_size)), R=np.eye(model.action_size))
super().__init__(model=model, dt=dt, cost=cost)
# self.cost = cost
self.map = model.map
class CarEnvironment(ControlEnvironment):
def __init__(self, Tmax=10, noise_scale=1.0, cost=None, max_laps=10, hot_start=False, render_mode=None, **kwargs):
discrete_model = DiscreteCarModel(cost=cost, hot_start=hot_start, **kwargs)
super().__init__(discrete_model, Tmax=Tmax, render_mode=render_mode)
self.map = discrete_model.map
self.noise_scale = noise_scale
self.cost = cost
self.completed_laps = 0
self.max_laps = max_laps
def simple_bounds(self):
simple_bounds = {'x': Bounds(self.observation_space.low, self.observation_space.high),
't0': Bounds([0], [0]),
'u': Bounds(self.action_space.low, self.action_space.high)}
return simple_bounds
""" We add a bit of noise for backward compatibility. """
def step(self, u):
# We don't want to render the car before we have added jitter (below). These lines therefore disable rendering
self.render_mode, rmt_ = None, self.render_mode
xp, cost, terminated, truncated, info = super().step(u)
self.render_mode = rmt_
x = xp
if hasattr(self, 'seed') and self.seed is not None and not callable(self.seed):
np.random.seed(self.seed)
noise_vx = np.maximum(-0.05, np.minimum(np.random.randn() * 0.01, 0.05))
noise_vy = np.maximum(-0.1, np.minimum(np.random.randn() * 0.01, 0.1))
noise_wz = np.maximum(-0.05, np.minimum(np.random.randn() * 0.005, 0.05))
if True: #self.noise_scale > 0:
x[0] = x[0] + 0.03 * noise_vx #* self.noise_scale
x[1] = x[1] + 0.03 * noise_vy #* self.noise_scale
x[2] = x[2] + 0.03 * noise_wz #* self.noise_scale
if x[4] > self.map.TrackLength:
self.completed_laps += 1
x[4] -= self.map.TrackLength
done = self.completed_laps >= self.max_laps
if x[4] < 0:
assert(False)
if self.render_mode == 'human':
self.render()
return x, cost, done, False, info
def L(self, x):
'''
Implement whether we have obtained the terminal condition. see eq. 4 in "Autonomous Racing using LMPC"
:param x:
:return:
'''
return x[4] > self.map.TrackLength
def epoch_reset(self, x):
'''
After completing one epoch, i.e. when L(x) == True, reset the x-vector using this method to
restart the epoch. In practice, take one more lap on the track.
:param x:
:return:
'''
x = x.copy()
x[4] -= self.map.TrackLength
return x
def _get_initial_state(self):
x0 = np.zeros((6,))
if self.discrete_model.continuous_model.hot_start:
x0[0] = 0.5 # Start velocity is 0.5
# self.render()
return x0
if __name__ == "__main__":
# car = SymbolicBicycleModel()
# car.render(car.reset())
# sleep(2.0)
# car.close()
# print("Hello world")
env = CarEnvironment(render_mode='human')
env.metadata['video.frames_per_second'] = 10000
# from irlc import VideoMonitor
# env = wrappers.Monitor(env, "carvid2", force=True, video_callable=lambda episode_id: True)
# env = VideoMonitor(env)
env.reset()
import time
t0 = time.time()
n = 300
for _ in range(n):
u = env.action_space.sample()
# print(u)
# u *= 0
u[0] = 0
u[1] = 0.01
s, cost, done, truncated, info = env.step(u)
# print(s)
# sleep(5)
env.close()
tpf = (time.time()- t0)/n
print("TPF", tpf, "fps", 1/tpf)
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
# from pyglet.shapes import Rectangle, Circle
# from irlc.utils.pyglet_rendering import PygletViewer, PolygonOutline, GroupedElement
import pygame
from irlc.utils.graphics_util_pygame import UpgradedGraphicsUtil
import numpy as np
track_outline = (0, 0, 0)
track_middle = (220, 25, 25)
class CarViewerPygame(UpgradedGraphicsUtil):
def __init__(self, car):
n = int(10 * (car.map.PointAndTangent[-1, 3] + car.map.PointAndTangent[-1, 4]))
center = [car.map.getGlobalPosition(i * 0.1, 0) for i in range(n)]
outer = [car.map.getGlobalPosition(i * 0.1, -car.map.width) for i in range(n)]
inner = [car.map.getGlobalPosition(i * 0.1, car.map.width) for i in range(n)]
fudge = 0.2
xs, ys = zip(*outer)
super().__init__(screen_width=1000, xmin=min(xs) - fudge, xmax=max(xs) + fudge,
ymax=min(ys) - fudge, ymin=max(ys) + fudge, title="Racecar environment")
self.center = center
self.outer = outer
self.inner = inner
# Load ze sprite.
from irlc.utils.graphics_util_pygame import Object
self.car = Object("car.png", image_width=90)
def render(self):
green = (126, 200, 80)
track = (144,)*3
self.draw_background(background_color=green)
self.polygon("safd", self.outer, fillColor=track, outlineColor=track_outline, width=3)
self.polygon("in", self.inner, fillColor=green, outlineColor=track_outline, width=3)
self.polygon("in", self.center, fillColor=None, filled=False, outlineColor=(100, 100, 100), width=5)
# Now draw the pretty car.
x, y, psi = self.xglob[4], self.xglob[5], self.xglob[3]
xy = self.fixxy((x,y))
# self.car.rect.move()
self.car.rect.center = xy
# self.car.rect.center = xy[1]
self.car.rotate(psi / (2*np.pi) * 360)
# self.car.rotate(45)
self.car.blit(self.surf)
self.circle("in", (x,y), 4, fillColor=(255, 0, 0)) # drawn on the center of the car.
def update(self, xglob):
self.xglob = xglob
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
import pdb
import matplotlib.pyplot as plt
import numpy as np
import numpy.linalg as la
import sympy as sym
"""
This is a bunch of pretty awful code to define a map and compute useful quantities like tangents, etc.
Defining a map is pretty straight forward (it consist of circle archs and lines), but
don't try to read on.
"""
class SymMap:
def plot(self, show=False):
PointAndTangent, TrackLength, extra = self.spec2PointAndTangent(self.spec)
for i in range(PointAndTangent.shape[0]-1):
extra_ = extra[i]
if 'CenterX' in extra_:
CenterX, CenterY = extra_['CenterX'], extra_['CenterY']
angle, spanAng = extra_['angle'], extra_['spanAng']
r = self.spec[i,1]
direction = 1 if r >= 0 else -1
# Plotting. Ignore this
plt.plot(CenterX, CenterY, 'ro')
tt = np.linspace(angle, angle + direction * spanAng)
plt.plot(CenterX + np.cos(tt) * np.abs(r), CenterY + np.abs(r) * np.sin(tt), 'r-')
x, y = PointAndTangent[:, 0], PointAndTangent[:, 1]
plt.plot(x, y, '.-')
print(np.sum(np.sum(np.abs(self.PointAndTangent - PointAndTangent))))
if show:
plt.show()
'''
Format:
PointAndTangent = [x,
y,
psi: angle of tangent vector at the last point of segment,
total-distance-travelled,
segment-length, curvature]
Also creates a symbolic expression to evaluate track position.
'''
def spec2PointAndTangent(self, spec):
# also create a symbolic piecewise expression to evaluate the curvature as a function of track length location.
# spec = self.spec
# PointAndTangent = self.PointAndTangent.copy()
PointAndTangent = np.zeros((spec.shape[0] + 1, 6))
extra = []
N = spec.shape[0]
segment_s_cur = 0 # Distance travelled to start of segment (s-coordinate).
angle_prev = 0 # Angle of the tangent vector at the starting point of the segment
x_prev, y_prev = 0, 0 # x,y coordinate of last point of previous segment.
for i in range(N):
l, r = spec[i,0], spec[i,1] # Length of segment and radius of curvature
ang = angle_prev # Angle of the tangent vector at the starting point of the segment
if r == 0.0: # If the current segment is a straight line
x = x_prev + l * np.cos(ang) # x coordinate of the last point of the segment
y = y_prev + l * np.sin(ang) # y coordinate of the last point of the segment
psi = ang # Angle of the tangent vector at the last point of the segment
curvature = 0
extra_ = {}
else:
direction = 1 if r >= 0 else -1
CenterX = x_prev + np.abs(r) * np.cos(ang + direction * np.pi / 2) # x coordinate center of circle
CenterY = y_prev + np.abs(r) * np.sin(ang + direction * np.pi / 2) # y coordinate center of circle
spanAng = l / np.abs(r) # Angle spanned by the circle
psi = wrap_angle(ang + spanAng * np.sign(r)) # Angle of the tangent vector at the last point of the segment
angleNormal = wrap_angle((direction * np.pi / 2 + ang))
angle = -(np.pi - np.abs(angleNormal)) * (sign(angleNormal))
x = CenterX + np.abs(r) * np.cos(angle + direction * spanAng) # x coordinate of the last point of the segment
y = CenterY + np.abs(r) * np.sin(angle + direction * spanAng) # y coordinate of the last point of the segment
curvature = 1/r
extra_ = {'CenterX': CenterX,
'CenterY': CenterY,
'angle': angle,
'direction': direction,
'spanAng': spanAng}
extra.append(extra_)
NewLine = np.array([x, y, psi, segment_s_cur, l, curvature])
PointAndTangent[i, :] = NewLine # Write the new info
x_prev, y_prev, angle_prev = PointAndTangent[i, 0], PointAndTangent[i, 1], PointAndTangent[i, 2]
segment_s_cur += l
xs = PointAndTangent[-2, 0]
ys = PointAndTangent[-2, 1]
xf = 0
yf = 0
psif = 0
l = np.sqrt((xf - xs) ** 2 + (yf - ys) ** 2)
NewLine = np.array([xf, yf, psif, PointAndTangent[-2, 3] + PointAndTangent[-2, 4], l, 0])
PointAndTangent[-1, :] = NewLine
TrackLength = PointAndTangent[-1, 3] + PointAndTangent[-1, 4]
return PointAndTangent, TrackLength, extra
"""map object
Attributes:
getGlobalPosition: convert position from (s, ey) to (X,Y)
"""
def __init__(self, width):
"""Initialization
width: track width
Modify the vector spec to change the geometry of the track
"""
self.width = width
self.halfWidth = 0.4
self.slack = 0.45
lengthCurve = 3.5 # 3.0
straight = 1.0
spec = np.array([[1.0, 0],
[lengthCurve, lengthCurve / np.pi],
# Note s = 1 * np.pi / 2 and r = -1 ---> Angle spanned = np.pi / 2
[straight, 0],
[lengthCurve / 2, -lengthCurve / np.pi],
[straight, 0],
[lengthCurve, lengthCurve / np.pi],
[lengthCurve / np.pi * 2 + 1.0, 0],
[lengthCurve / 2, lengthCurve / np.pi]])
PointAndTangent, TrackLength, extra = self.spec2PointAndTangent(spec)
self.PointAndTangent = PointAndTangent
self.TrackLength = TrackLength
self.spec = spec
'''
Creates a symbolic expression for the curvature
def Curvature(s, PointAndTangent):
"""curvature computation
s: curvilinear abscissa at which the curvature has to be evaluated
PointAndTangent: points and tangent vectors defining the map (these quantities are initialized in the map object)
"""
TrackLength = PointAndTangent[-1,3]+PointAndTangent[-1,4]
# In case on a lap after the first one
while (s > TrackLength):
s = s - TrackLength
# Given s \in [0, TrackLength] compute the curvature
# Compute the segment in which system is evolving
index = np.all([[s >= PointAndTangent[:, 3]], [s < PointAndTangent[:, 3] + PointAndTangent[:, 4]]], axis=0)
i = int(np.where(np.squeeze(index))[0])
curvature = PointAndTangent[i, 5]
return curvature
'''
def sym_curvature(self, s):
s = s - self.TrackLength * sym.floor(s / self.TrackLength)
n = self.PointAndTangent.shape[0]
pw = []
for i in range(n):
pw.append( (self.PointAndTangent[i,5], s - (self.PointAndTangent[i, 3] + self.PointAndTangent[i, 4]) <= 0) )
p = sym.Piecewise(*pw)
return p
def getGlobalPosition(self, s, ey, epsi=None, vangle_true=None):
"""coordinate transformation from curvilinear reference frame (e, ey) to inertial reference frame (X, Y)
(s, ey): position in the curvilinear reference frame
"""
# wrap s along the track
# while (s > self.TrackLength):
# s = s - self.TrackLength
s = np.mod(s, self.TrackLength)
# Compute the segment in which system is evolving
PointAndTangent = self.PointAndTangent
index = np.all([[s >= PointAndTangent[:, 3]], [s < PointAndTangent[:, 3] + PointAndTangent[:, 4]]], axis=0)
dx = np.where(np.squeeze(index))
if len(dx) < 1:
a = 234
raise Exception("bad")
try:
i = int(np.where(np.squeeze(index))[0])
except Exception as e:
print(e)
if PointAndTangent[i, 5] == 0.0: # If segment is a straight line
# Extract the first final and initial point of the segment
xf = PointAndTangent[i, 0]
yf = PointAndTangent[i, 1]
xs = PointAndTangent[i - 1, 0]
ys = PointAndTangent[i - 1, 1]
psi = PointAndTangent[i, 2]
# Compute the segment length
deltaL = PointAndTangent[i, 4]
reltaL = s - PointAndTangent[i, 3]
# Do the linear combination
x = (1 - reltaL / deltaL) * xs + reltaL / deltaL * xf + ey * np.cos(psi + np.pi / 2)
y = (1 - reltaL / deltaL) * ys + reltaL / deltaL * yf + ey * np.sin(psi + np.pi / 2)
if epsi is not None:
vangle = psi + epsi
else:
r = 1 / PointAndTangent[i, 5] # Extract curvature
ang = PointAndTangent[i - 1, 2] # Extract angle of the tangent at the initial point (i-1)
# Compute the center of the arc
direction = 1 if r >= 0 else -1
# if r >= 0:
# direction = 1
# else:
# direction = -1
CenterX = PointAndTangent[i - 1, 0] + np.abs(r) * np.cos(ang + direction * np.pi / 2) # x coordinate center of circle
CenterY = PointAndTangent[i - 1, 1] + np.abs(r) * np.sin(ang + direction * np.pi / 2) # y coordinate center of circle
spanAng = (s - PointAndTangent[i, 3]) / (np.pi * np.abs(r)) * np.pi
angleNormal = wrap_angle(direction * np.pi / 2 + ang)
angle = -(np.pi - np.abs(angleNormal)) * (sign(angleNormal))
x = CenterX + (np.abs(r) - direction * ey) * np.cos(angle + direction * spanAng) # x coordinate of the last point of the segment
y = CenterY + (np.abs(r) - direction * ey) * np.sin(angle + direction * spanAng) # y coordinate of the last point of the segment
if epsi is not None:
vangle = epsi + direction * spanAng + PointAndTangent[i - 1, 2]
if epsi is None:
return x,y
else:
vangle = wrap_angle(vangle)
if vangle_true is not None:
vangle_true = wrap_angle(vangle_true)
# vangle, vangle_true = np.unwrap([vangle, vangle_true])
if err(vangle - vangle_true, exception=False) > 1e-3: # debug code
print([vangle_true, vangle])
print("Bad angle, delta: ", vangle - vangle_true)
raise Exception("bad angle")
return x, y, vangle
def getLocalPosition(self, x, y, psi):
"""coordinate transformation from inertial reference frame (X, Y) to curvilinear reference frame (s, ey)
(X, Y): position in the inertial reference frame
"""
PointAndTangent = self.PointAndTangent
CompletedFlag = 0
for i in range(0, PointAndTangent.shape[0]):
if CompletedFlag == 1:
break
if PointAndTangent[i, 5] == 0.0: # If segment is a straight line
# Extract the first final and initial point of the segment
xf = PointAndTangent[i, 0]
yf = PointAndTangent[i, 1]
xs = PointAndTangent[i - 1, 0]
ys = PointAndTangent[i - 1, 1]
psi_unwrap = np.unwrap([PointAndTangent[i - 1, 2], psi])[1]
epsi = psi_unwrap - PointAndTangent[i - 1, 2]
# Check if on the segment using angles
if (la.norm(np.array([xs, ys]) - np.array([x, y]))) == 0:
s = PointAndTangent[i, 3]
ey = 0
CompletedFlag = 1
elif (la.norm(np.array([xf, yf]) - np.array([x, y]))) == 0:
s = PointAndTangent[i, 3] + PointAndTangent[i, 4]
ey = 0
CompletedFlag = 1
else:
if np.abs(computeAngle( [x,y] , [xs, ys], [xf, yf])) <= np.pi/2 and np.abs(computeAngle( [x,y] , [xf, yf], [xs, ys])) <= np.pi/2:
v1 = np.array([x,y]) - np.array([xs, ys])
angle = computeAngle( [xf,yf] , [xs, ys], [x, y])
s_local = la.norm(v1) * np.cos(angle)
s = s_local + PointAndTangent[i, 3]
ey = la.norm(v1) * np.sin(angle)
if np.abs(ey)<= self.width:
CompletedFlag = 1
else:
xf = PointAndTangent[i, 0]
yf = PointAndTangent[i, 1]
xs = PointAndTangent[i - 1, 0]
ys = PointAndTangent[i - 1, 1]
r = 1 / PointAndTangent[i, 5] # Extract curvature
direction = 1 if r >= 0 else -1
# if r >= 0:
# direction = 1
# else:
# direction = -1
ang = PointAndTangent[i - 1, 2] # Extract angle of the tangent at the initial point (i-1)
# Compute the center of the arc
CenterX = xs + np.abs(r) * np.cos(ang + direction * np.pi / 2) # x coordinate center of circle
CenterY = ys + np.abs(r) * np.sin(ang + direction * np.pi / 2) # y coordinate center of circle
# Check if on the segment using angles
if (la.norm(np.array([xs, ys]) - np.array([x, y]))) == 0:
ey = 0
psi_unwrap = np.unwrap([ang, psi])[1]
epsi = psi_unwrap - ang
s = PointAndTangent[i, 3]
CompletedFlag = 1
elif (la.norm(np.array([xf, yf]) - np.array([x, y]))) == 0:
s = PointAndTangent[i, 3] + PointAndTangent[i, 4]
ey = 0
psi_unwrap = np.unwrap([PointAndTangent[i, 2], psi])[1]
epsi = psi_unwrap - PointAndTangent[i, 2]
CompletedFlag = 1
else:
arc1 = PointAndTangent[i, 4] * PointAndTangent[i, 5]
arc2 = computeAngle([xs, ys], [CenterX, CenterY], [x, y])
if np.sign(arc1) == np.sign(arc2) and np.abs(arc1) >= np.abs(arc2):
v = np.array([x, y]) - np.array([CenterX, CenterY])
s_local = np.abs(arc2)*np.abs(r)
s = s_local + PointAndTangent[i, 3]
ey = -np.sign(direction) * (la.norm(v) - np.abs(r))
psi_unwrap = np.unwrap([ang + arc2, psi])[1]
epsi = psi_unwrap - (ang + arc2)
if np.abs(ey) <= self.width:
CompletedFlag = 1
if epsi>1.0:
raise Exception("epsi very large; car in wrong direction")
pdb.set_trace()
if CompletedFlag == 0:
s = 10000
ey = 10000
epsi = 10000
print("Error!! POINT OUT OF THE TRACK!!!! <==================")
raise Exception("car outside track")
# pdb.set_trace()
return s, ey, epsi, CompletedFlag
def curvature_and_angle(self, s):
"""curvature computation
s: curvilinear abscissa at which the curvature has to be evaluated
PointAndTangent: points and tangent vectors defining the map (these quantities are initialized in the map object)
"""
PointAndTangent = self.PointAndTangent
TrackLength = PointAndTangent[-1, 3] + PointAndTangent[-1, 4]
# In case on a lap after the first one
while (s > TrackLength):
s = s - TrackLength
# Given s \in [0, TrackLength] compute the curvature
# Compute the segment in which system is evolving
index = np.all([[s >= PointAndTangent[:, 3]], [s < PointAndTangent[:, 3] + PointAndTangent[:, 4]]], axis=0)
i = int(np.where(np.squeeze(index))[0])
curvature = PointAndTangent[i, 5]
angle = PointAndTangent[i, 4] # tangent angle of path
return curvature, angle, i
# ======================================================================================================================
# ======================================================================================================================
# ====================================== Internal utilities functions ==================================================
# ======================================================================================================================
# ======================================================================================================================
def computeAngle(point1, origin, point2):
# The orientation of this angle matches that of the coordinate system. Tha is why a minus sign is needed
v1 = np.array(point1) - np.array(origin)
v2 = np.array(point2) - np.array(origin)
dot = v1[0] * v2[0] + v1[1] * v2[1] # dot product between [x1, y1] and [x2, y2]
det = v1[0] * v2[1] - v1[1] * v2[0] # determinant
angle = np.arctan2(det, dot) # atan2(y, x) or atan2(sin, cos)
return angle
'''
This is used because np.sign(a) return 0 when a=0, which is pretty stupid.
'''
def sign(a):
return 1 if a >= 0 else -1
def wrap_angle(angle):
return np.mod(angle+np.pi, 2 * np.pi) - np.pi
'''
Compute difference of these two vectors taking into account the angular component wraps
'''
def xy_diff(x,y):
dx = x-y
if len(dx.shape) == 1:
dx[3] = wrap_angle(dx[3])
else:
dx[:,3] = wrap_angle(dx[:,3])
return dx
def unityTestChangeOfCoordinates(map, ClosedLoopData):
"""For each point in ClosedLoopData change (X, Y) into (s, ey) and back to (X, Y) to check accurancy
"""
TestResult = 1
for i in range(0, ClosedLoopData.x.shape[0]):
xdat = ClosedLoopData.x
xglobdat = ClosedLoopData.x_glob
s, ey, epsi, _ = map.getLocalPosition(x=xglobdat[i, 4], y=xglobdat[i, 5], psi=xglobdat[i, 3])
v1 = np.array([epsi, s, ey])
v2 = np.array(xdat[i, 3:6])
x,y,vangle = np.array(map.getGlobalPosition(s=v1[1], ey=v1[2],epsi=v1[0], vangle_true=xglobdat[i,3] ))
v3 = np.array([ vangle, x, y])
v4 = np.array( [wrap_angle( xglobdat[i, 3] )] + xglobdat[i, 4:6].tolist() )
# print(i)
if np.abs( wrap_angle( xglobdat[i, 3] ) - vangle ) > 0.1:
print("BAD")
raise Exception("bad angle test result")
if np.dot(v3 - v4, v3 - v4) > 0.00000001:
TestResult = 0
print("ERROR", v1, v2, v3, v4)
# pdb.set_trace()
v1 = np.array(map.getLocalPosition(xglobdat[i, 4], xglobdat[i, 5]))
v2 = np.array(xdat[i, 4:6])
v3 = np.array(map.getGlobalPosition(v1[0], v1[1]))
v4 = np.array([xglobdat[i, 4], xglobdat[i, 5]])
print(np.dot(v3 - v4, v3 - v4))
# pdb.set_trace()
if TestResult == 1:
print("Change of coordinates test passed!")
def err(x, exception=True, tol=1e-5, message="Error too large!"):
er = np.mean(np.abs(x).flat)
if er > tol:
print(message)
print(x)
print(er)
if exception:
raise Exception(message)
return er
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
"""This directory contains the exercises for week 0."""
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
def add(a, b):
""" This function shuold return the sum of a and b. I.e. if print(add(2,3)) should print '5'. """
# TODO: 1 lines missing.
raise NotImplementedError("Implement function body")
def misterfy(animals):
"""
Given a list of animals like animals=["cat", "wolf", "elephans"], this function should return
a list like ["mr cat", "mr wolf", "mr elephant"] """
# TODO: 1 lines missing.
raise NotImplementedError("Implement function body")
def mean_value(p_dict):
"""
Given a dictionary of the form: {x: probability_of_x, ...} compute the mean value of
x, i.e. sum_i x_i * p(x_i). The recommended way is to use list comprehension and not numpy.
Hint: Look at the .items() method and the build-in sum(my_list) method. """
# TODO: 1 lines missing.
raise NotImplementedError("Implement function body")
def fruits_ordered(order_dict):
# TODO: 1 lines missing.
raise NotImplementedError("Implement function body")
class BasicFruitShop:
""" This is a simple class that represents a fruit-shop.
You instantiate it with a dictionary of prices """
def __init__(self, name, prices):
""" prices is a dictionary of the form {fruit_name: cost}. For instance
prices = {'apple': 5, 'orange': 6} """
self.name = name
self.prices = prices
def cost(self, fruit):
""" Return the cost in pounds of the fruit with name 'fruit'. It uses the self.prices variable
to get the price.
You don't need to do exception handling here. """
# TODO: 1 lines missing.
raise NotImplementedError("Return cost of fruit as a floating point number")
class OnlineFruitShop(BasicFruitShop):
def price_of_order(self, order):
"""
order_dict = {'apple': 5, 'pear': 2, ...} where the numbers are the quantity ordered.
Hints: Dictionary comprehension like:
> for fruit, pounds in order_dict.items()
> self.getCostPerPound(fruit) allows you to get cost of a fruit
> the total is sum of {pounds} * {cost_per_pound}
"""
# TODO: 1 lines missing.
raise NotImplementedError("return the total cost of the order")
def shop_smart(order, fruit_shops):
"""
order_dict: dictionary {'apple': 3, ...} of fruits and the pounds ordered
fruitShops: List of OnlineFruitShops
Hints:
> Remember there is a s.price_of_order method
> Use this method to first make a list containing the cost of the order at each fruit shop
> List has form [cost1, cost2], then find the index of the smallest value (the list has an index-function)
> return fruitShops[lowest_index].
"""
# TODO: 2 lines missing.
raise NotImplementedError("Implement function body")
return best_shop
if __name__ == '__main__':
"This code runs when you invoke the script from the command line (but not otherwise)"
""" Quesion 1: Lists and basic data types """
print("add(2,5) function should return 7, and it returned", add(2, 5))
animals = ["cat", "giraffe", "wolf"]
print("The nice animals are", misterfy(animals))
"""
This problem represents the probabilities of a loaded die as a dictionary such that
> p(roll=3) = p_dict[3] = 0.15.
"""
p_die = {1: 0.20,
2: 0.10,
3: 0.15,
4: 0.05,
5: 0.10,
6: 0.40}
print("Mean roll of die, sum_{i=1}^6 i * p(i) =", mean_value(p_die))
order = {'apples': 1.0,
'oranges': 3.0}
print("The different fruits in the fruit-order is", fruits_ordered(order))
""" Part B: A simple class """
price1 = {"apple": 4, "pear": 8, 'orange': 10}
shop1 = BasicFruitShop("Alis Funky Fruits", price1)
price2 = {'banana': 9, "apple": 5, "pear": 7, 'orange': 11}
shop2 = BasicFruitShop("Hansen Fruit Emporium", price2)
fruit = "apple"
print("The cost of", fruit, "in", shop1.name, "is", shop1.cost(fruit))
print("The cost of", fruit, "in", shop2.name, "is", shop2.cost(fruit))
""" Part C: Class inheritance """
price_of_fruits = {'apples': 2, 'oranges': 1, 'pears': 1.5, 'mellon': 10}
shopA = OnlineFruitShop('shopA', price_of_fruits)
print("The price of the given order in shopA is", shopA.price_of_order(order))
""" Part C: Using classes """
shopB = OnlineFruitShop('shopB', {'apples': 1.0, 'oranges': 5.0})
shops = [shopA, shopB]
print("For the order", order, " the best shop is", shop_smart(order, shops).name)
order = {'apples': 3.0} # test with a new order.
print("For the order", order, " the best shop is", shop_smart(order, shops).name)
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
"""This directory contains the exercises for week 1."""
File added
File added
File added
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
"""The Agent class.
References:
[Her24] Tue Herlau. Sequential decision making. (Freely available online), 2024.
"""
import typing
import itertools
import os
import sys
from collections import OrderedDict, namedtuple
import numpy as np
from tqdm import tqdm
from irlc.utils.common import load_time_series, log_time_series
from irlc.utils.irlc_plot import existing_runs
import shutil
from gymnasium import Env
from dataclasses import dataclass
class Agent:
r"""The main agent class. See (Her24, Subsection 4.4.3) for additional details.
To use the agent class, you should first create an environment. In this case we will just create an instance of the
``InventoryEnvironment`` (see (Her24, Subsection 4.2.3))
:Example:
.. runblock:: pycon
>>> from irlc import Agent # You can import directly from top-level package
>>> import numpy as np
>>> np.random.seed(42) # Fix the seed for reproduciability
>>> from irlc.ex01.inventory_environment import InventoryEnvironment
>>> env = InventoryEnvironment() # Create an instance of the environment
>>> agent = Agent(env) # Create an instance of the agent.
>>> s0, info0 = env.reset() # Always call reset to start the environment
>>> a0 = agent.pi(s0, k=0, info=info0) # Tell the agent to compute action $a_{k=0}$
>>> print(f"In state {s0=}, the agent took the action {a0=}")
"""
def __init__(self, env: Env):
"""Instantiate the Agent class.
The agent is given the openai gym environment it must interact with. This allows the agent to know what the
action and observation space is.
:param env: The openai gym ``Env`` instance the agent should interact with.
"""
self.env = env
def pi(self, s, k : int, info : typing.Optional[dict] =None):
r"""Evaluate the Agent's policy (i.e., compute the action the agent want to take) at time step ``k`` in state ``s``.
This correspond to the environment being in a state evaluating :math:`x_k`, and the function should compute the next
action the agent wish to take:
.. math::
u_k = \mu_k(x_k)
This means that ``s`` = :math:`x_k` and ``k`` = :math:`k =\{0, 1, ...\}`. The function should return an action that lies in the action-space
of the environment.
The info dictionary:
The ``info``-dictionary contains possible extra information returned from the environment, for instance when calling the ``s, info = env.reset()`` function.
The main use in this course is in control, where the dictionary contains a value ``info['time_seconds']`` (which corresponds to the simulation time :math:`t` in seconds).
We will also use the info dictionary to let the agent know certain actions are not available. This is done by setting the ``info['mask']``-key.
Note that this is only relevant for reinforcement learning, and you should see the documentation/exercises for reinforcement learning for additional details.
The default behavior of the agent is to return a random action. An example:
.. runblock:: pycon
>>> from irlc.pacman.pacman_environment import PacmanEnvironment
>>> from irlc import Agent
>>> env = PacmanEnvironment()
>>> s, info = env.reset()
>>> agent = Agent(env)
>>> agent.pi(s, k=0, info=info) # get a random action
>>> agent.pi(s, k=0) # If info is not specified, all actions are assumed permissible.
:param s: Current state the environment is in.
:param timestep: Current time
:return: The action the agent want to take in the given state at the given time. By default the agent returns a random action
"""
if info is None or 'mask' not in info:
return self.env.action_space.sample()
else:
""" In the case where the actions available in each state differ, openAI deals with that by specifying a
``mask``-entry in the info-dictionary. The mask can then be passed on to the
env.action_space.sample-function to make sure we don't sample illegal actions. I consider this the most
difficult and annoying thing about openai gym."""
if info['mask'].max() > 1:
raise Exception("Bad mask!")
return self.env.action_space.sample(mask=info['mask'])
def train(self, s, a, r, sp, done=False, info_s=None, info_sp=None):
r"""Implement this function if the agent has to learn (be trained).
Note that you only have to implement this function from week 7 onwards -- before that, we are not interested in control methods that learn.
The agent takes a number of input arguments. You should imagine that
* ``s`` is the current state :math:`x_k``
* ``a`` is the action the agent took in state ``s``, i.e. ``a`` :math:`= u_k = \mu_k(x_k)`
* ``r`` is the reward the the agent got from that action
* ``sp`` (s-plus) is the state the environment then transitioned to, i.e. ``sp`` :math:`= x_{k+1}`
* '``done`` tells the agent if the environment has stopped
* ``info_s`` is the information-dictionary returned by the environment as it transitioned to ``s``
* ``info_sp`` is the information-dictionary returned by the environment as it transitioned to ``sp``.
The following example will hopefully clarify it by showing how you would manually call the train-function once:
:Example:
.. runblock:: pycon
>>> from irlc.ex01.inventory_environment import InventoryEnvironment # import environment
>>> from irlc import Agent
>>> env = InventoryEnvironment() # Create an instance of the environment
>>> agent = Agent(env) # Create an instance of the agent.
>>> s, info_s = env.reset() # s is the current state
>>> a = agent.pi(s, k=0, info=info_s) # The agent takes an action
>>> sp, r, done, _, info_sp = env.step(a) # Environment updates
>>> agent.train(s, a, r, sp, done, info_s, info_sp) # How the training function is called
In control and dynamical programming, please recall that the reward is equal to minus the cost.
:param s: Current state :math:`x_k`
:param a: Action taken :math:`u_k`
:param r: Reward obtained by taking action :math:`a_k` in state :math:`x_k`
:param sp: The state that the environment transitioned to :math:`{\\bf x}_{k+1}`
:param info_s: The information dictionary corresponding to ``s`` returned by ``env.reset`` (when :math:`k=0`) and otherwise ``env.step``.
:param info_sp: The information-dictionary corresponding to ``sp`` returned by ``env.step``
:param done: Whether environment terminated when transitioning to ``sp``
:return: None
"""
pass
def __str__(self):
"""**Optional:** A unique name for this agent. Used for labels when plotting, but can be kept like this."""
return super().__str__()
def extra_stats(self) -> dict:
"""**Optional:** Implement this function if you wish to record extra information from the ``Agent`` while training.
You can safely ignore this method as it will only be used for control theory to create nicer plots """
return {}
fields = ('time', 'state', 'action', 'reward')
Trajectory = namedtuple('Trajectory', fields + ("env_info",))
# Experiment using a dataclass.
@dataclass
class Stats:
episode: int
episode_length: int
accumulated_reward: float
total_steps: int
trajectory : Trajectory = None
agent_stats : dict = None
@property
def average_reward(self):
return self.accumulated_reward / self.episode_length
# s = Stats(episode=0, episode_length=5, accumulated_reward=4, total_steps=2, trajectory=Trajectory())
def train(env,
agent=None,
experiment_name=None,
num_episodes=1,
verbose=True,
reset=True, # If True we will call env.reset() upon episode start.
max_steps=1e10,
max_runs=None,
return_trajectory=True, # Return the current trajectories as a list
resume_stats=None, # Resume stat collection from last save.
log_interval=1, # Only log every log_interval steps. Reduces size of log files.
delete_old_experiments=False, # Remove the old experiments folder. Useful while debugging a model (or to conserve disk space)
seed=None, # Attempt to set the seed of the random number generator to produce reproducible results.
):
"""This function implements the main training loop as described in (Her24, Subsection 4.4.4).
The loop will simulate the interaction between agent `agent` and the environment `env`.
The function has a lot of special functionality, so it is useful to consider the common cases. An example:
>>> stats, _ = train(env, agent, num_episodes=2)
Simulate interaction for two episodes (i.e. environment terminates two times and is reset).
`stats` will be a list of length two containing information from each run
>>> stats, trajectories = train(env, agent, num_episodes=2, return_Trajectory=True)
`trajectories` will be a list of length two containing information from the two trajectories.
>>> stats, _ = train(env, agent, experiment_name='experiments/my_run', num_episodes=2)
Save `stats`, and trajectories, to a file which can easily be loaded/plotted (see course software for examples of this).
The file will be time-stamped so using several calls you can repeat the same experiment (run) many times.
>>> stats, _ = train(env, agent, experiment_name='experiments/my_run', num_episodes=2, max_runs=10)
As above, but do not perform more than 10 runs. Useful for repeated experiments.
:param env: An openai-Gym ``Env`` instance (the environment)
:param agent: An ``Agent`` instance
:param experiment_name: The outcome of this experiment will be saved in a folder with this name. This will allow you to run multiple (repeated) experiment and visualize the results in a single plot, which is very important in reinforcement learning.
:param num_episodes: Number of episodes to simulate
:param verbose: Display progress bar
:param reset: Call ``env.reset()`` before simulation start. Default is ``True``. This is only useful in very rare cases.
:param max_steps: Terminate if this many steps have elapsed (for non-terminating environments)
:param max_runs: Maximum number of repeated experiments (requires ``experiment_name``)
:param return_trajectory: Return trajectories list (Off by default since it might consume lots of memory)
:param resume_stats: Resume stat collection from last run (this requires the ``experiment_name`` variable to be set)
:param log_interval: Log stats less frequently than each episode. Useful if you want to run really long experiments.
:param delete_old_experiments: If true, old saved experiments will be deleted. This is useful during debugging.
:param seed: An integer. The random number generator of the environment will be reset to this seed allowing for reproducible results.
:return: A list where each element corresponds to each (started) episode. The elements are dictionaries, and contain the statistics for that episode.
"""
from irlc import cache_write
from irlc import cache_read
saveload_model = False
# temporal_policy = None
save_stats = True
if agent is None:
print("[train] No agent was specified. Using irlc.Agent(env) (this agent selects actions at random)")
agent = Agent(env)
if delete_old_experiments and experiment_name is not None and os.path.isdir(experiment_name):
shutil.rmtree(experiment_name)
if experiment_name is not None and max_runs is not None and existing_runs(experiment_name) >= max_runs:
stats, recent = load_time_series(experiment_name=experiment_name)
if return_trajectory:
trajectories = cache_read(recent+"/trajectories.pkl")
else:
trajectories = []
return stats, trajectories
stats = []
steps = 0
ep_start = 0
resume_stats = saveload_model if resume_stats is None else resume_stats
recent = None
if resume_stats:
stats, recent = load_time_series(experiment_name=experiment_name)
if recent is not None:
ep_start, steps = stats[-1]['Episode']+1, stats[-1]['Steps']
trajectories = []
# include_metadata = len(inspect.getfullargspec(agent.train).args) >= 7
break_outer = False
with tqdm(total=num_episodes, disable=not verbose, file=sys.stdout, mininterval=int(num_episodes/100) if num_episodes>100 else None) as tq:
for i_episode in range(num_episodes):
if break_outer:
break
info_s = {}
if reset or i_episode > 0:
if seed is not None:
s, info_s = env.reset(seed=seed)
seed = None
else:
s, info_s = env.reset()
elif hasattr(env, "s"): # This is doing what, exactly? Perhaps save/load of agent?
s = env.s
elif hasattr(env, 'state'):
s = env.state
else:
s = env.model.s
# time = 0
reward = []
trajectory = Trajectory(time=[], state=[], action=[], reward=[], env_info=[])
k = 0 # initial state k.
for _ in itertools.count():
# policy is always temporal
a = agent.pi(s, k, info_s) # if temporal_policy else agent.pi(s)
k = k + 1
sp, r, terminated, truncated, info_sp = env.step(a)
done = terminated or truncated
if info_sp is not None and 'mask' in info_sp and info_sp['mask'].max() > 1:
print("bad")
agent.train(s, a, r, sp, done, info_s, info_sp)
if return_trajectory:
trajectory.time.append(np.asarray(info_s['time_seconds'] if 'time_seconds' in info_s else steps)) #np.asarray(time))
trajectory.state.append(s)
trajectory.action.append(a)
trajectory.reward.append(np.asarray(r))
trajectory.env_info.append(info_s)
reward.append(r)
steps += 1
# time += info_sp['dt'] if 'dt' in info_sp else 1
# time += 1
if done or steps >= max_steps:
trajectory.state.append(sp)
trajectory.env_info.append(info_sp)
trajectory.time.append(np.asarray(info_sp['time_seconds'] if 'time_seconds' in info_s else steps))
break_outer = steps >= max_steps
break
s = sp
info_s = info_sp
if return_trajectory:
try:
from irlc.ex04.control_environment import ControlEnvironment
if isinstance(env, ControlEnvironment): # TODO: this is too hacky. States/actions should be lists, and subsequent methods should stack.
trajectory = Trajectory(**{field: np.stack([np.asarray(x_) for x_ in getattr(trajectory, field)]) for field in fields}, env_info=trajectory.env_info)
# else:
# trajectory = Trajectory(**{field: np.stack([np.asarray(x_) for x_ in getattr(trajectory, field)]) for field in fields}, env_info=trajectory.env_info)
except Exception as e:
pass
trajectories.append(trajectory)
if (i_episode + 1) % log_interval == 0:
stats.append({"Episode": i_episode + ep_start,
"Accumulated Reward": sum(reward),
# "Average Reward": np.mean(reward), # Not sure we need this anymore.
"Length": len(reward),
"Steps": steps, # Useful for deep learning applications. This should be kept, or week 13 will have issues.
**agent.extra_stats()})
rate = int(num_episodes / 100)
if rate > 0 and i_episode % rate == 0:
tq.set_postfix(ordered_dict=OrderedDict(list(OrderedDict(stats[-1]).items())[:5])) if len(stats) > 0 else None
tq.update()
sys.stderr.flush()
if resume_stats and save_stats and recent is not None:
os.remove(recent+"/log.txt")
if experiment_name is not None and save_stats:
path = log_time_series(experiment=experiment_name, list_obs=stats)
if return_trajectory:
cache_write(trajectories, path+"/trajectories.pkl")
print(f"Training completed. Logging {experiment_name}: '{', '.join( stats[0].keys()) }'")
for i, t in enumerate(trajectories):
from collections import defaultdict
nt = defaultdict(lambda: [])
if t.env_info is not None and t.env_info[1] is not None and "supersample" in t.env_info[1]:
for f in fields:
for k, ei in enumerate(t.env_info):
if 'supersample' not in ei:
continue
z = ei['supersample'].__getattribute__(f).T
if k == 0:
pass
else:
z = z[1:]
nt[f].append(z)
for f in fields:
nt[f] = np.concatenate([z for z in nt[f]],axis=0)
traj2 = Trajectory(**nt, env_info=[])
trajectories[i] = traj2
# for k, t in enumerate(stats):
# if k < len(trajectories):
# stats[k]['trajectory'] = trajectories[k]
# Turn this into a single episodes-list (refactor later)
return stats, trajectories
if __name__ == "__main__":
# Use the trajectories here.
from irlc.ex01.inventory_environment import InventoryEnvironment
env = InventoryEnvironment(N=10)
stats, traj = train(env, Agent(env))
print(stats)
s = Stats(episode=1, episode_length=2, accumulated_reward=4, total_steps=4, trajectory=None, agent_stats={})
print(s)
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
import gymnasium
import numpy as np
from gymnasium.spaces.discrete import Discrete
from irlc.ex01.agent import Agent, train
class BobFriendEnvironment(gymnasium.Env):
def __init__(self, x0=20):
self.x0 = x0
self.action_space = Discrete(2) # Possible actions {0, 1}
def reset(self):
# TODO: 1 lines missing.
raise NotImplementedError("Insert your solution and remove this error.")
return self.s, {}
def step(self, a):
# TODO: 9 lines missing.
raise NotImplementedError("Insert your solution and remove this error.")
return s_next, reward, terminated, False, {}
class AlwaysAction_u0(Agent):
def pi(self, s, k, info=None):
"""This agent should always take action u=0."""
# TODO: 1 lines missing.
raise NotImplementedError("Implement function body")
class AlwaysAction_u1(Agent):
def pi(self, s, k, info=None):
"""This agent should always take action u=1."""
# TODO: 1 lines missing.
raise NotImplementedError("Implement function body")
if __name__ == "__main__":
# Part A:
env = BobFriendEnvironment()
x0, _ = env.reset()
print(f"Initial amount of money is x0 = {x0} (should be 20 kroner)")
print("Lets put it in the bank, we should end up in state x1=22 and get a reward of 2 kroner")
x1, reward, _, _, _ = env.step(0)
print("we got", x1, reward)
# Since we reset the environment, we should get the same result as before:
env.reset()
x1, reward, _, _, _ = env.step(0)
print("(once more) we got", x1, reward, "(should be the same as before)")
env.reset() # We must call reset -- the environment has possibly been changed!
print("Lets lend it to our friend -- what happens will now be random")
x1, reward, _, _, _ = env.step(1)
print("we got", x1, reward)
# Part B:
stats, _ = train(env, AlwaysAction_u0(env), num_episodes=1000)
average_u0 = np.mean([stat['Accumulated Reward'] for stat in stats])
stats, _ = train(env, AlwaysAction_u1(env), num_episodes=1000)
average_u1 = np.mean([stat['Accumulated Reward'] for stat in stats])
print(f"Average reward while taking action u=0 was {average_u0} (should be 2)")
print(f"Average reward while taking action u=1 was {average_u1} (should be 4)")
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
"""This file contains code for the Chess Tournament problem."""
import numpy as np
from gymnasium.spaces.discrete import Discrete
from gymnasium import Env
class ChessTournament(Env):
"""The ChessTournament gymnasium-environment which simulate a chess tournament.
In the problem, a chess tournament ends when a player wins two games in a row. The results
of each game are -1, 0, 1 corresponding to a loss, draw and win for player 1. See:
https://www.youtube.com/watch?v=5UQU1oBpAic
To implement this, we define the step-function such that one episode of the environment corresponds to playing
a chess tournament to completion. Once the environment completes, it returns a reward of +1 if the player won
the tournament, and otherwise 0.
Each step therefore corresponds to playing a single game in the tournament.
To implement this, we use a state corresponding to the sequence of games in the tournament:
>>> self.s = [0, -1, 1, 0, 0, 1]
In the self.step(action)-function, we ignore the action, simulate the outcome of a single game,
and append the outcome to self.s. We then compute whether the tournament has completed, and if so
a reward of 1 if we won.
"""
def __init__(self, p_draw=3 / 4, p_win=2 / 3):
self.action_space = Discrete(1)
self.p_draw = p_draw
self.p_win = p_win
self.s = [] # A chess tournament is a sequence of won/lost games s = [0, -1, 1, 0, ...]
def reset(self):
"""Reset the tournament environment to begin to simulate a new tournament.
After each episode is complete, this function will reset :python:`self.s` and return the current state s and an empty dictionary.
:return:
- s - The initial state (what is it?)
- info - An empty dictionary, ``{}``
"""
# TODO: 1 lines missing.
raise NotImplementedError("Implement function body")
return self.s, {}
def step(self, action):
"""Play a single game in the current tournament
The variable action is required by gymnasium but it is not used since no (player) actions occur in this problem.
The step-method should update `self.state` to be the next (new) state, compute the reward, and determine whether
the environment has terminated (:python:`done`).
:param action: This input is required by gymnasium but it is not used in this case.
:return: A tuple of the form :python:`(new_state, reward, done, False, {})`
"""
game_outcome = None # should be -1, 0, or 1 depending on outcome of single game.
## TODO: Oy veh, the following 7 lines below have been permuted. Uncomment, rearrange to the correct order and remove the error.
#-------------------------------------------------------------------------------------------------------------------------------
# else:
# else:
# game_outcome = 1
# if np.random.rand() < self.p_win:
# game_outcome = -1
# game_outcome = 0
# if np.random.rand() < self.p_draw:
raise NotImplementedError("Compute game_outcome here")
self.s.append(game_outcome)
#done = True if the tournament has ended otherwise false. Compute using s.
# TODO: 1 lines missing.
raise NotImplementedError("Compute 'done', whether the tournament has ended.")
# r = ... . Compute reward. Let r=1 if we won the tournament otherwise 0.
# TODO: 1 lines missing.
raise NotImplementedError("Compute the reward 'r' here.")
return self.s, r, done, False, {}
def main():
"""The main method of the chess-game problem.
This function will simulate T tournament games and estimate average win probability for player 1 as p_win (answer to riddle) and also
the average length. Note the later should be a 1-liner, but would require non-trivial computations to solve
analytically. Please see the :class:`gymnasium.Env` class for additional details.
"""
T = 5000
from irlc import train, Agent
env = ChessTournament()
# Compute stats using the train function. Simulate the tournament for a total of T=10'000 episodes.
# TODO: 1 lines missing.
raise NotImplementedError("Compute stats here using train(env, ...). Use num_episodes.")
p_win = np.mean([st['Accumulated Reward'] for st in stats])
avg_length = np.mean([st['Length'] for st in stats])
print("Agent: Estimated chance I won the tournament: ", p_win)
print("Agent: Average tournament length", avg_length)
if __name__ == "__main__":
main()
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
import numpy as np
from gymnasium.spaces.discrete import Discrete
from gymnasium import Env
from irlc.ex01.agent import Agent, train
class InventoryEnvironment(Env):
def __init__(self, N=2):
self.N = N # planning horizon
self.action_space = Discrete(3) # Possible actions {0, 1, 2}
self.observation_space = Discrete(3) # Possible observations {0, 1, 2}
def reset(self):
self.s = 0 # reset initial state x0=0
self.k = 0 # reset time step k=0
return self.s, {} # Return the state we reset to (and an empty dict)
def step(self, a):
w = np.random.choice(3, p=(.1, .7, .2)) # Generate random disturbance
# TODO: 5 lines missing.
raise NotImplementedError("Insert your solution and remove this error.")
return s_next, reward, terminated, False, {} # return transition information
class RandomAgent(Agent):
def pi(self, s, k, info=None):
""" Return action to take in state s at time step k """
# TODO: 1 lines missing.
raise NotImplementedError("Implement function body")
def simplified_train(env: Env, agent: Agent) -> float:
s, _ = env.reset()
J = 0 # Accumulated reward for this rollout
for k in range(1000):
## TODO: Oy veh, the following 7 lines below have been permuted. Uncomment, rearrange to the correct order and remove the error.
#-------------------------------------------------------------------------------------------------------------------------------
# if terminated or truncated:
# sp, r, terminated, truncated, metadata = env.step(a)
# a = agent.pi(s, k)
# s = sp
# J += r
# agent.train(s, a, sp, r, terminated)
# break
raise NotImplementedError("Remove this exception after the above lines have been uncommented and rearranged.")
return J
def run_inventory():
env = InventoryEnvironment()
agent = RandomAgent(env)
stats, _ = train(env,agent,num_episodes=1,verbose=False) # Perform one rollout.
print("Accumulated reward of first episode", stats[0]['Accumulated Reward'])
# I recommend inspecting 'stats' in a debugger; why do you think it is a list of length 1?
stats, _ = train(env, agent, num_episodes=1000,verbose=False) # do 1000 rollouts
avg_reward = np.mean([stat['Accumulated Reward'] for stat in stats])
print("[RandomAgent class] Average cost of random policy J_pi_random(0)=", -avg_reward)
# Try to inspect stats again in a debugger here. How long is the list now?
stats, _ = train(env, Agent(env), num_episodes=1000,verbose=False) # Perform 1000 rollouts using Agent class
avg_reward = np.mean([stat['Accumulated Reward'] for stat in stats])
print("[Agent class] Average cost of random policy J_pi_random(0)=", -avg_reward)
""" Second part: Using the simplified training method. I.e. do not use train() below.
You can find some pretty strong hints about what goes on in simplified_train in the lecture slides for today. """
avg_reward_simplified_train = np.mean( [simplified_train(env, agent) for i in range(1000)])
print("[simplified train] Average cost of random policy J_pi_random(0) =", -avg_reward_simplified_train)
if __name__ == "__main__":
run_inventory()
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
from irlc.pacman.pacman_environment import PacmanEnvironment
from irlc import Agent, train, savepdf
# Maze layouts can be specified using a string.
layout = """
%%%%%%%%%%
%P.......%
%.%%%%%%.%
%.% %.%
%.% %.%
%.% %.%
%.% %.%
%.%%%%%%.%
%........%
%%%%%%%%%%
"""
# This is our first agent. Note it inherits from the Agent class. Use <ctrl>+click in pycharm to navigate to code definitions --
# this is a very useful habbit when you work with other peoples code in general, and object-oriented code in particular.
class GoAroundAgent(Agent):
def pi(self, x, k, info=None):
""" Collect all dots in the maze in the smallest amount of time.
This function should return an action, check the output of the code below to see what actions you can potentially
return.
Remember Pacman only have to solve this single maze, so don't make the function general.
Hints:
- Insert a breakpoint in the function. Try to write self.env and self.env.action_space.actions in the interpreter. Where did self.env get set?
- Remember that k is the current step number.
- Ignore the info dictionary; you can probably also ignore the state x.
- The function should return a string (the actions are strings such as 'North')
"""
# TODO: 7 lines missing.
raise NotImplementedError("Implement function body")
return 'West'
if __name__ == "__main__":
# Create an environment with the given layout. animate_movement is just for a nicer visualization.
env = PacmanEnvironment(layout_str=layout, render_mode='human')
# This creates a visualization (Note this makes the environment slower) which can help us see what Pacman does
# This create the GoAroundAgent-instance
agent = GoAroundAgent(env)
# Uncomment the following line to input actions instead of the agent using the keyboard:
# env, agent = interactive(env, agent)
s, info = env.reset() # Reset (and start) the environment
savepdf("pacman_roundabout.pdf", env=env) # Saves a snapshot of the start layout
# The next two lines display two ways to get the available actions. The 'canonical' way using the
# env.action_space, and a way particular to Pacman by using the s.A() function on the state.
# You can read more about the functions in the state in project 1.
# print("Available actions at start:", env.action_space.actions) # This will list the available actions.
print("Alternative way of getting actions:", s.A()) # See also project description
# Simulate the agent for one episode
stats, _ = train(env, agent, num_episodes=1)
# Print your obtained score.
print("Your obtained score was", stats[0]['Accumulated Reward'])
env.close() # When working with visualizations, call env.close() to close windows it may have opened. "
# This file may not be shared/redistributed without permission. Please read copyright notice in the git repo. If this file contains other copyright notices disregard this text.
"""This directory contains the exercises for week 2."""
File added
File added
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment