Source code for pygenalgo.utils.utilities

from typing import Callable
from functools import wraps, partial

import numpy as np
from numpy.typing import NDArray

# Public interface.
__all__ = ["pareto_front", "cost_function", "np_cdist",
           "pareto_dominance", "np_pareto_front", "clamp"]

[docs] def clamp(x: int | float, x_lower: int | float, x_upper: int | float) -> int | float: """ Clamps a value within a specified range. :param x: value to clamp. :param x_lower: lower bound. :param x_upper: upper bound. :return: clamped value. """ return min(max(x, x_lower), x_upper)
# _end_def_
[docs] def pareto_dominance(point_a: tuple | list, point_b: tuple | list) -> bool: """ Implements a shortcut version of the Pareto dominance condition: all(p <= q for p, q in zip(point_i, point_j)) && any(p < q for p, q in zip(point_i, point_j)) NOTE: It is assumed that both points have the same size (length). :param point_a: the first point (as tuple or list). :param point_b: the second point (as tuple or list). :return: if the condition is satisfied. """ # Second condition. at_least_one_greater = False # Scan both points elementwise. for p, q in zip(point_a, point_b): # Return immediately. if p < q: return False # _end_if_ # Check if the second condition is satisfied. if not at_least_one_greater and p > q: # Switch the flag value to avoid re-setting it. at_least_one_greater = True # _end_if_ # _end_for_ # If we got here return only the second condition. return at_least_one_greater
# _end_def_
[docs] def pareto_front(points: list) -> list: """ Simple function that calculates the Pareto (optimal) front points from a given input points list. NOTE: The function is working directly with lists, even though it can be optimized using numpy arrays. :param points: list of points [(fx1, fx2, ..., fxn), (fy1, fy2, ..., fyn), ...................., (fk1, fk2, ..., fkn)] :return: List of points that lie on the Pareto front. """ # Create a set with all the points sizes. check_size = {len(point) for point in points} # Sanity check. if len(check_size) > 1: raise RuntimeError("All point must have the same size.") # _end_if_ # Create a set that will hold # all the Pareto front points. pareto_points = set() # Iterate through every point in the list. for i, point_i in enumerate(points): # Set the pareto optimal flag value to True. is_pareto_optimal = True # Compare it against every other point. for j, point_j in enumerate(points): # Check if "dominance" condition is satisfied. if i != j and pareto_dominance(point_i, point_j): # We swap the flag value. is_pareto_optimal = False # Break the internal loop and # continue to the next point. break # _end_for_ # If we get here and the flag hasn't changed # it means that 'point_i' is on the frontier. if is_pareto_optimal: pareto_points.add(point_i) # _end_for_ # Return the points as list. return list(pareto_points)
# _end_def_
[docs] def np_pareto_front(points: NDArray) -> NDArray: """ Simple function that calculates the Pareto (optimal) front points from a given input points numpy array. :param points: array of points [(fx1, fx2, ..., fxn), (fy1, fy2, ..., fyn), ...................., (fk1, fk2, ..., fkn)] :return: array of points that lie on the Pareto front. """ # Sanity check. if points.ndim != 2: raise RuntimeError("Points must be a 2-D array.") # _end_if_ # Get the number of points. num_points = points.shape[0] # Create a boolean array to track Pareto optimal points. is_pareto_optimal = np.ones(num_points, dtype=bool) for i, point_i in enumerate(points): # Compare point i-th with all other points. is_dominated = np.any(np.all(points <= point_i, axis=1) & np.any(points < point_i, axis=1)) # Set the flag appropriately. is_pareto_optimal[i] = not is_dominated # _end_for_ # Return only the unique Pareto optimal points. return np.unique(points[is_pareto_optimal], axis=0)
# _end_def_
[docs] def cost_function(func: Callable = None, minimize: bool = False): """ Decorator for the function that we want to optimize. The default setting is maximization. :param func: the function to be optimized. :param minimize: if 'True' it will return the negative function value to allow for the minimization. Default is set to 'False'. :return: the 'function_wrapper' method. """ # This allows the decorator to be called with # parenthesis and using the default parameters. if func is None: return partial(cost_function, minimize=minimize) # _end_if_ @wraps(func) def function_wrapper(*args, **kwargs) -> dict: """ Internal function wrapper. :param args: function positional arguments. :param kwargs: function keywords arguments. :return: a dictionary with two key-values. """ # Run the function we want to optimize. result = func(*args, **kwargs) # Check if the function returns a tuple (with two values) # or a single output parameter. In the former, the second # value should be boolean to signal that the solution meets # the termination requirements. if isinstance(result, tuple) and len(result) == 2: f_value, solution_is_found = result[0], bool(result[1]) else: f_value, solution_is_found = result, False # _end_if_ return {"f_value": -f_value if minimize else f_value, "solution_is_found": solution_is_found} # _end_def_ return function_wrapper
# _end_def_
[docs] def np_cdist(x_pos: NDArray, scaled: bool = False) -> NDArray: """ This is equivalent to the scipy.spatial.distance.cdist method with Euclidean distance metric. It is a tailored version for the purposes of the multimodal operation mode. :param x_pos: a ndarray of positions. The dimensions of the input array should be [n_rows, n_cols], where n_rows is the number of particles and n_cols are the number of positions. In special 3D cases the input can have a shape of [n_sample, n_rows, n_cols], in these cases the input will be first reshaped to [n_sample, (n_rows*n_cols)] before we continue. :param scaled: boolean flag that allows the input array to be scaled, using the MaxAbsScaler, before computing the distances. :return: a square [n_rows, n_rows] ndarray of distances. """ # Check if the input array is 3D. if x_pos.ndim == 3: # Get the original shape of the matrix. n_samples, n_rows, n_cols = x_pos.shape # Reshape it by combining the n_rows and n_cols. x_pos = x_pos.reshape((n_samples, n_rows * n_cols), copy=False) # _end_if_ # Get the number of rows/cols. n_rows, n_cols = x_pos.shape # Check if we want the input data to be scaled. if scaled: # Scale with the MaxAbsScaler. x_pos /= np.max(np.abs(x_pos), axis=0) # _end_if_ # Create a square matrix with zeros. dist_x = np.zeros((n_rows, n_rows), dtype=float) # Iterate through all vectors. for i in range(n_rows): # Compute the Euclidean norm of the 'i-th' element with the rest of them. dist_x[i, i + 1:] = np.sqrt(np.sum((x_pos[i] - x_pos[i + 1:, :]) ** 2, axis=1)) # Since the array is symmetric store the result in the 'i-th' column too. dist_x[:, i] = dist_x[i, :] # _end_for_ return dist_x
# _end_def_