import warnings
from ast import literal_eval
import datetime
from pathlib import Path
from typing import NewType, Literal, Union, Optional, Iterable
import pandas as pd
from pandas.api.types import is_float_dtype
import numpy as np
from agentlib_mpc.data_structures import mpc_datamodels
from agentlib_mpc.utils import TimeConversionTypes, TIME_CONVERSION
SimulationTime = NewType("SimulationTime", float)
[docs]def load_admm(file: Union[Path, str]) -> pd.DataFrame:
return load_mpc(file)
[docs]def load_mpc(file: Union[Path, str]) -> pd.DataFrame:
df = pd.read_csv(file, index_col=[0], header=[0, 1])
new_ind = [literal_eval(i) for i in df.index]
df.index = pd.MultiIndex.from_tuples(new_ind)
return df
[docs]def load_mpc_stats(results_file: Union[str, Path]) -> Optional[pd.DataFrame]:
stats_file = mpc_datamodels.stats_path(results_file)
try:
df = pd.read_csv(stats_file, index_col=0)
except Exception:
return None
if is_float_dtype(df.index):
return df
new_ind = [literal_eval(i) for i in df.index]
df.index = pd.MultiIndex.from_tuples(new_ind)
return df
[docs]def load_sim(file: Path, causality=None) -> pd.DataFrame:
df = pd.read_csv(file, header=[0, 1, 2], index_col=0)
if causality:
df = df[causality]
return df.droplevel(level=1, axis=1)
return df.droplevel(level=2, axis=1).droplevel(level=0, axis=1)
[docs]def convert_multi_index(
data: pd.DataFrame, convert_to: Union[TimeConversionTypes, Literal["datetime"]]
):
"""Converts an index of an MPC or ADMM results Dataframe to a different unit,
assuming it is passed in seconds."""
# last = data.index.nlevels - 1 # should be 1 for mpc, 2 for admm
outer = convert_index(convert_to, data.index.unique(0))
return data.set_index(
data.index.set_levels(outer, level=0)
) # .set_levels(inner, level=last)
[docs]def convert_index(
convert_to: Union[TimeConversionTypes, Literal["datetime"]], index: pd.Index
):
"""
Converts an index from seconds to datetime or another unit
Args:
convert_to: unit, e.g. minutes, hours, datetime
index: pandas index object
Returns:
"""
if convert_to == "datetime":
return pd.to_datetime(index.astype(int), unit="s")
else:
return index / TIME_CONVERSION[convert_to]
[docs]def mpc_at_time_step(
data: pd.DataFrame,
time_step: float,
variable=None,
variable_type="variable",
index_offset: Union[float, Literal["auto"], bool] = True,
) -> pd.DataFrame:
"""
Gets the results of an optimization at a time step.
Args:
data: The multi-indexed results data frame from the mpc
time_step: The time step from which results should be shown.
If no exact match, shows closest.
variable: If specified, only returns results
with regard to a certain variable.
variable_type: The type of the variable provided (parameter, variable, lower, ...)
index_offset: Determines how the index will be updated when loading the data.
The offset will be subtracted from the time-index. This is useful for results
of realtime systems, where the time value with be a unix time stamp and we want
to cut the number down to something understandable. For example, if the time
index (level 0 of the input Dataframe) is [105, 115, 125] and we give an
index_offset of 100, the data will be handled as if the index was [5, 15, 25].
If "auto" or True is provided as an argument, the index will be modified to
start at 0. If 0 or False are provided, no modifications will be made.
Returns:
pd.DataFrame: A single-indexed Dataframe of the optimization results
at the specified time step. If variable is not specified,
returns all variables with a double column index, if it
is specified returns only values and/or bounds with
single indexed columns.
"""
# get the closest matching (outer) index matching the requested time step
data = perform_index_update(data, index_offset, admm=False)
outer_index = data.index.get_level_values(0)
idx = np.searchsorted(outer_index, time_step, side="left")
if idx > 0 and (
idx == len(outer_index)
or np.fabs(time_step - outer_index[idx - 1])
< np.fabs(time_step - outer_index[idx])
):
closest = outer_index[idx - 1]
else:
closest = outer_index[idx]
# select the data at this index and increment the inner index
if variable:
data_at_ts = data[variable_type][variable].loc[closest]
else:
data_at_ts = data.loc[closest]
data_at_ts = data_at_ts.copy()
data_at_ts.index = data_at_ts.index + closest
return data_at_ts
[docs]def admm_at_time_step(
data: Union[pd.DataFrame, pd.Series],
time_step: float = None,
variable=None,
iteration: float = -1,
index_offset: Union[float, Literal["auto"], bool] = True,
convert_to: TimeConversionTypes = "seconds",
) -> pd.DataFrame:
"""
Gets the results of an optimization at a time step.
Args:
index_offset: Determines how the index will be updated when loading the data.
The offset will be subtracted from the time-index. This is useful for results
of realtime systems, where the time value with be a unix time stamp and we want
to cut the number down to something understandable. For example, if the time
index (level 0 of the input Dataframe) is [105, 115, 125] and we give an
index_offset of 100, the data will be handled as if the index was [5, 15, 25].
If "auto" or True is provided as an argument, the index will be modified to
start at 0. If 0 or False are provided, no modifications will be made.
data: The multi-indexed results data frame from the mpc
time_step: The time step from which results should be shown.
If no exact match, shows closest.
variable: If specified, only returns results
with regard to a certain variable.
iteration: Specifies, from which inner ADMM iteration data should be
from. If negative, counts from last iteration. Default -1.
convert_to: Whether the data should be converted to datetime, minutes etc.
Returns:
A single-indexed Dataframe of the optimization results
at the specified time step. If variable is not specified,
returns all variables with a double column index, if it
is specified returns only values and/or bounds with
single indexed columns.
"""
# get the closest matching (outer) index matching the requested time step
data = convert_multi_index(data, convert_to=convert_to)
if not convert_to == "datetime":
data = perform_index_update(data, index_offset, admm=True)
outer_index = data.index.get_level_values(0)
if time_step is None:
time_step = 0 if not convert_to == "datetime" else datetime.datetime.now()
idx = np.searchsorted(outer_index, time_step, side="left")
if idx > 0 and (
idx == len(outer_index)
or np.fabs(time_step - outer_index[idx - 1])
< np.fabs(time_step - outer_index[idx])
):
closest = outer_index[idx - 1]
else:
closest = outer_index[idx]
data_at_ts = data.loc[closest]
# if iteration provided is negative we count backwards (like list indexing)
if iteration < 0:
number_of_admm_iterations = data_at_ts.index.get_level_values(0).max()
iteration = number_of_admm_iterations + 1 + iteration
# select the data at this index and increment the inner index
if variable:
data_at_it = data_at_ts.xs(variable, axis=1, level="variable").loc[iteration]
else:
data_at_it = data_at_ts.loc[iteration]
data_at_it = data_at_it.copy()
if convert_to == "datetime":
index = convert_index(convert_to, data_at_it.index + closest.value // 1e9)
else:
index = convert_index(convert_to, data_at_it.index) + closest
data_at_it.index = index
return data_at_it
[docs]def get_number_of_iterations(data: pd.DataFrame) -> dict[SimulationTime, int]:
"""Returns the number of iterations at each time instance of the ADMM simulation."""
ind_full = data.index
ind = ind_full.droplevel(2).drop_duplicates()
time_stamps = ind.droplevel(1).drop_duplicates()
result = {}
for t in time_stamps:
_slice = ind.get_loc(t)
result[SimulationTime(t)] = len(ind[_slice])
return result
[docs]def get_time_steps(data: pd.DataFrame) -> Iterable[float]:
"""Returns the time steps at which an MPC step was performed."""
return sorted(set(data.index.get_level_values(0)))
[docs]def first_vals_at_trajectory_index(data: Union[pd.DataFrame, pd.Series]):
"""Gets the first values at each time step of a results trajectory."""
time_steps = get_time_steps(data)
first_vals = pd.Series(
{time_step: data.loc[time_step].iloc[0] for time_step in time_steps}
)
if np.nan in first_vals:
warnings.warn(
"Nan detected in first values. You may need to select the "
"correct column of the DataFrame and drop NaN before."
)
return first_vals
[docs]def last_vals_at_trajectory_index(data: Union[pd.DataFrame, pd.Series]):
"""Gets the last values at each time step of a results trajectory."""
time_steps = get_time_steps(data)
# -1 covers for parameters (only one entry) and states (-horizon until 0)
last_vals = pd.Series(
{time_step: data.at[time_step].iloc[-1] for time_step in time_steps}
)
if np.nan in last_vals:
warnings.warn(
"Nan detected in first values. You may need to select the "
"correct column of the DataFrame and drop NaN before."
)
return last_vals