Coverage for agentlib_flexquant/optimization_backends/constrained_cia.py: 95%

41 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-10-20 14:09 +0000

1import pydantic 

2import numpy as np 

3from agentlib.core.errors import OptionalDependencyError 

4from agentlib_mpc.optimization_backends.casadi_.minlp_cia import CasADiCIABackend 

5from agentlib_mpc.optimization_backends.casadi_.core.casadi_backend import CasadiBackendConfig 

6from agentlib_mpc.data_structures.mpc_datamodels import MINLPVariableReference 

7from agentlib_flexquant.data_structures.globals import full_trajectory_suffix 

8 

9try: 

10 import pycombina 

11except ImportError: 

12 raise OptionalDependencyError( 

13 used_object="Pycombina", 

14 dependency_install=".\ after cloning pycombina. Instructions: " 

15 "https://pycombina.readthedocs.io/en/latest/install.html#", 

16 ) 

17 

18 

19class ConstrainedCIABackendConfig(CasadiBackendConfig): 

20 market_time: int = pydantic.Field( 

21 default=900, 

22 ge=0, 

23 unit="s", 

24 description="Time for market interaction", 

25 ) 

26 

27 class Config: 

28 # Explicitly set this to allow additional fields in the derived class 

29 extra = "forbid" 

30 

31 

32class ConstrainedCasADiCIABackend(CasADiCIABackend): 

33 var_ref: MINLPVariableReference 

34 config_type = ConstrainedCIABackendConfig 

35 

36 def __init__(self, *args, **kwargs): 

37 super().__init__(*args, **kwargs) 

38 

39 def do_pycombina(self, b_rel: np.array) -> np.array: 

40 

41 grid = self.discretization.grid(self.system.binary_controls).copy() 

42 grid.append(grid[-1] + self.config.discretization_options.time_step) 

43 

44 binapprox = pycombina.BinApprox( 

45 t=grid, 

46 b_rel=b_rel, 

47 ) 

48 

49 # constrain shadow MPCs to values of baseline for time<market_time 

50 for bin_con in self.var_ref.binary_controls: 

51 # check for baseline or shadow MPC 

52 if ( 

53 bin_con + full_trajectory_suffix 

54 not in self.model.get_input_names() 

55 ): 

56 continue 

57 # if shadow MPC, get current value send by baseline and constrain pycombina 

58 elif ( 

59 self.model.get_input( 

60 bin_con + full_trajectory_suffix 

61 ).value 

62 is not None 

63 ): 

64 cons = self.model.get_input( 

65 bin_con + full_trajectory_suffix 

66 ).value 

67 # the index of constraints starts at the absolute current environment time, while the market time is relative time on mpc horizon 

68 cons.index -= cons.index[0] 

69 # get the constraints in the market time 

70 cons = cons[cons.index <= self.config.market_time] 

71 last_idx = 0 

72 for idx, value in cons.items(): 

73 # constrain ever timestep before market_time with values of baseline 

74 binapprox.set_valid_controls_for_interval( 

75 (last_idx, idx), [value, 1 - value] 

76 ) 

77 last_idx = idx 

78 

79 bnb = pycombina.CombinaBnB(binapprox) 

80 bnb.solve( 

81 use_warm_start=False, 

82 max_cpu_time=15, 

83 verbosity=0, 

84 ) 

85 b_bin = binapprox.b_bin 

86 

87 # if there is only one mode, we created a dummy mode which we remove now 

88 if len(self.var_ref.binary_controls) == 1: 

89 b_bin = b_bin[0, :].reshape(1, -1) 

90 

91 return b_bin