Document visualization workflow for lab 5 report
This commit is contained in:
423
lab5/es.py
Normal file
423
lab5/es.py
Normal file
@@ -0,0 +1,423 @@
|
||||
"""Evolution strategy implementation for laboratory work #5."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
import os
|
||||
import random
|
||||
import shutil
|
||||
import time
|
||||
from collections import deque
|
||||
from dataclasses import dataclass
|
||||
from typing import Callable, Iterable, Literal, Sequence
|
||||
|
||||
import matplotlib.pyplot as plt
|
||||
import numpy as np
|
||||
from matplotlib.axes import Axes
|
||||
from mpl_toolkits.mplot3d import Axes3D # noqa: F401 - required for 3D plotting
|
||||
from numpy.typing import NDArray
|
||||
|
||||
Array = NDArray[np.float64]
|
||||
FitnessFn = Callable[[Array], float]
|
||||
|
||||
|
||||
@dataclass
|
||||
class Individual:
|
||||
"""Single individual of the evolution strategy population."""
|
||||
|
||||
x: Array
|
||||
sigma: Array
|
||||
fitness: float
|
||||
|
||||
def copy(self) -> "Individual":
|
||||
return Individual(self.x.copy(), self.sigma.copy(), float(self.fitness))
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Generation:
|
||||
number: int
|
||||
population: tuple[Individual, ...]
|
||||
best: Individual
|
||||
mean_fitness: float
|
||||
sigma_scale: float
|
||||
|
||||
|
||||
@dataclass
|
||||
class EvolutionStrategyResult:
|
||||
generations_count: int
|
||||
best_generation: Generation
|
||||
history: list[Generation]
|
||||
time_ms: float
|
||||
|
||||
|
||||
@dataclass
|
||||
class EvolutionStrategyConfig:
|
||||
fitness_func: FitnessFn
|
||||
dimension: int
|
||||
x_min: Array
|
||||
x_max: Array
|
||||
mu: int
|
||||
lambda_: int
|
||||
mutation_probability: float
|
||||
initial_sigma: Array | float
|
||||
max_generations: int
|
||||
selection: Literal["plus", "comma"] = "comma"
|
||||
recombination: Literal["intermediate", "discrete", "none"] = "intermediate"
|
||||
parents_per_offspring: int = 2
|
||||
success_rule_window: int = 10
|
||||
success_rule_target: float = 0.2
|
||||
sigma_increase: float = 1.22
|
||||
sigma_decrease: float = 0.82
|
||||
sigma_scale_min: float = 1e-3
|
||||
sigma_scale_max: float = 100.0
|
||||
tau: float | None = None
|
||||
tau_prime: float | None = None
|
||||
sigma_min: float = 1e-6
|
||||
sigma_max: float = 10.0
|
||||
best_value_threshold: float | None = None
|
||||
max_stagnation_generations: int | None = None
|
||||
save_generations: list[int] | None = None
|
||||
results_dir: str = "results"
|
||||
log_every_generation: bool = False
|
||||
seed: int | None = None
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
assert self.dimension == self.x_min.shape[0] == self.x_max.shape[0], (
|
||||
"Bounds dimensionality must match the dimension of the problem"
|
||||
)
|
||||
assert 0 < self.mu <= self.lambda_, "Require mu <= lambda and positive"
|
||||
assert 0.0 < self.mutation_probability <= 1.0, (
|
||||
"Mutation probability must be within (0, 1]"
|
||||
)
|
||||
if isinstance(self.initial_sigma, (int, float)):
|
||||
if self.initial_sigma <= 0:
|
||||
raise ValueError("Initial sigma must be positive")
|
||||
else:
|
||||
if self.initial_sigma.shape != (self.dimension,):
|
||||
raise ValueError("initial_sigma must be scalar or an array of given dimension")
|
||||
if np.any(self.initial_sigma <= 0):
|
||||
raise ValueError("All sigma values must be positive")
|
||||
|
||||
if self.tau is None:
|
||||
object.__setattr__(self, "tau", 1.0 / math.sqrt(2.0 * math.sqrt(self.dimension)))
|
||||
if self.tau_prime is None:
|
||||
object.__setattr__(self, "tau_prime", 1.0 / math.sqrt(2.0 * self.dimension))
|
||||
|
||||
def make_initial_sigma(self) -> Array:
|
||||
if isinstance(self.initial_sigma, (int, float)):
|
||||
return np.full(self.dimension, float(self.initial_sigma), dtype=np.float64)
|
||||
return self.initial_sigma.astype(np.float64, copy=True)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helper utilities
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def clear_results_directory(results_dir: str) -> None:
|
||||
if os.path.exists(results_dir):
|
||||
shutil.rmtree(results_dir)
|
||||
os.makedirs(results_dir, exist_ok=True)
|
||||
|
||||
|
||||
def evaluate_population(population: Iterable[Individual], fitness_func: FitnessFn) -> None:
|
||||
for individual in population:
|
||||
individual.fitness = float(fitness_func(individual.x))
|
||||
|
||||
|
||||
def recombine(
|
||||
parents: Sequence[Individual],
|
||||
config: EvolutionStrategyConfig,
|
||||
) -> tuple[Array, Array, float]:
|
||||
"""Recombine parent individuals before mutation.
|
||||
|
||||
Returns the base vector, sigma and the best parent fitness.
|
||||
"""
|
||||
if config.recombination == "none" or config.parents_per_offspring == 1:
|
||||
parent = random.choice(parents)
|
||||
return parent.x.copy(), parent.sigma.copy(), parent.fitness
|
||||
|
||||
selected = random.choices(parents, k=config.parents_per_offspring)
|
||||
if config.recombination == "intermediate":
|
||||
x = np.mean([p.x for p in selected], axis=0)
|
||||
sigma = np.mean([p.sigma for p in selected], axis=0)
|
||||
elif config.recombination == "discrete":
|
||||
mask = np.random.randint(0, len(selected), size=config.dimension)
|
||||
indices = np.arange(config.dimension)
|
||||
x = np.array([selected[mask[i]].x[i] for i in indices], dtype=np.float64)
|
||||
sigma = np.array([selected[mask[i]].sigma[i] for i in indices], dtype=np.float64)
|
||||
else: # pragma: no cover - defensive
|
||||
raise ValueError(f"Unsupported recombination type: {config.recombination}")
|
||||
|
||||
parent_fitness = min(p.fitness for p in selected)
|
||||
return x, sigma, parent_fitness
|
||||
|
||||
|
||||
def mutate(
|
||||
x: Array,
|
||||
sigma: Array,
|
||||
config: EvolutionStrategyConfig,
|
||||
sigma_scale: float,
|
||||
) -> tuple[Array, Array]:
|
||||
"""Apply log-normal mutation with optional per-coordinate masking."""
|
||||
global_noise = np.random.normal()
|
||||
coordinate_noise = np.random.normal(size=config.dimension)
|
||||
sigma_new = sigma * np.exp(config.tau_prime * global_noise + config.tau * coordinate_noise)
|
||||
sigma_new = np.clip(sigma_new, config.sigma_min, config.sigma_max)
|
||||
sigma_new = np.clip(sigma_new * sigma_scale, config.sigma_min, config.sigma_max)
|
||||
|
||||
steps = np.random.normal(size=config.dimension) * sigma_new
|
||||
|
||||
if config.mutation_probability < 1.0:
|
||||
mask = np.random.random(config.dimension) < config.mutation_probability
|
||||
if not np.any(mask):
|
||||
mask[np.random.randint(0, config.dimension)] = True
|
||||
steps = steps * mask
|
||||
sigma_new = np.where(mask, sigma_new, sigma)
|
||||
|
||||
x_new = np.clip(x + steps, config.x_min, config.x_max)
|
||||
return x_new, sigma_new
|
||||
|
||||
|
||||
def create_offspring(
|
||||
parents: Sequence[Individual],
|
||||
config: EvolutionStrategyConfig,
|
||||
sigma_scale: float,
|
||||
) -> tuple[list[Individual], list[bool]]:
|
||||
offspring: list[Individual] = []
|
||||
successes: list[bool] = []
|
||||
|
||||
for _ in range(config.lambda_):
|
||||
base_x, base_sigma, best_parent_fitness = recombine(parents, config)
|
||||
mutated_x, mutated_sigma = mutate(base_x, base_sigma, config, sigma_scale)
|
||||
fitness = float(config.fitness_func(mutated_x))
|
||||
child = Individual(mutated_x, mutated_sigma, fitness)
|
||||
offspring.append(child)
|
||||
successes.append(fitness < best_parent_fitness)
|
||||
|
||||
return offspring, successes
|
||||
|
||||
|
||||
def select_next_generation(
|
||||
parents: list[Individual],
|
||||
offspring: list[Individual],
|
||||
config: EvolutionStrategyConfig,
|
||||
) -> list[Individual]:
|
||||
if config.selection == "plus":
|
||||
pool = parents + offspring
|
||||
else:
|
||||
pool = offspring
|
||||
|
||||
pool.sort(key=lambda ind: ind.fitness)
|
||||
next_generation = [ind.copy() for ind in pool[: config.mu]]
|
||||
return next_generation
|
||||
|
||||
|
||||
def compute_best(population: Sequence[Individual]) -> Individual:
|
||||
best = min(population, key=lambda ind: ind.fitness)
|
||||
return best.copy()
|
||||
|
||||
|
||||
def build_generation(
|
||||
number: int,
|
||||
population: list[Individual],
|
||||
sigma_scale: float,
|
||||
) -> Generation:
|
||||
copies = tuple(ind.copy() for ind in population)
|
||||
best = compute_best(copies)
|
||||
mean_fitness = float(np.mean([ind.fitness for ind in copies]))
|
||||
return Generation(number, copies, best, mean_fitness, sigma_scale)
|
||||
|
||||
|
||||
def save_generation(generation: Generation, config: EvolutionStrategyConfig) -> None:
|
||||
if config.dimension != 2:
|
||||
raise ValueError("Visualization is only supported for 2D problems")
|
||||
|
||||
os.makedirs(config.results_dir, exist_ok=True)
|
||||
|
||||
fig = plt.figure(figsize=(21, 7))
|
||||
fig.suptitle(
|
||||
(
|
||||
f"Поколение #{generation.number}. "
|
||||
f"Лучшее значение: {generation.best.fitness:.6f}. "
|
||||
f"Среднее: {generation.mean_fitness:.6f}. "
|
||||
f"Масштаб σ: {generation.sigma_scale:.4f}"
|
||||
),
|
||||
fontsize=14,
|
||||
y=0.88,
|
||||
)
|
||||
|
||||
ax_contour = fig.add_subplot(1, 3, 1)
|
||||
plot_fitness_contour(config.fitness_func, config.x_min, config.x_max, ax_contour)
|
||||
arr = np.array([ind.x for ind in generation.population])
|
||||
ax_contour.scatter(arr[:, 1], arr[:, 0], c="red", s=20, alpha=0.9)
|
||||
ax_contour.scatter(
|
||||
generation.best.x[1], generation.best.x[0], c="black", s=60, marker="*", label="Лучший"
|
||||
)
|
||||
ax_contour.legend(loc="upper right")
|
||||
ax_contour.text(0.5, -0.25, "(a)", transform=ax_contour.transAxes, ha="center", fontsize=16)
|
||||
|
||||
views = [(50, -45), (60, 30)]
|
||||
fitnesses = np.array([ind.fitness for ind in generation.population])
|
||||
|
||||
for idx, (elev, azim) in enumerate(views, start=1):
|
||||
ax = fig.add_subplot(1, 3, idx + 1, projection="3d", computed_zorder=False)
|
||||
plot_fitness_surface(config.fitness_func, config.x_min, config.x_max, ax)
|
||||
ax.scatter(arr[:, 0], arr[:, 1], fitnesses, c="red", s=12, alpha=0.9)
|
||||
ax.scatter(
|
||||
generation.best.x[0],
|
||||
generation.best.x[1],
|
||||
generation.best.fitness,
|
||||
c="black",
|
||||
s=60,
|
||||
marker="*",
|
||||
)
|
||||
ax.view_init(elev=elev, azim=azim)
|
||||
label = chr(ord("a") + idx)
|
||||
ax.text2D(0.5, -0.15, f"({label})", transform=ax.transAxes, ha="center", fontsize=16)
|
||||
ax.set_xlabel("X₁")
|
||||
ax.set_ylabel("X₂")
|
||||
ax.set_zlabel("f(x)")
|
||||
|
||||
filename = os.path.join(config.results_dir, f"generation_{generation.number:03d}.png")
|
||||
fig.savefig(filename, dpi=150, bbox_inches="tight")
|
||||
plt.close(fig)
|
||||
|
||||
|
||||
def plot_fitness_surface(
|
||||
fitness_func: FitnessFn,
|
||||
x_min: Array,
|
||||
x_max: Array,
|
||||
ax: Axes3D,
|
||||
num_points: int = 100,
|
||||
) -> None:
|
||||
if x_min.shape != (2,) or x_max.shape != (2,):
|
||||
raise ValueError("Surface plotting is only available for 2D functions")
|
||||
|
||||
xs = np.linspace(x_min[0], x_max[0], num_points)
|
||||
ys = np.linspace(x_min[1], x_max[1], num_points)
|
||||
X, Y = np.meshgrid(xs, ys)
|
||||
vectorized = np.vectorize(lambda a, b: fitness_func(np.array([a, b])))
|
||||
Z = vectorized(X, Y)
|
||||
ax.plot_surface(X, Y, Z, cmap="viridis", edgecolor="none", alpha=0.7, shade=False)
|
||||
|
||||
|
||||
def plot_fitness_contour(
|
||||
fitness_func: FitnessFn,
|
||||
x_min: Array,
|
||||
x_max: Array,
|
||||
ax: Axes,
|
||||
num_points: int = 100,
|
||||
) -> None:
|
||||
xs = np.linspace(x_min[0], x_max[0], num_points)
|
||||
ys = np.linspace(x_min[1], x_max[1], num_points)
|
||||
X, Y = np.meshgrid(xs, ys)
|
||||
vectorized = np.vectorize(lambda a, b: fitness_func(np.array([a, b])))
|
||||
Z = vectorized(X, Y)
|
||||
contour = ax.contourf(Y, X, Z, levels=25, cmap="viridis", alpha=0.8)
|
||||
plt.colorbar(contour, ax=ax, shrink=0.6)
|
||||
ax.set_aspect("equal")
|
||||
ax.set_xlabel("X₂")
|
||||
ax.set_ylabel("X₁")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main algorithm
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def run_evolution_strategy(config: EvolutionStrategyConfig) -> EvolutionStrategyResult:
|
||||
if config.seed is not None:
|
||||
random.seed(config.seed)
|
||||
np.random.seed(config.seed)
|
||||
|
||||
if config.save_generations:
|
||||
clear_results_directory(config.results_dir)
|
||||
|
||||
start = time.perf_counter()
|
||||
|
||||
parents = [
|
||||
Individual(
|
||||
np.random.uniform(config.x_min, config.x_max),
|
||||
config.make_initial_sigma(),
|
||||
0.0,
|
||||
)
|
||||
for _ in range(config.mu)
|
||||
]
|
||||
evaluate_population(parents, config.fitness_func)
|
||||
|
||||
sigma_scale = 1.0
|
||||
success_window: deque[float] = deque()
|
||||
history: list[Generation] = []
|
||||
best_overall: Generation | None = None
|
||||
stagnation_counter = 0
|
||||
|
||||
for generation_number in range(1, config.max_generations + 1):
|
||||
current_generation = build_generation(generation_number, parents, sigma_scale)
|
||||
history.append(current_generation)
|
||||
|
||||
if config.log_every_generation:
|
||||
print(
|
||||
f"Generation #{generation_number}: best={current_generation.best.fitness:.6f} "
|
||||
f"mean={current_generation.mean_fitness:.6f}"
|
||||
)
|
||||
|
||||
if (
|
||||
best_overall is None
|
||||
or current_generation.best.fitness < best_overall.best.fitness
|
||||
):
|
||||
best_overall = current_generation
|
||||
stagnation_counter = 0
|
||||
else:
|
||||
stagnation_counter += 1
|
||||
|
||||
if (
|
||||
config.best_value_threshold is not None
|
||||
and current_generation.best.fitness <= config.best_value_threshold
|
||||
):
|
||||
break
|
||||
|
||||
if (
|
||||
config.max_stagnation_generations is not None
|
||||
and stagnation_counter >= config.max_stagnation_generations
|
||||
):
|
||||
break
|
||||
|
||||
offspring, successes = create_offspring(parents, config, sigma_scale)
|
||||
success_ratio = sum(successes) / len(successes) if successes else 0.0
|
||||
success_window.append(success_ratio)
|
||||
|
||||
if len(success_window) == config.success_rule_window:
|
||||
average_success = sum(success_window) / len(success_window)
|
||||
if average_success > config.success_rule_target:
|
||||
sigma_scale = min(
|
||||
sigma_scale * config.sigma_increase, config.sigma_scale_max
|
||||
)
|
||||
elif average_success < config.success_rule_target:
|
||||
sigma_scale = max(
|
||||
sigma_scale * config.sigma_decrease, config.sigma_scale_min
|
||||
)
|
||||
success_window.clear()
|
||||
|
||||
parents = select_next_generation(parents, offspring, config)
|
||||
|
||||
if config.save_generations and (
|
||||
generation_number in config.save_generations
|
||||
or generation_number == config.max_generations
|
||||
):
|
||||
save_generation(current_generation, config)
|
||||
|
||||
end = time.perf_counter()
|
||||
|
||||
assert best_overall is not None
|
||||
|
||||
# Сохраняем последнее поколение, если нужно
|
||||
if config.save_generations and history:
|
||||
last_number = history[-1].number
|
||||
if last_number not in config.save_generations:
|
||||
save_generation(history[-1], config)
|
||||
|
||||
return EvolutionStrategyResult(
|
||||
generations_count=len(history),
|
||||
best_generation=best_overall,
|
||||
history=history,
|
||||
time_ms=(end - start) * 1000.0,
|
||||
)
|
||||
Reference in New Issue
Block a user