Coverage for agentlib_flexquant/optimization_backends/constrained_cia.py: 95%
41 statements
« prev ^ index » next coverage.py v7.4.4, created at 2025-10-20 14:09 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2025-10-20 14:09 +0000
1import pydantic
2import numpy as np
3from agentlib.core.errors import OptionalDependencyError
4from agentlib_mpc.optimization_backends.casadi_.minlp_cia import CasADiCIABackend
5from agentlib_mpc.optimization_backends.casadi_.core.casadi_backend import CasadiBackendConfig
6from agentlib_mpc.data_structures.mpc_datamodels import MINLPVariableReference
7from agentlib_flexquant.data_structures.globals import full_trajectory_suffix
9try:
10 import pycombina
11except ImportError:
12 raise OptionalDependencyError(
13 used_object="Pycombina",
14 dependency_install=".\ after cloning pycombina. Instructions: "
15 "https://pycombina.readthedocs.io/en/latest/install.html#",
16 )
19class ConstrainedCIABackendConfig(CasadiBackendConfig):
20 market_time: int = pydantic.Field(
21 default=900,
22 ge=0,
23 unit="s",
24 description="Time for market interaction",
25 )
27 class Config:
28 # Explicitly set this to allow additional fields in the derived class
29 extra = "forbid"
32class ConstrainedCasADiCIABackend(CasADiCIABackend):
33 var_ref: MINLPVariableReference
34 config_type = ConstrainedCIABackendConfig
36 def __init__(self, *args, **kwargs):
37 super().__init__(*args, **kwargs)
39 def do_pycombina(self, b_rel: np.array) -> np.array:
41 grid = self.discretization.grid(self.system.binary_controls).copy()
42 grid.append(grid[-1] + self.config.discretization_options.time_step)
44 binapprox = pycombina.BinApprox(
45 t=grid,
46 b_rel=b_rel,
47 )
49 # constrain shadow MPCs to values of baseline for time<market_time
50 for bin_con in self.var_ref.binary_controls:
51 # check for baseline or shadow MPC
52 if (
53 bin_con + full_trajectory_suffix
54 not in self.model.get_input_names()
55 ):
56 continue
57 # if shadow MPC, get current value send by baseline and constrain pycombina
58 elif (
59 self.model.get_input(
60 bin_con + full_trajectory_suffix
61 ).value
62 is not None
63 ):
64 cons = self.model.get_input(
65 bin_con + full_trajectory_suffix
66 ).value
67 # the index of constraints starts at the absolute current environment time, while the market time is relative time on mpc horizon
68 cons.index -= cons.index[0]
69 # get the constraints in the market time
70 cons = cons[cons.index <= self.config.market_time]
71 last_idx = 0
72 for idx, value in cons.items():
73 # constrain ever timestep before market_time with values of baseline
74 binapprox.set_valid_controls_for_interval(
75 (last_idx, idx), [value, 1 - value]
76 )
77 last_idx = idx
79 bnb = pycombina.CombinaBnB(binapprox)
80 bnb.solve(
81 use_warm_start=False,
82 max_cpu_time=15,
83 verbosity=0,
84 )
85 b_bin = binapprox.b_bin
87 # if there is only one mode, we created a dummy mode which we remove now
88 if len(self.var_ref.binary_controls) == 1:
89 b_bin = b_bin[0, :].reshape(1, -1)
91 return b_bin