import webbrowser
from typing import Optional, Union, get_args
import pandas as pd
from agentlib.core.agent import AgentConfig
from agentlib_mpc.utils import TIME_CONVERSION, TimeConversionTypes
from agentlib_mpc.utils.analysis import mpc_at_time_step
from agentlib_mpc.utils.plotting.interactive import get_port
from dash import Dash, Input, Output, ctx, dcc, html
from plotly import graph_objects as go
from pydantic import FilePath
import agentlib_flexquant.data_structures.flex_results as flex_results
import agentlib_flexquant.data_structures.globals as glbs
from agentlib_flexquant.data_structures.flex_kpis import FlexibilityKPIs
from agentlib_flexquant.data_structures.flex_offer import OfferStatus
from agentlib_flexquant.data_structures.flexquant import FlexQuantConfig
[docs]class CustomBound:
"""Dataclass to let the user define custom bounds for the mpc variables.
for_variable -- The name of the variable to plot the bounds into
lower bound -- The lower bound of the variable as the name of the lower bound variable
in the MPC
upper bound -- The upper bound of the variable as the name of the upper bound variable
in the MPC
"""
for_variable: str
lower_bound: Optional[str]
upper_bound: Optional[str]
def __init__(
self,
for_variable: str,
lb_name: Optional[str] = None,
ub_name: Optional[str] = None,
):
self.for_variable = for_variable
self.lower_bound = lb_name
self.upper_bound = ub_name
[docs]class Dashboard(flex_results.Results):
"""Class for the dashboard of flexquant"""
# Constants for plotting variables
MPC_ITERATIONS: str = "stats_iter_count"
# Label for the positive and negative flexibilities
label_positive: str = "positive"
label_negative: str = "negative"
# Keys for line properties
bounds_key: str = "bounds"
characteristic_times_current_key: str = "characteristic_times_current"
characteristic_times_accepted_key: str = "characteristic_times_accepted"
# Custom settings
custom_bounds: list[CustomBound] = []
def __init__(
self,
flex_config: Optional[Union[str, FilePath, FlexQuantConfig]] = None,
simulator_agent_config: Optional[Union[str, FilePath, AgentConfig]] = None,
generated_flex_files_base_path: Optional[Union[str, FilePath]] = None,
results: Union[str, FilePath, dict[str, dict[str, pd.DataFrame]]] = None,
to_timescale: TimeConversionTypes = "hours",
port: int = None,
):
super().__init__(
flex_config=flex_config,
simulator_agent_config=simulator_agent_config,
generated_flex_files_base_path=generated_flex_files_base_path,
results=results,
to_timescale=to_timescale,
)
self.port = port
self.current_timescale_input = self.current_timescale_of_data
# remove FMU from dataclass to enable pickling/multiprocessing of Dashboard
if hasattr(self.simulator_module_config, "model"):
self.simulator_module_config.model_config["frozen"] = False
delattr(self.simulator_module_config, "model")
# Define line properties
self.LINE_PROPERTIES: dict = {
self.simulator_agent_config.id: {"color": "black",},
self.baseline_agent_config.id: {"color": "black",},
self.neg_flex_agent_config.id: {"color": "red",},
self.pos_flex_agent_config.id: {"color": "blue",},
self.bounds_key: {"color": "grey",},
self.characteristic_times_current_key: {"color": "grey", "dash": "dash",},
self.characteristic_times_accepted_key: {"color": "yellow",},
}
# KPIS
kpis_pos = FlexibilityKPIs(direction="positive")
self.kpi_names_pos = kpis_pos.get_name_dict()
kpis_neg = FlexibilityKPIs(direction="negative")
self.kpi_names_neg = kpis_neg.get_name_dict()
# Get variables for plotting
# MPC stats
self.plotting_variables = [self.MPC_ITERATIONS]
# MPC and sim variables
self.intersection_mpcs_sim = self.get_intersection_mpcs_sim()
self.plotting_variables.extend(
[key for key in self.intersection_mpcs_sim.keys()]
)
# Flexibility kpis
self.plotting_variables.append(kpis_pos.energy_flex.name)
self.plotting_variables.append(kpis_pos.costs.name)
# for kpi in kpis_pos.get_kpi_dict(direction_name=False).values():
# if not isinstance(kpi.value, pd.Series):
# self.plotting_variables.append(kpi.name)
def _plot_mpc_stats(self, fig: go.Figure, variable: str) -> go.Figure:
""" plot the statics of the baseline and shadow mpcs.
Args:
fig: the figure to be updated
variable: the statics variable to be plotted
Returns:
The updated figure
"""
fig.add_trace(
go.Scatter(
name=self.baseline_agent_config.id,
x=self.df_baseline_stats.index,
y=self.df_baseline_stats[variable],
mode="markers",
line=self.LINE_PROPERTIES[self.baseline_agent_config.id],
)
)
fig.add_trace(
go.Scatter(
name=self.pos_flex_agent_config.id,
x=self.df_pos_flex_stats.index,
y=self.df_pos_flex_stats[variable],
mode="markers",
line=self.LINE_PROPERTIES[self.pos_flex_agent_config.id],
)
)
fig.add_trace(
go.Scatter(
name=self.neg_flex_agent_config.id,
x=self.df_neg_flex_stats.index,
y=self.df_neg_flex_stats[variable],
mode="markers",
line=self.LINE_PROPERTIES[self.neg_flex_agent_config.id],
)
)
return fig
def _plot_one_mpc_variable(
self, fig: go.Figure, variable: str, time_step: float
) -> go.Figure:
"""Plot the mpc series for the specified variable at the specified time step.
Args:
fig: the figure to be updated
variable: the variable to be plotted
time_step: the time step to be plotted
Returns:
The updated figure
"""
# Get the mpc data for the plot
series_neg = mpc_at_time_step(
data=self.df_neg_flex,
time_step=time_step,
variable=self.intersection_mpcs_sim[variable][
self.neg_flex_module_config.module_id
],
index_offset=False,
)
series_pos = mpc_at_time_step(
data=self.df_pos_flex,
time_step=time_step,
variable=self.intersection_mpcs_sim[variable][
self.pos_flex_module_config.module_id
],
index_offset=False,
)
series_bas = mpc_at_time_step(
data=self.df_baseline,
time_step=time_step,
variable=self.intersection_mpcs_sim[variable][
self.baseline_module_config.module_id
],
index_offset=False,
)
def _add_step_to_data(s: pd.Series) -> pd.Series:
""" shift the index of the series """
s_concat = s.copy().shift(periods=1)
s_concat.index = s.index - 0.01 * (s.index[1] - s.index[0])
for ind, val in s_concat.items():
s[ind] = val
s.sort_index(inplace=True)
return s
# Manage nans
for series in [series_neg, series_pos, series_bas]:
if variable in [
control.name for control in self.baseline_module_config.controls
]:
series.dropna(inplace=True)
series = _add_step_to_data(s=series)
series.dropna(inplace=True)
# Plot the data
try:
df_sim = self.df_simulation[
self.intersection_mpcs_sim[variable][
self.simulator_module_config.module_id
]
]
fig.add_trace(
go.Scatter(
name=self.simulator_agent_config.id,
x=df_sim.index,
y=df_sim,
mode="lines",
line=self.LINE_PROPERTIES[self.simulator_agent_config.id],
zorder=2,
)
)
except KeyError:
# E.g. when the simulator variable name was not found from the intersection
pass
fig.add_trace(
go.Scatter(
name=self.baseline_agent_config.id,
x=series_bas.index,
y=series_bas,
mode="lines",
line=self.LINE_PROPERTIES[self.baseline_agent_config.id]
| {"dash": "dash"},
zorder=3,
)
)
fig.add_trace(
go.Scatter(
name=self.neg_flex_agent_config.id,
x=series_neg.index,
y=series_neg,
mode="lines",
line=self.LINE_PROPERTIES[self.neg_flex_agent_config.id]
| {"dash": "dash"},
zorder=4,
)
)
fig.add_trace(
go.Scatter(
name=self.pos_flex_agent_config.id,
x=series_pos.index,
y=series_pos,
mode="lines",
line=self.LINE_PROPERTIES[self.pos_flex_agent_config.id]
| {"dash": "dash"},
zorder=4,
)
)
# Get the data for the bounds
def _get_mpc_series(var_type: str, var_name: str) -> pd.Series:
return self.df_baseline[(var_type, var_name)].xs(0, level=1)
def _get_bound(var_name: str) -> Optional[pd.Series]:
if var_name in self.df_baseline.columns.get_level_values(1):
try:
bound = _get_mpc_series(var_type="variable", var_name=var_name)
except KeyError:
bound = _get_mpc_series(var_type="parameter", var_name=var_name)
else:
bound = None
return bound
df_lb = None
df_ub = None
for custom_bound in self.custom_bounds:
if variable == custom_bound.for_variable:
df_lb = _get_bound(custom_bound.lower_bound)
df_ub = _get_bound(custom_bound.upper_bound)
if variable in [
control.name for control in self.baseline_module_config.controls
]:
df_lb = _get_mpc_series(var_type="lower", var_name=variable)
df_ub = _get_mpc_series(var_type="upper", var_name=variable)
# Plot bounds
if df_lb is not None:
fig.add_trace(
go.Scatter(
name="Lower bound",
x=df_lb.index,
y=df_lb,
mode="lines",
line=self.LINE_PROPERTIES[self.bounds_key],
zorder=1,
)
)
if df_ub is not None:
fig.add_trace(
go.Scatter(
name="Upper bound",
x=df_ub.index,
y=df_ub,
mode="lines",
line=self.LINE_PROPERTIES[self.bounds_key],
zorder=1,
)
)
return fig
def _plot_flexibility_kpi(self, fig: go.Figure, variable: str) -> go.Figure:
"""Plot the flexibility kpi.
Args:
fig: the figure to be updated
variable: the kpi variable to be plotted
Returns:
The updated figure
"""
df_ind = self.df_indicator.xs(0, level=1)
# if the variable only has NaN, don't plot
if df_ind[self.kpi_names_pos[variable]].isna().all():
return
fig.add_trace(
go.Scatter(
name=self.label_positive,
x=df_ind.index,
y=df_ind[self.kpi_names_pos[variable]],
mode="lines+markers",
line=self.LINE_PROPERTIES[self.pos_flex_agent_config.id],
)
)
fig.add_trace(
go.Scatter(
name=self.label_negative,
x=df_ind.index,
y=df_ind[self.kpi_names_neg[variable]],
mode="lines+markers",
line=self.LINE_PROPERTIES[self.neg_flex_agent_config.id],
)
)
return fig
def _plot_market_results(self, fig: go.Figure, variable: str) -> go.Figure:
"""Plot the market results.
Args:
fig: the figure to be updated
variable: the variable to be plotted
Returns:
The updated figure
"""
df_flex_market_index = self.df_market.index.droplevel("time")
if variable in self.df_market.columns:
fig.add_trace(
go.Scatter(
x=df_flex_market_index,
y=self.df_market[variable],
mode="lines+markers",
line=self.LINE_PROPERTIES[self.pos_flex_agent_config.id],
)
)
else:
pos_var = f"pos_{variable}"
neg_var = f"neg_{variable}"
fig.add_trace(
go.Scatter(
name=self.label_positive,
x=df_flex_market_index,
y=self.df_market[pos_var],
mode="lines+markers",
line=self.LINE_PROPERTIES[self.pos_flex_agent_config.id],
)
)
fig.add_trace(
go.Scatter(
name=self.label_negative,
x=df_flex_market_index,
y=self.df_market[neg_var],
mode="lines+markers",
line=self.LINE_PROPERTIES[self.neg_flex_agent_config.id],
)
)
return fig
def _get_characteristic_times(self, at_time_step: float) -> (float, float, float):
"""Get the characteristic times.
Args:
at_time_step: the time at which we want to get the characteristic times
Returns:
market_time, prep_time and flex_event_duration
"""
df_characteristic_times = self.df_indicator.xs(0, level="time")
rel_market_time = (
df_characteristic_times.loc[at_time_step, glbs.MARKET_TIME]
/ TIME_CONVERSION[self.current_timescale_of_data]
)
rel_prep_time = (
df_characteristic_times.loc[at_time_step, glbs.PREP_TIME]
/ TIME_CONVERSION[self.current_timescale_of_data]
)
flex_event_duration = (
df_characteristic_times.loc[at_time_step, glbs.FLEX_EVENT_DURATION]
/ TIME_CONVERSION[self.current_timescale_of_data]
)
return rel_market_time, rel_prep_time, flex_event_duration
def _mark_time(
self, fig: go.Figure, at_time_step: float, line_prop: dict
) -> go.Figure:
fig.add_vline(x=at_time_step, line=line_prop, layer="below")
return fig
def _mark_characteristic_times(
self, fig: go.Figure, offer_time: Union[float, int] = 0, line_prop: dict = None
) -> go.Figure:
"""Add markers of the characteristic times to the plot for a time step.
Args:
fig: the figure to plot the results into
offer_time: When to show the markers
line_prop: the graphic properties of the lines as in plotly
Returns:
The updated figure
"""
if line_prop is None:
line_prop = self.LINE_PROPERTIES[self.characteristic_times_current_key]
try:
(
rel_market_time,
rel_prep_time,
flex_event_duration,
) = self._get_characteristic_times(offer_time)
self._mark_time(fig=fig, at_time_step=offer_time, line_prop=line_prop)
self._mark_time(
fig=fig,
at_time_step=offer_time + rel_market_time,
line_prop=line_prop,
)
self._mark_time(
fig=fig,
at_time_step=offer_time + rel_prep_time + rel_market_time,
line_prop=line_prop,
)
self._mark_time(
fig=fig,
at_time_step=offer_time
+ rel_prep_time
+ rel_market_time
+ flex_event_duration,
line_prop=line_prop,
)
except KeyError:
pass # No data of characteristic times available, e.g. if offer accepted
return fig
def _mark_characteristic_times_of_accepted_offers(self, fig: go.Figure) -> go.Figure:
"""Add markers of the characteristic times for accepted offers to the plot."""
if self.df_market is not None:
if (
self.df_market["status"]
.isin(
[
OfferStatus.ACCEPTED_NEGATIVE.value,
OfferStatus.ACCEPTED_POSITIVE.value,
]
)
.any()
):
df_accepted_offers = self.df_market["status"].str.contains(
pat="OfferStatus.accepted"
)
for i in df_accepted_offers.index.to_list():
if df_accepted_offers[i]:
fig = self._mark_characteristic_times(
fig=fig,
offer_time=i[0],
line_prop=self.LINE_PROPERTIES[
self.characteristic_times_accepted_key
],
)
return fig
def _create_plot(
self,
variable: str,
at_time_step: float,
show_accepted_characteristic_times: bool = True,
show_current_characteristic_times: bool = True,
zoom_to_offer_window: bool = False,
zoom_to_prediction_interval: bool = False,
) -> go.Figure:
"""Create a plot for one variable
Args:
variable: the variable to plot
at_time_step: the time_step to show the mpc predictions
and the characteristic times
show_accepted_characteristic_times: whether to show
the accepted characteristic times
show_current_characteristic_times: whether to show
the current characteristic times
zoom_to_offer_window: whether to zoom to offer window
zoom_to_prediction_interval: wether to zoom to prediction interval
Returns:
The created figure
"""
# Create the figure
fig = go.Figure()
self._mark_time(fig=fig, at_time_step=at_time_step, line_prop={"color": "green"})
if show_accepted_characteristic_times:
self._mark_characteristic_times_of_accepted_offers(fig=fig)
# Plot variable
if variable in self.df_baseline_stats.columns:
self._plot_mpc_stats(fig=fig, variable=variable)
elif variable in self.intersection_mpcs_sim.keys():
self._plot_one_mpc_variable(
fig=fig, variable=variable, time_step=at_time_step
)
if show_current_characteristic_times:
self._mark_characteristic_times(fig=fig, offer_time=at_time_step)
elif any(variable in label for label in self.df_indicator.columns):
self._plot_flexibility_kpi(fig=fig, variable=variable)
elif any(variable in label for label in self.df_market.columns):
self._plot_market_results(fig=fig, variable=variable)
else:
raise ValueError(f"No plotting function found for variable {variable}")
# Set layout
if zoom_to_offer_window:
try:
(
rel_market_time,
rel_prep_time,
flex_event_duration,
) = self._get_characteristic_times(at_time_step)
ts = (
self.baseline_module_config.time_step
/ TIME_CONVERSION[self.current_timescale_of_data]
)
xlim_left = at_time_step
xlim_right = (
at_time_step
+ rel_market_time
+ rel_prep_time
+ flex_event_duration
+ 4 * ts
)
except KeyError:
# No data of characteristic times available for this time step,
# fall back to default zoom
xlim_left = self.df_simulation.index[0]
xlim_right = (
self.df_simulation.index[-1] + self.df_baseline.index[-1][-1]
)
elif zoom_to_prediction_interval:
xlim_left = at_time_step
xlim_right = at_time_step + self.df_baseline.index[-1][-1]
else:
xlim_left = self.df_simulation.index[0]
xlim_right = (
self.df_simulation.index[-1] + self.df_baseline.index[-1][-1]
)
fig.update_layout(
yaxis_title=variable,
xaxis_title=f"Time in {self.current_timescale_of_data}",
xaxis_range=[xlim_left, xlim_right],
height=350,
margin=dict(t=20, b=20),
)
fig.update_xaxes(
dtick=round(self.baseline_module_config.prediction_horizon / 6)
* self.baseline_module_config.time_step
/ TIME_CONVERSION[self.current_timescale_of_data]
)
fig.update_yaxes(tickformat="~r")
return fig
def _create_layout(self) -> list:
"""Create the layout for the Dash app.
Returns:
The layout as a list of Dash components.
"""
return [
html.H1("Results"),
html.H3("Settings"),
html.Div(
children=[
html.Code(f"{option}: {setting}, ")
for option, setting in self.baseline_module_config.optimization_backend[
"discretization_options"
].items()
]
),
# Options
html.Div(
children=[
html.H3("Options"),
html.Div(
children=[
dcc.Checklist(
id="accepted_characteristic_times",
options=[
{
"label": "Show characteristic times (accepted)",
"value": True,
}
],
value=[True],
style={
"display": "inline-block",
"padding-right": "10px",
},
),
dcc.Checklist(
id="current_characteristic_times",
options=[
{
"label": "Show characteristic times (current)",
"value": True,
}
],
value=[True],
style={
"display": "inline-block",
"padding-right": "10px",
},
),
],
),
html.Div(
children=[
dcc.Checklist(
id="zoom_to_offer_window",
options=[
{
"label": "Zoom to flexibility offer window",
"value": False,
}
],
style={
"display": "inline-block",
"padding-right": "10px",
},
),
dcc.Checklist(
id="zoom_to_prediction_interval",
options=[
{
"label": "Zoom to mpc prediction interval",
"value": False,
}
],
style={"display": "inline-block"},
),
],
),
# Time input
html.Div(
children=[
html.H3(
children=f"Time:",
style={
"display": "inline-block",
"padding-right": "10px",
},
),
dcc.Input(
id="time_typing",
type="number",
min=0,
max=1,
value=0, # will be updated in the callback
style={"display": "inline-block"},
),
dcc.Dropdown(
id="time_unit",
options=get_args(TimeConversionTypes),
value=self.current_timescale_input,
style={
"display": "inline-block",
"verticalAlign": "middle",
"padding-left": "10px",
"width": "100px",
},
),
],
),
dcc.Slider(
id="time_slider",
min=0,
max=1,
value=0, # will be updated in the callback
tooltip={"placement": "bottom", "always_visible": True},
marks=None,
updatemode="drag",
),
],
style={
"width": "88%",
"padding-left": "0%",
"padding-right": "12%",
# Make the options sticky to the top of the page
"position": "sticky",
"top": "0",
"overflow-y": "visible",
"z-index": "100",
"background-color": "white",
},
),
# Container for the graphs, will be updated in the callback
html.Div(id="graphs_container_variables", children=[]),
]
def _register_callbacks(self, app: Dash) -> None:
"""Register all callbacks for the Dash app.
Args:
app: The Dash application instance to register callbacks on.
"""
# Callbacks
# Update the time value or the time unit
@app.callback(
Output(component_id="time_slider", component_property="value"),
Output(component_id="time_slider", component_property="min"),
Output(component_id="time_slider", component_property="max"),
Output(component_id="time_slider", component_property="step"),
Output(component_id="time_typing", component_property="value"),
Output(component_id="time_typing", component_property="min"),
Output(component_id="time_typing", component_property="max"),
Output(component_id="time_typing", component_property="step"),
Input(component_id="time_typing", component_property="value"),
Input(component_id="time_slider", component_property="value"),
Input(component_id="time_unit", component_property="value"),
)
def update_time_index_of_input(
time_typing: float, time_slider: float, time_unit: TimeConversionTypes
) -> (int, float, float, float, int, float, float, float):
# get trigger id
trigger_id = ctx.triggered[0]["prop_id"].split(".")[0]
# Get the value for the sliders
if trigger_id == "time_slider":
value = time_slider
elif trigger_id == "time_unit":
value = (
time_typing
* TIME_CONVERSION[self.current_timescale_input]
/ TIME_CONVERSION[time_unit]
)
else:
value = time_typing
# Convert the index to the given time unit if necessary
if trigger_id == "time_unit":
self.convert_timescale_of_dataframe_index(to_timescale=time_unit)
# Get the index for the slider types
times = self.df_baseline.index.get_level_values(0).unique()
minimum = times[0]
maximum = times[-1]
step = times[1] - times[0]
self.current_timescale_input = time_unit
return (value, minimum, maximum, step, value, minimum, maximum, step)
# Update the graphs
@app.callback(
Output(
component_id="graphs_container_variables", component_property="children"
),
Input(component_id="time_typing", component_property="value"),
Input(
component_id="accepted_characteristic_times", component_property="value"
),
Input(
component_id="current_characteristic_times", component_property="value"
),
Input(component_id="zoom_to_offer_window", component_property="value"),
Input(
component_id="zoom_to_prediction_interval", component_property="value"
),
)
def update_graph(
at_time_step: float,
show_accepted_characteristic_times: bool,
show_current_characteristic_times: bool,
zoom_to_offer_window: bool,
zoom_to_prediction_interval: bool,
) -> list[dcc.Graph]:
"""Update all graphs based on the options and slider values"""
figs = []
for variable in self.plotting_variables:
fig = self._create_plot(
variable=variable,
at_time_step=at_time_step,
show_accepted_characteristic_times=show_accepted_characteristic_times,
show_current_characteristic_times=show_current_characteristic_times,
zoom_to_offer_window=zoom_to_offer_window,
zoom_to_prediction_interval=zoom_to_prediction_interval,
)
figs.append(dcc.Graph(id=f"graph_{variable}", figure=fig))
return figs
[docs] def create_app(
self, custom_bounds: Union[CustomBound, list[CustomBound]] = None
) -> Dash:
"""Create and return the Dash app without running it.
This method sets up the entire app layout and callbacks but doesn't
block by calling app.run(). This is useful for testing.
Args:
custom_bounds: optional arguments to show the comfort bounds
Returns:
The configured Dash application instance.
"""
if custom_bounds is None:
self.custom_bounds = []
elif isinstance(custom_bounds, CustomBound):
self.custom_bounds = [custom_bounds]
else:
self.custom_bounds = custom_bounds
# Create the app
app = Dash(__name__ + "_flexibility", title="Flexibility Results")
app.layout = self._create_layout()
# Register callbacks
self._register_callbacks(app)
return app
[docs] def show(self, custom_bounds: Union[CustomBound, list[CustomBound]] = None):
"""Show the dashboard in a web browser containing:
-- Statistics of the MPCs solver
-- The states, controls, and the power variable of the MPCs and the simulator
-- KPIs of the flexibility quantification
-- Markings of the characteristic flexibility times
Args:
custom_bounds: optional arguments to show the comfort bounds
"""
app = self.create_app(custom_bounds=custom_bounds)
# Run the app
if self.port:
port = self.port
else:
port = get_port()
webbrowser.open_new_tab(f"http://localhost:{port}")
app.run(debug=False, port=port)