"""The main way to define and interact with a Pycollo optimal control problem.
This module contains the main class that the user will interact with to define
and run their optimal control problem when working with Pycollo. Terminolgy is
loosely defined in accordance with "Betts, JT (2010). Practical Methods for
Optimal Control and Estimiation Using Nonlinear Programming (Second Edition)".
See the ``Notes`` section for a full list of symbols used.
Notes:
------
* t: independent parameter (time).
* x = [y, u, q, t0, tf, s]: vector of problem variables.
* y: vector state variables (which are functions of time).
* u: vector control variables (which are functions of time).
* q: vector of integral constraints.
* t0: the initial time of the (single) phase.
* tf: the final time of the (single) phase.
* s: vector of static parameter variables (which are phase-independent).
* J: objective function.
* g: gradient of the objective function w.r.t. x.
* L: Lagrangian of the objective function and constraints.
* H: Hessian of the Lagrangian.
* c = [zeta, gamma, rho, beta]: vector of constraints.
* zeta: vector of defect constraints.
* gamma: vector of path constraints.
* rho: vector of integral constraints.
* beta: vector of endpoint constraints.
* G: Jacobian of the constaints.
* n = len(x): number of free variables
* m = len(c): number of constraints
"""
from typing import AnyStr, Iterable, Tuple
import numpy as np
# from ordered_set import OrderedSet
import sympy as sym
from .backend import BACKENDS
from .bounds import EndpointBounds
from .guess import EndpointGuess
from .phase import Phase
from .scaling import EndpointScaling
from .settings import Settings
from .typing import OptionalSymsType
from .utils import check_sym_name_clash, console_out, format_as_data_container
__all__ = ["OptimalControlProblem"]
[docs]
class OptimalControlProblem():
"""The main class for Pycollo optimal control problems"""
[docs]
def __init__(self,
name,
parameter_variables=None,
*,
bounds=None,
guess=None,
scaling=None,
endpoint_constraints=None,
objective_function=None,
settings=None,
auxiliary_data=None,
):
"""Initialise the optimal control problem with user-passed objects.
Args:
phases (:obj:`Iterable` of :class:`~.Phase`, optional): Phases to be
associated with the optimal control problem at initialisation.
Defaults to None.
parameter_variables ()
"""
self.name = name
self.settings = settings
self._is_initialised = False
self._forward_dynamics = False
self._s_var_user = ()
self._b_con_user = ()
self._phases = ()
self.parameter_variables = parameter_variables
self.endpoint_constraints = endpoint_constraints
self.objective_function = objective_function
self.auxiliary_data = dict(auxiliary_data) if auxiliary_data else {}
self.scaling = scaling
self.bounds = bounds
self.guess = guess
@property
def name(self) -> str:
"""The name associated with the optimal control problem. For setter
behaviour, the supplied `name` is cast to a str.
The name is not strictly needed, however it improves the usefulness of
Pycollo console output. This is particularly useful in cases where the
user may wish to instantiate multiple :obj:`OptimalControlProblem`
objects within a single script, or instantiates other Pycollo objects
without providing a valid :class:`~.optimal_control_problem` argument for them
to be linked to at initialisation.
"""
return self._name
@name.setter
def name(self, name: AnyStr):
self._name = str(name)
@property
def phases(self) -> Tuple[Phase, ...]:
"""A tuple of all phases associated with the optimal control problem.
:meth:`~.phase_number` are integers beginning at 1 and are
ordered corresponding to the order that they were added to the optimal
control problem. As Python uses zero-based indexing the phase numbers
do not directly map to the indexes of phases within :class:`~.phases`.
Phases are however ordered sequentially corresponding to the
cronological order they were added to the optimal control problem.
"""
return self._phases
[docs]
def add_phase(self, phase: Iterable[Phase]) -> Phase:
"""Add an already instantiated :class:`~.Phase` to this optimal control problem.
This method is needed as :class:`~.phases` is read only ("private") and
therefore users cannot manually add :class:`~.Phase` objects to an optimal
control problem. :class:`~.phases` is required to be read only as it is an
iterable of :class:`~.Phase` objects and must be protected from accidental errors
introduced by user interacting with it incorrectly.
Args:
phase (Phase): The phase to be added to the optimal control problem
Returns:
the phase that has been added. It is the same
"""
phase.optimal_control_problem = self
return self.phases[-1]
[docs]
def add_phases(self, phases: Iterable[Phase]) -> Tuple[Phase, ...]:
"""Associate multiple already instantiated :class:`~.Phase` objects.
This is a convinience method to allow the user to add multiple :class:`~.Phase`
objects to the optimal control problem in one go.
"""
return tuple(self.add_phase(phase) for phase in phases)
[docs]
def new_phase(self,
name: str,
state_variables: OptionalSymsType = None,
control_variables: OptionalSymsType = None) -> Phase:
"""Create a new :obj:`~.Phase` and add to this optimal control problem.
Provides the same behaviour as manually creating a :obj:`.~Phase` called
`phase` and calling :meth:`~.add_phase`.
"""
new_phase = Phase(name, optimal_control_problem=self,
state_variables=state_variables,
control_variables=control_variables)
return new_phase
def new_phase_like(self, phase_for_copying: Phase, name: str, **kwargs):
return phase_for_copying.create_new_copy(name, **kwargs)
[docs]
def new_phases_like(self,
phase_for_copying: Phase,
number: int,
names: Iterable[str],
**kwargs) -> Tuple[Phase, ...]:
"""Creates multiple new phases like an already instantiated phase.
For a list of key word arguments and default values see the docstring
for the :meth:`~.new_phase_like` method.
Returns:
The newly instantiated and associated phases.
Raises:
ValueError: If the same number of names are not supplied as the
number of specified new phases.
"""
if len(names) != int(number):
msg = ("Must supply a name for each new phase.")
raise ValueError(msg)
new_phases = (self.new_phase_like(phase_for_copying, name, **kwargs)
for name in names)
return new_phases
@property
def number_phases(self) -> int:
"""Number of phases associated with this optimal control problem."""
return len(self.phases)
@property
def time_symbol(self):
"""
Raises:
NotImplementedError: Whenever called to inform the user that these
types of problem are not currently supported.
"""
msg = ("Pycollo do not currently support dynamic, path or integral "
"constraints that are explicit functions of continuous time.")
raise NotImplementedError(msg)
@property
def parameter_variables(self):
return self._s_var_user
@parameter_variables.setter
def parameter_variables(self, s_vars):
self._s_var_user = format_as_data_container("ParameterVariables", s_vars)
_ = check_sym_name_clash(self._s_var_user)
@property
def number_parameter_variables(self):
len(self._s_var_user)
@property
def endpoint_constraints(self):
return self._b_con_user
@endpoint_constraints.setter
def endpoint_constraints(self, b_cons):
self._b_con_user = format_as_data_container(
"EndpointConstraints",
b_cons,
use_named=False,
)
@property
def number_endpoint_constraints(self):
return len(self._b_con_user)
@property
def objective_function(self):
return self._J_user
@objective_function.setter
def objective_function(self, J):
self._J_user = sym.sympify(J)
# self._forward_dynamics = True if self._J_user == 1 else False
@property
def auxiliary_data(self):
return self._aux_data_user
@auxiliary_data.setter
def auxiliary_data(self, aux_data):
self._aux_data_user = dict(aux_data)
@property
def bounds(self):
return self._bounds
@bounds.setter
def bounds(self, bounds):
if bounds is None:
self._bounds = EndpointBounds(optimal_control_problem=self)
else:
self._bounds = bounds
self._bounds._ocp = self
@property
def guess(self):
return self._guess
@guess.setter
def guess(self, guess):
if guess is None:
self._guess = EndpointGuess(optimal_control_problem=self)
else:
self._guess = guess
self._guess.optimal_control_problem = self
@property
def scaling(self):
return self._scaling
@scaling.setter
def scaling(self, scaling):
if scaling is None:
self._scaling = EndpointScaling(optimal_control_problem=self)
else:
self._scaling = scaling
self._scaling._ocp = self
@property
def mesh_iterations(self):
return self._mesh_iterations
@property
def num_mesh_iterations(self):
return len(self._backend.mesh_iterations)
@property
def settings(self):
return self._settings
@settings.setter
def settings(self, settings):
if settings is None:
self._settings = Settings(optimal_control_problem=self)
else:
self._settings = settings
self._settings._ocp = self
@property
def solution(self):
return self._backend.mesh_iterations[-1].solution
[docs]
def initialise(self):
"""Initialise the optimal control problem before solving.
The initialisation of the optimal control problem involves the
following stages:
* 1. Check that for each phase there are the same number of state variables as there are state equations.
* 2. Check that for each phase the user-supplied bounds are permissible, and check point bounds on optimal control problem.
* 3. Process bounds that need processing.
"""
self._console_out_initialisation_message()
self._check_variables_and_equations()
self._initialise_backend()
self._check_problem_and_phase_bounds()
self._initialise_scaling()
self._initialise_quadrature()
self._postprocess_backend()
self._initialise_initial_mesh()
self._check_initial_guess()
self._initialise_first_mesh_iteration()
self._is_initialised = True
def _console_out_initialisation_message(self):
msg = "Initialising optimal control problem."
console_out(msg, heading=True)
def _check_variables_and_equations(self):
for phase in self.phases:
phase._check_variables_and_equations()
msg = "Phase variables and equations checked."
console_out(msg)
def _initialise_backend(self):
self._backend = BACKENDS.dispatcher[self.settings.backend](self)
msg = "Backend initialised."
console_out(msg)
def _check_problem_and_phase_bounds(self):
self._backend.create_bounds()
msg = "Bounds checked."
console_out(msg)
def _initialise_scaling(self):
self._backend.create_scaling()
msg = "Problem scaling initialised."
console_out(msg)
def _initialise_quadrature(self):
self._backend.create_quadrature()
msg = "Quadrature scheme initialised."
console_out(msg)
def _check_initial_guess(self):
self._backend.create_guess()
msg = "Initial guess checked."
console_out(msg)
def _postprocess_backend(self):
self._backend.postprocess_problem_backend()
msg = "Backend postprocessing complete."
console_out(msg)
def _initialise_initial_mesh(self):
self._backend.create_initial_mesh()
msg = "Initial mesh created."
console_out(msg)
def _initialise_first_mesh_iteration(self):
self._backend.create_mesh_iterations()
[docs]
def solve(self, display_progress=False):
"""Solve the optimal control problem.
If the initialisation flag is not set to True then the initialisation
method is called to initialise the optimal control problem.
Parameters:
display_progress : bool
Option for whether progress updates should be outputted to the
console during solving. Defaults to False.
"""
self._check_if_initialisation_required_before_solve()
self.mesh_tolerance_met = False
self._set_solve_options(display_progress)
tolerances_met = False
while not tolerances_met:
tolerances_met = self._solve_iteration()
self._final_output()
def _check_if_initialisation_required_before_solve(self):
"""Initialise the optimal control problem before solve if required."""
if not self._is_initialised:
self.initialise()
def _solve_iteration(self):
"""Solve a single mesh iteration.
Return:
bool
True is mesh tolerance is met or if maximum number of mesh
iterations has been reached.
"""
def tolerances_met(mesh_tolerance_met, mesh_iterations_met):
return (mesh_iterations_met or mesh_tolerance_met)
if self._backend.mesh_iterations[-1].solved:
_ = self._backend.new_mesh_iteration(self._next_iteration_mesh,
self._next_iteration_guess)
result = self._backend.mesh_iterations[-1].solve()
self._next_iteration_mesh = result.next_iteration_mesh
self._next_iteration_guess = result.next_iteration_guess
if result.mesh_tolerance_met:
self.mesh_tolerance_met = True
msg = (f"Mesh tolerance met in mesh iteration "
f"{self.num_mesh_iterations}.\n")
print(msg)
if self.num_mesh_iterations >= self.settings.max_mesh_iterations:
mesh_iterations_met = True
if not self.mesh_tolerance_met:
msg = ("Maximum number of mesh iterations reached. Pycollo "
"exiting before mesh tolerance met.\n")
print(msg)
else:
mesh_iterations_met = False
return tolerances_met(result.mesh_tolerance_met, mesh_iterations_met)
def _set_solve_options(self, display_progress):
self._display_progress = display_progress
# def _initialise(self):
# self._check_user_supplied_bounds()
# self._generate_scaling()
# self._generate_expression_graph()
# self._generate_quadrature()
# self._compile_numba_functions()
# self._check_user_supplied_initial_guess()
# # Initialise the initial mesh iterations
# self._mesh_iterations[0]._initialise_iteration(self.initial_guess)
# ocp_initialisation_time_stop = timer()
# self._ocp_initialisation_time = (ocp_initialisation_time_stop
# - ocp_initialisation_time_start)
# self._is_initialised = True
# def solve_old(self, display_progress=False):
# """Solve the optimal control problem.
# If the initialisation flag is not set to True then the initialisation
# method is called to initialise the optimal control problem.
# Parameters:
# -----------
# display_progress : bool
# Option for whether progress updates should be outputted to the
# console during solving. Defaults to False.
# """
# self._set_solve_options(display_progress)
# self._check_if_initialisation_required_before_solve()
# # Solve the transcribed NLP on the initial mesh
# new_iteration_mesh, new_iteration_guess = self._mesh_iterations[0]._solve()
# mesh_iterations_met = self._settings.max_mesh_iterations == 1
# if new_iteration_mesh is None:
# mesh_tolerance_met = True
# else:
# mesh_tolerance_met = False
# while not mesh_iterations_met and not mesh_tolerance_met:
# new_iteration = Iteration(optimal_control_problem=self, iteration_number=self.num_mesh_iterations+1, mesh=new_iteration_mesh)
# self._mesh_iterations.append(new_iteration)
# self._mesh_iterations[-1]._initialise_iteration(new_iteration_guess)
# new_iteration_mesh, new_iteration_guess = self._mesh_iterations[-1]._solve()
# if new_iteration_mesh is None:
# mesh_tolerance_met = True
# print(f'Mesh tolerance met in mesh iteration {len(self._mesh_iterations)}.\n')
# elif self.num_mesh_iterations >= self._settings.max_mesh_iterations:
# mesh_iterations_met = True
# print(f'Maximum number of mesh iterations reached. pycollo exiting before mesh tolerance met.\n')
# _ = self._final_output()
def _set_solve_options(self, display_progress):
self._display_progress = display_progress
def _check_if_initialisation_required_before_solve(self):
if not self._is_initialised:
self.initialise()
def _final_output(self):
def solution_results():
J_msg = (f'Final Objective Function Evaluation: {self._backend.mesh_iterations[-1].solution.objective:.4f}\n')
print(J_msg)
def mesh_results():
section_msg = (f'Final Number of Mesh Sections: {self.mesh_iterations[-1]._mesh._K}')
node_msg = (f'Final Number of Collocation Nodes: {self.mesh_iterations[-1]._mesh._N}\n')
print(section_msg)
print(node_msg)
def time_results():
ocp_init_time_msg = (f'Total OCP Initialisation Time: {self._ocp_initialisation_time:.4f} s')
print(ocp_init_time_msg)
self._iteration_initialisation_time = np.sum(np.array(
[iteration._initialisation_time for iteration in self._mesh_iterations]))
iter_init_time_msg = (f'Total Iteration Initialisation Time: {self._iteration_initialisation_time:.4f} s')
print(iter_init_time_msg)
self._nlp_time = np.sum(
np.array([iteration._nlp_time for iteration in self._mesh_iterations]))
nlp_time_msg = (f'Total NLP Solver Time: {self._nlp_time:.4f} s')
print(nlp_time_msg)
self._process_results_time = np.sum(np.array(
[iteration._process_results_time for iteration in self._mesh_iterations]))
process_results_time_msg = (f'Total Mesh Refinement Time: {self._process_results_time:.4f} s')
print(process_results_time_msg)
total_time_msg = (f'\nTotal Time: {self._ocp_initialisation_time + self._iteration_initialisation_time + self._nlp_time + self._process_results_time:.4f} s')
print(total_time_msg)
print('\n\n')
solved_msg = ('Optimal control problem sucessfully solved.')
console_out(solved_msg, heading=True)
solution_results()
if False:
mesh_results()
time_results()
[docs]
def __str__(self):
"""Returns name of OCP"""
return self.name
[docs]
def __repr__(self):
return f"OptimalControlProblem('{self.name}')"
[docs]
def kill():
print('\n\n')
raise ValueError
[docs]
def cout(*args):
print('\n\n')
for arg in args:
print(f'{arg}\n')