Source code for pygenalgo.engines.generic_ga

from math import isnan
from os import cpu_count
from operator import attrgetter
from collections import defaultdict
from typing import Callable, Optional

from joblib import Parallel, delayed
from numpy.random import default_rng, Generator

from pygenalgo.engines import logger
from pygenalgo.genome.chromosome import Chromosome
from pygenalgo.operators.genetic_operator import GeneticOperator
from pygenalgo.operators.mutation.mutate_operator import MutationOperator
from pygenalgo.operators.selection.select_operator import SelectionOperator
from pygenalgo.operators.crossover.crossover_operator import CrossoverOperator

# Public interface.
__all__ = ["GenericGA"]


[docs] class GenericGA: """ Description: Generic GA class models the interface of a specific genetic algorithm model (or engine). It provides the common variables and functionality that all GA models should share. """ rng_GA: Generator = default_rng() """ Random Number Generator for the whole class. """ MAX_CPUs: int = 1 if not cpu_count() else cpu_count() """ Set the maximum number of CPUs (at least one). """ # Object variables. __slots__ = ("population", "fitness_func", "_select_op", "_crossx_op", "_mutate_op", "_stats", "_n_cpus", "_f_evals", "_iteration") def __init__(self, initial_pop: list[Chromosome], fit_func: Callable, select_op: Optional[SelectionOperator] = None, mutate_op: Optional[MutationOperator] = None, crossx_op: Optional[CrossoverOperator] = None, n_cpus: Optional[int] = None) -> None: """ Default constructor of GenericGA object. :param initial_pop: list of the initial population of (randomized) chromosomes. :param fit_func: callable fitness function. :param select_op: selection operator (must inherit from class SelectionOperator). :param mutate_op: mutation operator (must inherit from class MutationOperator). :param crossx_op: crossover operator (must inherit from class CrossoverOperator). :param n_cpus: Number of requested CPUs for the evolution process (Default=Max_CPU). """ # Sanity check. if not callable(fit_func): raise TypeError(f"{self.__class__.__name__}: Fitness function is not callable.") # _end_if_ # Sanity check. if select_op is None: raise ValueError(f"{self.__class__.__name__}: Selection operator is missing.") # _end_if_ # Sanity check. if mutate_op is None: raise ValueError(f"{self.__class__.__name__}: Mutation operator is missing.") # _end_if_ # Sanity check. if crossx_op is None: raise ValueError(f"{self.__class__.__name__}: Crossover operator is missing.") # _end_if_ # Copy the reference of the population. self.population = initial_pop.copy() # Get the fitness function. self.fitness_func = fit_func # Get Selection Operator. self._select_op = select_op # Get Mutation Operator. self._mutate_op = mutate_op # Get Crossover Operator. self._crossx_op = crossx_op # Get the number of requested CPUs. if n_cpus is None: # This is the default option. self._n_cpus = max(1, GenericGA.MAX_CPUs-1) else: # Assign the requested number, making sure we have # enough CPUs and the value entered has the correct # type. self._n_cpus = max(1, min(GenericGA.MAX_CPUs-1, int(n_cpus))) # _end_if_ # Log the number of CPUs. logger.debug(f"{self.__class__.__name__} uses {self._n_cpus} CPUs.") # Dictionary with statistics. self._stats = defaultdict(list) # Set the function evaluation to zero. self._f_evals = 0 # Set the iterations counter to zero. self._iteration = 0 # Log the object initialization. logger.debug(f"{self.__class__.__name__} initialization complete.") # _end_def_ @property def rng(self) -> Generator: """ Get access of the Class variable (rng_GA). :return: the random number generator. """ return self.rng_GA # _end_def_ @property def iteration(self) -> int: """ Accessor (getter) of the iteration parameter. :return: the iteration value. """ return self._iteration # _end_def_ @iteration.setter def iteration(self, value: int) -> None: """ Accessor (setter) of the iteration value. :param value: (int). """ # Check for correct type and allow only the positive values. if not isinstance(value, int) or value < 0: raise RuntimeError(f"{self.__class__.__name__}: " f"Iteration value should be positive int: {type(value)}.") # _end_if_ # Update the iteration value. self._iteration = value # Update the iteration value in the GeneticOperator Class. GeneticOperator.set_iteration(value) # _end_def_
[docs] @classmethod def set_seed(cls, new_seed=None) -> None: """ Sets a new seed for the random number generator. :param new_seed: New seed value (default=None). :return: None. """ # Re-initialize the class variable. cls.rng_GA = default_rng(seed=new_seed) # Log the new seed. logger.debug(f"{cls.__name__} has a new seed.")
# _end_def_ @property def f_evals(self) -> int: """ Accessor method that returns the value of the f_eval. :return: (int) the counted number of function evaluations. """ return self._f_evals # _end_def_ @property def stats(self) -> dict: """ Accessor method that returns the 'stats' dictionary. :return: the dictionary with the statistics from the run. """ return self._stats # _end_def_ @property def select_op(self) -> SelectionOperator: """ Accessor method that returns the selection operator reference. :return: the SelectionOperator. """ return self._select_op # _end_def_ @property def crossx_op(self) -> CrossoverOperator: """ Accessor method that returns the crossover operator reference. :return: the CrossoverOperator. """ return self._crossx_op # _end_def_ @property def mutate_op(self) -> MutationOperator: """ Accessor method that returns the mutation operator reference. :return: the MutationOperator. """ return self._mutate_op # _end_def_ @property def n_cpus(self) -> int: """ Accessor method that returns the number of CPUs. :return: the n_cpus. """ return self._n_cpus # _end_def_
[docs] def f_eval_increase_by(self, new_counts: int) -> None: """ Utility method to allow the '_f_eval' to be updated with new counts outside the main class. This can happen during gene correction. :param new_counts: the new value of 'counts' that we want to add on the current f_eval. :return: None. """ # Sanity check. if not isinstance(new_counts, int) or new_counts < 0: raise ValueError(f"{self.__class__.__name__}: " f"New counts must be positive integer.") # _end_if_ # Update the function evaluation counter. self._f_evals += new_counts
# _end_def_
[docs] def clear_all(self) -> None: """ Make sure all the genetic operator counters and the stats are cleared. This reset everything before each run(). :return: None. """ # Ensure the genetic operator counters are reset before. self._crossx_op.reset_counter() self._mutate_op.reset_counter() self._select_op.reset_counter() # Reset stats dictionary. self._stats.clear() # Reset f_eval counter. self._f_evals = 0 # Log the cleanup. logger.debug(f"{self.__class__.__name__} cleared.")
# _end_def_
[docs] def best_chromosome(self) -> Optional[Chromosome]: """ Auxiliary method that returns the chromosome with the highest fitness value. Safeguarded with ignoring NaNs. :return: Return the chromosome with the highest fitness. """ # Return the chromosome with the highest fitness. return max((p for p in self.population if not isnan(p.fitness)), key=attrgetter("fitness"), default=None)
# _end_def_
[docs] def best_n(self, n: int = 1) -> list[Chromosome]: """ Auxiliary method that returns the best 'n' chromosomes with the highest fitness value. :param n: the number of the best chromosomes. Default = 1. :return: Return the 'n' chromosomes with the highest fitness. """ # Make sure 'n' is positive integer. if not isinstance(n, int) or n <= 0: raise ValueError(f"{self.__class__.__name__}: " f"Input must be a positive integer.") # _end_if_ # Ensure the number of return chromosome # do not exceed the size of the population. if n > len(self.population): raise RuntimeError(f"{self.__class__.__name__}: " f"Best {n} exceeds population size.") # _end_if_ # Sort the population in descending order. sorted_population = sorted([p for p in self.population if not isnan(p.fitness)], key=attrgetter("fitness"), reverse=True) # Return the best 'n' chromosomes. return sorted_population[0:n]
# _end_def_
[docs] def crossover_mutate(self, input_population: list[Chromosome]) -> None: """ This is an auxiliary method that combines the crossover and mutation operations in one call. Since these operations happen in place the 'input_population' will be modified directly. This method should be called AFTER the selection of the parents that have been selected for breeding. :param input_population: this is the population that we will apply the two genetic operators. :return: None. """ # Get the size of the input population. pop_size: int = len(input_population) # CROSSOVER and MUTATE to produce the new offsprings. for j in range(0, pop_size, 2): # In case of 'odd sized' populations. if j == pop_size-1: # MUTATE in place the last offspring. self._mutate_op(input_population[j]) # Exit the loop. break # _end_if_ # Get the index of the next chromosome. k = j+1 # Replace directly the OLD parents with the NEW offsprings. input_population[j], input_population[k] = self._crossx_op(input_population[j], input_population[k]) # MUTATE in place the 1st offspring. self._mutate_op(input_population[j]) # MUTATE in place the 2nd offspring. self._mutate_op(input_population[k])
# _end_def_
[docs] def adapt_probabilities(self, threshold: Optional[float] = None) -> bool: """ This method is used (optionally) to adjust simultaneously the crossover and mutation parameters of the GenericGA object. :param threshold: This float parameter is used to determine whether we are going to increase or decrease the crossover and mutation parameters. :return: True if the parameters have changed, else False. """ # Check if the threshold value is missing. if threshold is None: raise RuntimeError(f"{self.__class__.__name__}: " f"Threshold parameter is missing.") # _end_if_ # Sanity check. if not isinstance(threshold, float) or threshold < 0.0 or threshold > 1.0: raise ValueError(f"{self.__class__.__name__}: " f"Threshold value must be float in [0.0, 1.0].") # _end_if_ # Initialize the trial values with the current # probabilities to avoid going out of limits. trial_pc = self._crossx_op.probability trial_pm = self._mutate_op.probability # Initialize the flag with "False" # to avoid unnecessary assignments. have_changed: bool = False # Use the threshold value to adjust # the probabilities accordingly. if threshold < 0.1: trial_pc *= 0.9 trial_pm *= 1.1 have_changed = True elif threshold > 0.8: trial_pc *= 1.1 trial_pm *= 0.9 have_changed = True # _end_if_ # Check if the probabilities have changed. if have_changed: # Ensure the probabilities stay within the range [0, 1]. self._crossx_op.probability = min(max(trial_pc, 0.0), 1.0) self._mutate_op.probability = min(max(trial_pm, 0.0), 1.0) # Log the update of the parameters. logger.debug(f"{self.__class__.__name__} " f"probabilities have been updated.") # _end_if_ return have_changed
# _end_def_
[docs] def population_fitness(self) -> list[float]: """ Get the fitness values of all the population. :return: A list with all the fitness values. """ return [p.fitness for p in self.population]
# _end_def_
[docs] def individual_fitness(self, index: int) -> float: """ Get the fitness value of an individual member of the population. :param index: Position of the individual in the population. :return: The fitness value (float). """ return self.population[index].fitness
# _end_def_
[docs] def evaluate_fitness(self, input_population: list[Chromosome], parallel_mode: bool = False, backend: str = "threading") -> tuple[list[float], bool]: """ Evaluate all the chromosomes of the input list with the custom fitness function. The parallel_mode is optional. Moreover, the default backend is "threading", but in the IslandModelGA it is better to select "loky". :param input_population: (list) The population of Chromosomes that we want to evaluate their fitness. :param parallel_mode: (bool) Enables parallel computation of the fitness function. :param backend: (str) Backend for the parallel Joblib framework. :return: a list with the fitness values and the found solution flag. """ # Get a local copy of the fitness function. fit_func: Callable = self.fitness_func # Check the 'parallel_mode' flag. if parallel_mode: # Evaluate the chromosomes in parallel mode. fitness_i = Parallel(n_jobs=self._n_cpus, backend=backend)( delayed(fit_func)(p) for p in input_population ) else: # Evaluate the chromosomes in serial mode. fitness_i = [fit_func(p) for p in input_population] # _end_if_ # Get the size of the population. p_size: int = len(fitness_i) # Preallocate the fitness list. fitness_values: list[float] = p_size * [float("NaN")] # Flag to indicate if a solution has been found. found_solution: bool = False # Update all chromosomes with their fitness and check if a solution # has been found. for n, (p, fit_result) in enumerate(zip(input_population, fitness_i)): # Attach the fitness to each chromosome. p.fitness = fit_result["f_value"] # Collect the fitness in a separate list. fitness_values[n] = fit_result["f_value"] # Update the "found solution". found_solution |= fit_result["solution_is_found"] # _end_for_ # Update the counter of function evaluations. self._f_evals += p_size # Return the fitness values. return fitness_values, found_solution
# _end_def_
[docs] def run(self, *args, **kwargs) -> None: """ Main method of the Generic GA class, that implements the evolutionary routine. """ raise NotImplementedError(f"{self.__class__.__name__}: " f"You should implement this method!")
# _end_def_ def __call__(self, *args, **kwargs) -> None: """ This method is only a wrapper of the "run" method. """ return self.run(*args, **kwargs)
# _end_def_ # _end_class_