Source code for rtc_tools_meta.meta_optimization_problem

import logging
from typing import Dict, List, Tuple, Union

import casadi as ca
import numpy as np

from rtctools.data import pi
from rtctools.optimization.optimization_problem import OptimizationProblem
from rtctools.optimization.pi_mixin import PIMixin

logger = logging.getLogger("rtctools")


[docs]class MetaOptimizationProblem(OptimizationProblem): """An optimization problem of aggregated submodels and a global objective Use this class to define a meta optimization problem. A meta problem consists of ``submodels``, which interact via ``variable_mappings``. To optimize the meta model, call the ``optimize()`` method. This will: 1. Optimize aggregated model 2. Impose interface constraints on submodels 3. Optimize submodels Results will be located in their respective submodel ``output`` directories. """ @property def meta_level(self): """String identifying the meta model""" return self.__class__.__name__ @property def submodels(self) -> Dict[str, OptimizationProblem]: """Dictionary of submodels This variable is where we store the submodel **instances**. The models are indexed by a unique short name. For example: .. code-block:: python # Meta Problem Optimizing ModelA and ModelB submodels = { "ModelA": collect_model_instance('ModelA', path_to_a), "ModelB": collect_model_instance('ModelB', path_to_b), } """ return {} @property def variable_mappings(self) -> List[Tuple[str, str, str]]: """Mappings that constrain variables in separate submodels to be equal To define how submodels interact with each other, we set ``variable_mappings`` equal to a list of 3-tuples. The 3-tuples have the structure (var1, var2, sign). For example: .. code-block:: python # Meta Problem Optimizing ModelA and ModelB variable_mappings = [ ( "ModelA::some_variable", "ModelB::another_variable", "-", # Either "-" (if vars are equal) or "+" (if they sum to 0) ) ] """ return []
[docs] def pop_submodel(self, variable: str) -> Tuple[str, str]: """Function to split submodel name from variable The submodel a variable is stored in is indicated by preceding that variable's name with the model name, seperated by ``::``. For example, .. code-block:: python >>> self.pop_submodel("submodel_name::variable_name") ("submodel_name", "variable_name") >>> self.pop_submodel("submodelA_name::submodelB_name::variable_name") ("submodelA_name", "submodelB_name::variable_name") # Nested models :param variable: Submodel name and variable name in a single string :return: Submodel name and variable name as separate strings :raises: AssertionError, ValueError """ try: split_index = variable.index("::") except ValueError: raise ValueError( f"Metamodel {self.meta_level} cannot find submodel name in {variable}" ) submodel = variable[:split_index] assert ( submodel in self.submodels ), f"Metamodel {self.meta_level} has no submodel {submodel}" return submodel, variable[split_index + 2 :]
[docs] def pre(self): """Execute pre() method of all submodels""" for model in self.submodels.values(): model.pre()
def _update_meta_level(self, meta_level: str): """ Set meta run level on submodels :param meta_level: String identifying a meta model """ logger.debug( f"MetaOptimizationProblem: {self.meta_level} " f"activating run level {meta_level}" ) for model in self.submodels.values(): model._update_meta_level(meta_level)
[docs] def post(self): """Execute post() method of all submodels""" for model in self.submodels.values(): model.post()
[docs] def transcribe(self): """Transcribe all submodels and aggregate them into a single nlp problem""" # Transcribe and aggregate subproblems # Store list of tuples: (discrete, lbx, ubx, lbg, ubg, x0, nlp) transcribed_output = [model.transcribe() for model in self.submodels.values()] assert transcribed_output, "No submodels found" transcribed_output_iter = zip(*transcribed_output) # Construct expressions for constraints to make model interfaces consistant extra_constraints = [] for mapping in self.variable_mappings: states_0 = self.state_vector(mapping[0]) states_1 = self.state_vector(mapping[1]) operation = mapping[2] if operation == "-": extra_constraints.append(states_0 - states_1) elif operation == "+": extra_constraints.append(states_0 + states_1) extra_constraints = ca.vertcat(*extra_constraints) # Merge the submodels into a single nlp problem discrete = np.concatenate(next(transcribed_output_iter), axis=None) lbx = ca.vertcat(*next(transcribed_output_iter)) ubx = ca.vertcat(*next(transcribed_output_iter)) lbg = ca.vertcat(*[ca.veccat(*bg) for bg in next(transcribed_output_iter)]) ubg = ca.vertcat(*[ca.veccat(*bg) for bg in next(transcribed_output_iter)]) x0 = ca.vertcat(*next(transcribed_output_iter)) sub_nlp = next(transcribed_output_iter) nlp = { "x": ca.vertcat(*[snlp["x"] for snlp in sub_nlp]), "g": ca.vertcat(*[snlp["g"] for snlp in sub_nlp]), } # Add the meta objective nlp["f"] = self.meta_objective() # Save the start indices so we know which model the results belong to self.__submodel_start_indices = np.cumsum( [0] + [snlp["x"].size1() for snlp in sub_nlp], dtype=np.int ) # Append constraints to make model interfaces consistant nlp["g"] = ca.vertcat(nlp["g"], extra_constraints) lbg = ca.vertcat(lbg, ca.DM(np.zeros(extra_constraints.size1()))) ubg = ca.vertcat(ubg, ca.DM(np.zeros(extra_constraints.size1()))) return discrete, lbx, ubx, [lbg], [ubg], x0, nlp
[docs] def optimize( self, preprocessing=True, postprocessing=True, log_solver_failure_as_error=True ): logger.info("MetaOptimizationProblem: Entering optimize()") # Set meta level on all subproblems self._update_meta_level(self.meta_level) # Do any/all preprocessing if preprocessing: self.pre() # Run meta optimization problem success = super().optimize(preprocessing=False, postprocessing=False) if success: logger.log( logging.INFO, "MetaOptimizationProblem: Solver succeeded with status {}".format( self.solver_stats["return_status"] ), ) else: logger.log( logging.INFO, "MetaOptimizationProblem: Solver failed with status {}".format( self.solver_stats["return_status"] ), ) # Done logger.info("MetaOptimizationProblem: Done with Optimization") # Extract results and fix submodel boundary conditions logger.info("MetaOptimizationProblem: Fixing Subproblem Interfaces") self._update_meta_solver_output(self.solver_output) self._fix_interface_variables(self.interface_variables) # Run all submodels # TODO: Make this optional? (e.g. write fixed variable dict to pickle?) for model_id, model in self.submodels.items(): logger.info(f"MetaOptimizationProblem: running submodel {model_id}") submodel_success = model.optimize(preprocessing=False, postprocessing=False) if not submodel_success: raise Exception(f"{model_id} failed.") # Do any/all postprocessing if postprocessing: self.post() return success
@property def interface_variables(self) -> Tuple[str]: """Tuple of interface variables *Note: This property is extracted from* ``self.variable_mappings`` """ local_interface_vars = [] for mapping in self.variable_mappings: local_interface_vars.append(mapping[0]) local_interface_vars.append(mapping[1]) return tuple(local_interface_vars)
[docs] def meta_objective(self) -> ca.MX: """Symbolic expression to be minimized For example, here is a quadratic minimization objective: .. code-block:: python def meta_objective(self): return ca.sum1(self.states_in("ModelA::var_name")) ** 2.0 """ return ca.MX(0)
def _fix_interface_variables(self, interface_variables: Tuple[str]): """Fixes interface variables of submodels Force subproblems to take the meta-optimal values of boundary conditions between submodels so that the submodels can be run independently :param interface_variables: Tuple of strings that map to submodels and variable names """ logger.debug(f"MetaOptimizationProblem: fixing variables {interface_variables}") interface_variables = tuple( self.pop_submodel(variable) for variable in interface_variables ) for model in dict(interface_variables): self.submodels[model]._fix_interface_variables( tuple(int_var for mod, int_var in interface_variables if mod == model) ) def _update_meta_solver_output(self, meta_solver_output: np.ndarray): """Records results from a meta optimization Takes the solver output from a meta optimization and redistributes it to the correct submodels :param meta_solver_output: 1D numpy array of results calculated by self.optimize() """ # Test for missing __submodel_start_indices, and calculate it if missing try: submodel_start_indices = self.__submodel_start_indices except AttributeError: _transcribe = self.transcribe() submodel_start_indices = self.__submodel_start_indices # Recursively apply meta solver output on submodels for i, model in enumerate(self.submodels.values()): start = submodel_start_indices[i] end = submodel_start_indices[i + 1] model._update_meta_solver_output(meta_solver_output[start:end].copy()) def state_vector(self, variable: str, *args, **kwargs) -> ca.MX: submodel, variable = self.pop_submodel(variable) return self.submodels[submodel].state_vector(variable, *args, **kwargs) def states_in(self, variable: str, *args, **kwargs) -> ca.MX: submodel, variable = self.pop_submodel(variable) return self.submodels[submodel].states_in(variable, *args, **kwargs) def state_at(self, variable: str, *args, **kwargs) -> ca.MX: submodel, variable = self.pop_submodel(variable) return self.submodels[submodel].state_at(variable, *args, **kwargs) def integral(self, variable, *args, **kwargs): submodel, variable = self.pop_submodel(variable) return self.submodels[submodel].integral(variable, *args, **kwargs) def control_at(self, variable, *args, **kwargs): submodel, variable = self.pop_submodel(variable) return self.submodels[submodel].control_at(variable, *args, **kwargs) def der_at(self, variable, *args, **kwargs): submodel, variable = self.pop_submodel(variable) return self.submodels[submodel].der_at(variable, *args, **kwargs) def algebraic_states(self, *args, **kwargs): raise NotImplementedError def alias_relation(self, *args, **kwargs): raise NotImplementedError def controls(self, *args, **kwargs): raise NotImplementedError def dae_residual(self, *args, **kwargs): raise NotImplementedError def dae_variables(self, *args, **kwargs): raise NotImplementedError def der(self, *args, **kwargs): raise NotImplementedError def differentiated_states(self, *args, **kwargs): raise NotImplementedError def discretize_controls(self, *args, **kwargs): raise NotImplementedError def discretize_states(self, *args, **kwargs): raise NotImplementedError def extra_variable(self, *args, **kwargs): raise NotImplementedError def extract_controls(self, *args, **kwargs): raise NotImplementedError def extract_results(self, *args, **kwargs): raise NotImplementedError def extract_states(self, *args, **kwargs): raise NotImplementedError def solver_input(self, *args, **kwargs): raise NotImplementedError def variable(self, *args, **kwargs): raise NotImplementedError