Source code for pysolvegn.solver

"""
pysolve-gn - Robust Gauss-Newton Least Squares Solver.
Copyright (C) 2026 Artezaru, artezaru.github@proton.me

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program.  If not, see <https://www.gnu.org/licenses/>.
"""

from typing import Optional, Sequence, Union, Tuple, Callable, Dict, List, Any
from numbers import Real, Integral
from numpy.typing import ArrayLike

import numpy
import scipy
import time

from .implemented import (
    _IMPLEMENTED_HISTORY_DETAILS,
)

from .rho_functions import (
    _build_tilde_R_and_tilde_J,
)

from .study_optimization import study_jacobian
from .term import Term


[docs] def solve( terms: Sequence[Term], x0: ArrayLike, *, max_iteration: Optional[Integral] = None, max_time: Optional[Real] = None, ftol: Optional[Real] = None, xtol: Optional[Real] = None, gtol: Optional[Real] = None, atol: Optional[Real] = None, ptol: Optional[Real] = None, callback_func: Optional[Callable[[Dict], bool]] = None, update_func: Optional[Callable[[numpy.ndarray, numpy.ndarray], ArrayLike]] = None, verbosity: Integral = 0, naninf: bool = True, history: bool = False, history_details: Optional[Union[str, Sequence[str]]] = None, ) -> Tuple[numpy.ndarray, Optional[List[Dict]]]: r""" Function to solve the least squares problem using the Gauss-Newton method with robust cost functions. The function accepts multiple terms in the least squares problem, each with its own residual function, Jacobian function, weight, and loss function solving the following optimization problem : .. math:: \min_{\mathbf{p}} \frac{1}{2} \sum_{i} w_i \sum_j \rho_i\left(\| \mathbf{r}_{i,j}(\mathbf{p}) \|^2\right) where :math:`w_i` is a weight for each sub least squares problem, and :math:`\rho_i` is a robust cost function for each sub least squares problem. .. seealso:: For more details on notations for the optimization problem and the algorithm, please refer to the mathematical section of the documentation. The cost of each term defined by the residuals is estimated as : .. math:: \frac{1}{2} \sum_j \rho_i\left(\| \mathbf{R}_{i,j}(\mathbf{p}) \|^2\right) .. important:: For terms defined directly by a Hessian and gradient, only a local quadratic model of the cost is available. Since this model is defined up to an arbitrary constant, an absolute cost value cannot be uniquely determined. For reporting consistency, the cost contribution of these terms is set to 0. Parameters ---------- x0: ArrayLike The initial guess for the parameters of the optimization with shape (n_parameters,). The array will not be modified by this function, a copy of the parameters will be used for the optimization process. max_iteration: Optional[Integral], optional (default=None) Stop criterion by the number of iterations. The optimization process is stopped when the number of iterations exceeds ``max_iteration``. If None, no limit on the number of iterations is considered. max_time: Optional[Real], optional (default=None) Stop criterion by the elapsed time of optimization. The optimization process is stopped when the time elapsed since the beginning of the optimization exceeds ``max_time`` seconds. If None, no limit on the computation time is considered. ftol: Optional[Real], optional (default=None) Stop criterion by the change of the cost function value. The optimization process is stopped when ``dF < ftol * F`` where F is the cost function value and dF is the change of the cost function value between two iterations. If None, this criterion is not considered. xtol: Optional[Real], optional (default=None) Stop criterion by the change of the parameters. The optimization process is stopped when ``||dp|| < xtol * (xtol + ||p||)`` where p is the parameters and dp is the change of the parameters between two iterations. If None, this criterion is not considered. gtol: Optional[Real], optional (default=None) Stop criterion by the optimality value. The optimization process is stopped when the optimality verifies ``norm(g, ord=numpy.inf) < gtol`` where g is the scaled second term. If None, this criterion is not considered. atol: Optional[Real], optional (default=None) Stop criterion by the absolute cost function value. The optimization process is stopped when ``F < atol`` where F is the cost function value. If None, this criterion is not considered. ptol: Optional[Real], optional (default=None) Stop criterion by the change of the parameters value. The optimization process is stopped when ``norm(dp, ord=numpy.inf) < ptol`` where dp is the change of the parameters. If None, this criterion is not considered. callback_func: Optional[Callable[[Dict], bool]], optional (default=None) A callback function that is called at the end of each iteration of the optimization process. The callback function should take a dictionary similar to the one returned in the history of the optimization process as input. If the return value of the callback function is True (or None), the optimization process will continue. If the return value is False, the optimization process will be stopped. If None, no callback function is used. update_func: Optional[Callable[[numpy.ndarray, numpy.ndarray], ArrayLike]], optional (default=None) A function that is called at the end of each iteration of the optimization process to compute the parameters for the next iteration. The function should take the current parameters and update ``(p, dp)`` as inputs and return the parameters with shape (n_parameters,) to use for the next iteration. If None, the parameters for the next iteration will be computed as ``p + dp`` where p is the current parameters and dp is the update computed by solving the linear system. verbosity : Integral, optional (default=0) The level of verbosity for logging the optimization process. 0: No logging 1: Log only the final results of the optimization process. 2: Log the results at each iteration of the optimization process. 3: Details logging for debugging purposes. naninf: bool, optional (default=True) If True, the optimization process will be stopped if any NaN or Inf value is encountered in the cost function value or parameters during the optimization process. If False, the optimization process will continue. history: bool (optional, default=False) If True, the function will also return a tuple containing the history of the optimization process. history_details: Optional[Union[str, Sequence[str]]], optional (default=None) The details to include in the history of the optimization process. Can be a single string or a sequence of strings. See the Notes section for the available details. By default, the history will include : "iteration", "elapsed_time", "parameters", "delta_params", "delta_cost", "cost", and "optimality" details. For a specified list of details, only the details in the list below will be included in the history. Returns ------- parameters: numpy.ndarray The parameters that minimize the least squares problem with shape (n_parameters,). history: List[Dict], optional Only returned if ``history`` is True. A list containing the history of the optimization process. Each element of the list is a dictionary containing the keys described below. Notes ----- The history contains the following keys (if requested in ``history_details``): - "iteration": Integer representing the iteration number. - "elapsed_time": Float representing the time elapsed since the beginning of the optimization process in seconds. - "parameters": Numpy array representing the parameters at the current iteration. - "delta_params": Numpy array representing the change of parameters between the current and previous iteration (Only for iterations > 0). - "costs": List of floats representing the cost function value for each term in the least squares problem at the current iteration compute as :math:`\frac{1}{2} \sum_j \rho_i\left(\| \mathbf{R}_{i,j}(\mathbf{p}) \|^2\right)` for each term :math:`i`. - "cost": Float representing the cost function value at the current iteration computed as :math:`\frac{1}{2} \sum_i w_i \sum_j \rho_i\left(\| \mathbf{R}_{i,j}(\mathbf{p}) \|^2\right)`. - "delta_cost": Float representing the change of the cost function value between the current and previous iteration (Only for iterations > 0). - "optimality": Float representing the optimality value at the current iteration computed as ``norm(b, ord=numpy.inf)`` where b is the scaled second term. - "residuals": A list of numpy arrays representing the residuals for each term in the least squares problem at the current iteration (Only for rJ terms). - "second_term": The scaled second term :math:`\mathbf{g}` of the linear system at the current iteration. - "hessian": The Hessian approximation :math:`\mathbf{H}` of the linear system at the current iteration. The optimization process is performed by iteratively solving a linearized version of the least squares problem at each iteration for different values of the regularization factor, and then selecting the optimal regularization factor based on the L-curve analysis. The step of each loop iteration is as follows: .. code-block:: text While NOT converged: 1. Build the system ``H Δp = -g``. 2. Call the 'callback' function and STOP if it returns False. 3. Check the stopping criteria and STOP if any of them is satisfied. 4. If CONTINUE solve the linear system ``H Δp = -g`` for Δp. 5. Update the parameters as ``p = p + Δp`` (or using 'update_func' if provided). Version ------- - 0.1.0: Initial version for the solver method. """ # Check the validity of the input arguments and raise appropriate errors if necessary. if not isinstance(terms, Sequence): raise ValueError("terms must be a sequence of Term objects.") if len(terms) == 0: raise ValueError("terms sequence cannot be empty.") for term in terms: if not isinstance(term, Term): raise ValueError( "All elements of terms must be instances of the Term class." ) x0 = numpy.asarray(x0, dtype=numpy.float64) if x0.ndim != 1: raise ValueError(f"x0 must be a 1D array, got {x0.ndim} dimensions.") if max_iteration is not None: if not isinstance(max_iteration, Integral): raise ValueError("max_iteration must be an integer.") max_iteration = int(max_iteration) if max_iteration < 0: raise ValueError("max_iteration must be a non-negative integer.") if max_time is not None: if not isinstance(max_time, Real): raise ValueError("max_time must be a real number.") max_time = float(max_time) if max_time < 0: raise ValueError("max_time must be a non-negative real number.") if ftol is not None: if not isinstance(ftol, Real): raise ValueError("ftol must be a real number.") ftol = float(ftol) if ftol <= 0: raise ValueError("ftol must be a positive real number.") if xtol is not None: if not isinstance(xtol, Real): raise ValueError("xtol must be a real number.") xtol = float(xtol) if xtol <= 0: raise ValueError("xtol must be a positive real number.") if gtol is not None: if not isinstance(gtol, Real): raise ValueError("gtol must be a real number.") gtol = float(gtol) if gtol <= 0: raise ValueError("gtol must be a positive real number.") if atol is not None: if not isinstance(atol, Real): raise ValueError("atol must be a real number.") atol = float(atol) if atol <= 0: raise ValueError("atol must be a positive real number.") if ptol is not None: if not isinstance(ptol, Real): raise ValueError("ptol must be a real number.") ptol = float(ptol) if ptol <= 0: raise ValueError("ptol must be a positive real number.") if all( criterion is None for criterion in [max_iteration, max_time, ftol, xtol, gtol, atol, ptol] ): raise ValueError( "At least one stopping criterion must be provided " "(max_iteration, max_time, ftol, xtol, gtol, atol, or ptol)." ) if callback_func is not None and not callable(callback_func): raise ValueError("callback_func must be a callable function.") if update_func is not None and not callable(update_func): raise ValueError("update_func must be a callable function.") if not isinstance(verbosity, Integral): raise ValueError("verbosity must be an integer.") verbosity = int(verbosity) if verbosity < 0 or verbosity > 3: raise ValueError("verbosity must be an integer between 0 and 3 inclusive.") if not isinstance(naninf, bool): raise ValueError("naninf must be a boolean value.") naninf = bool(naninf) if not isinstance(history, bool): raise ValueError("history must be a boolean value.") history = bool(history) if history_details is None: history_details = [ "iteration", "elapsed_time", "parameters", "delta_params", "delta_cost", "cost", "optimality", ] if isinstance(history_details, str): history_details = [history_details] if not isinstance(history_details, Sequence): raise ValueError("history_details must be a string or a sequence of strings.") for detail in history_details: if detail not in _IMPLEMENTED_HISTORY_DETAILS and detail != "all": raise ValueError( f"Invalid history detail: {detail}. Valid details are: {_IMPLEMENTED_HISTORY_DETAILS} and 'all'." ) if "all" in history_details: history_details = list(_IMPLEMENTED_HISTORY_DETAILS) # -- Select Computation -- _compute_history = history or callback_func is not None _compute_cost = ( ftol is not None or atol is not None or verbosity >= 2 or ( _compute_history and any(detail in ["cost", "costs"] for detail in history_details) ) ) _compute_optimality = ( gtol is not None or verbosity >= 2 or ( _compute_history and any(detail in ["optimality"] for detail in history_details) ) ) _compute_delta_norm = ( xtol is not None or ptol is not None or verbosity >= 2 or ( _compute_history and any(detail in ["delta_norm"] for detail in history_details) ) ) _compute_convergence_analysis = verbosity >= 3 # -- Solver Implementation -- _n_terms = len(terms) _n_parameters = x0.shape[0] _parameters = x0.copy() _iteration = 0 _history = [] _starting_time = time.time() _end_flag = False # ! (end-flag is used to BREAK the optimization loop) _end_message = "" _delta = None _delta_norm = None _last_total_cost = None r_vectors: List[numpy.ndarray] = None J_matrices: List[numpy.ndarray] = None H_matrices: List[numpy.ndarray] = None g_vectors: List[numpy.ndarray] = None costs: List[float] = None total_cost: float = None optimality: float = None rhos_arrays: List[Tuple[numpy.ndarray]] = None Hessian = None second_term = None # Printing the header for the optimization process logging based on the verbosity level. _printed_detail = f"" _printed_header = f"" if verbosity >= 2: _printed_detail += ( f"\nIndividual costs [rJ term]: C_i = 0.5 * ρ(||r_i||^2) " f"\nCost: C = sum(w_i * C_i) " f"\nStep norm: ||Δp|| " f"\nOptimality: ||g|| " ) _printed_header += ( f"\n{'Iteration':^10} {'Total time (s)':^15} {'Cost C':^15} {'ΔC':^15}" + f" {'||Δp||_2':^15} {'||g||_∞':^15}" ) if verbosity >= 3: _printed_header += ( f" {'||Δp||_∞':^15} {'||g||_2':^15} {'Cond(H)':^15} {'Trace(H)':^15}" + " ".join( [f"{'Cost C_' + str(i):^15}" for i in range(_n_terms)], ) ) if verbosity >= 2: print(_printed_detail) print(_printed_header) # Start the optimization loop while True: # ! (ensure end-flag activation for term "break" statement) # Compute r, J, H, g for each term and the cost of each term if necessary r_vectors = [] J_matrices = [] rhos_arrays = [] H_matrices = [] g_vectors = [] for term in terms: if term.type == "rJ": r_vectors.append(term.r_func(_parameters)) J_matrices.append(term.J_func(_parameters)) g_vectors.append(None) H_matrices.append(None) # compute rho rho' and rho'' for each residual of the term if term.loss == "linear": rhos_arrays.append((r_vectors[-1] ** 2, 1.0, 0.0)) else: rhos_arrays.append(term.rho_function(r_vectors[-1])) elif term.type == "gH": r_vectors.append(None) J_matrices.append(None) rhos_arrays.append(None) g_vectors.append(term.g_func(_parameters)) H_matrices.append(term.H_func(_parameters)) # Build the modified residuals and Jacobian for each rJ term based on the robust cost function costs = [] if _compute_cost: for i in range(_n_terms): if terms[i].type == "rJ": costs.append(0.5 * numpy.sum(rhos_arrays[i][0])) elif terms[i].type == "gH": costs.append(0) total_cost = sum( w * c for w, c in zip([term.weight for term in terms], costs) ) for i in range(_n_terms): if terms[i].type == "rJ" and terms[i].loss != "linear": r_vectors[i], J_matrices[i] = _build_tilde_R_and_tilde_J( r_vectors[i], J_matrices[i], rhos_arrays[i][1], rhos_arrays[i][2], ) # Assemble the Hessian and second term for the linear system Hessian = sum( w * (J.T @ J) if terms[i].type == "rJ" else w * H for i, (w, J, H) in enumerate( zip([term.weight for term in terms], J_matrices, H_matrices) ) ) second_term = sum( w * (J.T @ r) if terms[i].type == "rJ" else w * g for i, (w, J, r, g) in enumerate( zip( [term.weight for term in terms], J_matrices, r_vectors, g_vectors, ) ) ) if _compute_optimality: optimality = numpy.linalg.norm(second_term, ord=numpy.inf) if _compute_convergence_analysis: optimality_2 = numpy.linalg.norm(second_term, ord=2) if scipy.sparse.issparse(Hessian): cond_Hessian = scipy.sparse.linalg.norm( Hessian ) * scipy.sparse.linalg.norm(scipy.sparse.linalg.inv(Hessian)) trace_Hessian = Hessian.diagonal().sum() else: cond_Hessian = numpy.linalg.cond(Hessian) trace_Hessian = numpy.trace(Hessian) _elapsed_time = time.time() - _starting_time # Update history if _compute_history: _h = {} if "iteration" in history_details: _h["iteration"] = _iteration if "parameters" in history_details: _h["parameters"] = _parameters.copy() if "delta_params" in history_details: _h["delta_params"] = _delta.copy() if _delta is not None else None if "delta_cost" in history_details: if _last_total_cost is None: _h["delta_cost"] = None else: _h["delta_cost"] = total_cost - _last_total_cost if "elapsed_time" in history_details: _h["elapsed_time"] = _elapsed_time if "costs" in history_details: _h["costs"] = costs if "cost" in history_details: _h["cost"] = total_cost if "optimality" in history_details: _h["optimality"] = optimality if "residuals" in history_details: _h["residuals"] = [ r.copy() if r is not None else None for r in r_vectors ] if "second_term" in history_details: _h["second_term"] = second_term.copy() if "hessian" in history_details: _h["hessian"] = Hessian.copy() _history.append(_h) _printed_row = f"" if verbosity >= 2: if _delta is None: strdp2 = f"{'':^15}" strdpinf = f"{'':^15}" else: strdp2 = f"{_delta_norm:^15.3e}" strdpinf = f"{numpy.linalg.norm(_delta, ord=numpy.inf):^15.3e}" if _last_total_cost is None: dC_str = f"{'':^15}" else: dC = total_cost - _last_total_cost dC_str = f"{dC:^15.3e}" _printed_row += ( f"{_iteration:^10} {_elapsed_time:^15.3e} {total_cost:^15.3e} {dC_str}" + f" {strdp2} {optimality:^15.3e}" ) if verbosity >= 3: _printed_row += ( f" {strdpinf} {optimality_2:^15.3e}" + f" {cond_Hessian:^15.3e} {trace_Hessian:^15.3e}" + " ".join([f"{c:^15.3e}" for c in costs]) ) if verbosity >= 2: print(_printed_row) # Convergence analysis and stopping criteria check if ftol is not None and _last_total_cost is not None: dF = abs(total_cost - _last_total_cost) if dF < ftol * total_cost: _end_flag = True _end_message += f"\n[ftol] Convergence achieved (df < ftol * F) : {dF} < {ftol * total_cost}." if atol is not None and total_cost < atol: _end_flag = True _end_message += ( f"\n[atol] Convergence achieved (F < atol) : {total_cost} < {atol}." ) if gtol is not None and optimality < gtol: _end_flag = True _end_message += f"\n[gtol] Convergence achieved (optimality < gtol) : {optimality} < {gtol}." if xtol is not None and _delta_norm is not None: _parameters_norm = numpy.linalg.norm(_parameters, ord=2) if _delta_norm < xtol * (xtol + _parameters_norm): _end_flag = True _end_message += f"\n[xtol] Convergence achieved (||Δp|| < xtol * (xtol + ||p||)) : {_delta_norm} < {xtol * (xtol + _parameters_norm)}." if ptol is not None and _delta_norm is not None and _delta_norm < ptol: _end_flag = True _end_message += f"\n[ptol] Convergence achieved (||Δp|| < ptol) : {_delta_norm} < {ptol}." if max_iteration is not None and _iteration >= max_iteration: _end_flag = True _end_message += f"\n[max_iterations] Maximum number of iterations reached: {max_iteration}." if max_time is not None and _elapsed_time >= max_time: _end_flag = True _end_message += ( f"\n[max_time] Maximum computation time reached: {max_time} seconds." ) if naninf: if total_cost is not None and ( numpy.isnan(total_cost) or numpy.isinf(total_cost) ): _end_flag = True _end_message += f"\n[naninf] Optimization stopped due to NaN or Inf value in cost: {total_cost}." elif numpy.any(numpy.isnan(_parameters)) or numpy.any( numpy.isinf(_parameters) ): _end_flag = True _end_message += f"\n[naninf] Optimization stopped due to NaN or Inf value in parameters." if callback_func is not None: callback_result = callback_func(_history[-1]) if callback_result is not None and not isinstance(callback_result, bool): raise ValueError("Callback function must return a boolean or None.") if callback_result is False: _end_flag = True _end_message += ( "\n[callback] Optimization stopped by callback function." ) if _end_flag: if verbosity >= 1: print(_end_message) break if scipy.sparse.issparse(Hessian): _delta = scipy.sparse.linalg.spsolve(Hessian, -second_term) else: _delta = numpy.linalg.solve(Hessian, -second_term) # Update parameters if update_func is not None: _new_parameters = update_func(_parameters, _delta) _new_parameters = numpy.asarray(_new_parameters, dtype=numpy.float64) if not _new_parameters.ndim == 1 or _new_parameters.size != _n_parameters: raise ValueError( f"update_func must return a 1D array with shape ({_n_parameters},)." ) _delta = _new_parameters - _parameters _parameters = _new_parameters else: _parameters = _parameters + _delta if _compute_delta_norm: _delta_norm = numpy.linalg.norm(_delta, ord=2) if _compute_cost: _last_total_cost = total_cost _iteration += 1 if history: return _parameters, _history else: return _parameters.copy()