Source code for agentlib_flexquant.optimization_backends.constrained_cia

import pydantic
import numpy as np
from agentlib.core.errors import OptionalDependencyError
from agentlib_mpc.optimization_backends.casadi_.minlp_cia import CasADiCIABackend
from agentlib_mpc.optimization_backends.casadi_.core.casadi_backend import CasadiBackendConfig
from agentlib_mpc.data_structures.mpc_datamodels import MINLPVariableReference, MPCVariable
from agentlib_flexquant.data_structures.globals import full_trajectory_suffix
from agentlib_mpc.optimization_backends.casadi_.core.discretization import Results

try:
    import pycombina
except ImportError:
    raise OptionalDependencyError(
        used_object="Pycombina",
        dependency_install=".\ after cloning pycombina. Instructions: "
        "https://pycombina.readthedocs.io/en/latest/install.html#",
    )


[docs]class ConstrainedCIABackendConfig(CasadiBackendConfig): market_time: int = pydantic.Field( default=900, ge=0, unit="s", description="Time for market interaction", ) use_rounding: bool = pydantic.Field( default=False, description="If True, CIA is skipped and plain rounding is used.", ) full_controls_dict: dict = pydantic.Field( default={}, description="Holds a key value pair for each full control of the Baseline", )
[docs] class Config: # Explicitly set this to allow additional fields in the derived class extra = "forbid"
[docs]class ConstrainedCasADiCIABackend(CasADiCIABackend): var_ref: MINLPVariableReference config_type = ConstrainedCIABackendConfig def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs)
[docs] def solve(self, now: float, current_vars: dict[str, MPCVariable]) -> Results: # collect and format inputs mpc_inputs = self._get_current_mpc_inputs(agent_variables=current_vars, now=now) # solve NLP with relaxed binaries relaxed_results = self.discretization.solve(mpc_inputs) if self.config.use_rounding: b_rel = [relaxed_results[var] for var in self.var_ref.binary_controls] b_rel_np = np.transpose(np.vstack(b_rel)) # List to collect all rounded/overwritten binary arrays binary_arrays = [] # constrain shadow MPCs to values of baseline for time<market_time for bin_con, bin_rel in zip(self.var_ref.binary_controls, b_rel_np): cons = self.get_baseline_binary_solution(bin_con) # round binaries binary_array = np.round(bin_rel) if cons is not None: # Determine how many sample times are before the market time sample_time = self.config.discretization_options.time_step market_time = self.config.market_time num_samples_before_market = int(market_time / sample_time) # Overwrite the market time entries with baseline values for i in range(num_samples_before_market): time_point = i * sample_time if time_point in cons.index: binary_array[i] = cons.loc[time_point] binary_arrays.append(binary_array) # Stack all binary arrays back together (transpose to match expected shape) binary_array = np.transpose(np.vstack(binary_arrays)) else: relaxed_binary_array = self.make_binary_array(full_results=relaxed_results) binary_array = self.do_pycombina(b_rel=relaxed_binary_array) mpc_inputs_new = self.constrain_binary_inputs( mpc_inputs_old=mpc_inputs, binary_array=binary_array, ) # solve NLP with fixed binaries full_results_final = self.discretization.solve(mpc_inputs_new) self.save_rel_result_df(relaxed_results, now=now) self.save_result_df(full_results_final, now=now) return full_results_final
[docs] def do_pycombina(self, b_rel: np.array) -> np.array: grid = self.discretization.grid(self.system.binary_controls).copy() grid.append(grid[-1] + self.config.discretization_options.time_step) binapprox = pycombina.BinApprox( t=grid, b_rel=b_rel, ) # constrain shadow MPCs to values of baseline for time<market_time for bin_con in self.var_ref.binary_controls: cons = self.get_baseline_binary_solution(bin_con) if cons is not None: last_idx = 0 for idx, value in cons.items(): # constrain every timestep before market_time # with values of baseline binapprox.set_valid_controls_for_interval( (last_idx, idx), [value, 1 - value] ) last_idx = idx bnb = pycombina.CombinaBnB(binapprox) bnb.solve( use_warm_start=False, max_cpu_time=15, verbosity=0, ) b_bin = binapprox.b_bin # if there is only one mode, we created a dummy mode which we remove now if len(self.var_ref.binary_controls) == 1: b_bin = b_bin[0, :].reshape(1, -1) return b_bin
[docs] def get_baseline_binary_solution(self, bin_con): # check for baseline or shadow MPC if not self.config.full_controls_dict: # if baseline, return return None name = bin_con + full_trajectory_suffix # if shadow MPC, get current value send by baseline if self.config.full_controls_dict[name] is not None: cons = self.config.full_controls_dict[name] # the index of constraints starts at the absolute current environment # time, while the market time is relative time on mpc horizon cons.index -= cons.index[0] # get the constraints in the market time cons = cons[cons.index <= self.config.market_time] return cons