Files
2025-10-15 16:43:11 +03:00

532 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import random
import shutil
import time
from copy import deepcopy
from dataclasses import asdict, dataclass
from typing import Callable
import numpy as np
import plotly.graph_objects as go
from matplotlib import pyplot as plt
from matplotlib.axes import Axes
from mpl_toolkits.mplot3d import Axes3D
from numpy.typing import NDArray
type Cites = list[tuple[float, float]]
type InitializePopulationFn = Callable[[int, Cites], Population]
type Chromosome = list[int]
type Population = list[Chromosome]
type Fitnesses = NDArray[np.float64]
type FitnessFn = Callable[[Chromosome], float]
type CrossoverFn = Callable[[Chromosome, Chromosome], tuple[Chromosome, Chromosome]]
type MutationFn = Callable[[Chromosome], Chromosome]
@dataclass
class GARunConfig:
fitness_func: FitnessFn
cities: Cites
initialize_population_fn: InitializePopulationFn
crossover_fn: CrossoverFn
mutation_fn: MutationFn
pop_size: int # размер популяции
pc: float # вероятность кроссинговера
pm: float # вероятность мутации
max_generations: int # максимальное количество поколений
elitism: int = (
0 # сколько лучших особей перенести без изменения в следующее поколение
)
max_best_repetitions: int | None = (
None # остановка при повторении лучшего результата
)
seed: int | None = None # seed для генератора случайных чисел
minimize: bool = False # если True, ищем минимум вместо максимума
save_generations: list[int] | None = (
None # индексы поколений для сохранения графиков
)
results_dir: str = "results" # папка для сохранения графиков
fitness_avg_threshold: float | None = (
None # порог среднего значения фитнес функции для остановки
)
best_value_threshold: float | None = (
None # остановка при достижении значения фитнеса лучше заданного
)
log_every_generation: bool = False # логировать каждое поколение
def save(self, filename: str = "GARunConfig.txt"):
"""Сохраняет конфиг в results_dir."""
os.makedirs(self.results_dir, exist_ok=True)
path = os.path.join(self.results_dir, filename)
with open(path, "w", encoding="utf-8") as f:
for k, v in asdict(self).items():
f.write(f"{k}: {v}\n")
@dataclass(frozen=True)
class Generation:
number: int
best: Chromosome
best_fitness: float
population: Population
fitnesses: Fitnesses
@dataclass(frozen=True)
class GARunResult:
generations_count: int
best_generation: Generation
history: list[Generation]
time_ms: float
def save(self, path: str, filename: str = "GARunResult.txt"):
"""Сохраняет конфиг в results_dir."""
os.makedirs(path, exist_ok=True)
path = os.path.join(path, filename)
with open(path, "w", encoding="utf-8") as f:
for k, v in asdict(self).items():
if k == "history":
continue
if k == "best_generation":
f.write(
f"{k}: Number: {v['number']}, Best Fitness: {v['best_fitness']}, Best: {v['best']}\n"
)
else:
f.write(f"{k}: {v}\n")
def initialize_random_population(pop_size: int, cities: Cites) -> Population:
"""Инициализирует популяцию случайными маршрутами без повторений городов."""
return [random.sample(range(len(cities)), len(cities)) for _ in range(pop_size)]
def reproduction(population: Population, fitnesses: Fitnesses) -> Population:
"""Репродукция (селекция) методом рулетки.
Чем больше значение фитнеса, тем больше вероятность выбора особи. Для минимизации
значения фитнеса нужно предварительно инвертировать.
"""
# Чтобы работать с отрицательными f, сдвигаем значения фитнес функции на минимальное
# значение в популяции. Вычитаем min_fit, т. к. min_fit может быть отрицательным.
min_fit = np.min(fitnesses)
shifted_fitnesses = fitnesses - min_fit + 1e-12
# Получаем вероятности для каждой особи
probs = shifted_fitnesses / np.sum(shifted_fitnesses)
cum = np.cumsum(probs)
# Выбираем особей методом рулетки
selected = []
for _ in population:
r = np.random.random()
idx = int(np.searchsorted(cum, r, side="left"))
selected.append(population[idx])
return selected
def partially_mapped_crossover_fn(
p1: Chromosome,
p2: Chromosome,
cut1: int | None = None,
cut2: int | None = None,
) -> tuple[Chromosome, Chromosome]:
n = len(p1)
# если разрезы не заданы — выберем случайные
if cut1 is None or cut2 is None:
cut1 = random.randint(1, n - 2) # [1, n-2]
cut2 = random.randint(cut1 + 1, n - 1) # (cut1, n-1]
# отображения внутри среднего сегмента
mapping12 = {p1[i]: p2[i] for i in range(cut1, cut2)}
mapping21 = {p2[i]: p1[i] for i in range(cut1, cut2)}
# будущие потомки
o1 = p2[:cut1] + p1[cut1:cut2] + p2[cut2:]
o2 = p1[:cut1] + p2[cut1:cut2] + p1[cut2:]
# разрешаем конфликты по цепочке
def resolve(x: int, mapping: dict[int, int]) -> int:
while x in mapping:
x = mapping[x]
return x
# исправляем только вне среднего сегмента
for i in (*range(0, cut1), *range(cut2, n)):
o1[i] = resolve(o1[i], mapping12)
o2[i] = resolve(o2[i], mapping21)
return o1, o2
def ordered_crossover_fn(
p1: Chromosome,
p2: Chromosome,
cut1: int | None = None,
cut2: int | None = None,
) -> tuple[Chromosome, Chromosome]:
n = len(p1)
# если разрезы не заданы — выберем случайные корректно
if cut1 is None or cut2 is None:
cut1 = random.randint(1, n - 2) # [1, n-2]
cut2 = random.randint(cut1 + 1, n - 1) # [cut1+1, n-1]
# --- o1: сегмент от p1, остальное — порядок из p2
o1: Chromosome = [None] * n # type: ignore
o1[cut1:cut2] = p1[cut1:cut2]
segment1 = set(p1[cut1:cut2])
fill_idx = cut2 % n
for x in (p2[i % n] for i in range(cut2, cut2 + n)):
if x not in segment1:
# прокручиваем fill_idx до ближайшей пустой ячейки
while o1[fill_idx] is not None:
fill_idx = (fill_idx + 1) % n
o1[fill_idx] = x
fill_idx = (fill_idx + 1) % n
# --- o2: сегмент от p2, остальное — порядок из p1
o2: Chromosome = [None] * n # type: ignore
o2[cut1:cut2] = p2[cut1:cut2]
segment2 = set(p2[cut1:cut2])
fill_idx = cut2 % n
for x in (p1[i % n] for i in range(cut2, cut2 + n)):
if x not in segment2:
while o2[fill_idx] is not None:
fill_idx = (fill_idx + 1) % n
o2[fill_idx] = x
fill_idx = (fill_idx + 1) % n
return o1, o2
def cycle_crossover_fn(p1: Chromosome, p2: Chromosome) -> tuple[Chromosome, Chromosome]:
n = len(p1)
o1 = [None] * n
o2 = [None] * n
# быстрый поиск позиций элементов p1
pos_in_p1 = {val: i for i, val in enumerate(p1)}
used = [False] * n
cycle_index = 0
for start in range(n):
if used[start]:
continue
# строим цикл индексов
idx = start
cycle = []
while not used[idx]:
used[idx] = True
cycle.append(idx)
# переход: idx -> элемент p2[idx] -> его позиция в p1
val = p2[idx]
idx = pos_in_p1[val]
# нечётные циклы: из p1 в o1, из p2 в o2
# чётные циклы: наоборот
if cycle_index % 2 == 0:
for i in cycle:
o1[i] = p1[i]
o2[i] = p2[i]
else:
for i in cycle:
o1[i] = p2[i]
o2[i] = p1[i]
cycle_index += 1
return o1, o2 # type: ignore
def crossover(
population: Population,
pc: float,
crossover_fn: CrossoverFn,
) -> Population:
"""Оператор кроссинговера (скрещивания) выполняется с заданной вероятностью pc.
Две хромосомы (родители) выбираются случайно из промежуточной популяции.
Если популяция нечетного размера, то последняя хромосома скрещивается со случайной
другой хромосомой из популяции. В таком случае одна из хромосом может поучаствовать
в кроссовере дважды.
"""
# Создаем копию популяции и перемешиваем её для случайного выбора пар
shuffled_population = population.copy()
np.random.shuffle(shuffled_population)
next_population = []
pop_size = len(shuffled_population)
for i in range(0, pop_size, 2):
p1 = shuffled_population[i]
p2 = shuffled_population[(i + 1) % pop_size]
if np.random.random() <= pc:
p1, p2 = crossover_fn(p1, p2)
next_population.append(p1)
next_population.append(p2)
return next_population[:pop_size]
def swap_mutation_fn(chrom: Chromosome) -> Chromosome:
"""Меняем два случайных города в маршруте местами."""
chrom = chrom.copy()
a, b = random.sample(range(len(chrom)), 2)
chrom[a], chrom[b] = chrom[b], chrom[a]
return chrom
def inversion_mutation_fn(chrom: Chromosome) -> Chromosome:
"""Инвертируем случайный сегмент маршрута."""
chrom = chrom.copy()
a, b = sorted(random.sample(range(len(chrom)), 2))
chrom[a:b] = reversed(chrom[a:b])
return chrom
def insertion_mutation_fn(chrom: Chromosome) -> Chromosome:
"""Вырезаем случайный город и вставляем его в случайное место маршрута."""
chrom = chrom.copy()
a, b = random.sample(range(len(chrom)), 2)
city = chrom.pop(a)
chrom.insert(b, city)
return chrom
def mutation(population: Population, pm: float, mutation_fn: MutationFn) -> Population:
"""Мутация происходит с вероятностью pm."""
next_population = []
for chrom in population:
next_population.append(
mutation_fn(chrom) if np.random.random() <= pm else chrom
)
return next_population
def clear_results_directory(results_dir: str) -> None:
"""Очищает папку с результатами перед началом эксперимента."""
if os.path.exists(results_dir):
shutil.rmtree(results_dir)
os.makedirs(results_dir, exist_ok=True)
def eval_population(population: Population, fitness_func: FitnessFn) -> Fitnesses:
return np.array([fitness_func(chrom) for chrom in population])
def plot_tour(cities: list[tuple[float, float]], tour: list[int], ax: Axes):
"""Рисует маршрут обхода городов."""
x = [cities[i][0] for i in tour]
y = [cities[i][1] for i in tour]
ax.plot(x + [x[0]], y + [y[0]], "k-", linewidth=1)
ax.plot(x, y, "ro", markersize=4)
# for i, (cx, cy) in enumerate(cities):
# plt.text(cx, cy, str(i), fontsize=7, ha="right", va="bottom")
ax.axis("equal")
def save_generation(
generation: Generation, history: list[Generation], config: GARunConfig
) -> None:
os.makedirs(config.results_dir, exist_ok=True)
fig = plt.figure(figsize=(7, 7))
fig.suptitle(
f"Поколение #{generation.number}. "
f"Лучшая особь: {generation.best_fitness:.0f}. "
f"Среднее значение: {np.mean(generation.fitnesses):.0f}",
fontsize=14,
y=0.95,
)
# Рисуем лучший маршрут в поколении
ax = fig.add_subplot(1, 1, 1)
plot_tour(config.cities, generation.best, ax)
filename = f"generation_{generation.number:03d}.png"
path_png = os.path.join(config.results_dir, filename)
fig.savefig(path_png, dpi=150, bbox_inches="tight")
plt.close(fig)
def genetic_algorithm(config: GARunConfig) -> GARunResult:
if config.seed is not None:
random.seed(config.seed)
np.random.seed(config.seed)
if config.save_generations:
clear_results_directory(config.results_dir)
population = config.initialize_population_fn(config.pop_size, config.cities)
start = time.perf_counter()
history: list[Generation] = []
best: Generation | None = None
generation_number = 1
best_repetitions = 0
while True:
# Вычисляем фитнес для всех особей в популяции
fitnesses = eval_population(population, config.fitness_func)
# Сохраняем лучших особей для переноса в следующее поколение
elites: list[Chromosome] = []
if config.elitism:
elites = deepcopy(
[
population[i]
for i in sorted(
range(len(fitnesses)),
key=lambda i: fitnesses[i],
reverse=not config.minimize,
)
][: config.elitism]
)
# Находим лучшую особь в поколении
best_index = (
int(np.argmin(fitnesses)) if config.minimize else int(np.argmax(fitnesses))
)
# Добавляем эпоху в историю
current = Generation(
number=generation_number,
best=population[best_index],
best_fitness=fitnesses[best_index],
population=deepcopy(population),
fitnesses=deepcopy(fitnesses),
)
history.append(current)
if config.log_every_generation:
print(
f"Generation #{generation_number} best: {current.best_fitness},"
f" avg: {np.mean(current.fitnesses)}"
)
# Обновляем лучшую эпоху
if (
best is None
or (config.minimize and current.best_fitness < best.best_fitness)
or (not config.minimize and current.best_fitness > best.best_fitness)
):
best = current
# Проверка критериев остановки
stop_algorithm = False
if generation_number >= config.max_generations:
stop_algorithm = True
if config.max_best_repetitions is not None and generation_number > 1:
if history[-2].best_fitness == current.best_fitness:
best_repetitions += 1
if best_repetitions == config.max_best_repetitions:
stop_algorithm = True
else:
best_repetitions = 0
# if config.variance_threshold is not None:
# fitness_variance = np.var(fitnesses)
# if fitness_variance < config.variance_threshold:
# stop_algorithm = True
if config.best_value_threshold is not None:
if (
config.minimize and current.best_fitness < config.best_value_threshold
) or (
not config.minimize
and current.best_fitness > config.best_value_threshold
):
stop_algorithm = True
if config.fitness_avg_threshold is not None:
mean_fitness = np.mean(fitnesses)
if (config.minimize and mean_fitness < config.fitness_avg_threshold) or (
not config.minimize and mean_fitness > config.fitness_avg_threshold
):
stop_algorithm = True
# Сохраняем указанные поколения и последнее поколение
if config.save_generations and (
stop_algorithm or generation_number in config.save_generations
):
save_generation(current, history, config)
if stop_algorithm:
break
# селекция (для минимума инвертируем знак)
parents = reproduction(
population, fitnesses if not config.minimize else -fitnesses
)
# кроссинговер попарно
next_population = crossover(parents, config.pc, config.crossover_fn)
# мутация
next_population = mutation(
next_population,
config.pm,
config.mutation_fn,
)
# Вставляем элиту в новую популяцию
population = next_population[: config.pop_size - config.elitism] + elites
generation_number += 1
end = time.perf_counter()
assert best is not None, "Best was never set"
return GARunResult(
len(history),
best,
history,
(end - start) * 1000.0,
)
def plot_fitness_history(result: GARunResult, save_path: str | None = None) -> None:
"""Рисует график изменения лучших и средних значений фитнеса по поколениям."""
generations = [gen.number for gen in result.history]
best_fitnesses = [gen.best_fitness for gen in result.history]
avg_fitnesses = [np.mean(gen.fitnesses) for gen in result.history]
fig, ax = plt.subplots(figsize=(10, 6))
ax.plot(
generations, best_fitnesses, label="Лучшее значение", linewidth=2, color="blue"
)
ax.plot(
generations,
avg_fitnesses,
label="Среднее значение",
linewidth=2,
color="orange",
)
ax.set_xlabel("Поколение", fontsize=12)
ax.set_ylabel("Значение фитнес-функции", fontsize=12)
ax.legend(fontsize=11)
ax.grid(True, alpha=0.3)
if save_path:
fig.savefig(save_path, dpi=150, bbox_inches="tight")
print(f"График сохранен в {save_path}")
else:
plt.show()
plt.close(fig)