Source code for pysolvegn.solver

"""
pysolve-gn - Robust Gauss-Newton Least Squares Solver.
Copyright (C) 2026 Artezaru, artezaru.github@proton.me

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program.  If not, see <https://www.gnu.org/licenses/>.
"""

from typing import Optional, Sequence, Union, Tuple, Callable, Dict, List, Any
from numbers import Real, Integral
from numpy.typing import ArrayLike

import numpy
import scipy
import time

from .rho_functions import (
    linear_rho_at_R2,
    soft_l1_rho_at_R2,
    cauchy_rho_at_R2,
    arctan_rho_at_R2,
    _build_tilde_R_and_tilde_J,
)

from .study_optimization import study_jacobian


[docs] def solve_gauss_newton( residual_func: Union[Callable, Sequence[Callable]], jacobian_func: Union[Callable, Sequence[Callable]], parameters: ArrayLike, *, weight: Optional[Union[str, Sequence[str]]] = None, loss: Optional[Union[str, Sequence[str]]] = None, max_iterations: Optional[Integral] = None, max_time: Optional[Real] = None, ftol: Optional[Real] = None, xtol: Optional[Real] = None, gtol: Optional[Real] = None, atol: Optional[Real] = None, ptol: Optional[Real] = None, callback: Optional[Callable[[Dict], bool]] = None, update_func: Optional[Callable[[numpy.ndarray, numpy.ndarray], ArrayLike]] = None, verbosity: Integral = 0, history: bool = False, ) -> Tuple[numpy.ndarray, Tuple[Dict]]: r""" Solve the least squares problem using the Gauss-Newton method with robust cost functions. The function accepts multiple terms in the least squares problem, each with its own residual function, Jacobian function, weight, and loss function solving the following optimization problem .. math:: \min_{\mathbf{p}} \frac{1}{2} \sum_{i} w_i \sum_j \rho_i\left(\| \mathbf{R}_{i,j}(\mathbf{p}) \|^2\right) where :math:`w_i` is a weight for each sub least squares problem, and :math:`\rho_i` is a robust cost function for each sub least squares problem. For more details on the optimization problem and the algorithm, please refer to the documentation. .. note:: By nomenclature, we assume that the first sub least squares problem (i.e. :math:`i=0`) is the main least squares problem containing the data residuals, and the other sub least squares problems (i.e. :math:`i \geq 1`) are regularization terms. .. important:: At least one of the stopping criteria (i.e. ``max_iterations``, ``max_time``, ``ftol``, ``xtol``, ``gtol``, ``atol``, ``ptol``) must be provided to ensure the optimization process will stop. Parameters ---------- residual_func: Union[Callable, Sequence[Callable]] The function(s) to compute the residuals for each term in the least squares problem. Can be a single function or a sequence of functions. Each callable represent the `i`-th term :math:`\mathbf{R}_i(\mathbf{p})` in the least squares problem and should take as inputs the parameters (array-like with shape (n_parameters,)) and return the residuals as a 1D numpy array with shape (n_residuals_i,) representing each :math:`\mathbf{R}_{i,j}(\mathbf{p})` where :math:`j=0,...,n_{\text{residuals}_i-1}`. jacobian_func: Union[Callable, Sequence[Callable]] The function(s) to compute the Jacobian matrices for each term in the least squares problem. Can be a single function or a sequence of functions. Each callable represent the `i`-th term :math:`\mathbf{J}_i(\mathbf{p})` in the least squares problem such as :math:`\mathbf{J}_i(\mathbf{p}) = \frac{\partial \mathbf{R}_i(\mathbf{p})}{\partial \mathbf{p}}` and should take as inputs the parameters (array-like with shape (n_parameters,)) and return the Jacobian matrix as a 2D numpy array with shape (n_residuals_i, n_parameters) representing the Jacobian of each residual :math:`\mathbf{R}_{i,j}(\mathbf{p})` where :math:`j=0,...,n_{\text{residuals}_i-1}`. parameters: ArrayLike The starting parameters of the optimization with shape (n_parameters,). The array will not be modified by this function, a copy of the parameters will be used for the optimization process. weight: Optional[Union[Real, Sequence[Real]]], optional (default=None) The weight(s) for each term in the least squares problem. Can be a single value or a sequence of values. Each weight :math:`w_i` is a weight for the corresponding term in the least squares problem and should be a non-negative real number. If a single value is provided, it will be used for all terms in the least squares problem. If None is provided, it will be set to 1.0. loss: Optional[Union[str, Sequence[str]]], optional (default=None) The loss function(s) for each term in the least squares problem. Can be a single string or a sequence of strings. Each loss function :math:`\rho_i` is a robust cost function for the corresponding term in the least squares problem and should be one of the following strings: ["linear", "soft_l1", "cauchy", "arctan"]. If a single string is provided, it will be used for all terms in the least squares problem. If None is provided, it will be set to "linear". max_iterations: Optional[Integral], optional (default=None) Stop criterion by the number of iterations. The optimization process is stopped when the number of iterations exceeds ``max_iterations``. If None, no limit on the number of iterations is considered. max_time: Optional[Real], optional (default=None) Stop criterion by the elapsed time of optimization. The optimization process is stopped when the time elapsed since the beginning of the optimization exceeds ``max_time`` seconds. If None, no limit on the computation time is considered. ftol: Optional[Real], optional (default=None) Stop criterion by the change of the cost function value. The optimization process is stopped when ``dF < ftol * F`` where F is the cost function value and dF is the change of the cost function value between two iterations. If None, this criterion is not considered. xtol: Optional[Real], optional (default=None) Stop criterion by the change of the parameters. The optimization process is stopped when ``||dp|| < xtol * (xtol + ||p||)`` where p is the parameters and dp is the change of the parameters between two iterations. If None, this criterion is not considered. gtol: Optional[Real], optional (default=None) Stop criterion by the optimality value. The optimization process is stopped when the optimality verifies ``norm(b, ord=numpy.inf) < gtol`` where b is the scaled second term. If None, this criterion is not considered. atol: Optional[Real], optional (default=None) Stop criterion by the absolute cost function value. The optimization process is stopped when ``F < atol`` where F is the cost function value. If None, this criterion is not considered. ptol: Optional[Real], optional (default=None) Stop criterion by the parameters value. The optimization process is stopped when ``||p|| < ptol`` where p is the parameters. If None, this criterion is not considered. callback: Optional[Callable[[Dict], bool]], optional (default=None) A callback function that is called at the end of each iteration of the optimization process. The callback function should take a dictionary similar to the one returned in the history of the optimization process as input. If the return value of the callback function is True (or None), the optimization process will continue. If the return value is False, the optimization process will be stopped. If None, no callback function is used. update_func: Optional[Callable[[numpy.ndarray, numpy.ndarray], ArrayLike]], optional (default=None) A function that is called at the end of each iteration of the optimization process to compute the parameters for the next iteration. The function should take the current parameters and update ``(p, dp)`` as inputs and return the parameters with shape (n_parameters,) to use for the next iteration. If None, the parameters for the next iteration will be computed as ``p + dp`` where p is the current parameters and dp is the update computed by solving the linear system. verbosity : Integral, optional The level of verbosity for logging the optimization process. 0: No logging (default) 1: Log only the final results of the optimization process. 2: Log the results at each iteration of the optimization process. 3: Log detailed information about the optimization process. history: bool If True, the function will also return a tuple containing the history of the optimization process. Returns ------- parameters: numpy.ndarray The parameters that minimize the least squares problem with shape (n_parameters,). history: Tuple[Dict], optional Only returned if ``history`` is True. A tuple containing the history of the optimization process. Each element of the tuple is a dictionary containing the keys described below. Notes ----- The history contains the following keys: - "iteration": Integer representing the iteration number. - "costs": List of floats representing the cost function value for each term in the least squares problem at the current iteration compute as :math:`\frac{1}{2} \sum_j \rho_i\left(\| \mathbf{R}_{i,j}(\mathbf{p}) \|^2\right)` for each term :math:`i`. - "cost": Float representing the cost function value at the current iteration computed as :math:`\frac{1}{2} \sum_i w_i \sum_j \rho_i\left(\| \mathbf{R}_{i,j}(\mathbf{p}) \|^2\right)`. - "parameters": Numpy array representing the parameters at the current iteration. - "residuals": The residual of the data term (i.e. the first term in the least squares problem) at the current iteration. The optimization process is performed by iteratively solving a linearized version of the least squares problem at each iteration for different values of the regularization factor, and then selecting the optimal regularization factor based on the L-curve analysis. The step of each loop iteration is as follows: .. code-block:: text While NOT converged: 1. Build the system M Δp = -b from 'residual_func', 'jacobian_func', 'loss' and 'weight'. 2. Call the 'callback' function and STOP if it returns False. 3. Check the stopping criteria and STOP if any of them is satisfied. 4. If CONTINUE solve the linear system M Δp = -b for Δp. 5. Update the parameters as p = p + Δp (or using 'update_func' if provided). Version ------- - 0.0.1: Initial version. """ if not isinstance(residual_func, Sequence): residual_func = [residual_func] if not isinstance(jacobian_func, Sequence): jacobian_func = [jacobian_func] if not len(residual_func) == len(jacobian_func): raise ValueError( "The length of residual_func and jacobian_func must be the same." ) if not all(callable(func) for func in residual_func + jacobian_func): raise ValueError( "All elements in residual_func and jacobian_func must be callable." ) if weight is None: weight = 1.0 if not isinstance(weight, Sequence): weight = [weight] if len(weight) == 1: weight = weight * len(residual_func) if not len(weight) == len(residual_func): raise ValueError( "The length of weight must be the same as the length of residual_func and jacobian_func." ) if not all(isinstance(w, Real) for w in weight): raise ValueError("All elements in weight must be real numbers.") weight = [float(w) for w in weight] # Ensure weights are floats if loss is None: loss = "linear" if isinstance(loss, str): loss = [loss] loss = [l if l is not None else "linear" for l in loss] # None -> "linear" if len(loss) == 1: loss = loss * len(residual_func) if not len(loss) == len(residual_func): raise ValueError( "The length of loss must be the same as the length of residual_func and jacobian_func." ) if not all(isinstance(l, str) for l in loss): raise ValueError("All elements in loss must be strings.") valid_loss_functions = {"linear", "soft_l1", "cauchy", "arctan"} if not all(l in valid_loss_functions for l in loss): raise ValueError(f"All elements in loss must be one of {valid_loss_functions}.") parameters = numpy.asarray(parameters, dtype=numpy.float64) if not isinstance(parameters, numpy.ndarray) or parameters.ndim != 1: raise ValueError("Parameters must be a 1D numpy array.") if not isinstance(verbosity, Integral): raise TypeError("verbosity must be an integer.") if not (0 <= verbosity <= 3): raise ValueError("verbosity must be an integer between 0 and 3.") verbosity = int(verbosity) if max_iterations is not None: if not isinstance(max_iterations, Integral): raise TypeError("max_iterations must be an integer.") if max_iterations <= 0: raise ValueError("max_iterations must be a positive integer.") max_iterations = int(max_iterations) if max_time is not None: if not isinstance(max_time, Real): raise TypeError("max_time must be a real number.") if max_time <= 0: raise ValueError("max_time must be a positive real number.") max_time = float(max_time) if ftol is not None: if not isinstance(ftol, Real): raise TypeError("ftol must be a real number.") if ftol <= 0: raise ValueError("ftol must be a positive real number.") ftol = float(ftol) if xtol is not None: if not isinstance(xtol, Real): raise TypeError("xtol must be a real number.") if xtol <= 0: raise ValueError("xtol must be a positive real number.") xtol = float(xtol) if gtol is not None: if not isinstance(gtol, Real): raise TypeError("gtol must be a real number.") if gtol <= 0: raise ValueError("gtol must be a positive real number.") gtol = float(gtol) if atol is not None: if not isinstance(atol, Real): raise TypeError("atol must be a real number.") if atol <= 0: raise ValueError("atol must be a positive real number.") atol = float(atol) if ptol is not None: if not isinstance(ptol, Real): raise TypeError("ptol must be a real number.") if ptol <= 0: raise ValueError("ptol must be a positive real number.") ptol = float(ptol) if all( criterion is None for criterion in [max_iterations, max_time, ftol, xtol, gtol, atol, ptol] ): raise ValueError( "At least one stopping criterion must be provided (max_iterations, max_time, ftol, xtol, gtol, atol, or ptol)." ) if not isinstance(history, bool): raise TypeError("history must be a boolean value.") history = bool(history) if callback is not None and not isinstance(callback, Callable): raise TypeError("callback must be a callable function or None.") if update_func is not None and not isinstance(update_func, Callable): raise TypeError("update_func must be a callable function or None.") # ----------- Process optimisation ------------------ rho_func = [] for l in loss: if l == "linear": rho_func.append(linear_rho_at_R2) elif l == "soft_l1": rho_func.append(soft_l1_rho_at_R2) elif l == "cauchy": rho_func.append(cauchy_rho_at_R2) elif l == "arctan": rho_func.append(arctan_rho_at_R2) else: raise ValueError( f"Invalid loss function '{l}'. Valid options are 'linear', 'soft_l1', 'cauchy', 'arctan'." ) _n_terms = len(residual_func) _n_parameters = parameters.size _starting_time = time.time() _parameters = parameters.copy() _iteration = 0 _history = [] _end_flag = False # ! (end-flag is used to BREAK the optimization loop) _end_message = "" _delta = None _last_total_cost = None _compute_history = history or callback is not None _compute_cost = ( ftol is not None or atol is not None or verbosity >= 2 or _compute_history ) _compute_optimality = gtol is not None or verbosity >= 2 or _compute_history # Cached values residual_arrays: List[numpy.ndarray] = None jacobian_matrices: List[numpy.ndarray] = None costs: List[float] = None total_cost: float = None optimality: float = None rhos_arrays: List[Tuple[numpy.ndarray]] = None M = None b = None if verbosity >= 3: study_jacobian( residual_func=residual_func, jacobian_func=jacobian_func, parameters=_parameters, weight=weight, loss=loss, title="Initial Jacobian Analysis Before Optimization", ) if verbosity >= 2: detail = ( f"\nIndividual costs: C_i = 0.5 * ρ(||R_i||^2) " f"\nCost: C = sum(w_i * C_i) " f"\nStep norm: ||Δp||_2 and ||Δp||_∞ " f"\nOptimality: ||J^T R||_∞" ) header = ( f"\n{'Iteration':^10} {'Cost C':^15}" + " ".join( [f"{'Cost C_' + str(i):^15}" for i in range(len(residual_func))], ) + f" {'||Δp||_2':^15} {'||Δp||_∞':^15} {'Optimality':^15}" ) print(detail) print(header) while True: # ! (ensure end-flag activation for term "break" statement) # Compute residuals, Jacobians, costs residual_arrays = [R_func(_parameters) for R_func in residual_func] jacobian_matrices = [J_func(_parameters) for J_func in jacobian_func] rhos_arrays = [r_func(R) for r_func, R in zip(rho_func, residual_arrays)] if _compute_cost: costs = [0.5 * numpy.sum(rho[0]) for rho in rhos_arrays] total_cost = sum(w * c for w, c in zip(weight, costs)) # Build tilted residuals and Jacobian based on the robust cost function for i in range(_n_terms): residual_arrays[i], jacobian_matrices[i] = _build_tilde_R_and_tilde_J( residual_arrays[i], jacobian_matrices[i], rhos_arrays[i][1], rhos_arrays[i][2], ) # Assemble the operator M and the second term b for the linear system M Δp = -b M = sum(w * J.T @ J for w, J in zip(weight, jacobian_matrices)) b = sum( w * J.T @ R for w, J, R in zip(weight, jacobian_matrices, residual_arrays) ) if _compute_optimality: optimality = numpy.linalg.norm(b, ord=numpy.inf) # Update history if _compute_history: _history.append( { "iteration": _iteration, "costs": costs, "cost": total_cost, "parameters": _parameters.copy(), "residuals": residual_arrays[0].copy(), # Data term residuals } ) # Check for convergence before solving the linear system if verbosity >= 2: if _delta is None: strdp2 = f"{'':^15}" strdpinf = f"{'':^15}" else: strdp2 = f"{numpy.linalg.norm(_delta, ord=2):^15.3e}" strdpinf = f"{numpy.linalg.norm(_delta, ord=numpy.inf):^15.3e}" row = ( f"{_iteration:^10} {total_cost:^15.3e}" + " ".join([f"{c:^15.3e}" for c in costs]) + f" {strdp2} {strdpinf} {optimality:^15.3e}" ) print(row) if _compute_cost and ftol is not None and _last_total_cost is not None: dF = abs(total_cost - _last_total_cost) if dF < ftol * total_cost: _end_flag = True _end_message += f"\n[ftol] Convergence achieved (df < ftol * F) : {dF} < {ftol * total_cost}." if _compute_cost and atol is not None and total_cost < atol: _end_flag = True _end_message += ( f"\n[atol] Convergence achieved (F < atol) : {total_cost} < {atol}." ) if _compute_optimality and gtol is not None and optimality < gtol: _end_flag = True _end_message += f"\n[gtol] Convergence achieved (optimality < gtol) : {optimality} < {gtol}." if ptol is not None and numpy.linalg.norm(_parameters, ord=2) < ptol: _end_flag = True _end_message += f"\n[ptol] Convergence achieved (||p|| < ptol) : {numpy.linalg.norm(_parameters, ord=2)} < {ptol}." if max_iterations is not None and _iteration >= max_iterations: _end_flag = True _end_message += f"\n[max_iterations] Maximum number of iterations reached: {max_iterations}." if max_time is not None and (time.time() - _starting_time) >= max_time: _end_flag = True _end_message += ( f"\n[max_time] Maximum computation time reached: {max_time} seconds." ) if callback is not None: callback_result = callback(_history[-1]) if callback_result is not None and not isinstance(callback_result, bool): raise ValueError("Callback function must return a boolean or None.") if callback_result is False: _end_flag = True _end_message += ( "\n[callback] Optimization stopped by callback function." ) if _end_flag: if verbosity >= 1: print(_end_message) break # Solve the linear system M Δp = -b if not numpy.linalg.cond(M) < 1 / numpy.finfo(M.dtype).eps: raise numpy.linalg.LinAlgError( "The Jacobian matrix is singular or ill-conditioned." "Consider using a different robust cost function or adding regularization." ) if scipy.sparse.issparse(M): _delta = scipy.sparse.linalg.spsolve(M, -b) else: _delta = numpy.linalg.solve(M, -b) # Update parameters if update_func is not None: _parameters = update_func(_parameters, _delta) _parameters = numpy.asarray(_parameters, dtype=numpy.float64) if not _parameters.ndim == 1 or _parameters.size != _n_parameters: raise ValueError( f"update_func must return a 1D array with shape ({_n_parameters},)." ) else: _parameters += _delta _last_total_cost = total_cost _iteration += 1 if verbosity >= 3: study_jacobian( residual_func=residual_func, jacobian_func=jacobian_func, parameters=_parameters, weight=weight, loss=loss, title="Final Jacobian Analysis After Optimization", ) if history: return _parameters, tuple(_history) else: return _parameters.copy()