Source code for rtc_tools_meta.meta_optimization_problem
import logging
from typing import Dict, List, Tuple, Union
import casadi as ca
import numpy as np
from rtctools.data import pi
from rtctools.optimization.optimization_problem import OptimizationProblem
from rtctools.optimization.pi_mixin import PIMixin
logger = logging.getLogger("rtctools")
[docs]class MetaOptimizationProblem(OptimizationProblem):
"""An optimization problem of aggregated submodels and a global objective
Use this class to define a meta optimization problem. A meta problem
consists of ``submodels``, which interact via ``variable_mappings``.
To optimize the meta model, call the ``optimize()`` method. This will:
1. Optimize aggregated model
2. Impose interface constraints on submodels
3. Optimize submodels
Results will be located in their respective submodel ``output`` directories.
"""
@property
def meta_level(self):
"""String identifying the meta model"""
return self.__class__.__name__
@property
def submodels(self) -> Dict[str, OptimizationProblem]:
"""Dictionary of submodels
This variable is where we store the submodel **instances**. The models
are indexed by a unique short name. For example:
.. code-block:: python
# Meta Problem Optimizing ModelA and ModelB
submodels = {
"ModelA": collect_model_instance('ModelA', path_to_a),
"ModelB": collect_model_instance('ModelB', path_to_b),
}
"""
return {}
@property
def variable_mappings(self) -> List[Tuple[str, str, str]]:
"""Mappings that constrain variables in separate submodels to be equal
To define how submodels interact with each other, we set
``variable_mappings`` equal to a list of 3-tuples. The 3-tuples
have the structure (var1, var2, sign). For example:
.. code-block:: python
# Meta Problem Optimizing ModelA and ModelB
variable_mappings = [
(
"ModelA::some_variable",
"ModelB::another_variable",
"-", # Either "-" (if vars are equal) or "+" (if they sum to 0)
)
]
"""
return []
[docs] def pop_submodel(self, variable: str) -> Tuple[str, str]:
"""Function to split submodel name from variable
The submodel a variable is stored in is indicated by preceding that
variable's name with the model name, seperated by ``::``. For example,
.. code-block:: python
>>> self.pop_submodel("submodel_name::variable_name")
("submodel_name", "variable_name")
>>> self.pop_submodel("submodelA_name::submodelB_name::variable_name")
("submodelA_name", "submodelB_name::variable_name") # Nested models
:param variable: Submodel name and variable name in a single string
:return: Submodel name and variable name as separate strings
:raises: AssertionError, ValueError
"""
try:
split_index = variable.index("::")
except ValueError:
raise ValueError(
f"Metamodel {self.meta_level} cannot find submodel name in {variable}"
)
submodel = variable[:split_index]
assert (
submodel in self.submodels
), f"Metamodel {self.meta_level} has no submodel {submodel}"
return submodel, variable[split_index + 2 :]
[docs] def pre(self):
"""Execute pre() method of all submodels"""
for model in self.submodels.values():
model.pre()
def _update_meta_level(self, meta_level: str):
"""
Set meta run level on submodels
:param meta_level: String identifying a meta model
"""
logger.debug(
f"MetaOptimizationProblem: {self.meta_level} "
f"activating run level {meta_level}"
)
for model in self.submodels.values():
model._update_meta_level(meta_level)
[docs] def post(self):
"""Execute post() method of all submodels"""
for model in self.submodels.values():
model.post()
[docs] def transcribe(self):
"""Transcribe all submodels and aggregate them into a single nlp problem"""
# Transcribe and aggregate subproblems
# Store list of tuples: (discrete, lbx, ubx, lbg, ubg, x0, nlp)
transcribed_output = [model.transcribe() for model in self.submodels.values()]
assert transcribed_output, "No submodels found"
transcribed_output_iter = zip(*transcribed_output)
# Construct expressions for constraints to make model interfaces consistant
extra_constraints = []
for mapping in self.variable_mappings:
states_0 = self.state_vector(mapping[0])
states_1 = self.state_vector(mapping[1])
operation = mapping[2]
if operation == "-":
extra_constraints.append(states_0 - states_1)
elif operation == "+":
extra_constraints.append(states_0 + states_1)
extra_constraints = ca.vertcat(*extra_constraints)
# Merge the submodels into a single nlp problem
discrete = np.concatenate(next(transcribed_output_iter), axis=None)
lbx = ca.vertcat(*next(transcribed_output_iter))
ubx = ca.vertcat(*next(transcribed_output_iter))
lbg = ca.vertcat(*[ca.veccat(*bg) for bg in next(transcribed_output_iter)])
ubg = ca.vertcat(*[ca.veccat(*bg) for bg in next(transcribed_output_iter)])
x0 = ca.vertcat(*next(transcribed_output_iter))
sub_nlp = next(transcribed_output_iter)
nlp = {
"x": ca.vertcat(*[snlp["x"] for snlp in sub_nlp]),
"g": ca.vertcat(*[snlp["g"] for snlp in sub_nlp]),
}
# Add the meta objective
nlp["f"] = self.meta_objective()
# Save the start indices so we know which model the results belong to
self.__submodel_start_indices = np.cumsum(
[0] + [snlp["x"].size1() for snlp in sub_nlp], dtype=np.int
)
# Append constraints to make model interfaces consistant
nlp["g"] = ca.vertcat(nlp["g"], extra_constraints)
lbg = ca.vertcat(lbg, ca.DM(np.zeros(extra_constraints.size1())))
ubg = ca.vertcat(ubg, ca.DM(np.zeros(extra_constraints.size1())))
return discrete, lbx, ubx, [lbg], [ubg], x0, nlp
[docs] def optimize(
self, preprocessing=True, postprocessing=True, log_solver_failure_as_error=True
):
logger.info("MetaOptimizationProblem: Entering optimize()")
# Set meta level on all subproblems
self._update_meta_level(self.meta_level)
# Do any/all preprocessing
if preprocessing:
self.pre()
# Run meta optimization problem
success = super().optimize(preprocessing=False, postprocessing=False)
if success:
logger.log(
logging.INFO,
"MetaOptimizationProblem: Solver succeeded with status {}".format(
self.solver_stats["return_status"]
),
)
else:
logger.log(
logging.INFO,
"MetaOptimizationProblem: Solver failed with status {}".format(
self.solver_stats["return_status"]
),
)
# Done
logger.info("MetaOptimizationProblem: Done with Optimization")
# Extract results and fix submodel boundary conditions
logger.info("MetaOptimizationProblem: Fixing Subproblem Interfaces")
self._update_meta_solver_output(self.solver_output)
self._fix_interface_variables(self.interface_variables)
# Run all submodels
# TODO: Make this optional? (e.g. write fixed variable dict to pickle?)
for model_id, model in self.submodels.items():
logger.info(f"MetaOptimizationProblem: running submodel {model_id}")
submodel_success = model.optimize(preprocessing=False, postprocessing=False)
if not submodel_success:
raise Exception(f"{model_id} failed.")
# Do any/all postprocessing
if postprocessing:
self.post()
return success
@property
def interface_variables(self) -> Tuple[str]:
"""Tuple of interface variables
*Note: This property is extracted from* ``self.variable_mappings``
"""
local_interface_vars = []
for mapping in self.variable_mappings:
local_interface_vars.append(mapping[0])
local_interface_vars.append(mapping[1])
return tuple(local_interface_vars)
[docs] def meta_objective(self) -> ca.MX:
"""Symbolic expression to be minimized
For example, here is a quadratic minimization objective:
.. code-block:: python
def meta_objective(self):
return ca.sum1(self.states_in("ModelA::var_name")) ** 2.0
"""
return ca.MX(0)
def _fix_interface_variables(self, interface_variables: Tuple[str]):
"""Fixes interface variables of submodels
Force subproblems to take the meta-optimal values of boundary conditions
between submodels so that the submodels can be run independently
:param interface_variables: Tuple of strings that map to submodels and
variable names
"""
logger.debug(f"MetaOptimizationProblem: fixing variables {interface_variables}")
interface_variables = tuple(
self.pop_submodel(variable) for variable in interface_variables
)
for model in dict(interface_variables):
self.submodels[model]._fix_interface_variables(
tuple(int_var for mod, int_var in interface_variables if mod == model)
)
def _update_meta_solver_output(self, meta_solver_output: np.ndarray):
"""Records results from a meta optimization
Takes the solver output from a meta optimization and redistributes it to
the correct submodels
:param meta_solver_output: 1D numpy array of results calculated by self.optimize()
"""
# Test for missing __submodel_start_indices, and calculate it if missing
try:
submodel_start_indices = self.__submodel_start_indices
except AttributeError:
_transcribe = self.transcribe()
submodel_start_indices = self.__submodel_start_indices
# Recursively apply meta solver output on submodels
for i, model in enumerate(self.submodels.values()):
start = submodel_start_indices[i]
end = submodel_start_indices[i + 1]
model._update_meta_solver_output(meta_solver_output[start:end].copy())
def state_vector(self, variable: str, *args, **kwargs) -> ca.MX:
submodel, variable = self.pop_submodel(variable)
return self.submodels[submodel].state_vector(variable, *args, **kwargs)
def states_in(self, variable: str, *args, **kwargs) -> ca.MX:
submodel, variable = self.pop_submodel(variable)
return self.submodels[submodel].states_in(variable, *args, **kwargs)
def state_at(self, variable: str, *args, **kwargs) -> ca.MX:
submodel, variable = self.pop_submodel(variable)
return self.submodels[submodel].state_at(variable, *args, **kwargs)
def integral(self, variable, *args, **kwargs):
submodel, variable = self.pop_submodel(variable)
return self.submodels[submodel].integral(variable, *args, **kwargs)
def control_at(self, variable, *args, **kwargs):
submodel, variable = self.pop_submodel(variable)
return self.submodels[submodel].control_at(variable, *args, **kwargs)
def der_at(self, variable, *args, **kwargs):
submodel, variable = self.pop_submodel(variable)
return self.submodels[submodel].der_at(variable, *args, **kwargs)
def algebraic_states(self, *args, **kwargs):
raise NotImplementedError
def alias_relation(self, *args, **kwargs):
raise NotImplementedError
def controls(self, *args, **kwargs):
raise NotImplementedError
def dae_residual(self, *args, **kwargs):
raise NotImplementedError
def dae_variables(self, *args, **kwargs):
raise NotImplementedError
def der(self, *args, **kwargs):
raise NotImplementedError
def differentiated_states(self, *args, **kwargs):
raise NotImplementedError
def discretize_controls(self, *args, **kwargs):
raise NotImplementedError
def discretize_states(self, *args, **kwargs):
raise NotImplementedError
def extra_variable(self, *args, **kwargs):
raise NotImplementedError
def extract_controls(self, *args, **kwargs):
raise NotImplementedError
def extract_results(self, *args, **kwargs):
raise NotImplementedError
def extract_states(self, *args, **kwargs):
raise NotImplementedError
def solver_input(self, *args, **kwargs):
raise NotImplementedError
def variable(self, *args, **kwargs):
raise NotImplementedError