Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
S
SINDyc And MCMC Framework
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Requirements
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Locked files
Build
Pipelines
Jobs
Pipeline schedules
Test cases
Artifacts
Deploy
Releases
Package registry
Container registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Code review analytics
Issue analytics
Insights
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
GitLab community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
freba
SINDyc And MCMC Framework
Commits
a7e73c36
Commit
a7e73c36
authored
5 years ago
by
freba
Browse files
Options
Downloads
Patches
Plain Diff
Added `Documentation`.
parent
065d5b0d
Branches
Branches containing commit
No related tags found
No related merge requests found
Changes
1
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
Documentation.org
+4004
-0
4004 additions, 0 deletions
Documentation.org
with
4004 additions
and
0 deletions
Documentation.org
0 → 100644
+
4004
−
0
View file @
a7e73c36
#+TITLE: Documentation
* SINDYc
#+NAME: simo_IDCL_sindy_header
#+begin_src python :exports none :results none :noweb yes
""" Simulation definitions for the SIMO results.
"""
import numpy as np
Tburn, Tid, Tcl = 0, 200, 50
u = np.full((1, Tid), .5)
poly_degree = 2
if cluster == 0:
samples = 50
elif cluster == 1:
samples = 5
else:
raise NotImplementedError
IDspec = dict(Tburn=Tburn,
T=Tid,
u=u,
cut_off=[0.0005, 0.1],
neval=50,
poly_degree=poly_degree)
CLspec = dict(T=Tcl) # add controller
simspec = dict(system='SIMO', Ts=1, ID=IDspec, CL=CLspec,
cluster=cluster, samples=samples)
<<paper_simulation>>
<<paper_plotting_header>>
def unpack(msindy):
msindy_ = {}
for imodel in range(simulator.mp.nx):
msindy_[imodel] = []
for isample in range(simulator.simspec['samples']):
try:
m = msindy[isample][imodel]
if isinstance(m, pd.DataFrame):
msindy_[imodel].append(m.iloc[:, 0])
except KeyError:
pass
df = pd.concat(msindy_[imodel], axis=0)
msindy_[f'df_{imodel}'] = df
return msindy_
def plot_clusterModels(msindy_,
cluster=0,
true_coefficients=None,
filename='simo_IDCL_sindy.pdf',
rotate_xlabels=True,
xlabels=None,
kind='bar'):
def adjust_df(df,
name,
xlabels=None):
df.name = name
df = df.replace({0.0: np.nan})
df = df.drop(index=['Cn'])
ncoeff = df.index.unique().shape[0]
nsamples = int(df.shape[0]/ncoeff)
labels = np.vstack([xlabels[:ncoeff]
for n in range(nsamples)]).flatten()
idx = pd.Index(labels)
idx.name = 'Coefficients'
try:
df.index = idx
except ValueError as ve:
breakpoint()
return df
def _unpack(msindy_, xlabels):
df_0 = msindy_['df_0']
df_0 = adjust_df(df_0,
name='Model 0',
xlabels=xlabels)
df_1 = msindy_['df_1']
df_1 = adjust_df(df_1,
name='Model 1',
xlabels=xlabels)
return df_0, df_1
def unstack(df, criterion):
df = df.reset_index()
df.loc[:, 'Cluster'] = criterion
return df
if isinstance(cluster, list):
dfs_0 = []
dfs_1 = []
idx = 0
for key, _msindy_ in msindy_.items():
df_0, df_1 = _unpack(_msindy_, xlabels)
df_0 = unstack(df_0, criterion=key)
df_1 = unstack(df_1, criterion=key)
dfs_0.append(df_0)
dfs_1.append(df_1)
idx += 1
df_0 = pd.concat(dfs_0, axis=0)
df_1 = pd.concat(dfs_1, axis=0)
hue = 'Cluster'
else:
hue = None
df_0, df_1 = _unpack(msindy_, xlabels)
filepath = os.path.join(PLOTPATH, filename)
_, ax = plt.subplots(figsize=Plotter.extra_small)
if kind == 'box':
sns.boxplot(x='Coefficients',
y='Model 0',
hue=hue,
data=df_0,
ax=ax)
elif kind == 'violin':
sns.violinplot(x='Coefficients',
y='Model 0',
hue=hue,
data=df_0,
ax=ax)
elif kind == 'bar':
sns.barplot(x='Coefficients',
y='Model 0',
hue=hue,
data=df_0,
ax=ax,
estimator=np.median,
alpha=.5)
else:
raise NotImplementedError(f'Plot kind {kind} not supported.')
ax.set_xlabel('Associated model term')
plt.tight_layout()
sns.despine()
plt.savefig(filepath + '_c0' + '.pdf', transparent=True)
_, ax = plt.subplots(figsize=Plotter.extra_small)
if kind == 'box':
sns.boxplot(x='Coefficients',
y='Model 1',
hue=hue,
data=df_1,
ax=ax)
elif kind == 'violin':
sns.violinplot(x='Coefficients',
y='Model 1',
hue=hue,
data=df_1,
ax=ax)
elif kind == 'bar':
sns.barplot(x='Coefficients',
y='Model 1',
hue=hue,
data=df_1,
ax=ax,
estimator=np.median,
alpha=.5)
else:
raise NotImplementedError(f'Plot kind {kind} not supported.')
ax.set_xlabel('Associated model term')
if true_coefficients is not None:
tc = true_coefficients
index = df_0.index.unique()
for iax, ax in enumerate(axs):
for n, idx in enumerate(index):
ax.scatter(idx, tc[n, iax], marker='x', color='red')
plt.tight_layout()
sns.despine()
plt.savefig(filepath + '_c1' + '.pdf', transparent=True)
return filepath
def select_Xi_parsimony_sweep(msindy):
""" Loop over the SINDYc models in `msindy` and identify the ones
with the simplest structure. Select the coefficients as average
over the list of candidate models `cmodels`.
Args:
msindy (dict): Dictionary of SINDYc models (samples: models)
"""
dfs = []
cmodels_ = [] # Candidate model
ncoeff = 100 # Number of coefficients observed so far
# TODO Select the simplest model as candidate
# TODO Implement as function
for isample, model in msindy.items():
m = model[0].iloc[:, 0]
ncoeff_ = m[m != 0].shape[0]
if ncoeff_ <= ncoeff:
try:
m = m.drop(index=['Cn'])
except KeyError:
pass
cmodels_.append(m)
ncoeff = ncoeff_
# dfs.append(model[0].iloc[:, 0])
cmodels = []
criterion = 100
for model in cmodels_:
mask = (model != 0)
model_ = model.reset_index()
# Check parsimony: The simpler model is preferred
criterion_ = model_.index.values[mask].sum()
if criterion_ <= criterion:
cmodels.append(model)
criterion = criterion_
def yield_common_max_LL(Xistar_, coefficients=['u1', r'u0^{2}']):
""" Derive the model that maximizes the probability of the chosen
coefficients.
"""
from scipy import stats
# Replace with enumerated columns
Xistar_.columns = np.arange(0, Xistar_.shape[1])
dist = {}
for coeff in coefficients:
values = Xistar_.loc[coeff, :].values
kde = stats.gaussian_kde(values) # Instantiate `kde`
s_min, s_max = values.min(), values.max() # Obtain bounds
xs_ = np.linspace(s_min, s_max,
Xistar_.shape[1]) # Evaluation space
ys_ = kde(xs_) # Evaluate `kde`
# # TODO Find max.
# idx = np.where(ys_ == max(ys_))[0]
dist[coeff] = {'y': ys_, 'x': xs_}
LL = 0
for coeff, data in dist.items():
# Add to common likelihood
LL += data['y']
# Select index corresponding with maximal common likelihood
idx_max_LL = np.where(LL == LL.max())[0]
Xistar = Xistar_.iloc[:, idx_max_LL]
Xistar.columns = ['max_LL']
return Xistar.iloc[:, 0]
# Xistar = pd.concat(cmodels, axis=1, sort=False).mean(axis=1)
Xistar = yield_common_max_LL(pd.concat(cmodels, axis=1, sort=False))
return Xistar
#+end_src
#+NAME: simo_IDCL_c0
#+begin_src python :noweb strip-export :exports none :tangle ./simo_IDCL_c0.py :results file :return filepath
cluster=0
<<simo_IDCL_sindy_header>>
msindy = simulator.identify()
results = simulator.cosimulate_ID()
df, dfm = simulator.combine_sample_dataframes(dfs=results,
aggregation_axis=1)
data = dict(msindy=msindy, df=df, dfm=dfm, raw_results=results)
save_to_disc(data, filename='data_simo_IDCL_c0.pkl')
filepath = plot_clusterModels(unpack(msindy),
cluster=cluster,
#true_coefficients=simulator.mp.true_coefficients,
xlabels=simulator.formatted_model_coefficients,
rotate_xlabels=False,
kind='violin',
filename='simo_IDCL_sindy_c0.pdf')
#+end_src
#+NAME: simo_IDCL_c1
#+begin_src python :noweb strip-export :exports none :tangle ./simo_IDCL_c1.py :results file :return filepath
cluster=1
<<simo_IDCL_sindy_header>>
simspec = dict(system='SIMO',
Ts=1,
ID=IDspec,
CL=CLspec,
cluster=cluster,
samples=5,
# siggen=dict(signal=dict(tau=.5,
# freq_0=dict(mean=-10., sigma=.05),
# freq_1=dict(mean=-3., sigma=.5)))
)
data = read_from_disc('data_simo_IDCL_c0.pkl')
u = data['raw_results']['system'][0].filter(regex='u').values.T
d = data['raw_results']['system'][0].filter(regex='d').values.T
UD = dict(U=u, D =d)
msindy = simulator.identify(UD=UD)
results = simulator.cosimulate_ID()
df, dfm = simulator.combine_sample_dataframes(dfs=results, aggregation_axis=1)
data = dict(msindy=msindy, df=df, dfm=dfm)
save_to_disc(data, filename='data_simo_IDCL_c1.pkl')
filepath = plot_clusterModels(
unpack(msindy),
cluster=cluster,
#true_coefficients=simulator.mp.true_coefficients,
rotate_xlabels=False,
xlabels=simulator.formatted_model_coefficients,
kind='violin',
filename='simo_IDCL_sindy_c1.pdf')
#+end_src
#+NAME: simo_IDCL_cosimulate_ID_c0
#+begin_src python :results file :exports results :return filename :tangle ./simo_IDCL_cosimulate_ID_c0.py :noweb yes
import copy
import numpy as np
Tburn, Tid, Tcl = 0, 200, 50
u = np.full((2, Tid), .5)
# bug0
IDspec = dict(T=Tid,
u=u,
cut_off=[0.0005, 0.1],
neval=50,
Tburn=Tburn,
poly_degree=2)
CLspec = dict(T=Tcl) # add controller
simspec = dict(system='SIMO',
Ts=1.,
ID=IDspec,
CL=CLspec,
cluster=0,
samples=1)
IDspec_oos = copy.copy(IDspec)
IDspec_oos['ID'] = {'siggen': {'signal': {'scale': .75}}}
simspec_oos = dict(Ts=1., system='SIMO', cluster=0, ID=IDspec, CL=CLspec)
<<code_evaluate_fit>>
<<paper_plotting_header>>
<<paper_simulation>>
msindyc = simulator.identify() # Identify a SINDyc model
dfs = simulator.cosimulate_ID()
df, dfm = simulator.combine_sample_dataframes(dfs)
dfs_oos = simulator.simulate_out_of_sample()
df_oos, dfm_oos = simulator.combine_sample_dataframes(dfs_oos)
df, df_oos = df[0], df_oos[0]
dfm, dfm_oos = dfm[0], dfm_oos[0]
save_to_disc({
'df': df,
'df_oos': df_oos,
'dfs_oos': dfs_oos,
'dfm': dfm,
'dfm_oos': dfm_oos
}, 'simo_IDCL_cosimulate_ID_c0.pkl')
def rename(df0, df1, identifier='id'):
""" Rename the labels in the system response dataframe `df` and the
model response data frame `dfm` such that we can distinguish the data
from the out of sample simulations.
"""
if identifier == 'id':
df0.columns = [col + r' (system,id)' for col in df0.columns]
df1.columns = [col + r' (model,id)' for col in df1.columns]
elif identifier == 'oos':
df0.columns = [col + r' (system,oos)' for col in df0.columns]
df1.columns = [col + r' (model,oos)' for col in df1.columns]
else:
raise Exception('Identifier not supported')
return df0, df1
sns.set_palette([sns.color_palette('Blues_r')[1], sns.color_palette('Greens_r')[1]])
filename = plot_sns(dfs=dict(ts_id=dict(system=df, model=dfm),
ts_oos=dict(system=df_oos, model=dfm_oos)),
filename='simo_IDCL_cosimulate_ID_oos_c0.pdf',
show=False, save=True)
#+end_src
#+NAME: paper_one_step_prediction_surface
#+begin_src python :tangle ./paper_one_step_prediction_surface.py :exports results :results file :return filepath :noweb yes
<<paper_simulation_header>>
<<paper_simulation_class>>
<<paper_prosumerDynamics>>
<<paper_plotting_header>>
def plot_one_step_prediction_map(mp, xs, us, cluster=0, **kwargs):
""" Plot a one step prediction map of the Prosumer `mp` over the
evaluation spaces `xs` and `us`. Notice that ... TODO
"""
def wrap_mp0_c0(mp, x, p):
dx_dt_0 = mp.alpha[0, 0] * x**2 + mp.beta * p
dy_dt_0 = mp.C * dx_dt_0 + mp.rs.normal(
loc=0, size=(100, 100), scale=mp.sig_meas_noise)
dx_dt_1 = mp.alpha[1, 0] * x + mp.beta * p
dy_dt_1 = mp.C * dx_dt_1 + mp.rs.normal(
loc=0, size=(100, 100), scale=mp.sig_meas_noise)
return dx_dt_0, dy_dt_0, dx_dt_1, dy_dt_1
def wrap_mp0_c1(mp, x, p):
dx_dt_0 = mp.alpha[1, 0] * x**2 + mp.beta * p
dy_dt_0 = mp.C * dx_dt_0 + mp.rs.normal(
loc=0, size=(100, 100), scale=mp.sig_meas_noise)
return dx_dt_0, dy_dt_0
def SINDyc_testm(x, p, cluster):
""" An exemplary SINDyc test model
"""
if cluster == 0:
c_u0 = 0.
c_u1 = 0.077434
c_u0sq = -0.213128
c_u0u1 = 0.
c_u1sq = 0.
dx_dt = (c_u0 * x + c_u1 * p + c_u0sq * x**2 + c_u0u1 * x * p +
c_u1sq * p**2)
else:
data = read_from_disc('data_simo_IDCL_c0.pkl')
msindy = data['msindy']
coeff = msindy[3][0]
c_u0 = coeff.loc['u0'][0]
c_u1 = coeff.loc['u1'][0]
c_u0sq = coeff.loc['u0^{2}'][0]
c_u0u1 = coeff.loc['u0u1'][0]
c_u1sq = coeff.loc['u1^{2}'][0]
dx_dt = (c_u0 * x + c_u1 * p + c_u0sq * x**2 + c_u0u1 * x * p +
c_u1sq * p**2)
return dx_dt
def plot_surfaces(ax, xs, us, ys, ysm, cmaps, alpha):
ax.plot_surface(xs, us, ys, cmap=cmaps[0], linewidth=0, alpha=alpha)
ax.plot_surface(xs, us, ysm, cmap=cmaps[1], linewidth=0)
def format_ax(ax, ysm):
ax.set_xlim(*(-1., 1.))
ax.xaxis.set_major_locator(LinearLocator(5))
ax.xaxis.set_major_formatter(FormatStrFormatter(xformat))
ax.set_ylim(*(-1., 1.))
ax.yaxis.set_major_locator(LinearLocator(5))
ax.yaxis.set_major_formatter(FormatStrFormatter(yformat))
ax.set_zlim(*(ysm.min(), ysm.max()))
ax.zaxis.set_major_locator(LinearLocator(10))
ax.zaxis.set_major_formatter(FormatStrFormatter(zformat))
ax.set_xlabel('x') # x (state space)
ax.set_ylabel('p') # p (input space)
ax.set_zlabel('y') # Y (output space)
ax.azim = kwargs.get('azim', -55)
ax.elev = kwargs.get('elev', 5)
filepath = os.path.join(PLOTPATH, 'one_step_prediction_surface.pdf')
from matplotlib.ticker import LinearLocator, FormatStrFormatter
from mpl_toolkits.mplot3d import Axes3D
xformat = kwargs.get('xformat', '%.02f')
yformat = kwargs.get('yformat', '%.02f')
zformat = kwargs.get('zformat', '%.02f')
xs_, us_ = np.meshgrid(xs, us)
ys_0, ys0, ys_1, ys1 = wrap_mp0_c0(mp, xs_, us_)
if len(cluster) == 1:
ysm_0 = SINDyc_testm(xs_, us_, cluster=0)
fig, ax = plt.subplots(subplot_kw={'projection': '3d'},
figsize=Plotter.small)
plot_surfaces(ax, xs_, us_, ys0, ysm_0, cmaps=['Blues', 'Greens'],
alpha=.25)
format_ax(ax, ysm_0)
elif len(cluster) == 2:
ysm_1 = SINDyc_testm(xs_, us_, cluster=1)
fig, axs = plt.subplots(subplot_kw={'projection': '3d'},
ncols=2,
figsize=Plotter.normal)
plot_surfaces(ax[1], xs_, us_, ys1, ysm_1, cmaps=['Blues', 'Greens'],
alpha=.25)
for ax, ysm in zip(axs, [ys0, ys1]):
format_ax(ax, ysm)
plt.tight_layout()
plt.savefig(filepath, transparent=True)
return filepath
mp = Prosumer(samples=1)
xs = np.linspace(-1., 1., 100)
us = np.linspace(-1., 1., 100)
filepath = plot_one_step_prediction_map(mp, xs, us, cluster=[0])
#+end_src
* Markov Chain Monte Carlo
#+NAME: simo_IDCL_c0_stan
#+begin_src python :noweb strip-export :exports none :results file :tangle ./simo_IDCL_c0_stan.py :return filepath
cluster=0
<<simo_IDCL_sindy_header>>
<<code_stan_fitModel>>
<<paper_simulation_header>>
data = read_from_disc('data_simo_IDCL_c0.pkl')
msindy = data['msindy']
Xistar = select_Xi_parsimony_sweep(msindy)
print(Xistar)
df = data['df'][0]
obs = df
sigma_y_init = df.filter(regex='y').iloc[0, :].std()
stan_results = Simulator.identify_stan(obs,
Xistar,
sigma_y_init=sigma_y_init,
name='c0',
fitnewmodel=True)
var_names = {'plot_labels': [], 'labels': []}
nXi = Xistar[Xistar != 0].shape[0]
for n in range(nXi):
subscript = f'{n+2}' # Only valid for this scenario
var_names['plot_labels'].append(r'$\xi_' + subscript + r'$')
var_names['labels'].append(f'Xi[{n+1}]')
filename = 'simo_IDCL_c0_stan.pdf'
filepath = os.path.join(PLOTPATH, filename)
filepath = Plotter.plot_density(stan_results,
var_names=var_names,
var_prior=Xistar,
labels=[[
'True parameter', 'Posterior',
'SINDyc (prior mean)',
r'95% confidence interval',
'MAP'
], None],
original_model_obj=mp,
show=False,
save=True,
filename=filepath)
#+end_src
#+NAME: simo_IDCL_c0_stan_ppc
#+CAPTION: Posterior-Predictive check.
#+begin_src python :exports none :results file :tangle ./simo_IDCL_c0_stan_ppc.py :noweb yes :return filepath
import numpy as np
<<paper_plotting_header>>
import arviz as az
import pickle
sm_path = './c0_23_sm.pkl'
fit_path = './c0_23_fit.pkl'
objs = []
for path in [sm_path, fit_path]:
with open(path, 'rb') as fhandle:
objs.append(pickle.load(fhandle))
fit = objs[-1]
df = fit.to_dataframe()
stan_data = az.from_pystan(
posterior=fit,
posterior_predictive='y_hat',
# observed_data=['y', 'u'],
observed_data=['y'],
log_likelihood='log_lik',
coords={'N': np.arange(fit.data['N']), 'ny': np.arange(fit.data['ny']),
'nXi': np.arange(fit.data['nXi'])},
dims={
'Xi': ['nXi'],
'y': ['N', 'ny'],
'log_lik': ['N', 'ny'],
'y_hat': ['N', 'ny']
}
)
print(stan_data)
filepath = Plotter.plot_ppc(stan_data, filename='simo_IDCL_c0_stan_ppc.pdf',
data_pairs={'y': 'y_hat'}, num_pp_samples=200,
xlim={0: (-.5, 2.)},
show=False, save=True)
#+end_src
#+NAME: simo_IDCL_c1_stan_ppc
#+CAPTION: Posterior-Predictive check.
#+begin_src python :exports none :results file :tangle ./simo_IDCL_c1_stan_ppc.py :noweb yes :return filepath
import numpy as np
<<paper_plotting_header>>
import arviz as az
import pickle
sm_path = './c1_23_sm.pkl'
fit_path = './c1_23_fit.pkl'
objs = []
for path in [sm_path, fit_path]:
with open(path, 'rb') as fhandle:
objs.append(pickle.load(fhandle))
fit = objs[-1]
df = fit.to_dataframe()
stan_data = az.from_pystan(
posterior=fit,
posterior_predictive='y_hat',
observed_data=['y'],
log_likelihood='log_lik',
coords={'N': np.arange(fit.data['N']), 'ny': np.arange(fit.data['ny']),
'nXi': np.arange(fit.data['nXi'])},
dims={
'Xi': ['nXi'],
'y': ['N', 'ny'],
'log_lik': ['N', 'ny'],
'y_hat': ['N', 'ny']
}
)
print(stan_data)
filepath = Plotter.plot_ppc(stan_data, filename='simo_IDCL_c1_stan_ppc.pdf',
data_pairs={'y': 'y_hat'}, num_pp_samples=200,
xlim={0: (-.5, 2.)},
show=False, save=True)
#+end_src
#+NAME: simo_IDCL_stan_cosimulate_ID_c0
#+begin_src python :results file :exports results :return filename :tangle ./simo_IDCL_stan_cosimulate_ID_c0.py :noweb yes
from collections import OrderedDict
import numpy as np
Tburn, Tid, Tcl = 0, 200, 50
u = np.full((2, Tid), .1)
# bug0
IDspec = dict(T=Tid,
u=u,
cut_off=[0.0005, 0.1],
neval=50,
Tburn=Tburn,
poly_degree=2)
CLspec = dict(T=Tcl) # add controller
simspec = dict(system='SIMO', Ts=1, ID=IDspec, CL=CLspec, cluster=0, samples=5)
<<code_evaluate_fit>>
<<paper_plotting_header>>
<<paper_simulation>>
# Reuse data from initial simulation experiment for continuity
data = read_from_disc('data_simo_IDCL_c0.pkl')
u = data['raw_results']['system'][0].filter(regex='u').values.T
d = data['raw_results']['system'][0].filter(regex='d').values.T
UD = dict(U=u, D=d)
simulator.U = UD['U']
simulator.D = UD['D']
results = simulator.cosimulate_IDStan(path_predicate='./',
name='c0_23')
dfss_oos_ = results['stan']
df_oos = results['system']
dfm_oos = results['sindy']
a_dfss_oos = []
for sample, df in dfss_oos_.items():
df.loc[:, 'sample'] = sample
a_dfss_oos.append(df)
dfss_oos = pd.concat(a_dfss_oos, axis=0, sort=False)
_, ax = plt.subplots(nrows=1)
y = df_oos[0].filter(regex='y') # Process observations
y.plot(ax=ax, linestyle='-', color='r', alpha=.5)
filename = Plotter.plot_sectors(dfss_oos.loc[:, 'y'],
dfss_oos.loc[:, 'y'].groupby(dfss_oos.index),
filename=os.path.join(PLOTPATH,
'stan_response_sectors_c0.pdf'),
n_levels=10,
save=True,
show=False,
median=False,
median_linestyle='--',
legend=False,
ax=ax)
#+end_src
* Stan submodels
#+NAME: simo_IDCL_sindy_stan_code_12
#+begin_src stan :exports none :results none :tangle ./simo_IDCL_sindy_stan_code_12.stan
functions {
real[] dy_dt(real t, real[] y, real[] xi, real[] params, int[] ncupoly) {
vector[ncupoly[1]] poly;
row_vector[ncupoly[1]] T;
real ut;
real dydt;
poly = to_vector(params);
/* Excitation polynomial: excite system in-loop */
T[1] = 1;
T[2] = t;
T[3] = pow(t, 2);
T[4] = pow(t, 3);
ut = dot_product(poly, T);
/* Notice that `w=xi[1]` */
dydt = xi[2] * yt + xi[3] * ut + xi[1];
return { dydt };
}
}
data {
int<lower=0> N; // simulation horizon
real<lower=0> ts[N-1]; // Sampling instances
int<lower=0> nu; // number of system inputs
int<lower=0> ny; // number of system outputs
int<lower=0> nXi; // number of SINDy model coefficients
real z_init[1, ny]; // initial model state = `y_init`
real y[N, ny]; // state observations
real u[N-1, nu]; // control inputs
real<lower=0> sigma_x_init; // initial state associated uncertainty
real<lower=0> sigma_y_init; // initial measurement associated uncertainty
real Xistar[nXi]; // SINDyc-optimal `Xi` (absolute values)
real l_Xibounds[nXi]; // Guess on the lower bounds of `Xi`
real u_Xibounds[nXi]; // Guess on the upper bounds of `Xi`
int<lower=0> ocupolys; // Excitation polynomial degree
real params[nu, ocupolys]; // General parameter vector
}
parameters {
real<lower=l_Xibounds[1],upper=u_Xibounds[1]> Xi_1; // The Stan-optimal `Xi_1`
real<lower=l_Xibounds[2],upper=u_Xibounds[2]> Xi_2; // The Stan-optimal `Xi_2`
real w[1]; // State dynamics error
real<lower=0> sigma_Xi[nXi];
real<lower=0> sigma_w[1]; // Dynamics error variance
real<lower=0> sigma_y[N]; // Measurement error variance
}
transformed parameters {
real Xi[nXi];
real z[N, ny];
real Xi_[nXi+1]; // Augmented model (incl. dynamics error)
real results[N-1, 1];
Xi[1] = Xi_1;
Xi[2] = Xi_2;
Xi_ = append_array(w, Xi);
for (n in 1:ny){ /* Loop over outputs */
z[1, n] = z_init[1, n];
results = integrate_ode_rk45(dy_dt, z_init[, n], 0, ts, Xi_, params[n, ],
rep_array(ocupolys, 1));
z[2:, n] = results[, 1];
}
}
model {
/* Use for evaluation something along:
plt.hist(np.random.lognormal(mean=-2, sigma=1, size=100));plt.show() */
sigma_Xi[1] ~ lognormal(-1, .1);
sigma_Xi[2] ~ lognormal(-1, .1);
sigma_w ~ lognormal(log(sigma_x_init), .1); // Dynamics error variance prior
sigma_y ~ lognormal(log(sigma_y_init), .1); // Measurement error variance prior
Xi_1 ~ normal(Xistar[1], sigma_Xi[1]);
Xi_2 ~ normal(Xistar[2], sigma_Xi[2]);
w ~ normal(0., sigma_w); // Dynamics error
for (n in 1:ny) { // loop over outputs
y[, n] ~ normal(z[, n], sigma_y);
}
}
generated quantities {
real log_lik[N, ny]; // Check Log-Likelihood
real y_hat[N, ny]; // Model-based output predictions
for (n in 1:ny){ // Loop over trajectories
for (m in 1:N){ // Loop over timestamps
log_lik[m, n] = normal_lpdf(y[m, n] | z[m, n], sigma_y[m]);
y_hat[m, n] = normal_rng(z[m, n], sigma_y[m]);
}
}
}
#+end_src
#+NAME: simo_IDCL_sindy_stan_code_23
#+begin_src stan :exports none :results none :tangle ./simo_IDCL_sindy_stan_code_23.stan
functions {
real[] dy_dt(real t, real[] y, real[] xi, real[] params, int[] ncupoly) {
vector[ncupoly[1]] poly;
row_vector[ncupoly[1]] T;
real ut;
real dydt;
poly = to_vector(params);
/* Excitation polynomial: excite system in-loop */
T[1] = 1;
T[2] = t;
T[3] = pow(t, 2);
T[4] = pow(t, 3);
ut = dot_product(poly, T);
/* Notice that `w=xi[1]` */
dydt = xi[2] * ut + xi[3] * pow(y[1], 2) + xi[1];
return { dydt };
}
}
data {
int<lower=0> N; // simulation horizon
real<lower=0> ts[N-1]; // Sampling instances
int<lower=0> nu; // number of system inputs
int<lower=0> ny; // number of system outputs
int<lower=0> nXi; // number of SINDy model coefficients
real z_init[1, ny]; // initial model state = `y_init`
real y[N, ny]; // state observations
real u[N-1, nu]; // control inputs
real<lower=0> sigma_x_init; // initial state associated uncertainty
real<lower=0> sigma_y_init; // initial measurement associated uncertainty
real Xistar[nXi]; // SINDyc-optimal `Xi` (absolute values)
real l_Xibounds[nXi]; // Guess on the lower bounds of `Xi`
real u_Xibounds[nXi]; // Guess on the upper bounds of `Xi`
int<lower=0> ocupolys; // Excitation polynomial degree
real params[nu, ocupolys]; // General parameter vector
}
parameters {
real<lower=l_Xibounds[1],upper=u_Xibounds[1]> Xi_1; // The Stan-optimal `Xi_1`
real<lower=l_Xibounds[2],upper=u_Xibounds[2]> Xi_2; // The Stan-optimal `Xi_2`
real w[1]; // State dynamics error
real<lower=0> sigma_Xi[nXi];
real<lower=0> sigma_w[1]; // Dynamics error variance
real<lower=0> sigma_y[N]; // Measurement error variance
}
transformed parameters {
real Xi[nXi];
real z[N, ny];
real Xi_[nXi+1]; // Augmented model (incl. dynamics error)
real results[N-1, 1];
Xi[1] = Xi_1;
Xi[2] = Xi_2;
Xi_ = append_array(w, Xi);
for (n in 1:ny){ /* Loop over outputs */
z[1, n] = z_init[1, n];
results = integrate_ode_rk45(dy_dt, z_init[, n], 0, ts, Xi_, params[n, ],
rep_array(ocupolys, 1));
z[2:, n] = results[, 1];
}
}
model {
/* Use for evaluation something along:
plt.hist(np.random.lognormal(mean=-2, sigma=1, size=100));plt.show() */
sigma_Xi[1] ~ lognormal(-1, .1);
sigma_Xi[2] ~ lognormal(-1, .1);
sigma_w ~ lognormal(log(sigma_x_init), .1); // Dynamics error variance prior
sigma_y ~ lognormal(log(sigma_y_init), .1); // Measurement error variance prior
Xi_1 ~ normal(Xistar[1], sigma_Xi[1]);
Xi_2 ~ normal(Xistar[2], sigma_Xi[2]);
w ~ normal(0., sigma_w); // Dynamics error
for (n in 1:ny) { // loop over outputs
y[, n] ~ normal(z[, n], sigma_y);
}
}
generated quantities {
real log_lik[N, ny]; // Check Log-Likelihood
real y_hat[N, ny]; // Model-based output predictions
for (n in 1:ny){ // Loop over trajectories
for (m in 1:N){ // Loop over timestamps
log_lik[m, n] = normal_lpdf(y[m, n] | z[m, n], sigma_y[m]);
y_hat[m, n] = normal_rng(z[m, n], sigma_y[m]);
}
}
}
#+end_src
#+NAME: simo_IDCL_sindy_stan_code_34
#+begin_src stan :exports none :results none :tangle ./simo_IDCL_sindy_stan_code_34.stan
functions {
real[] dy_dt(real t, real[] y, real[] xi, real[] params, int[] ncupoly) {
vector[ncupoly[1]] poly;
row_vector[ncupoly[1]] T;
real ut;
real dydt;
poly = to_vector(params);
/* Excitation polynomial: excite system in-loop */
T[1] = 1;
T[2] = t;
T[3] = pow(t, 2);
T[4] = pow(t, 3);
ut = dot_product(poly, T);
/* Notice that `w=xi[1]` */
dydt = xi[2] * pow(y[1], 2) + xi[3] * y[1] * ut + xi[1];
return { dydt };
}
}
data {
int<lower=0> N; // simulation horizon
real<lower=0> ts[N-1]; // Sampling instances
int<lower=0> nu; // number of system inputs
int<lower=0> ny; // number of system outputs
int<lower=0> nXi; // number of SINDy model coefficients
real z_init[1, ny]; // initial model state = `y_init`
real y[N, ny]; // state observations
real u[N-1, nu]; // control inputs
real<lower=0> sigma_x_init; // initial state associated uncertainty
real<lower=0> sigma_y_init; // initial measurement associated uncertainty
real Xistar[nXi]; // SINDyc-optimal `Xi` (absolute values)
real l_Xibounds[nXi]; // Guess on the lower bounds of `Xi`
real u_Xibounds[nXi]; // Guess on the upper bounds of `Xi`
int<lower=0> ocupolys; // Excitation polynomial degree
real params[nu, ocupolys]; // General parameter vector
}
parameters {
real<lower=l_Xibounds[1],upper=u_Xibounds[1]> Xi_1; // The Stan-optimal `Xi_1`
real<lower=l_Xibounds[2],upper=u_Xibounds[2]> Xi_2; // The Stan-optimal `Xi_2`
real w[1]; // State dynamics error
real<lower=0> sigma_Xi[nXi];
real<lower=0> sigma_w[1]; // Dynamics error variance
real<lower=0> sigma_y[N]; // Measurement error variance
}
transformed parameters {
real Xi[nXi];
real z[N, ny];
real Xi_[nXi+1]; // Augmented model (incl. dynamics error)
real results[N-1, 1];
Xi[1] = Xi_1;
Xi[2] = Xi_2;
Xi_ = append_array(w, Xi);
for (n in 1:ny){ /* Loop over outputs */
z[1, n] = z_init[1, n];
results = integrate_ode_rk45(dy_dt, z_init[, n], 0, ts, Xi_, params[n, ],
rep_array(ocupolys, 1));
z[2:, n] = results[, 1];
}
}
model {
/* Use for evaluation something along:
plt.hist(np.random.lognormal(mean=-2, sigma=1, size=100));plt.show() */
sigma_Xi[1] ~ lognormal(-1, .1);
sigma_Xi[2] ~ lognormal(-1, .1);
sigma_w ~ lognormal(log(sigma_x_init), .1); // Dynamics error variance prior
sigma_y ~ lognormal(log(sigma_y_init), .1); // Measurement error variance prior
Xi_1 ~ normal(Xistar[1], sigma_Xi[1]);
Xi_2 ~ normal(Xistar[2], sigma_Xi[2]);
w ~ normal(0., sigma_w); // Dynamics error
for (n in 1:ny) { // loop over outputs
y[, n] ~ normal(z[, n], sigma_y);
}
}
generated quantities {
real log_lik[N, ny]; // Check Log-Likelihood
real y_hat[N, ny]; // Model-based output predictions
for (n in 1:ny){ // Loop over trajectories
for (m in 1:N){ // Loop over timestamps
log_lik[m, n] = normal_lpdf(y[m, n] | z[m, n], sigma_y[m]);
y_hat[m, n] = normal_rng(z[m, n], sigma_y[m]);
}
}
}
#+end_src
#+NAME: simo_IDCL_sindy_stan_code_345
#+begin_src stan :exports none :results none :tangle ./simo_IDCL_sindy_stan_code_345.stan
functions {
real[] dy_dt(real t, real[] y, real[] xi, real[] params, int[] ncupoly) {
vector[ncupoly[1]] poly;
row_vector[ncupoly[1]] T;
real ut;
real dydt;
poly = to_vector(params);
/* Excitation polynomial: excite system in-loop */
T[1] = 1;
T[2] = t;
T[3] = pow(t, 2);
T[4] = pow(t, 3);
ut = dot_product(poly, T);
/* Notice that `w=xi[1]` */
dydt = xi[2] * pow(y[1], 2) + xi[3] * y[1] * ut + xi[4] * pow(ut, 2) + xi[1];
return { dydt };
}
}
data {
int<lower=0> N; // simulation horizon
real<lower=0> ts[N-1]; // Sampling instances
int<lower=0> nu; // number of system inputs
int<lower=0> ny; // number of system outputs
int<lower=0> nXi; // number of SINDy model coefficients
real z_init[1, ny]; // initial model state = `y_init`
real y[N, ny]; // state observations
real u[N-1, nu]; // control inputs
real<lower=0> sigma_x_init; // initial state associated uncertainty
real<lower=0> sigma_y_init; // initial measurement associated uncertainty
real Xistar[nXi]; // SINDyc-optimal `Xi` (absolute values)
real l_Xibounds[nXi]; // Guess on the lower bounds of `Xi`
real u_Xibounds[nXi]; // Guess on the upper bounds of `Xi`
int<lower=0> ocupolys; // Excitation polynomial degree
real params[nu, ocupolys]; // General parameter vector
}
parameters {
real<lower=l_Xibounds[1],upper=u_Xibounds[1]> Xi_1; // The Stan-optimal `Xi_1`
real<lower=l_Xibounds[2],upper=u_Xibounds[2]> Xi_2; // The Stan-optimal `Xi_2`
real<lower=l_Xibounds[3],upper=u_Xibounds[3]> Xi_3; // The Stan-optimal `Xi_3`
real w[1]; // State dynamics error
real<lower=0> sigma_Xi[nXi];
real<lower=0> sigma_w[1]; // Dynamics error variance
real<lower=0> sigma_y[N]; // Measurement error variance
}
transformed parameters {
real Xi[nXi];
real z[N, ny];
real Xi_[nXi+1]; // Augmented model (incl. dynamics error)
real results[N-1, 1];
Xi[1] = Xi_1;
Xi[2] = Xi_2;
Xi[3] = Xi_3;
Xi_ = append_array(w, Xi);
for (n in 1:ny){ /* Loop over outputs */
z[1, n] = z_init[1, n];
results = integrate_ode_rk45(dy_dt, z_init[, n], 0, ts, Xi_, params[n, ],
rep_array(ocupolys, 1));
z[2:, n] = results[, 1];
}
}
model {
/* Use for evaluation something along:
plt.hist(np.random.lognormal(mean=-2, sigma=1, size=100));plt.show() */
sigma_Xi[1] ~ lognormal(-1, .1);
sigma_Xi[2] ~ lognormal(-1, .1);
sigma_Xi[3] ~ lognormal(-1, .1);
sigma_w ~ lognormal(log(sigma_x_init), .1); // Dynamics error variance prior
sigma_y ~ lognormal(log(sigma_y_init), .1); // Inform error scale
Xi_1 ~ normal(Xistar[1], sigma_Xi[1]);
Xi_2 ~ normal(Xistar[2], sigma_Xi[2]);
Xi_3 ~ normal(Xistar[3], sigma_Xi[3]);
w ~ normal(0., sigma_w); // Dynamics error
for (n in 1:ny) { // Loop over outputs
y[, n] ~ normal(z[, n], sigma_y);
}
}
generated quantities {
real log_lik[N, ny]; // Check Log-Likelihood
real y_hat[N, ny]; // Model-based output predictions
for (n in 1:ny){ // Loop over trajectories
for (m in 1:N){ // Loop over timestamps
log_lik[m, n] = normal_lpdf(y[m, n] | z[m, n], sigma_y[m]);
y_hat[m, n] = normal_rng(z[m, n], sigma_y[m]);
}
}
}
#+end_src
#+NAME: simo_IDCL_sindy_stan_code_124
#+begin_src stan :exports none :results none :tangle ./simo_IDCL_sindy_stan_code_124.stan
functions {
real[] dy_dt(real t, real[] y, real[] xi, real[] params, int[] ncupoly) {
vector[ncupoly[1]] poly;
row_vector[ncupoly[1]] T;
real ut;
real dydt;
poly = to_vector(params);
/* Excitation polynomial: excite system in-loop */
T[1] = 1;
T[2] = t;
T[3] = pow(t, 2);
T[4] = pow(t, 3);
ut = dot_product(poly, T);
/* Notice that `w=xi[1]` */
dydt = xi[2] * y[1] + xi[3] * ut + xi[4] * y[1] * ut + xi[1];
return { dydt };
}
}
data {
int<lower=0> N; // simulation horizon
real<lower=0> ts[N-1]; // Sampling instances
int<lower=0> nu; // number of system inputs
int<lower=0> ny; // number of system outputs
int<lower=0> nXi; // number of SINDy model coefficients
real z_init[1, ny]; // initial model state = `y_init`
real y[N, ny]; // state observations
real u[N-1, nu]; // control inputs
real<lower=0> sigma_x_init; // initial state associated uncertainty
real<lower=0> sigma_y_init; // initial measurement associated uncertainty
real Xistar[nXi]; // SINDyc-optimal `Xi` (absolute values)
real l_Xibounds[nXi]; // Guess on the lower bounds of `Xi`
real u_Xibounds[nXi]; // Guess on the upper bounds of `Xi`
int<lower=0> ocupolys; // Excitation polynomial degree
real params[nu, ocupolys]; // General parameter vector
}
parameters {
real<lower=l_Xibounds[1],upper=u_Xibounds[1]> Xi_1; // The Stan-optimal `Xi_1`
real<lower=l_Xibounds[2],upper=u_Xibounds[2]> Xi_2; // The Stan-optimal `Xi_2`
real<lower=l_Xibounds[3],upper=u_Xibounds[3]> Xi_3; // The Stan-optimal `Xi_3`
real<lower=0> sigma_Xi[nXi];
real<lower=0> sigma_w[1]; // Dynamics error variance
real<lower=0> sigma_y[N]; // TODO error scale
}
transformed parameters {
real Xi[nXi];
real z[N, ny];
real Xi_[nXi+1]; // Augmented model (incl. dynamics error)
real results[N-1, 1];
Xi[1] = Xi_1;
Xi[2] = Xi_2;
Xi[3] = Xi_3;
Xi_ = append_array(w, Xi);
for (n in 1:ny){ /* Loop over outputs */
z[1, n] = z_init[1, n];
results = integrate_ode_rk45(dy_dt, z_init[, n], 0, ts, Xi_, params[n, ],
rep_array(ocupolys, 1));
z[2:, n] = results[, 1];
}
}
model {
/* Use for evaluation something along:
plt.hist(np.random.lognormal(mean=-2, sigma=1, size=100));plt.show() */
sigma_Xi[1] ~ lognormal(-1, .1);
sigma_Xi[2] ~ lognormal(-1, .1);
sigma_Xi[3] ~ lognormal(-1, .1);
sigma_w ~ lognormal(log(sigma_x_init), .1); // Dynamics error variance prior
sigma_y ~ lognormal(log(sigma_y_init), .1); // Inform error scale
Xi_1 ~ normal(Xistar[1], sigma_Xi[1]);
Xi_2 ~ normal(Xistar[2], sigma_Xi[2]);
Xi_3 ~ normal(Xistar[3], sigma_Xi[3]);
w ~ normal(0., sigma_w); // Dynamics error
for (n in 1:ny) { // Loop over outputs
y[, n] ~ normal(z[, n], sigma_y);
}
}
generated quantities {
real log_lik[N, ny]; // Check Log-Likelihood
real y_hat[N, ny]; // Model-based output predictions
for (n in 1:ny){ // Loop over trajectories
for (m in 1:N){ // Loop over timestamps
log_lik[m, n] = normal_lpdf(y[m, n] | z[m, n], sigma_y[m]);
y_hat[m, n] = normal_rng(z[m, n], sigma_y[m]);
}
}
}
#+end_src
#+NAME: simo_IDCL_sindy_stan_code_1234
#+begin_src stan :exports none :results none :tangle ./simo_IDCL_sindy_stan_code_1234.stan
functions {
real[] dy_dt(real t, real[] y, real[] xi, real[] params, int[] ncupoly) {
vector[ncupoly[1]] poly;
row_vector[ncupoly[1]] T;
real ut;
real dydt;
poly = to_vector(params);
/* Excitation polynomial: excite system in-loop */
T[1] = 1;
T[2] = t;
T[3] = pow(t, 2);
T[4] = pow(t, 3);
ut = dot_product(poly, T);
/* Notice that `w=xi[1]` */
dydt = xi[2] * y[1] + xi[3] * ut + xi[4] * pow(y[1], 2) + xi[5] * y[1] * ut + xi[1];
return { dydt };
}
}
data {
int<lower=0> N; // simulation horizon
real<lower=0> ts[N-1]; // Sampling instances
int<lower=0> nu; // number of system inputs
int<lower=0> ny; // number of system outputs
int<lower=0> nXi; // number of SINDy model coefficients
real z_init[1, ny]; // initial model state = `y_init`
real y[N, ny]; // state observations
real u[N-1, nu]; // control inputs
real<lower=0> sigma_x_init; // initial state associated uncertainty
real<lower=0> sigma_y_init; // initial measurement associated uncertainty
real Xistar[nXi]; // SINDyc-optimal `Xi` (absolute values)
real l_Xibounds[nXi]; // Guess on the lower bounds of `Xi`
real u_Xibounds[nXi]; // Guess on the upper bounds of `Xi`
int<lower=0> ocupolys; // Excitation polynomial degree
real params[nu, ocupolys]; // General parameter vector
}
parameters {
real<lower=l_Xibounds[1],upper=u_Xibounds[1]> Xi_1; // The Stan-optimal `Xi_1`
real<lower=l_Xibounds[2],upper=u_Xibounds[2]> Xi_2; // The Stan-optimal `Xi_2`
real<lower=l_Xibounds[3],upper=u_Xibounds[3]> Xi_3; // The Stan-optimal `Xi_3`
real<lower=l_Xibounds[4],upper=u_Xibounds[4]> Xi_4; // The Stan-optimal `Xi_4`
real w[1]; // State dynamics error
real<lower=0> sigma_Xi[nXi];
real<lower=0> sigma_w[1]; // Dynamics error variance
real<lower=0> sigma_y[N]; // Measurement error variance
}
transformed parameters {
real Xi[nXi];
real z[N, ny];
real Xi_[nXi+1]; // Augmented model (incl. dynamics error)
real results[N-1, 1];
Xi[1] = Xi_1;
Xi[2] = Xi_2;
Xi[3] = Xi_3;
Xi[4] = Xi_4;
Xi_ = append_array(w, Xi);
for (n in 1:ny){ /* Loop over outputs */
z[1, n] = z_init[1, n];
results = integrate_ode_rk45(dy_dt, z_init[, n], 0, ts, Xi_, params[n, ],
rep_array(ocupolys, 1));
z[2:, n] = results[, 1];
}
}
model {
/* Use for evaluation something along:
plt.hist(np.random.lognormal(mean=-2, sigma=1, size=100));plt.show() */
sigma_Xi[1] ~ lognormal(-1, .1);
sigma_Xi[2] ~ lognormal(-1, .1);
sigma_Xi[3] ~ lognormal(-1, .1);
sigma_Xi[4] ~ lognormal(-1, .1);
sigma_w ~ lognormal(log(sigma_x_init), .1); // Dynamics error variance prior
sigma_y ~ lognormal(log(sigma_y_init), .1); // Inform error scale
Xi_1 ~ normal(Xistar[1], sigma_Xi[1]);
Xi_2 ~ normal(Xistar[2], sigma_Xi[2]);
Xi_3 ~ normal(Xistar[3], sigma_Xi[3]);
Xi_4 ~ normal(Xistar[4], sigma_Xi[4]);
w ~ normal(0., sigma_w); // Dynamics error
for (n in 1:ny) { // loop over outputs
y[, n] ~ normal(z[, n], sigma_y);
}
}
generated quantities {
real log_lik[N, ny]; // Check Log-Likelihood
real y_hat[N, ny]; // Model-based output predictions
for (n in 1:ny){ // Loop over trajectories
for (m in 1:N){ // Loop over timestamps
log_lik[m, n] = normal_lpdf(y[m, n] | z[m, n], sigma_y[m]);
y_hat[m, n] = normal_rng(z[m, n], sigma_y[m]);
}
}
}
#+end_src
* General code blocks
** Header
#+NAME: paper_simulation_header
#+begin_src python
from collections import OrderedDict
import numpy as np
import pandas as pd
import control
import control.matlab
from pySINDy.sindy import SINDy
np.random.seed(12345)
def save_to_disc(container, filename='save.pkl'):
""" Save the `container` to disc under concatenated filepath
`TEMPDIR` and `filename`.
"""
import os
import pickle
try: # Ensure `TEMPDIR` is set
TEMPDIR
except NameError:
TEMPDIR = './'
with open(os.path.join(TEMPDIR, filename), 'wb') as fhandle:
pickle.dump(container, fhandle, -1)
def read_from_disc(filename='save.pkl'):
""" Load the file 'save.pkl' under the concatenated filepath
`TEMPDIR` and `filename`.
"""
import os
import pickle
try: # Ensure `TEMPDIR` is set
TEMPDIR
except NameError:
TEMPDIR = './'
with open(os.path.join(TEMPDIR, filename), 'rb') as fhandle:
load = pickle.load(fhandle)
return load
#+end_src
** Modeling
*** Prosumer dynamics
#+NAME: paper_prosumerDynamics
#+CAPTION: Common prosumer response dynamics shared for both the SIMO as well as MIMO example.
#+begin_src python
class Prosumer:
""" Common prosumer response model
"""
def __init__(self,
mu_alpha=np.vstack([-.2, -.1]),
mu_beta=np.vstack([.07, .2]),
sig_alpha=.001,
sig_beta=.01,
samples=5,
sig_dyn_noise=.005,
sig_meas_noise=.005,
,**kwargs):
"""
"""
self.alpha = -np.abs(
np.random.normal(loc=mu_alpha, scale=sig_alpha, size=(2, samples)))
self.mu_alpha = mu_alpha
self.sig_alpha = sig_alpha
self.beta = np.abs(
np.random.normal(loc=mu_beta, scale=sig_beta, size=samples))
self.mu_beta = mu_beta
self.sig_beta = sig_beta
self.C = 1. # Output space mapping
self.x = kwargs.get('x0', np.vstack([np.zeros_like(self.alpha)]))
self.x0 = self.x.copy()
self.p = kwargs.get(
'p0',
np.vstack([np.zeros_like(self.beta),
np.zeros_like(self.beta)]))
self.p0 = self.p.copy()
self.nx, self.np = self.x.shape[0], self.p.shape[0]
self.ns = samples # Number of samples
self.nc = 2 # Number of clusters
self.dpref = self.beta.shape[0]
self.true_coefficients = self.provide_true_coefficients()
self.rs = np.random.RandomState() # Initialize random number generator
self.sig_dyn_noise = sig_dyn_noise
self.sig_meas_noise = sig_meas_noise
def provide_true_coefficients(self):
twonan = np.full((1, 2), np.nan)
u0 = np.hstack([np.nan, self.mu_alpha[1]])
u1 = self.mu_beta.T
u0sq = np.hstack([self.mu_alpha[0], np.nan])
u0u1 = twonan
u1sq = twonan
coefficients = np.vstack([twonan, u0, u1, u0sq, u0u1, u1sq])
return coefficients
def ODE_0(self, x, p):
"""
"""
try:
ddt_x_0 = self.alpha[0, :] * x[0]**2 + self.beta * p[0]
ddt_x_1 = self.alpha[1, :] * x[1] + self.beta * p[1]
except IndexError:
breakpoint()
return np.vstack([ddt_x_0, ddt_x_1])
def ODE_1(self, x, p):
"""
"""
try:
ddt_x_0 = self.alpha[0, :] * x[0]**2 + self.beta * p[0]
ddt_x_1 = self.alpha[1, :] * x[1] + self.beta * p[1]
except IndexError:
breakpoint()
return np.vstack([ddt_x_0, ddt_x_1])
def excite(self, cluster, p=np.zeros(1)):
""" Excite prosumer with price signal `p`. `p` is either scalar in the
SIMO case, or a vector in the MIMO case.
"""
if p.shape[0] != self.dpref:
p = np.tile(p, (self.dpref, 1))
self.p = p
if cluster == 0:
self.x += self.ODE_0(self.x, p) + self.rs.normal(
loc=0., scale=self.sig_dyn_noise)
elif cluster == 1:
self.x += self.ODE_1(self.x, p) + self.rs.normal(
loc=0., scale=self.sig_dyn_noise)
else:
raise NotImplementedError('Only cluster 0 and 1 covered.')
self.y = self.C * self.x + self.rs.normal(loc=0.,
scale=self.sig_meas_noise)
return self.y
#+end_src
*** SINDyc identification
#+NAME: code_identification
#+begin_src python
class Identification:
def __init__(self, data, **kwargs):
"""
"""
self.name = kwargs.get('name', 'unnamed')
self.model = SINDy(self.name)
self.data = data
def fit(self, ny, data=None, dt=1., poly_degree=2, cut_off=.1,
sparsity_threshold=.65, neval=None):
"""
"""
def iterate(ny, data, dt, poly_degree, cut_off_rng,
sparsity_threshold, **kwargs):
"""
"""
def test_sparsity(ny, msindy, sparsity_threshold):
"""
"""
y_msindy = msindy[:, :ny]
nz = np.count_nonzero(y_msindy)
nval = y_msindy.shape[0] * y_msindy.shape[1]
if (nz / nval) < sparsity_threshold:
return True
else:
return False
neval = kwargs.get('neval', cut_off_rng[0])
cut_offs = np.linspace(cut_off_rng[0], cut_off_rng[1], neval)
print(f'Identification/Evaluating {neval} cut-off increments...')
for cut_off in cut_offs:
print(f'Identification/Testing cut-off={cut_off}')
self.model.fit(data, dt, poly_degree=poly_degree,
cut_off=cut_off)
success = test_sparsity(ny, self.model.coefficients,
sparsity_threshold)
if success:
break
if success is False:
print('Identification/Failed to determine a sparse model.')
if data is None:
data = self.data
if type(cut_off) is list:
iterate(ny, data, dt, poly_degree=poly_degree,
cut_off_rng=cut_off,
sparsity_threshold=sparsity_threshold, neval=neval)
else:
self.model.fit(data.T, dt, poly_degree=poly_degree,
cut_off=cut_off)
results = pd.DataFrame(self.model.coefficients)
results.index = self.model.descriptions
results.index.name = 'Coefficients'
self.model.results = results
#+end_src
*** Stan identification
#+NAME: code_stan_fitModel
#+begin_src python :results none :exports code
import pystan
def fit_stan(data, model_code=None, model_file=None, name='StanSummary',
iter=1000, chains=4, warmup=500, thin=1,
seed=101, verbose=False, fitnewmodel=False,
control={'max_treedepth': 20}, _pickle=True) -> tuple:
""" Fit a Stan model.
Args:
model_code (str/None): Stan model in C++ code
model_file (str/None): Path to the Stan model
data (dict): Observations, keys have to match fields in `model`
name (str, optional): Filename to pickled summary dictionary
Returns:
results (tuple): Of `fit.summary()` and `fit` (result of
`pystan.StanModel.sampling`)
"""
import os # Ensure imports
if _pickle:
import pickle
try:
TEMPDIR
except UnboundLocalError: # `TEMPDIR` is not set
TEMPDIR = './'
filepath = os.path.join(TEMPDIR, name) # Path results object
def fitnew() -> tuple:
""" Fit a new Stan model with parameters provided to `fit`.
Args:
See `fit`.
Returns:
results (tuple): Of `pd.DataFrame` containing the fit-summary dictionary
entries and `fit`, the `pystan.StanModel.sampling` results.
"""
if model_code is not None:
sm = pystan.StanModel(model_code=model_code)
elif model_file is not None:
sm = pystan.StanModel(model_file)
fit = sm.sampling(data=data, iter=iter, chains=chains, warmup=warmup,
thin=thin, seed=seed, verbose=verbose, control=control)
if _pickle:
with open(filepath + '_sm.pkl', 'wb') as fhandle:
pickle.dump(sm, fhandle, -1)
with open(filepath + '_fit.pkl', 'wb') as fhandle:
pickle.dump(fit, fhandle, -1)
df = fit.to_dataframe()
df.to_csv(filepath + '.csv')
return df, fit
if fitnewmodel: # Force a new Stan model fitting run
return fitnew()
else:
# Return an available Stan model if available under
# `filepath`. Otherwise fit a new model.
try:
# Attempt to open existing model summary
if _pickle:
with open(filepath + '_sm.pkl', 'rb') as fhandle:
sm = pickle.load(fhandle)
with open(filepath + '_fit.pkl', 'rb') as fhandle:
fit = pickle.load(fhandle)
df = fit.to_dataframe()
return df, fit
else:
df = pd.DataFrame.from_csv(filepath + '.csv')
return df, None
except FileNotFoundError: # Fit a new model
df, fit = fitnew()
return df, fit
#+end_src
#+NAME: code_stan_inference
#+begin_src python :tangle ./stan_inference.py
import arviz as az
class StanInference:
""" Derive a Stan model based on a SINDy model. The latter is used as
grounds to inform the priors.
"""
# Library of candidate functions in Stan-compliant C++ snippets
Theta = {
'1': '1',
'u0': 'z[1]',
'u1': 'x_r[1]',
'u0^{2}': 'pow(z[1], 2)',
'u0u1': 'z[1] * x_r[1]',
'u1^{2}': 'pow(x_r[1], 2)',
'u0^{3}': 'pow(z[1], 3)',
'u0^{2}u1': 'pow(z[1], 2) * x_r[1]',
'u0u1^{2}': 'z[1] * pow(x_r[1], 2)',
'u1^{3}': 'pow(x_r[1], 3)'}
def __init__(self, m_SINDy, data,
cy, cu, name='SINDyStan',
iter=1000, chains=4, warmup=500, thin=1,
seed=None, verbose=False, forcefit=False,
_pickle=True, with_generated_quantities=False):
"""
"""
self.m = m_SINDy
self.data = data
# self.m_Stan = m_Stan
self.cy, self.cu = cy, cu
self.ny = len(self.cy)
self.nu = len(self.cu)
if (self.cy == [0]) and (self.cu == [1]): # SISO case
self.Theta = self.Theta
self.Y = self.m.iloc[:, 0] # The relevant output equation
else:
raise NotImplementedError
self.name = name
self.iter = iter
self.chains = chains
self.warmup = warmup
self.thin = thin
self.seed = seed
self.verbose = verbose
self.forcefit = forcefit
self.pickle = _pickle
# Path results object
try:
self.filepath = os.path.join(TEMPDIR, self.name)
except UnboundLocalError:
self.filepath = os.path.join('./', self.name)
self.bode = self._setup_ODE()
self.bfunctions = self._setup_functions()
self.bdata = self._setup_data()
self.bparameters = self._setup_parameters()
self.btransformed_parameters = self._setup_transformed_parameters()
self.bmodel = self._setup_model()
self.bgenerated_quantities = self._setup_generated_quantities()
self.model = self._concatenate_blocks(with_generated_quantities)
# print(self.model)
# breakpoint()
self.df, self._fit = self.fit()
self.az_data = self.prepare_az_data()
def _setup_ODE(self):
""" Setup the Ordinary Differential Equation in Stan.
"""
ode_head = r""" real[] dz_dt(real t, real[] z, real[] xi,
real[] x_r, int[] x_i) {
"""
ode = 'real dz_dt = '
i = 1
for label in self.Y.index:
var = self.Theta[label]
term = f'{var} * xi[{i}]'
ode += term
if i != self.Y.shape[0]:
ode += ' + '
else:
ode += ';'
i += 1
ode_foot = r""" return { dz_dt };} """
ode = ode_head + ode + ode_foot
return ode
def _setup_functions(self):
""" Setup Stan functions - TODO: Add flexibility by allowing for
additional functions.
"""
functions = f"""
functions {{
{self.bode}
}} """
return functions
def _setup_data(self):
""" TODO Implement MISO/MIMO cases (u is one-dimensional)
"""
data = f"""
data {{
int<lower=0> N; // simulation horizon
real<lower=0> ts[N]; // sampling instances
real y_init[{self.ny}]; // initial observed state
real y[N, {self.ny}]; // state observations
real u[N]; // control inputs
}} """
return data
def _setup_parameters(self):
""" TODO Generalize the parameter bounds integration
"""
Ymin = round(self.Y.min()-.1, 4)
Ymax = round(self.Y.max()+.1, 4)
parameters = f"""
parameters {{
real<lower={Ymin}, upper={Ymax}> xi[{self.Y.shape[0]}];
real z_init[{self.ny}]; // initial model state
real<lower=0> sigma[{self.ny}]; // error scale
}}"""
return parameters
def _setup_transformed_parameters(self):
"""
"""
tpar = f"""
transformed parameters {{
real z[N, 1] = integrate_ode_rk45(dz_dt, y_init, 0, ts, xi,
u, rep_array(0, 0),
1e-6, 1e-6, 1e6);
}} """
return tpar
def _setup_model(self):
pxi = ''
sigma = .1
for i, prior in enumerate(self.Y.values):
pxi += f'xi[{i+1}] ~ normal({prior}, {sigma});'
model = f"""
model {{
{pxi}
sigma ~ lognormal(-1, 1);
z_init ~ normal(0., sigma);
y_init ~ normal(z_init, sigma[1]);
y[, 1] ~ normal(z[, 1], sigma[1]);
}} """
return model
def _setup_generated_quantities(self):
gquantities = f"""
generated quantities {{
real y_init_rep[1];
real y_rep[N, 1];
y_init_rep[1] = normal_rng(z_init[1], sigma[1]);
for (n in 1:N)
y_rep[n, 1] = normal_rng(z[n, 1], sigma[1]);
}} """
return gquantities
def _concatenate_blocks(self, with_generated_quantities=False):
model = ''
blocks = ['bfunctions', 'bdata', 'bparameters',
'btransformed_parameters', 'bmodel']
if with_generated_quantities:
blocks.append('bgenerated_quantities')
for block in blocks:
model += getattr(self, block)
return model
def fit(self) -> tuple:
""" Fit a Stan model.
Args:
model (str): Stan model in C++ code
data (dict): Observations, keys have to match fields in `model`
name (str, optional): Filename to pickled summary dictionary
Returns:
results (tuple): Of `fit.summary()` and `fit` (result of
`pystan.StanModel.sampling`)
"""
import os # Ensure imports
if self.pickle:
import pickle
def to_df(sdict) -> pd.DataFrame:
""" Convert the summary dictionary `sdict` to `pandas.DataFrame`.
Returns:
df (pd.DataFrame): Containing the contents of `summary`
"""
df = pd.DataFrame(sdict['summary'],
columns=sdict['summary_colnames'],
index=sdict['summary_rownames'])
return df
def fitnew() -> tuple:
""" Fit a new Stan model with parameters provided to `fit`.
Args:
See `fit`.
Returns:
results (tuple): Of `pd.DataFrame` containing the
fit-summary dictionary entries and `fit`, the
`pystan.StanModel.sampling` results.
"""
sm = pystan.StanModel(model_code=self.model)
fit = sm.sampling(data=self.data, iter=self.iter,
chains=self.chains, warmup=self.warmup,
thin=self.thin, seed=self.seed,
verbose=self.verbose)
sdict = fit.summary()
if self.pickle:
with open(self.filepath + '_sm.pkl', 'wb') as fhandle:
pickle.dump(sm, fhandle, -1)
with open(self.filepath + '_fit.pkl', 'wb') as fhandle:
pickle.dump(fit, fhandle, -1)
df = to_df(sdict)
df.to_csv(self.filepath + '.csv')
return df, fit
if self.forcefit: # Force a new Stan model fitting run
return fitnew()
else:
# Return an available Stan model if available under
# `filepath`. Otherwise fit a new model.
try:
# Attempt to open existing model summary
if self.pickle:
with open(self.filepath + '_sm.pkl', 'rb') as fhandle:
sm = pickle.load(fhandle)
with open(self.filepath + '_fit.pkl', 'rb') as fhandle:
fit = pickle.load(fhandle)
df = to_df(fit.summary())
return df, fit
else:
df = pd.DataFrame.from_csv(self.filepath + '.csv')
return df, None
except FileNotFoundError: # Fit a new model
df, fit = fitnew()
return df, fit
def prepare_az_data(self):
try:
stan_data = az.from_pystan(
prior=self._fit,
posterior=self._fit,
posterior_predictive='z',
observed_data=['y'],
log_likelihood='log_lik',
coords={'N': np.arange(1, N+1)},
dims={
'xi': ['N'],
'z': ['N'],
'log_lik': ['N'],
'y': ['N']
}
)
except:
breakpoint()
#+end_src
*** Signal generator
#+NAME: code_signal_generator
#+begin_src python
class SignalGenerator:
def __init__(self, random=False, filter=None, **kwargs):
"""
"""
if random is True:
self.rs = {
'generator': np.random.RandomState(),
'scale': kwargs.get('scale', 1.),
'loc': kwargs.get('loc', 0.)
}
if (filter is not None):
import control
sfilt = control.c2d(
control.ss(control.tf([1.], [filter['tau'], 1.])),
filter['Ts'])
self.sfilt = sfilt
nx, nu = self.sfilt.B.shape
ny = self.sfilt.C.shape[0]
self.sfilt.y = np.zeros((ny, 1))
self.sfilt.x = np.zeros((nx, 1))
self.sfilt.u = np.zeros((nu, 1))
self.postprocess = self._apply_filter
else:
self.postprocess = lambda sig: sig
def _apply_filter(self, seq):
""" Apply the filter `self.sfilt` to the sequence `seq`.
"""
def innovate(u):
self.sfilt.u = u
self.sfilt.x = (self.sfilt.A.dot(self.sfilt.x) +
self.sfilt.B.dot(self.sfilt.u))
self.sfilt.y = self.sfilt.C.dot(self.sfilt.x)
us = np.zeros_like(seq) # System-smoothed input signal
for n, u in enumerate(seq):
innovate(u)
us[n] = self.sfilt.y
return us
def add_noise(self, seq):
""" Add random normal noise to `seq`
"""
try:
noise = self.rs['generator'].normal(loc=self.rs['loc'],
scale=self.rs['scale'],
size=seq.shape[0])
return seq + noise
except AttributeError: # No random noise generator available
return seq
def prbs(self, samples, frac=1 / 2) -> np.array:
""" Pseudo-Random Binary Number generator (PRBS)
"""
ones = np.ones(int(samples * frac))
zeros = np.zeros(int(samples * (1 - frac)))
ubar = np.hstack([zeros, ones])
np.random.shuffle(ubar)
if ubar.shape[0] < samples:
mm = samples - ubar.shape[0] # Mismatch
ubar = np.hstack([ubar, np.ones(mm)])
else:
mm = ubar.shape[0] - samples # Mismatch
ubar = ubar[:samples - mm]
return self.postprocess(self.add_noise(ubar))
def sine(self, samples, freq=.25) -> np.array:
""" Sinusoidal signal with frequency `freq`.
"""
sine = np.array([np.sin(freq * t) for t in range(samples)])
return self.postprocess(self.add_noise(sine))
def sine_prbs_filt(self,
samples,
sine_freq=.25,
sine_scale=.5,
prbs_frac=1 / 3,
prbs_scale=1.,
scale=1.):
""" Combinatorial signal consisting of (and scaled by `scale`):
- sinusoid(s)
- filtered prbs
"""
def gen_sine(sf, samples):
sine = np.array([np.sin(sf * t) for t in range(samples)])
return sine
if prbs_frac is not None:
signal = prbs_scale * self.prbs(samples, frac=prbs_frac)
else:
signal = 0.
if type(sine_freq) is list:
sines = [gen_sine(sf, samples) for sf in sine_freq]
for idx, sine in enumerate(sines):
signal += sine_scale[idx] * sine
else:
signal += sine_scale * gen_sine(sine_freq, samples)
return scale * signal[np.newaxis, :]
#+end_src
*** Evaluation
#+NAME: code_evaluate_fit
#+begin_src python :exports code :results none
def evaluate(systraj, modeltraj, method='nrmse', n=None, t=5):
""" Evaluate the model trajectory `modeltraj` against the system
trajectory `systraj` using one of the following evaluation metrics:..
- Normalized Root Mean Squared Error (nrmse)
- IAE (iae)
- Akaike's Information Criterion (AIC)
Args:
systraj (array): System trajectory / evaluation reference
modeltraj (array): Model trajectory / observed values
n (int/None): Variables in the model, required for 'AIC' and 'BIC'
t (int): Start evaluation from this sample onwards
"""
if systraj.shape[0] != modeltraj.shape[0]:
print('Trajectories do not mach in dimensions, auto-adjusting...')
if max(systraj.shape) < max(modeltraj.shape):
try:
modeltraj = modeltraj.iloc[:systraj.shape[0], :]
except:
modeltraj = modeltraj.iloc[:systraj.shape[0]]
elif max(systraj.shape) > max(modeltraj.shape):
try:
systraj = systraj.iloc[:modeltraj.shape[0], :]
except:
systraj = systraj.iloc[:modeltraj.shape[0]]
def nrmse(systraj, modeltraj) -> float:
""" Normalized Root Mean Squared Error, based on the description
given on the Matlab documentation of 'goodnessoffit'.
"""
normalize = np.linalg.norm(systraj - np.mean(systraj), 2)
cost = 1 - np.linalg.norm(systraj - modeltraj, 2) / normalize
return cost
def iae(systraj, modeltraj) -> float:
""" Returns the Integrated Absolute Error 'IAE'.
"""
return np.sum(np.abs((modeltraj - systraj)))
def aic(systraj, modeltraj, n) -> float:
""" Returns the Akaike Information Criterion (AIC) for the given
trajectories `systraj` and `modeltraj` and the model complexity `n`.
Returns:
aic_score (float): AIC score
"""
if n is None:
raise Exception(
'Pass `n` with number of parameters to `evaluate`.')
if systraj.shape != modeltraj.shape:
raise Exception(
'Both evaluation time-series have to match in dimensions.')
tsdim = systraj.shape
l = max(tsdim) # Evaluation reference
resid = np.subtract(modeltraj, systraj)
rss = float(np.sum(np.power(resid, 2)))
aic_score = 2 * n + l * np.log(rss / l)
return aic_score
def bic(systraj, modeltraj, n) -> float:
""" Returns the Bayesian Information Criterion (BIC) for the given
trajectories `systraj` and `modeltraj` and the model complexity `n`.
Returns:
bic_score (float): BIC score
"""
if n is None:
raise Exception(
'Pass `n` with number of parameters to `evaluate`.')
if systraj.shape != modeltraj.shape:
raise Exception(
'Both evaluation time-series have to match in dimensions.')
tsdim = systraj.shape
l = max(tsdim) # Evaluation reference
resid = np.subtract(modeltraj, systraj)
rss = float(np.sum(np.power(resid, 2)))
bic_score = n * np.log(l) + l * np.log(rss / l)
return bic_score
try: # Multi-dimensional recording
systraj_, modeltraj_ = systraj.iloc[t:, :], modeltraj.iloc[t:, :]
except: # Single dimensional recording
systraj_, modeltraj_ = systraj.iloc[t:], modeltraj.iloc[t:]
if method == 'nrmse':
return nrmse(systraj_, modeltraj_)
elif method == 'iae':
return iae(systraj_, modeltraj_)
elif method == 'AIC':
return aic(systraj_, modeltraj_, n)
else:
raise NotImplementedError('Requested evaluation not available.')
#+end_src
** Simulation
#+NAME: paper_simulation
#+begin_src python :noweb strip-export :exports none
<<paper_simulation_header>>
<<code_signal_generator>>
<<code_identification>>
<<paper_prosumerDynamics>>
<<paper_simulation_class>>
mp = Prosumer(samples=simspec['samples']) # Model prosumer
simulator = Simulator(simspec, mp) # Initialize simulation
#+end_src
*** System simulation
#+NAME: code_system_simulator
#+begin_src python
class SystemSimulator:
def __init__(self, Tsim, D=None, **kwargs):
"""
"""
self.Tsim = Tsim
self.D = D
self.rs = np.random.RandomState()
self.sigdx = kwargs.get('sigdx', 0)
self.sigdy = kwargs.get('sigdy', 0)
self.f_U = None # Control signal factor (TODO bug0)
def poly_map_1dim(self, X, m) -> np.array:
""" Map the pySINDy candidate function library structure to
corresponding python evaluations.
Args: TODO
Returns:
Function innovations (float)
"""
u0 = X.flatten()
resp = 0.
try:
resp += m.loc['1']
except KeyError:
pass
try:
resp += m.loc['u0'] * u0
resp += m.loc['u0^{2}'] * u0**2
resp += m.loc['u0u1'] * u0 * u1
resp += m.loc['u1^{2}'] * u1**2
except KeyError: # No higher order terms available
pass
return resp.values.reshape((1, 1))
def poly_map_2dim(self, X, m) -> np.array:
""" Map the pySINDy candidate function library structure to
corresponding python evaluations.
Args: TODO
Returns:
Function innovations (float)
"""
u0, u1 = X[0, :], X[1, :]
resp = 0.
try:
resp += m.loc['1']
except KeyError:
pass
try:
try:
resp += m.loc['u0'] * u0
resp += m.loc['u1'] * u1
resp += m.loc['u0^{2}'] * u0**2
resp += m.loc['u0u1'] * u0 * u1
resp += m.loc['u1^{2}'] * u1**2
resp += m.loc['u0^3'] * u0**3
resp += m.loc['u0^{2}u1'] * u0**2 * u1
resp += m.loc['u0u1^{2}'] * u0 * u1**2
resp += m.loc['u1^{3}'] * u1**3
except ValueError as ve:
print(ve)
except KeyError: # No higher order terms available
pass
return resp.values.reshape((2, 1))
def poly_map_3dim(self, X, m) -> np.array:
""" Map the pySINDy candidate function library structure to
corresponding python evaluations.
Args: TODO
Returns:
Function innovations (float)
"""
u0, u1, u2 = X[0, :], X[1, :], X[2, :]
resp = 0.
try:
resp += m.loc['1']
except KeyError:
pass
try:
resp += m.loc['u0'] * u0
resp += m.loc['u1'] * u1
resp += m.loc['u2'] * u2
resp += m.loc['u0^{2}'] * u0**2
resp += m.loc['u0u1'] * u0 * u1
resp += m.loc['u1^{2}'] * u1**2
resp += m.loc['u0u2'] * u0 * u2
resp += m.loc['u1u2'] * u1 * u2
resp += m.loc['u2^{2}'] * u2**2
except KeyError: # No higher order terms available
raise Exception('No higher order terms supported for now.')
return resp.values.reshape((3, 1))
def poly_map_4dim(self, X, m) -> np.array:
""" Map the pySINDy candidate function library structure to
corresponding python evaluations.
Args: TODO
Returns:
Function innovations (float)
"""
u0, u1, u2, u3 = X[0, :], X[1, :], X[2, :], X[3, :]
resp = 0.
raise NotImplementedError
try:
resp += m.loc['1']
except KeyError:
pass
try:
resp += m.loc['u0'] * u0
resp += m.loc['u1'] * u1
resp += m.loc['u2'] * u2
resp += m.loc['u3'] * u3
# resp += m.loc['u0^{2}'] * u0**2
# resp += m.loc['u0u1'] * u0 * u1
# resp += m.loc['u1^{2}'] * u1**2
# resp += m.loc['u0^{3}'] * u0**3
# resp += m.loc['u0^{2}u1'] * u0**2 * u1
# resp += m.loc['u0u1^{2}'] * u0 * u1**2
# resp += m.loc['u1^{3}'] * u1**3
except KeyError: # No higher order terms available
pass
return resp.values.reshape((4, 1))
def innovate_model(self, X, P, poly_map):
""" System model innovation.
"""
# return P.loc['Cn', 0] * poly_map(X, P)
return poly_map(X, P)
def simulate_model(self, model, X0, U=None, T=None,
sigw=0, sigy=0.):
""" Simulate the system `model` given the initial state `X0`, the input
sequence `U` and the ... TODO
"""
def zoh(T, U, s, u) -> np.array:
""" Retrieve the input `U` that corresponds to the timestep `T` given the
samples `s`. If no update occurs, return the present input `u` (zero-order
hold behavior).
Args:
T (array): Time-steps
U (array): System inputs
s (float): Current sample
u (array): Present system input
Returns:
u (array): Potentially updated system input
"""
umap = np.where(T == s)
try:
if umap[0].shape[0] == 0:
pass # No change to the input `u`
else: # Retrieve corresponding input
try:
u = U.T[umap].reshape((1, 1)) # Single Input case
except ValueError:
u = np.squeeze(U[:, umap], axis=1)
except IndexError:
breakpoint()
return u
try:
results = model.results
except AttributeError:
results = model
if results.shape[1] == 1:
poly_map = self.poly_map_1dim
elif results.shape[1] == 2:
poly_map = self.poly_map_2dim
elif results.shape[1] == 3:
poly_map = self.poly_map_3dim
elif results.shape[1] == 4:
poly_map = self.poly_map_4dim
else:
raise NotImplementedError(
'No other model structures yet supported.')
if U is not None: # Retrieve initial input
try:
u = np.array([U[:, 0]]).T
except IndexError:
u = np.array([[U[0]]])
forced = True
else:
forced = False
xout = x = X0 # Initial state
try:
uout = u # Initial control input
except UnboundLocalError:
uout = u = np.nan
T = T[:-1] # Initial control is given at sample 1, adjusting...
samples = np.arange(max(T)) # Equi-spaced samples given `T`
for s in samples:
if forced is True:
u = zoh(T, U, s, u)
x[-u.shape[0]:] = u
x += (self.innovate_model(x, results, poly_map) +
self.rs.normal(loc=0., scale=sigw)) # State innovation
y = x + self.rs.normal(loc=0., scale=sigy) # Measurement
try:
xout = np.hstack([xout, y])
except ValueError:
xout = np.hstack([xout, np.array([[y]])])
try:
uout = np.hstack([uout, u])
except ValueError:
uout = np.hstack([uout, np.array([[u]])])
data = {f'x_{idx}': xout[idx, :] for idx in range(xout.shape[0])}
try:
try:
data.update(
{f'u_{idx}': uout[idx, :]
for idx in range(uout.shape[0])})
except IndexError:
data.update({'u': U})
except AttributeError: # No forcing
pass
df = pd.DataFrame(data, index=T)
return df
def simulate_ol(self, sys, U, T, X0) -> tuple:
""" Open-loop system simulation.
"""
return control.matlab.lsim(sys, U, T, X0)
def simulate_cl(self, sys, U, T, X0) -> tuple:
""" Closed-loop system simulation.
"""
def innovate(sys) -> tuple:
""" Standard State Space system innovation.
"""
try:
sys.x = (sys.A.dot(sys.x) + sys.B.dot(sys.u) +
sys.G.dot(sys.d) + self.rs.normal(scale=self.sigdx))
sys.y = sys.C.dot(sys.x) + self.rs.normal(scale=self.sigdy)
except ValueError as ve:
print(f'Backtrace: {ve}')
breakpoint()
return sys.y, sys.x
# Initial control input
try:
if U.ustar: # Apply `U` in case optimal
sys.u = self.f_U * U.ustar
else:
sys.u = np.zeros((sys.nu, 1))
except AttributeError: # No controller
sys.u = U[0:1].reshape((1, 1))
yout, xout = np.empty((sys.y.shape[0], 0)), np.empty(
(sys.x.shape[0], 0))
uout, dout = np.empty((sys.u.shape[0], 0)), np.empty(
(sys.d.shape[0], 0))
for t in T[1:]:
try:
try: # Multiple disturbance sequence
sys.d = self.D[:, t].reshape((sys.nd, 1))
except IndexError: # Single disturbance sequence
sys.d = self.D[t].reshape((sys.nd, 1))
except TypeError: # No disturbance sequence
pass
# Innovate system and update control model fields
y, x = innovate(sys)
try:
U.ss.y, U.ss.x = y, x
sys.u = self.f_U * U.optimize(sys.y)
except TypeError: # No controller
try:
sys.u = U[t]
except:
breakpoint()
try:
yout = np.hstack([yout, sys.y])
xout = np.hstack([xout, sys.x])
uout = np.hstack([uout, sys.u])
dout = np.hstack([dout, sys.d])
except ValueError:
breakpoint()
return (np.array(yout), T[:-1], np.array(xout), np.array(uout),
np.array(dout))
def simulate(self,
sys,
X0,
U=0.0,
T=None,
invert_U=False,
force_controller=False):
""" Simulate the system `sys` response given its initial state `X0`,
the input sequence `U` valid at corresponding timesteps `T`.
"""
if invert_U is True: # Invert control feedback signal conditionally
self.f_U = -1
else:
self.f_U = 1
try:
try:
if isinstance(U, Controller) or force_controlled:
# Closed-loop simulation: Replaces controller `U`
yout, T, xout, U, D = self.simulate_cl(sys, U, T, X0)
else:
yout, T, xout = self.simulate_ol(sys, U, T, X0)
yout = yout.T
xout = xout.T
except NameError:
yout, T, xout = self.simulate_ol(sys, U, T, X0)
yout = yout.T
xout = xout.T
except NameError:
yout, T, xout = self.simulate_ol(sys, U, T, X0)
yout = yout.T
xout = xout.T
data = {f'y_{idx}': yout[idx, :] for idx in range(yout.shape[0])}
data.update({f'x_{idx}': xout[idx, :] for idx in range(xout.shape[0])})
try:
data.update({f'u_{idx}': U[idx, :] for idx in range(U.shape[0])})
except IndexError:
data.update({'u': U})
try:
try:
data.update(
{f'u_{idx}': U[idx, :]
for idx in range(U.shape[0])})
except IndexError:
data.update({'u': U})
except UnboundLocalError: # No disturbance input
pass
df = pd.DataFrame(data, index=T)
return df
#+end_src
*** Simulator class
#+NAME: paper_simulation_class
#+begin_src python :noweb strip-export :exports none
<<code_system_simulator>>
class Simulator(SystemSimulator):
"""
"""
name = 'SINDyStan'
map_model = {
'Xi[0]': 'u0',
'Xi[1]': 'u1',
'Xi[2]': 'u0^{2}',
'Xi[3]': 'u0u1',
'Xi[4]': 'u1^{2}',
'Xi[5]': 'u0^{3}',
'Xi[6]': 'u0^{2}u1'
}
formatted_model_coefficients = [
r'$\xi_1$', r'$\xi_2$', r'$\xi_3$', r'$\xi_4$', r'$\xi_5$', r'$\xi_6$',
r'$\xi_7$'
]
def __init__(self, simspec, mp):
""" Args:
simspec (dict): Simulation specification
mp (obj): Prosumer dynamics model
"""
self.simspec = simspec
self.mp = self.model_prosumer = mp
try:
self.identifier = Identification(data=None, name=Simulator.name)
super().__init__(Tsim=simspec['ID']['T'])
except:
self.identifier = None
super().__init__(Tsim=None)
self.rs = np.random.RandomState() # Initialize random generator
def combine_sample_dataframes(self, dfs, aggregation_axis=1):
""" TODO Understand and document
"""
def combine_system(df, aggregation_axis):
""" TODO Understand and document
"""
# Model specific dataframes
dfm = {m: [] for m in range(self.mp.nx)}
for isample, sample in df.items():
for imodel, aggregate in dfm.items():
response = sample.loc[:, f'y_{imodel}']
excitation = (sample.loc[:, f'u_{imodel}'] +
sample.loc[:, f'd_{imodel}'])
y_newname = f'y_({isample}, {imodel})'
try:
response = response.rename({f'y_{imodel}': y_newname},
axis=1)
except ValueError: # Sample is of type `pd.Series`
response.name = y_newname
u_newname = f'u_({isample}, {imodel})'
try:
excitation = excitation.rename(
{f'y_{imodel}': u_newname}, axis=1)
excitation.name = u_newname
except ValueError: # Sample is of type `pd.Series`
excitation.name = u_newname
# Aggregate response-excitation couples (reversed order
# such that first column is output)
aggregate.append(pd.concat([response, excitation], axis=1))
for imodel, aggregates in dfm.items():
dfm[imodel] = pd.concat(aggregates, axis=1)
return dfm
def combine_sindy(df, aggregation_axis):
""" TODO Understand and document
"""
# Model specific dataframes
dfm = {m: [] for m in range(self.mp.nx)}
# Loop over samples
try:
for isample, sample in df.items():
# Loop over models
for imodel, model in sample.items():
# Drop excessive columns
model = model.drop(['x_1'], axis=1)
if aggregation_axis == 1: # Aggregate over columns
model = model.rename(
{
'y': f'y_({isample}, {imodel})',
'u_0': f'u_({isample}, {imodel})'
},
axis=1)
dfm[imodel].append(model)
except AttributeError:
pass
for imodel, model in dfm.items():
try:
dfm[imodel] = pd.concat(dfm[imodel], axis=1)
except ValueError:
dfm[imodel] = dfm[imodel]
return dfm
def combine_stan(df, aggregation_axis):
""" Combine all posterior sample trajectories into a large dataframe
with sample column as means to identification.
"""
dfs = []
for sample, data in df.items():
trajectory = data['trajectory']
trajectory = trajectory.drop(columns=['x_1', 'u_0'])
trajectory = trajectory.rename(columns={'x_0': 'y'})
trajectory.loc[:, 'Sample'] = sample
dfs.append(trajectory)
dfms = pd.concat(dfs,
axis=0) # Combine all stan timeseries vertically
return dfms
aggregation_axis = 1
df = combine_system(dfs['system'], aggregation_axis)
dfm = combine_sindy(dfs['sindy'], aggregation_axis)
try:
dfms = combine_stan(dfs['stan'], aggregation_axis)
return df, dfm, dfms
except KeyError: # No `stan` recording available
return df, dfm
def temporalaggregates_to_structure(self,
temporalaggregates,
structure=np.array,
name='System'):
""" TODO Understand and document
"""
def loop_over_tempdict(tempdict) -> dict:
""" Loop over the dictionary of temporal entries `tempdict` and
aggregate the contained response data into a dictionary `responses`
containing the sample-based response data.
"""
responses = {
sample: np.empty((self.mp.nx, 0))
for sample in range(self.mp.ns)
}
for t, response in tempdict.items():
for im, m in enumerate(response.T):
responses[im] = np.hstack(
[responses[im],
m.reshape((self.mp.nx, 1))])
return responses
def extract_arrays(responses) -> np.array:
""" Obtain arrays as data containers.
Args:
responses (dict): Response dictionary with temporal aggregates
Returns:
arrs (array): Array of collated temporal responses
"""
arrs = {m: None for m in range(self.mp.ns)}
for m, response in responses.items():
arrs[m] = response
return arrs
def extract_generate_dfs(responses, name) -> list:
"""
Args:
responses (dict): Response dictionary with temporal aggregates
Returns:
dfs (list): List of temporal responses as DataFrames
"""
dfs = {m: None for m in range(self.mp.ns)}
for m, response in responses.items():
df = pd.DataFrame(response.T)
df.index.name = 'Timesteps'
df.columns.name = f'{name} IO'
df.columns = ['y_0', 'y_1']
dfs[m] = df
return dfs
# Obtain aggregated responses in wide-format
responses = loop_over_tempdict(temporalaggregates)
# Obtain collated data structures given the structure spec. `structure`
if structure == np.array:
arrs = extract_arrays(responses)
return arrs
elif structure == pd.DataFrame:
dfs = extract_generate_dfs(responses, name=name)
return dfs
def loop_and_simulate_model(self, dfs, x0, U, T):
""" Loop over models and clusters and simulate them.
Args:
dfs (dict): Dictionary with the fields
`sindy: {isample: {imodel}}`.
x0 (array): Initial state (stacked state and input)
U (array): System excitation (both controlled and uncontrolled)
T (arrray): Timestamps for the simulation horizon
"""
for isample, sample in self.msindyc.items():
# Loop over the two models
for imodel, model in sample.items():
# Model data
df_m = self.simulate_model(
model,
# np.vstack([x0.copy()[imodel, isample:isample+1],
# np.zeros(1)]),
np.zeros((2, 1)),
U=U[imodel, :],
T=T)
# df_m = df_m.rename({'x_0': 'y'}, axis=1)
try:
# df_m.loc[:, 'y'] = model.loc['Cn', 0] * df_m.loc[:, 'x_0']
df_m.loc[:, 'y'] = df_m.loc[:, 'x_0']
except KeyError:
breakpoint()
dfs['sindy'][isample][imodel] = df_m
return dfs
def _prepare_simulation_containers(self):
""" Prepare simulation containers: `dfs` contains fields for the
system response data `system` and the model responses `sindy`.
"""
dfs = {
'sindy': {
isample: {imodel: None
for imodel in range(self.mp.nx)}
for isample in range(self.mp.ns)
},
'system': None
}
# Retrieve system response data
df_sys = self.temporalaggregates_to_structure(self.Y,
structure=pd.DataFrame,
name='System')
dfs['system'] = df_sys
# Add system input recordings to the system response data `df_sys`
for isample, data in df_sys.items():
for imodel in range(self.mp.nx):
data.loc[:, f'u_{imodel}'] = self.U[imodel, :]
data.loc[:, f'd_{imodel}'] = self.D[imodel, :]
return dfs
def cosimulate_ID(self):
""" Perform a cosimulation experiment in-sample or out-of-sample
given the ID data and the identified SINDy models.
"""
dfs = self._prepare_simulation_containers()
dfs = self.loop_and_simulate_model(dfs,
x0=self.mp.x0,
U=(self.uid + self.did),
T=self.Tid)
return dfs
def simulate_out_of_sample(self, Tsim=200, UD=None, **kwargs):
""" Perform a cosimulation experiment in-sample or out-of-sample
given the ID data and the identified SINDy models.
"""
if UD is None: # No excitation sequence provided
siggen = SignalGenerator(
filter=dict(tau=2.5, Ts=self.simspec['Ts']))
try:
self.U # If `U` is not available, carry out `ID` prep.
except AttributeError:
self.ID(**self._ID())
sigc = self.U * .5
disturbance = siggen.sine_prbs_filt(samples=Tsim,
sine_freq=[
self.rs.lognormal(
mean=-5., sigma=.05),
self.rs.lognormal(mean=-3,
sigma=.5)
],
sine_scale=[1., 1.],
prbs_scale=.001,
scale=.25)
disturbance = np.tile(disturbance, (self.mp.np, 1))
self.U = sigc
self.D = disturbance
else:
try:
self.U = UD['U']
self.D = UD['D']
except KeyError as ke:
raise Exception(
'Provide excitation `UD` as dictionary with keys `U` and `D`'
) from ke
self.mp.x = self.mp.x0.copy() # Reset the prosumer state
data = self.simulate(self.Tid, self.mp, self.D, self.U)
self.Y = data['X']
dfs = self._prepare_simulation_containers()
if kwargs.get('nsim') is not None:
# Only consider `nsim` SINDyc models
self.msindyc = {
n: self.msindyc[n]
for n in range(kwargs.get('nsim'))
}
dfs = self.loop_and_simulate_model(dfs,
x0=np.zeros((2, 1)),
U=self.U + self.D,
T=self.Tid)
return dfs
def cosimulate_IDStan(self,
path_predicate,
name,
Xilabels={
0: 'Xi[0]',
1: 'Xi[1]',
2: 'Xi[2]',
3: 'Xi[3]',
4: 'Xi[4]',
5: 'Xi[5]'
},
ci=.05,
ndraws=100,
diagnostic_plots=False,
use_fit=True):
""" Perform a cosimulation out of sample using the Stan model `fit`.
"""
if use_fit is True:
try:
sm = read_from_disc(
f'{os.path.join(path_predicate, name)}_sm.pkl')
fit = read_from_disc(
f'{os.path.join(path_predicate, name)}_fit.pkl')
except FileNotFoundError:
Xistan = read_from_disc(
os.path.join(path_predicate, f'Xistan_{name}.pkl'))
else:
try:
Xistan = read_from_disc(
os.path.join(path_predicate, f'Xistan_{name}.pkl'))
except FileNotFoundError:
sm = read_from_disc(
f'{os.path.join(path_predicate, name)}_sm.pkl')
fit = read_from_disc(
f'{os.path.join(path_predicate, name)}_fit.pkl')
def evaluate_parameter_likelihood(fit_df, Xilabels, ci):
""" Determine the most probable parameters and lower and
upper bounds associated with the confidence interval.
"""
from scipy import stats
# Xistan = {'lower': {}, 'max_LL': {}, 'upper': {}}
Xistan = {'lower': [], 'max_LL': [], 'upper': []}
xspaces, kdes = {}, {}
for idx_Xi, label in Xilabels.items():
if label is False:
LL_lower = 0
max_LL = 0
LL_upper = 0
xspaces[label] = None
kdes[label] = None
else:
try:
series = fit_df.loc[:, label]
kde = stats.gaussian_kde(series) # Obtain KDE
# Evaluation space
xs = np.linspace(series.min(), series.max(), 1000)
# Evaluate density over evaluation space
density = kde(xs)
max_LL = xs[np.where(density == np.max(density))]
ci_ = ci * np.abs(max(xs) - min(xs))
LL_lower = max_LL - ci_
LL_upper = max_LL + ci_
xspaces[label] = xs
kdes[label] = kde
except KeyError: # Parameter density not available
LL_lower = 0
max_LL = 0
LL_upper = 0
xspaces[label] = None
kdes[label] = None
Xistan['lower'].append(LL_lower)
Xistan['max_LL'].append(max_LL)
Xistan['upper'].append(LL_upper)
data = {
'Xi': Xistan,
'densities': {
'xspace': xspaces,
'yspace': kdes
}
}
self.save_inferred_stan_densities(data, name)
for label, parameters in Xistan.items():
coefficients = np.hstack(parameters)
df = pd.DataFrame(
{
0: coefficients,
1: np.full_like(coefficients, np.nan)
},
index=['1', 'u0', 'u1', 'u0^{2}', 'u0u1', 'u1^{2}'])
Xistan[label] = df
return Xistan, xspaces, kdes
def draws_from_parameter_likelihood(fit_df,
Xilabels,
path_predicate,
name,
ci,
ndraws,
xres=1000):
""" Simulate draws from the parameter likelihood in `fit`.
Obtain only the stable parameterizations and collect the
corresponding trajectories.
TODO
- Merge inner code structs with `evaluate_parameter_likelihood`
"""
from scipy import stats
Xistan = {}
for idx_Xi, label in Xilabels.items():
if label is False:
LL_lower = 0
max_LL = 0
LL_upper = 0
xspaces[label] = None
kdes[label] = None
else:
try:
series = fit_df.loc[:, label]
kde = stats.gaussian_kde(series) # Obtain KDE
# Evaluation space
xs = np.linspace(series.min(), series.max(), xres)
# Evaluate density over evaluation space
density = kde(xs)
# Find the point of maximal Likelihood
max_LL = xs[np.where(density == np.max(density))]
# Find the overall parameter space
delta = max(xs) - min(xs)
# Find the evaluation delta given rel. confidence `ci`
delta_ci = delta * ci
# Obtain lower and upper parameter spaces in confidence range
LL_lower = max_LL - delta_ci
LL_upper = max_LL + delta_ci
# Obtain `ndraws` uniform draws in the region
# `LL_upper - LL_lower`
draws = np.random.uniform(low=LL_lower,
high=LL_upper,
size=ndraws)
Xistan[label] = draws
except KeyError: # Parameter density not available
Xistan[label] = np.zeros(0)
self.save_inferred_stan_densities(
Xistan, os.path.join(path_predicate, f'Xistan_{name}'))
return Xistan
def simulate_draws(Xistan, Xilabels, ndraws, sigw, sigy):
"""Simulate various trajectories considering random draws from `Xistan`,
rejecting the instable modes. Collect only stable trajectories and
associated modes.
TODO
Args:
Xistan
Xilabels
ndraws ():
sigy (float):
"""
def random_draw_from(Xistan, Xilabels, ndraws):
"""Randomly obtain candidate parameters `Xi` in the data `Xistan`.
"""
Xi = {} # Collect all variations in `Xi`
# Number of parameters in full model
nparam_fullmodel = len(Simulator.map_model)
for n in range(ndraws):
Xi_ = OrderedDict()
for idx in range(nparam_fullmodel):
label = f'Xi[{idx}]'
try: # Check if model contains term associated with `idx`
Xistan_ = Xistan[Xilabels[idx]]
try:
xi_ = np.random.choice(Xistan_)
except ValueError:
xi_ = 0.
except KeyError: # Model term is zero
Xistan_ = 0.
xi_ = 0.
coeff = Simulator.map_model[label]
Xi_[coeff] = xi_
Xi[n] = pd.DataFrame(Xi_, columns=Xi_.keys(), index=[0]).T
# Set the control model to empty by filling with zeros
Xi[n].loc[:, 1] = np.zeros(len(Xi_))
return Xi
def is_stable(trajectory, thresholds=[-1e2, 1e2]):
""" Check whether the `trajectory` is stable.
"""
crit0 = not np.any(trajectory.isna())
crit1 = np.any(trajectory < thresholds[1])
crit2 = np.any(thresholds[0] < trajectory)
flag0 = (crit0 is True)
flag1 = (bool(crit1) is True)
flag2 = (bool(crit2) is True)
if flag0 and flag1 and flag2:
return True
else:
print('System is unstable!')
return False
def test_random_draws_from_canonical_sindyc(ndraws=100):
cmsindyc = self.msindyc[0][0] # Canonical SINDyc model
cmsindyc.iloc[:, 1] = 0
Xi = dict()
for n in range(ndraws):
xi_ = pd.concat(
[(cmsindyc[cmsindyc != 0].iloc[:, 0] +
np.random.randn(6) * .05**2), cmsindyc.iloc[:, 1]],
axis=1)
xi_ = xi_.fillna(0)
Xi[n] = xi_
return Xi
stan_results = {} # Collect trajectories and associated modes
Xi = random_draw_from(Xistan, Xilabels, ndraws)
for n, Xi_ in Xi.items():
trajectory = self.simulate_model(
Xi_,
X0=np.zeros((2, 1)), # Initial state
U=(self.U + self.D)[0, :],
T=self.Tid,
sigw=sigw,
sigy=sigy)
trajectory.loc[:, 'y'] = trajectory.loc[:, 'x_0']
if is_stable(trajectory):
stan_results[n] = dict(Xi=Xi_, trajectory=trajectory)
else: # Reject the draw
pass
return stan_results
fit_df = fit.to_dataframe() # Cast as `pd.DataFrame`
if (name == 'c0_12') or (name == 'c1_12'):
# Map indices in `simulator.map_model` to the coefficients in `fit_df`
Xilabels = {0: 'Xi[1]', 1: 'Xi[2]'}
elif (name == 'c0_23') or (name == 'c1_23'):
Xilabels = {1: 'Xi[1]', 2: 'Xi[2]'}
elif (name == 'c0_124') or (name == 'c1_124'):
Xilabels = {0: 'Xi[1]', 1: 'Xi[2]', 3: 'Xi[3]'}
elif (name == 'c0_1234') or (name == 'c1_1234'):
Xilabels = {0: 'Xi[1]', 1: 'Xi[2]', 2: 'Xi[3]', 3: 'Xi[4]'}
else:
raise NotImplementedError(
'Other model formulations not yet supported.')
try:
Xistan
except UnboundLocalError: # `Xistan` is not available
Xistan = draws_from_parameter_likelihood(fit_df,
Xilabels,
path_predicate,
name,
ci=ci,
ndraws=ndraws,
xres=1000)
self.identify()
dfs_oos = self.simulate_out_of_sample() # System and SINDyc
df_oos, dfm_oos = self.combine_sample_dataframes(dfs_oos)
# Stan
dfss_oos_ = simulate_draws(
Xistan,
Xilabels,
ndraws,
sigw=fit_df.filter(regex='sigma_w').mean().mean(),
sigy=fit_df.filter(regex='sigma_y').mean().mean())
dfss_oos = dict()
for key, value in dfss_oos_.items():
dfss_oos[key] = value['trajectory']
results = {'stan': dfss_oos, 'system': df_oos, 'sindy': dfm_oos}
return results
def _ID(self):
""" Prepare identification-phase related code.
"""
self.Tburn = np.arange(0, self.simspec['ID']['Tburn'],
dtype=int) # Burn-horizon
self.Tid = np.arange(0, self.simspec['ID']['T'],
dtype=int) # ID-horizon
siggen = SignalGenerator(
random=self.simspec.get('siggen', {}).get('random',
{}).get('random'),
scale=self.simspec.get('siggen', {}).get('random',
{}).get('scale'),
filter=dict(tau=self.simspec.get('siggen',
{}).get('signal',
{}).get('tau', 5.),
Ts=self.simspec['Ts']))
freq_0_m = -4
freq_0_s = .05
freq_1_m = -1
freq_1_s = .05
try:
freq_0_m = self.simspec['siggen']['signal']['freq_0']['mean']
freq_0_s = self.simspec['siggen']['signal']['freq_0']['sigma']
freq_1_m = self.simspec['siggen']['signal']['freq_1']['mean']
freq_1_s = self.simspec['siggen']['signal']['freq_1']['sigma']
except KeyError:
pass
freq_0 = self.rs.lognormal(mean=freq_0_m, sigma=freq_0_s)
freq_1 = self.rs.lognormal(mean=freq_1_m, sigma=freq_1_s)
self.did = siggen.sine_prbs_filt(
samples=self.simspec['ID']['T'],
sine_freq=[freq_0, freq_1],
sine_scale=[
self.simspec['ID'].get('siggen',
{}).get('signal',
{}).get('sine_scale', .25), .2
],
prbs_frac=None,
prbs_scale=.1,
scale=self.simspec['ID'].get('siggen',
{}).get('signal',
{}).get('scale', 1.))
self.uid = self.simspec['ID']['u']
if self.did.shape[0] != self.mp.np:
self.did = np.tile(self.did, (self.mp.np, 1))
if self.uid.shape[0] != self.mp.np:
self.uid = np.tile(self.uid, (self.mp.np, 1))
return dict(Tburn=self.Tburn, Tid=self.Tid, d=self.did, u=self.uid)
def ID(self, **parameters):
""" Identification-phase of the simulation. Both disturbance signal
`d` and control signal `u` are applied to the system simultaneously.
At least one of `d` and `u` consequently has to be provided.
Notice:
In the controlled phase, the control signal `u` shall be more or less
constant such as to allow for the system to be perturbed by the
identification signal `d`.
"""
Tburn, Tid = parameters['Tburn'], parameters['Tid']
d, u = parameters['d'], parameters['u']
if (d is None) and (u is None):
raise TypeError('Provide at least one argument to `ID`')
def burnin(Tburn, mp, u_burn):
""" Perform burnin-excitation in order to stabilize the
system around its reference excitation.
"""
for t in Tburn:
mp.excite(cluster=self.simspec['cluster'], p=u_burn)
return mp
burnin(Tburn, self.mp, u_burn=u[:, 0])
data = self.simulate(Tid, self.mp, d, u)
self.Y = data['X']
self.U = u
self.D = d
return data
def simulate(self, Tid, mp, d, u):
"""
"""
P = np.empty((mp.nx, 0))
X = {}
for t in Tid:
p = d[:, t:t + 1] + u[:, t:t + 1]
x = mp.excite(cluster=self.simspec['cluster'], p=p)
P = np.hstack([P, p])
X[t] = x.copy()
return dict(P=P, X=X)
def CL(self):
""" Controlled-phase of the simulation.
"""
raise NotImplementedError
def _identify_SINDy(self, data):
""" Adjust the `data` such that we only identify a model given
meaningful data (such as, only use the single excitation sequence
in the SIMO case).
"""
P = data['P'][0, :][np.newaxis, :]
X = self.temporalaggregates_to_structure(data['X'],
structure=pd.DataFrame)
data['X'] = X
return data
def identify_SINDy(self, data):
""" Identify a SINDy model given `data`. `data` contains response data
recorded over several samples `self.mp.ns`.
"""
# Identify one SINDyc model per sample `self.mp.ns`
self.msindyc = {s: {} for s in range(self.mp.ns)}
X = data['X'] # Unpack response data
for sample, response in X.items():
for imodel in range(self.mp.nx): # Loop over response models
data_ = {
'Y': response.iloc[:, imodel].values,
'P': data['P'][imodel, :]
}
if np.isnan(data_['Y']).any():
print('Instable model!')
break
elif np.isinf(data_['Y']).any():
print('Instable model!')
break
else:
combineddata = np.vstack([data_['Y'], data_['P']])
ny = 1 # Number of effective SINDyc outputs
self.identifier.fit(
ny,
combineddata,
dt=self.simspec['Ts'],
poly_degree=self.simspec['ID'].get('poly_degree', 3),
cut_off=self.simspec['ID']['cut_off'],
neval=self.simspec['ID'].get('neval', 10),
sparsity_threshold=self.simspec['ID'].get(
'sparsity_threshold', .65))
self.msindyc[sample][imodel] = (
self.identifier.model.results)
self.msindyc = self.normalize(self.msindyc)
return self.msindyc
def normalize(self, msindyc, steps=5, gain=.1) -> pd.DataFrame:
""" Normalize the SINDyc model `msindyc` on the associated
input-output signals `py_msindyc`. NOTE this is WIP
"""
m = msindyc[0][0]
# Y, P = py_msindyc[0, :], py_msindyc[1, :]
# msindyc.in_out_power = Y.mean() / P.mean()
# msindyc.loc['Cn', :] = [Y.mean(), P.mean()]
dfs = self.step_response(gain=gain)
df = dfs['sindy'][0] # NOTE Unclear indexing
# df[0].plot();plt.show()
msindyc_ = msindyc[0] # NOTE Unclear indexing
for imodel in df.keys():
ysteady = df[imodel]['y'].iloc[-steps:].mean()
if ysteady is np.nan:
raise Exception('Model is unstable!')
else: # Stable model
# NEXT Find a linear or nonlinear mapping for adjusting
# the output space. As this is a nonlinear system,
# most likely this will be nonlinear as well.
# cfac = gain/ysteady
# msindyc_[imodel].loc['Cn', :] = 1./cfac
msindyc_[imodel].loc['Cn', :] = 1. + (0.5 * 0.1)
# TODO py_msindyc = ?
msindyc[0] = msindyc_
return msindyc
@classmethod
def identify_stan(self, df, Xistar, sigma_y_init, name, fitnewmodel=True):
""" Fit a Stan model given the single trajectory dataframe `df`
and the identified SINDy model `Xistar` (such that it provides
the coefficients for a linear model `ax + b`, where `b` is
the first coefficient in `Xistar`).
"""
def yield_Xibounds(Xistar, sig_Xi=None):
"""
"""
if sig_Xi is None:
sig_Xi = np.full_like(Xistar, .2)
l_Xibounds, u_Xibounds = (np.empty_like(Xistar),
np.empty_like(Xistar))
for n, Xi in enumerate(Xistar):
if Xi > 0:
l_Xibounds[n] = 0
u_Xibounds[n] = Xi + sig_Xi[n]
elif Xi == 0: # Relax for now
l_Xibounds[n] = -10.
u_Xibounds[n] = 10.
else:
u_Xibounds[n] = 0
l_Xibounds[n] = Xi - sig_Xi[n]
return l_Xibounds, u_Xibounds
def yield_signs(Xistar):
"""
"""
signs_Xistar = np.empty_like(Xistar, dtype=int)
for n, Xi in enumerate(Xistar):
if Xi < 0:
signs_Xistar[n] = -1
else:
signs_Xistar[n] = 1
return signs_Xistar
def fit_excitation(df, deg=12):
""" Fit the excitation signal `u` using polynomial regression.
Args:
df (pd.DataFrame): Pandas DataFrame
deg (int): Polynomial degree (defaults=12)
Returns:
poly (array): Polynomial coefficients, *lowest* power first
"""
def plot(fits, x, y):
_, ax = plt.subplots()
ax.plot(x, y, '-')
for idx, fit in enumerate(fits):
epoly = np.poly1d(fit)
ys = epoly(x)
ax.plot(x, ys, '--', label=f'{idx}')
plt.show()
polys = []
columns = df.filter(regex='u').columns
for column in columns:
signal = df.loc[:, column].values.flatten()
poly = np.polyfit(df.index.values, signal, deg=deg)
# plot([poly], df.index.values, signal)
polys.append(poly[::-1])
polys = np.vstack(polys)
return polys
def adjust_problem_size(Xistar):
""" Yield a boolean mask `mask` that adjusts the problem size
"""
mask = np.where(Xistar != 0)[0]
return mask
l_Xibounds, u_Xibounds = yield_Xibounds(Xistar)
signs_Xistar = yield_signs(Xistar)
Xi_0 = 0.
cupolys = fit_excitation(df, deg=3)
ocupolys = cupolys.shape[1] # Order of polynomials
params = cupolys
mask = adjust_problem_size(Xistar)
identifier = ''
for m in mask:
identifier += str(m + 1)
model_file = './simo_IDCL_sindy_stan_code_{}.stan'.format(
identifier)
print('Using the following model file:')
print(model_file)
Xistar = Xistar[mask]
print(f'Effectively active terms: {Xistar.values}')
data = dict(
N=df.shape[0],
ts=np.arange(1, df.shape[0]),
# ts=np.ones(1), # Sampling rate
nu=min(df.filter(regex='u').shape),
ny=min(df.filter(regex='y').shape),
nXi=max(Xistar.shape),
y_init=df.filter(regex='y').iloc[0, :].values,
z_init=df.filter(regex='y').iloc[0, :].values[np.newaxis],
y=df.filter(regex='y').values,
u=df.filter(regex='u').values[1:, :],
sigma_x_init=.005,
sigma_y_init=sigma_y_init + .1,
Xistar=Xistar,
l_Xibounds=l_Xibounds[mask],
u_Xibounds=u_Xibounds[mask],
params=params,
ocupolys=ocupolys,
cupolys=cupolys,
signs_Xistar=signs_Xistar[mask])
results = {}
name_id = name + '_' + str(identifier)
if fitnewmodel:
df, fit = fit_stan(data,
model_code=None,
model_file=model_file,
name=name_id,
fitnewmodel=fitnewmodel)
results['df'] = df
results['fit'] = fit
else:
try:
data = self.load_inferred_stan_densities(name_id)
results['densities'] = data
except FileNotFoundError: # Obtain `fit` in case model is not available
# through pickling
df, fit = fit_stan(data,
model_file,
name=name_id,
fitnewmodel=fitnewmodel)
results['df'] = df
results['fit'] = fit
return results
@classmethod
def retrieve_Xistar(self, msindyc, model, diagnostic=False):
""" Obtain an most-likely `Xi` by examining the likelihood functions
"""
from scipy import stats
dfs = []
for isample, sindy in msindyc.items():
try:
dfs.append(sindy[model].iloc[:, cluster])
except np.linalg.LinAlgError:
dfs.append(sindy[model][cluster])
try:
dfs = pd.concat(dfs, axis=1)
except TypeError:
breakpoint()
canonical = {} # Canonical coefficients
for coefficient in dfs.index:
mins, maxs = dfs.loc[coefficient].min(), dfs.loc[coefficient].max()
xs = np.linspace(mins, maxs, 2000)
try:
kde = stats.gaussian_kde(xs)
max_LL = xs[np.where(kde(xs) == np.max(kde(xs)))]
canonical[coefficient] = np.max(max_LL)
except np.linalg.LinAlgError: # Coefficient unused in model
canonical[coefficient] = 0.
if diagnostic:
plt.plot(xs, kde(xs))
plt.show()
return np.array(list(canonical.values()))
def identify(self, UD=None):
""" Identify a SINDyc model using the class-methods `ID` and
`identify_SINDy`, using inputs provided by respective private methods.
"""
ID_params = self._ID()
if UD is not None:
U, D = UD['U'], UD['D'] # Unpack excitation sequences
# Update the excitation sequences
ID_params['u'] = U
ID_params['d'] = D
self.uid = U
self.did = D
data = self.ID(**ID_params)
sindy = self.identify_SINDy(self._identify_SINDy(data))
return sindy
@classmethod
def save_inferred_stan_densities(self, data, name):
import pickle
if not '.pkl' in name: # NOTE Dirty implementation
name += '.pkl'
with open(os.path.join('./', name), 'wb',
-1) as fhandle:
pickle.dump(data, fhandle)
@classmethod
def load_inferred_stan_densities(self, name):
import pickle
if not '.pkl' in name: # NOTE Dirty implementation
name += '.pkl'
with open(os.path.join('./', name), 'rb',
-1) as fhandle:
data = pickle.load(fhandle)
return data
def simulate_CL(self):
""" TODO
"""
raise NotImplementedError
def _step_response(self, dfs, T, gain, x0=np.zeros((2, 1))):
""" Excite the models with a step.
"""
dfs = self._prepare_simulation_containers()
U = np.hstack([np.zeros(1), np.full(T.shape[0] - 1, gain)])
for isample, sample in self.msindyc.items():
# Loop over the two models
for imodel, model in sample.items():
# Model data
df_m = self.simulate_model(
model,
# NOTE Dirty implementation, due to that
# `simulate_model` asks for SINDy-related
# initial model state, consisting of a
# stacking of state and control input,
# we here use `x0` only partly right!
x0.copy(),
U=U,
T=T)
df_m = df_m.rename({'x_0': 'y'}, axis=1)
dfs['sindy'][isample][imodel] = df_m
return dfs
def step_response(self, gain=1.):
""" Excite the identified models with a unit step.
"""
dfs = self._prepare_simulation_containers()
dfs = self._step_response(dfs, T=self.Tid, gain=gain)
return dfs
#+end_src
** Plotting
#+NAME: paper_plotting_header
#+begin_src python
import os
import seaborn as sns
import matplotlib.pyplot as plt
palette='Set2'
sns.set(palette=palette, style='ticks', context='paper')
DATAPATH = './'
DUMPPATH = './'
PLOTPATH = './Illustrations/'
class Plotter:
linestyles = ['-', '--', '-.', ':']
extra_small = (6, 3)
small = (6, 4)
normal = (8, 6)
large = (10, 8)
@classmethod
def annotate(self, ax, annotation):
"""
"""
def set_text(ax, text):
ax.text(1,
1.01,
text,
fontsize=6,
ha="right",
va="bottom",
transform=ax.transAxes)
if annotation.get('metric') is not None:
metric = annotation['metric']
text = ''
if isinstance(metric, list):
nmetric = len(metric)
for n, annotation in enumerate(metric):
name, value = annotation
if n == nmetric: # Reached end of metric list
text += f'{name.upper()}: {value}'
else:
text += f'{name.upper()}: {value}, '
else:
name, value = metric
text = f'{name.upper()}: {value}'
try:
set_text(ax, text)
except UnboundLocalError:
print('Failed to annotate axis.')
@classmethod
def postprocess(self, axs, **kwargs):
ax_labels = kwargs.get('labels')
if ax_labels is not None:
if isinstance(ax_labels, list):
for ax, labels in zip(axs, ax_labels):
if labels is not None:
ax.legend(labels)
else:
ax.legend().remove()
else:
for ax in axs:
ax.legend()
else:
for ax in axs:
ax.legend().remove()
metric = kwargs.get('metric')
if metric is not None:
self.annotate(axs[0], {'metric': metric})
hlines = kwargs.get('hlines')
if hlines is not None:
for iax, options in hlines.items():
val = options['val']
xmin, xmax = options['xmin'], options['xmax']
axs[iax].hlines(val, xmin, xmax)
xlim = kwargs.get('xlim')
if xlim is not None:
for iax, limits in xlim.items():
axs[iax].set_xlim(limits)
ylim = kwargs.get('ylim')
if ylim is not None:
for iax, limits in ylim.items():
axs[iax].set_ylim(limits)
plt.tight_layout()
sns.despine()
@classmethod
def generate_filepath(self, filename):
return os.path.join(PLOTPATH, filename)
@classmethod
def savefig(self, filename):
filepath = self.generate_filepath(filename)
plt.savefig(filepath, transparent=True)
return filepath
@classmethod
def plot_y(self, dfs, filename, **kwargs):
_, ax = plt.subplots(figsize=kwargs.get('figsize', (8, 4)))
for linestyle, df in zip(Plotter.linestyles, dfs):
if kwargs.get('filter', True) is True:
df.filter(regex='y').plot(ax=ax, linestyle=linestyle)
else:
df.plot(ax=ax, linestyle=linestyle)
ax.set_xlabel('Timsteps')
ax.set_ylabel('Output space')
ax.legend()
self.postprocess([ax], **kwargs)
return self.savefig(filename)
@classmethod
def plot_yu(self,
dfs,
filename,
show=False,
save=True,
df_P=None,
forcesingle_u=False,
bw=True,
axs=None,
,**kwargs):
gridspec_kw = kwargs.get('gridspec_kw', None)
if axs is None:
_, axs = plt.subplots(nrows=2,
sharex=True,
gridspec_kw=gridspec_kw,
figsize=kwargs.get('figsize', (8, 6)))
if df_P is not None: # Plot uncertainty range
lower = df_P['lower']
upper = df_P['upper']
label = df_P.get('label')
axs[0].fill_between(dfs[-1].index,
lower,
upper,
color='grey',
alpha=.05,
label=label)
if bw:
color = 'k' # Use Black-And-White plotting
else:
color = None
for linestyle, df in zip(Plotter.linestyles, dfs):
if kwargs.get('filter', True) is True:
try:
df.filter(regex='y').plot(ax=axs[0],
linestyle=linestyle,
color=color)
except TypeError:
try:
print(f'Failed to plot {df.name}')
except AttributeError:
print('Failed to plot unnamed dataframe')
else:
df.plot(ax=axs[0], linestyle=linestyle, color=color)
axs[0].set_ylabel('Output space')
if forcesingle_u:
dfs[0].filter(regex='u').plot(ax=axs[1], color=color)
else:
for linestyle, df in zip(Plotter.linestyles, dfs):
df.filter(regex='u').plot(ax=axs[1],
linestyle=linestyle,
color=color)
axs[1].set_ylabel('Input space')
axs[1].set_xlabel('Timesteps')
self.postprocess(axs, **kwargs)
if show:
plt.show()
if save:
return self.savefig(filename)
@classmethod
def plot3D(self,
ode_lin,
ode_nl,
xmin,
xmax,
umin,
umax,
zlim=(-1, 1),
step=.1,
show=True,
save=False,
fig=None,
ax=None,
filename='plot3D.pdf',
,**kwargs):
from matplotlib.ticker import LinearLocator, FormatStrFormatter
if (ax is None) or (fig is None):
from mpl_toolkits.mplot3d import Axes3D
fig, ax = plt.subplots(subplot_kw={'projection': '3d'})
X = np.arange(xmin, xmax, step)[:, np.newaxis]
U = np.arange(umin, umax, step)[:, np.newaxis]
X, U = np.meshgrid(X, U)
surfl = ax.plot_wireframe(X,
U,
ode_lin(X, U),
color='k',
alpha=.25,
label='Linearized function')
surfnl = ax.plot_surface(X,
U,
ode_nl(X, U),
cmap='viridis',
linewidth=0,
label='Nonlinear function')
surfnl._facecolors2d = surfnl._facecolors3d
surfnl._edgecolors2d = surfnl._edgecolors3d
xformat = kwargs.get('xformat', '%.02f')
ax.xaxis.set_major_locator(LinearLocator(5))
ax.xaxis.set_major_formatter(FormatStrFormatter(xformat))
yformat = kwargs.get('yformat', '%.02f')
ax.yaxis.set_major_locator(LinearLocator(5))
ax.yaxis.set_major_formatter(FormatStrFormatter(yformat))
zformat = kwargs.get('zformat', '%.02f')
ax.set_zlim(*zlim)
ax.zaxis.set_major_locator(LinearLocator(10))
ax.zaxis.set_major_formatter(FormatStrFormatter(zformat))
if fig is not None:
if kwargs.get('colorbar', True):
from mpl_toolkits.axes_grid1.inset_locator import inset_axes
axins1 = inset_axes(ax,
width="5%",
height="50%",
loc='center left')
fig.colorbar(surfnl, cax=axins1)
axins1.yaxis.set_ticks_position('left')
ax.set_xlabel('x') # x (state space)
ax.set_ylabel('u') # u (input space)
ax.set_zlabel('y') # Y (output space)
ax.azim = kwargs.get('azim', -33)
ax.elev = kwargs.get('elev', 13)
ax.legend()
if ax is None:
if show:
plt.show()
return None
if save:
filepath = os.path.join(PLOTPATH, filename)
plt.savefig(filepath, transparent=True)
return filepath
else:
return ax
@classmethod
def plot_density(self,
stan_results,
var_names,
filename,
resolution=2000,
var_prior=None,
mark_max_LL=True,
mark_LL_ci=True,
ci=[.05],
original_model_obj=None,
show=False,
save=True,
var_labels=None,
,**kwargs):
""" Plot the posterior density of the variables specified in
`var_names`.
"""
var_prior_ = var_prior[var_prior != 0]
from scipy import stats
try: # Use `stan_results` to retrieve input data
densities = stan_results['densities']
use_recordings = False
except KeyError: # Try to obtain the distributions
try: # Try obtain `df` directly
fit = stan_results['fit']
df = fit.to_dataframe()
use_recordings = True
except KeyError: # Try to obtain `fit` instance
df = stan_results['df']
use_recordings = True
def obtain_via_recordings(df, name, idx):
""" Obtain the plotting input via recordings
"""
series = df.loc[:, name]
if var_labels is not None: # Obtain plot label
name = var_labels[idx]
else:
name = series.name
kde = stats.gaussian_kde(series) # Instantiate `kde`
s_min, s_max = series.min(), series.max() # Obtain bounds
xs = np.linspace(s_min, s_max, resolution) # Evaluation space
ys = kde(xs) # Evaluate `kde`
return xs, ys, kde, series
def obtain_via_densities(densities, name):
""" Obtain the plotting input via densities
"""
densities_ = densities['densities']
xs, kde = densities_['xspace'][name], densities_['yspace'][name]
ys = kde(xs)
return xs, ys, kde, None
def obtain_original_density(size=1000):
""" Obtain the original parametric density.
"""
# First moment of original system parameters
mus = [mp.mu_alpha[0], mp.mu_beta[0]]
# Second moment of original system parameter
sigs = [mp.sig_alpha, mp.sig_beta]
kdes, xs, ys = [], [], []
for mu, sig in zip(mus, sigs):
values = np.random.normal(loc=mu, scale=sig, size=resolution)
kde = stats.gaussian_kde(values) # Instantiate `kde`
s_min, s_max = values.min(), values.max() # Obtain bounds
xs_ = np.linspace(s_min, s_max, resolution) # Evaluation space
ys_ = kde(xs_) # Evaluate `kde`
kdes.append(kde)
xs.append(xs_)
ys.append(ys_)
return xs[::-1], ys[::-1], kdes[::-1], None
if original_model_obj is not None:
o_xs, o_ys, o_kde, _ = obtain_original_density()
nrows = len(var_names['labels'])
if nrows == 1:
figsize = Plotter.extra_small
elif nrows == 2:
figsize = Plotter.small
elif nrows == 3:
figsize = Plotter.normal
else:
figsize = Plotter.large
_, axs = plt.subplots(nrows=nrows, figsize=figsize)
for idx, name in enumerate(var_names['labels']):
# Obtain `xs` and `ys` evaluation spaces
if use_recordings:
xs, ys, kde, series = obtain_via_recordings(
df, name, int(name[3]))
else:
xs, ys, kde, series = obtain_via_densities(densities, name)
# Obtain reference minimum for logarithmic axis scaling
minval = min(ys.min(), o_ys[idx].min())
if mark_max_LL:
max_LL = xs[np.where(ys == np.max(ys))]
if var_prior is not None: # Plot prior mean
axs[idx].vlines(var_prior_.iloc[idx],
minval, max(o_ys[idx].max(), kde(max_LL)),
linestyle=':')
if mark_LL_ci:
def mark_ci(n, xs, ci_, max_LL):
"""
"""
dci = ci_ * np.abs(max(xs) - np.min(xs))
LL_lower = max_LL - dci
LL_upper = max_LL + dci
alpha = (1 + n) * .25
xs_ = xs[(LL_lower < xs) & (xs < LL_upper)]
axs[idx].fill_between(xs_,
kde(xs_),
np.ones_like(xs_) * minval,
color='grey',
alpha=alpha)
if isinstance(ci, list):
for n, ci_ in enumerate(ci):
mark_ci(n, xs, ci_, max_LL)
else:
mark_ci(minval, xs, ci, max_LL)
try:
axs[idx].scatter(max_LL, kde(max_LL))
except UnboundLocalError:
pass
axs[idx].plot(o_xs[idx], o_ys[idx], color='k')
axs[idx].plot(xs, ys, color='grey', linestyle='--')
axs[idx].set_xlabel(var_names['plot_labels'][idx])
self.postprocess(axs, **kwargs)
axs[-1].set_yscale('log')
if show:
plt.show()
if save:
return self.savefig(filename)
@classmethod
def plot_ppc(self,
data,
filename,
data_pairs={'y': 'y_hat'},
num_pp_samples=None,
show=False,
save=True,
,**kwargs):
""" Posterior Predictive Check
"""
import arviz as az
az.style.use('arviz-white')
ax = az.plot_ppc(stan_data,
data_pairs={'y': 'y_hat'},
num_pp_samples=num_pp_samples,
figsize=Plotter.extra_small,
legend=False)
try:
handles, labels = ax.get_legend_handles_labels()
except AttributeError:
handles, labels = ax[0].get_legend_handles_labels()
ax = ax[0]
ax.set_xlim((-0.3, 1.0))
labels = [label.replace('y_hat', r'$\hat{y}$') for label in labels]
ax.legend(handles, labels, fontsize=12, loc='upper left',
frameon=True, fancybox=True, framealpha=.75)
ax.set_xlabel(r'$y/\hat{y}$')
sns.despine()
plt.tight_layout()
if show:
plt.show()
if save:
return self.savefig(filename)
@classmethod
def plot_draws(self, dfs, P, filename='draws.pdf', ax=None):
if ax is None:
_, axs = plt.subplots(nrows=2,
sharex=True,
gridspec_kw={'height_ratios': [1., .5]},
figsize=Plotter.normal)
else:
axs = [ax]
raise NotImplementedError
@classmethod
def plot_sectors(self,
df,
grouped,
filename=None,
q_lower=.1,
q_upper=.9,
n_levels=20,
save=True,
show=False,
xlabel='Timestamps',
ylabel='Y',
ax=None,
returnHandle=False,
,**kwargs):
""" Uncertainty sector plot.
Args:
df (pd.DataFrame): Full pandas DataFrame
grouped (<pd.DataFrame.groupby>): df grouped by chosen criterion
filename (str): Full filename (including type specifier)
"""
import matplotlib as mpl
quantspace = np.linspace(q_lower, q_upper, 2 * n_levels + 1)
levels = []
for idx in range(int((quantspace.shape[0] + 1) / 2) - 1):
# Lower quantile
ql = quantspace[idx]
# Upper quantile
qu = quantspace[-idx - 1]
# DataFrame/Series corresponding to `ql`
lower = grouped.quantile(ql)
lower.name = ql
# DataFrame/Series corresponding to `qu`
upper = grouped.quantile(qu)
upper.name = qu
levels.append((lower, upper))
cmap = plt.get_cmap(kwargs.get('cmap', 'Blues'), n_levels)
if ax is None:
# Here we initialize a new plot container (otherwise we re-use)
_, ax = plt.subplots(figsize=(6, 4))
for idx, level in enumerate(levels):
try:
ax.fill_between(
grouped.median().index,
# df.index,
level[0].values.flatten(),
level[1].values.flatten(),
color=cmap(idx),
alpha=kwargs.get('alpha', 1.))
except:
try:
ax.fill_between(
# taxis,
df.index[0:level[0].values.shape[0]],
level[0].values.flatten(),
level[1].values.flatten(),
color=cmap(idx),
alpha=kwargs.get('alpha', 1.))
except:
breakpoint()
if kwargs.get('median', False) is True:
ax.plot(grouped.median().index,
grouped.median(),
linestyle=kwargs.get('median_linestyle', '--'),
color=cmap(kwargs.get('median_pcmap', .5)))
maxquant = levels[-1]
# Derive the color space by examination of the quantile difference
vmin = quantspace.min()
vmax = quantspace.max()
norm = mpl.colors.Normalize(vmin=vmin, vmax=vmax)
sm = plt.cm.ScalarMappable(cmap=cmap.reversed(), norm=norm)
sm.set_array([])
if kwargs.get('colorbar', True) is True:
plt.colorbar(sm,
label=kwargs.get('colorbar_label',
'Quantile range around median'))
if kwargs.get('postprocess', True) is True:
ax.set_xlabel(xlabel)
ax.set_ylabel(ylabel)
plt.tight_layout()
sns.despine()
if kwargs.get('legend', False) is False:
ax.legend().remove()
if returnHandle is False:
if save is True:
plt.savefig(filename, transparent=True)
return filename
else:
return ax, sm
if show is True:
plt.show()
else:
return ax, sm
def plot_sns(dfs, filename, show=False, save=True):
""" Cosimulation and out of sample plot.
"""
filepath = os.path.join(PLOTPATH, filename)
def concat(dfs):
keys = ['ts_id', 'ts_oos']
labels = ['ID', 'OOS']
df_updated = []
for key, label in zip(keys, labels):
ts = dfs[key]
sys = ts['system']
model = ts['model']
df_relabeled = []
for df, timeseries in zip([sys, model], ['system', 'model']):
df = pd.concat([df.filter(regex='y'),
df.filter(regex='u')], axis=1)
df.columns = ['y', 'u']
df.loc[:, 'time series'] = timeseries
df_relabeled.append(df)
df_ = pd.concat(df_relabeled, axis=0)
df_.loc[:, 'experiment'] = label
df_updated.append(df_)
df = pd.concat(df_updated, axis=0)
df.index.name = 'Timesteps'
df = df.reset_index()
return df
df = concat(dfs)
_, axs = plt.subplots(nrows=2, figsize=Plotter.small,
sharex=True)
sns.lineplot(x='Timesteps', y='y', style='time series', hue='experiment', data=df,
ax=axs[0])
axs[0].legend(loc='lower right')
axs[0].set_ylabel('Y')
sns.lineplot(x='Timesteps', y='u', hue='experiment', data=df, ax=axs[1])
axs[1].legend(loc='lower right')
axs[1].set_ylabel('P')
axs[1].set_xlabel('Timesteps')
sns.despine()
plt.tight_layout()
if show:
plt.show()
if save:
plt.savefig(filepath, transparent=True)
return filepath
#+end_src
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment