"""Evolution strategy implementation for laboratory work #5.""" from __future__ import annotations import math import os import random import shutil import time from collections import deque from dataclasses import dataclass from typing import Callable, Iterable, Literal, Sequence import matplotlib.pyplot as plt import numpy as np from matplotlib.axes import Axes from mpl_toolkits.mplot3d import Axes3D # noqa: F401 - required for 3D plotting from numpy.typing import NDArray Array = NDArray[np.float64] FitnessFn = Callable[[Array], float] @dataclass class Individual: """Single individual of the evolution strategy population.""" x: Array sigma: Array fitness: float def copy(self) -> "Individual": return Individual(self.x.copy(), self.sigma.copy(), float(self.fitness)) @dataclass(frozen=True) class Generation: number: int population: tuple[Individual, ...] best: Individual mean_fitness: float sigma_scale: float @dataclass class EvolutionStrategyResult: generations_count: int best_generation: Generation history: list[Generation] time_ms: float @dataclass class EvolutionStrategyConfig: fitness_func: FitnessFn dimension: int x_min: Array x_max: Array mu: int lambda_: int mutation_probability: float initial_sigma: Array | float max_generations: int selection: Literal["plus", "comma"] = "comma" recombination: Literal["intermediate", "discrete", "none"] = "intermediate" parents_per_offspring: int = 2 success_rule_window: int = 10 success_rule_target: float = 0.2 sigma_increase: float = 1.22 sigma_decrease: float = 0.82 sigma_scale_min: float = 1e-3 sigma_scale_max: float = 100.0 tau: float | None = None tau_prime: float | None = None sigma_min: float = 1e-6 sigma_max: float = 10.0 best_value_threshold: float | None = None max_stagnation_generations: int | None = None save_generations: list[int] | None = None results_dir: str = "results" log_every_generation: bool = False seed: int | None = None def __post_init__(self) -> None: assert self.dimension == self.x_min.shape[0] == self.x_max.shape[0], ( "Bounds dimensionality must match the dimension of the problem" ) assert 0 < self.mu <= self.lambda_, "Require mu <= lambda and positive" assert 0.0 < self.mutation_probability <= 1.0, ( "Mutation probability must be within (0, 1]" ) if isinstance(self.initial_sigma, (int, float)): if self.initial_sigma <= 0: raise ValueError("Initial sigma must be positive") else: if self.initial_sigma.shape != (self.dimension,): raise ValueError("initial_sigma must be scalar or an array of given dimension") if np.any(self.initial_sigma <= 0): raise ValueError("All sigma values must be positive") if self.tau is None: object.__setattr__(self, "tau", 1.0 / math.sqrt(2.0 * math.sqrt(self.dimension))) if self.tau_prime is None: object.__setattr__(self, "tau_prime", 1.0 / math.sqrt(2.0 * self.dimension)) def make_initial_sigma(self) -> Array: if isinstance(self.initial_sigma, (int, float)): return np.full(self.dimension, float(self.initial_sigma), dtype=np.float64) return self.initial_sigma.astype(np.float64, copy=True) # --------------------------------------------------------------------------- # Helper utilities # --------------------------------------------------------------------------- def clear_results_directory(results_dir: str) -> None: if os.path.exists(results_dir): shutil.rmtree(results_dir) os.makedirs(results_dir, exist_ok=True) def evaluate_population(population: Iterable[Individual], fitness_func: FitnessFn) -> None: for individual in population: individual.fitness = float(fitness_func(individual.x)) def recombine( parents: Sequence[Individual], config: EvolutionStrategyConfig, ) -> tuple[Array, Array, float]: """Recombine parent individuals before mutation. Returns the base vector, sigma and the best parent fitness. """ if config.recombination == "none" or config.parents_per_offspring == 1: parent = random.choice(parents) return parent.x.copy(), parent.sigma.copy(), parent.fitness selected = random.choices(parents, k=config.parents_per_offspring) if config.recombination == "intermediate": x = np.mean([p.x for p in selected], axis=0) sigma = np.mean([p.sigma for p in selected], axis=0) elif config.recombination == "discrete": mask = np.random.randint(0, len(selected), size=config.dimension) indices = np.arange(config.dimension) x = np.array([selected[mask[i]].x[i] for i in indices], dtype=np.float64) sigma = np.array([selected[mask[i]].sigma[i] for i in indices], dtype=np.float64) else: # pragma: no cover - defensive raise ValueError(f"Unsupported recombination type: {config.recombination}") parent_fitness = min(p.fitness for p in selected) return x, sigma, parent_fitness def mutate( x: Array, sigma: Array, config: EvolutionStrategyConfig, sigma_scale: float, ) -> tuple[Array, Array]: """Apply log-normal mutation with optional per-coordinate masking.""" global_noise = np.random.normal() coordinate_noise = np.random.normal(size=config.dimension) sigma_new = sigma * np.exp(config.tau_prime * global_noise + config.tau * coordinate_noise) sigma_new = np.clip(sigma_new, config.sigma_min, config.sigma_max) sigma_new = np.clip(sigma_new * sigma_scale, config.sigma_min, config.sigma_max) steps = np.random.normal(size=config.dimension) * sigma_new if config.mutation_probability < 1.0: mask = np.random.random(config.dimension) < config.mutation_probability if not np.any(mask): mask[np.random.randint(0, config.dimension)] = True steps = steps * mask sigma_new = np.where(mask, sigma_new, sigma) x_new = np.clip(x + steps, config.x_min, config.x_max) return x_new, sigma_new def create_offspring( parents: Sequence[Individual], config: EvolutionStrategyConfig, sigma_scale: float, ) -> tuple[list[Individual], list[bool]]: offspring: list[Individual] = [] successes: list[bool] = [] for _ in range(config.lambda_): base_x, base_sigma, best_parent_fitness = recombine(parents, config) mutated_x, mutated_sigma = mutate(base_x, base_sigma, config, sigma_scale) fitness = float(config.fitness_func(mutated_x)) child = Individual(mutated_x, mutated_sigma, fitness) offspring.append(child) successes.append(fitness < best_parent_fitness) return offspring, successes def select_next_generation( parents: list[Individual], offspring: list[Individual], config: EvolutionStrategyConfig, ) -> list[Individual]: if config.selection == "plus": pool = parents + offspring else: pool = offspring pool.sort(key=lambda ind: ind.fitness) next_generation = [ind.copy() for ind in pool[: config.mu]] return next_generation def compute_best(population: Sequence[Individual]) -> Individual: best = min(population, key=lambda ind: ind.fitness) return best.copy() def build_generation( number: int, population: list[Individual], sigma_scale: float, ) -> Generation: copies = tuple(ind.copy() for ind in population) best = compute_best(copies) mean_fitness = float(np.mean([ind.fitness for ind in copies])) return Generation(number, copies, best, mean_fitness, sigma_scale) def save_generation(generation: Generation, config: EvolutionStrategyConfig) -> None: if config.dimension != 2: raise ValueError("Visualization is only supported for 2D problems") os.makedirs(config.results_dir, exist_ok=True) fig = plt.figure(figsize=(21, 7)) fig.suptitle( ( f"Поколение #{generation.number}. " f"Лучшее значение: {generation.best.fitness:.6f}. " f"Среднее: {generation.mean_fitness:.6f}. " f"Масштаб σ: {generation.sigma_scale:.4f}" ), fontsize=14, y=0.88, ) ax_contour = fig.add_subplot(1, 3, 1) plot_fitness_contour(config.fitness_func, config.x_min, config.x_max, ax_contour) arr = np.array([ind.x for ind in generation.population]) ax_contour.scatter(arr[:, 1], arr[:, 0], c="red", s=20, alpha=0.9) ax_contour.scatter( generation.best.x[1], generation.best.x[0], c="black", s=60, marker="*", label="Лучший" ) ax_contour.legend(loc="upper right") ax_contour.text(0.5, -0.25, "(a)", transform=ax_contour.transAxes, ha="center", fontsize=16) views = [(50, -45), (60, 30)] fitnesses = np.array([ind.fitness for ind in generation.population]) for idx, (elev, azim) in enumerate(views, start=1): ax = fig.add_subplot(1, 3, idx + 1, projection="3d", computed_zorder=False) plot_fitness_surface(config.fitness_func, config.x_min, config.x_max, ax) ax.scatter(arr[:, 0], arr[:, 1], fitnesses, c="red", s=12, alpha=0.9) ax.scatter( generation.best.x[0], generation.best.x[1], generation.best.fitness, c="black", s=60, marker="*", ) ax.view_init(elev=elev, azim=azim) label = chr(ord("a") + idx) ax.text2D(0.5, -0.15, f"({label})", transform=ax.transAxes, ha="center", fontsize=16) ax.set_xlabel("X₁") ax.set_ylabel("X₂") ax.set_zlabel("f(x)") filename = os.path.join(config.results_dir, f"generation_{generation.number:03d}.png") fig.savefig(filename, dpi=150, bbox_inches="tight") plt.close(fig) def plot_fitness_surface( fitness_func: FitnessFn, x_min: Array, x_max: Array, ax: Axes3D, num_points: int = 100, ) -> None: if x_min.shape != (2,) or x_max.shape != (2,): raise ValueError("Surface plotting is only available for 2D functions") xs = np.linspace(x_min[0], x_max[0], num_points) ys = np.linspace(x_min[1], x_max[1], num_points) X, Y = np.meshgrid(xs, ys) vectorized = np.vectorize(lambda a, b: fitness_func(np.array([a, b]))) Z = vectorized(X, Y) ax.plot_surface(X, Y, Z, cmap="viridis", edgecolor="none", alpha=0.7, shade=False) def plot_fitness_contour( fitness_func: FitnessFn, x_min: Array, x_max: Array, ax: Axes, num_points: int = 100, ) -> None: xs = np.linspace(x_min[0], x_max[0], num_points) ys = np.linspace(x_min[1], x_max[1], num_points) X, Y = np.meshgrid(xs, ys) vectorized = np.vectorize(lambda a, b: fitness_func(np.array([a, b]))) Z = vectorized(X, Y) contour = ax.contourf(Y, X, Z, levels=25, cmap="viridis", alpha=0.8) plt.colorbar(contour, ax=ax, shrink=0.6) ax.set_aspect("equal") ax.set_xlabel("X₂") ax.set_ylabel("X₁") # --------------------------------------------------------------------------- # Main algorithm # --------------------------------------------------------------------------- def run_evolution_strategy(config: EvolutionStrategyConfig) -> EvolutionStrategyResult: if config.seed is not None: random.seed(config.seed) np.random.seed(config.seed) if config.save_generations: clear_results_directory(config.results_dir) start = time.perf_counter() parents = [ Individual( np.random.uniform(config.x_min, config.x_max), config.make_initial_sigma(), 0.0, ) for _ in range(config.mu) ] evaluate_population(parents, config.fitness_func) sigma_scale = 1.0 success_window: deque[float] = deque() history: list[Generation] = [] best_overall: Generation | None = None stagnation_counter = 0 for generation_number in range(1, config.max_generations + 1): current_generation = build_generation(generation_number, parents, sigma_scale) history.append(current_generation) if config.log_every_generation: print( f"Generation #{generation_number}: best={current_generation.best.fitness:.6f} " f"mean={current_generation.mean_fitness:.6f}" ) if ( best_overall is None or current_generation.best.fitness < best_overall.best.fitness ): best_overall = current_generation stagnation_counter = 0 else: stagnation_counter += 1 if ( config.best_value_threshold is not None and current_generation.best.fitness <= config.best_value_threshold ): break if ( config.max_stagnation_generations is not None and stagnation_counter >= config.max_stagnation_generations ): break offspring, successes = create_offspring(parents, config, sigma_scale) success_ratio = sum(successes) / len(successes) if successes else 0.0 success_window.append(success_ratio) if len(success_window) == config.success_rule_window: average_success = sum(success_window) / len(success_window) if average_success > config.success_rule_target: sigma_scale = min( sigma_scale * config.sigma_increase, config.sigma_scale_max ) elif average_success < config.success_rule_target: sigma_scale = max( sigma_scale * config.sigma_decrease, config.sigma_scale_min ) success_window.clear() parents = select_next_generation(parents, offspring, config) if config.save_generations and ( generation_number in config.save_generations or generation_number == config.max_generations ): save_generation(current_generation, config) end = time.perf_counter() assert best_overall is not None # Сохраняем последнее поколение, если нужно if config.save_generations and history: last_number = history[-1].number if last_number not in config.save_generations: save_generation(history[-1], config) return EvolutionStrategyResult( generations_count=len(history), best_generation=best_overall, history=history, time_ms=(end - start) * 1000.0, )