This commit is contained in:
2025-10-15 16:43:11 +03:00
parent 2cf0693070
commit 740a7be984
18 changed files with 2087 additions and 0 deletions

531
lab3/gen.py Normal file
View File

@@ -0,0 +1,531 @@
import os
import random
import shutil
import time
from copy import deepcopy
from dataclasses import asdict, dataclass
from typing import Callable
import numpy as np
import plotly.graph_objects as go
from matplotlib import pyplot as plt
from matplotlib.axes import Axes
from mpl_toolkits.mplot3d import Axes3D
from numpy.typing import NDArray
type Cites = list[tuple[float, float]]
type InitializePopulationFn = Callable[[int, Cites], Population]
type Chromosome = list[int]
type Population = list[Chromosome]
type Fitnesses = NDArray[np.float64]
type FitnessFn = Callable[[Chromosome], float]
type CrossoverFn = Callable[[Chromosome, Chromosome], tuple[Chromosome, Chromosome]]
type MutationFn = Callable[[Chromosome], Chromosome]
@dataclass
class GARunConfig:
fitness_func: FitnessFn
cities: Cites
initialize_population_fn: InitializePopulationFn
crossover_fn: CrossoverFn
mutation_fn: MutationFn
pop_size: int # размер популяции
pc: float # вероятность кроссинговера
pm: float # вероятность мутации
max_generations: int # максимальное количество поколений
elitism: int = (
0 # сколько лучших особей перенести без изменения в следующее поколение
)
max_best_repetitions: int | None = (
None # остановка при повторении лучшего результата
)
seed: int | None = None # seed для генератора случайных чисел
minimize: bool = False # если True, ищем минимум вместо максимума
save_generations: list[int] | None = (
None # индексы поколений для сохранения графиков
)
results_dir: str = "results" # папка для сохранения графиков
fitness_avg_threshold: float | None = (
None # порог среднего значения фитнес функции для остановки
)
best_value_threshold: float | None = (
None # остановка при достижении значения фитнеса лучше заданного
)
log_every_generation: bool = False # логировать каждое поколение
def save(self, filename: str = "GARunConfig.txt"):
"""Сохраняет конфиг в results_dir."""
os.makedirs(self.results_dir, exist_ok=True)
path = os.path.join(self.results_dir, filename)
with open(path, "w", encoding="utf-8") as f:
for k, v in asdict(self).items():
f.write(f"{k}: {v}\n")
@dataclass(frozen=True)
class Generation:
number: int
best: Chromosome
best_fitness: float
population: Population
fitnesses: Fitnesses
@dataclass(frozen=True)
class GARunResult:
generations_count: int
best_generation: Generation
history: list[Generation]
time_ms: float
def save(self, path: str, filename: str = "GARunResult.txt"):
"""Сохраняет конфиг в results_dir."""
os.makedirs(path, exist_ok=True)
path = os.path.join(path, filename)
with open(path, "w", encoding="utf-8") as f:
for k, v in asdict(self).items():
if k == "history":
continue
if k == "best_generation":
f.write(
f"{k}: Number: {v['number']}, Best Fitness: {v['best_fitness']}, Best: {v['best']}\n"
)
else:
f.write(f"{k}: {v}\n")
def initialize_random_population(pop_size: int, cities: Cites) -> Population:
"""Инициализирует популяцию случайными маршрутами без повторений городов."""
return [random.sample(range(len(cities)), len(cities)) for _ in range(pop_size)]
def reproduction(population: Population, fitnesses: Fitnesses) -> Population:
"""Репродукция (селекция) методом рулетки.
Чем больше значение фитнеса, тем больше вероятность выбора особи. Для минимизации
значения фитнеса нужно предварительно инвертировать.
"""
# Чтобы работать с отрицательными f, сдвигаем значения фитнес функции на минимальное
# значение в популяции. Вычитаем min_fit, т. к. min_fit может быть отрицательным.
min_fit = np.min(fitnesses)
shifted_fitnesses = fitnesses - min_fit + 1e-12
# Получаем вероятности для каждой особи
probs = shifted_fitnesses / np.sum(shifted_fitnesses)
cum = np.cumsum(probs)
# Выбираем особей методом рулетки
selected = []
for _ in population:
r = np.random.random()
idx = int(np.searchsorted(cum, r, side="left"))
selected.append(population[idx])
return selected
def partially_mapped_crossover_fn(
p1: Chromosome,
p2: Chromosome,
cut1: int | None = None,
cut2: int | None = None,
) -> tuple[Chromosome, Chromosome]:
n = len(p1)
# если разрезы не заданы — выберем случайные
if cut1 is None or cut2 is None:
cut1 = random.randint(1, n - 2) # [1, n-2]
cut2 = random.randint(cut1 + 1, n - 1) # (cut1, n-1]
# отображения внутри среднего сегмента
mapping12 = {p1[i]: p2[i] for i in range(cut1, cut2)}
mapping21 = {p2[i]: p1[i] for i in range(cut1, cut2)}
# будущие потомки
o1 = p2[:cut1] + p1[cut1:cut2] + p2[cut2:]
o2 = p1[:cut1] + p2[cut1:cut2] + p1[cut2:]
# разрешаем конфликты по цепочке
def resolve(x: int, mapping: dict[int, int]) -> int:
while x in mapping:
x = mapping[x]
return x
# исправляем только вне среднего сегмента
for i in (*range(0, cut1), *range(cut2, n)):
o1[i] = resolve(o1[i], mapping12)
o2[i] = resolve(o2[i], mapping21)
return o1, o2
def ordered_crossover_fn(
p1: Chromosome,
p2: Chromosome,
cut1: int | None = None,
cut2: int | None = None,
) -> tuple[Chromosome, Chromosome]:
n = len(p1)
# если разрезы не заданы — выберем случайные корректно
if cut1 is None or cut2 is None:
cut1 = random.randint(1, n - 2) # [1, n-2]
cut2 = random.randint(cut1 + 1, n - 1) # [cut1+1, n-1]
# --- o1: сегмент от p1, остальное — порядок из p2
o1: Chromosome = [None] * n # type: ignore
o1[cut1:cut2] = p1[cut1:cut2]
segment1 = set(p1[cut1:cut2])
fill_idx = cut2 % n
for x in (p2[i % n] for i in range(cut2, cut2 + n)):
if x not in segment1:
# прокручиваем fill_idx до ближайшей пустой ячейки
while o1[fill_idx] is not None:
fill_idx = (fill_idx + 1) % n
o1[fill_idx] = x
fill_idx = (fill_idx + 1) % n
# --- o2: сегмент от p2, остальное — порядок из p1
o2: Chromosome = [None] * n # type: ignore
o2[cut1:cut2] = p2[cut1:cut2]
segment2 = set(p2[cut1:cut2])
fill_idx = cut2 % n
for x in (p1[i % n] for i in range(cut2, cut2 + n)):
if x not in segment2:
while o2[fill_idx] is not None:
fill_idx = (fill_idx + 1) % n
o2[fill_idx] = x
fill_idx = (fill_idx + 1) % n
return o1, o2
def cycle_crossover_fn(p1: Chromosome, p2: Chromosome) -> tuple[Chromosome, Chromosome]:
n = len(p1)
o1 = [None] * n
o2 = [None] * n
# быстрый поиск позиций элементов p1
pos_in_p1 = {val: i for i, val in enumerate(p1)}
used = [False] * n
cycle_index = 0
for start in range(n):
if used[start]:
continue
# строим цикл индексов
idx = start
cycle = []
while not used[idx]:
used[idx] = True
cycle.append(idx)
# переход: idx -> элемент p2[idx] -> его позиция в p1
val = p2[idx]
idx = pos_in_p1[val]
# нечётные циклы: из p1 в o1, из p2 в o2
# чётные циклы: наоборот
if cycle_index % 2 == 0:
for i in cycle:
o1[i] = p1[i]
o2[i] = p2[i]
else:
for i in cycle:
o1[i] = p2[i]
o2[i] = p1[i]
cycle_index += 1
return o1, o2 # type: ignore
def crossover(
population: Population,
pc: float,
crossover_fn: CrossoverFn,
) -> Population:
"""Оператор кроссинговера (скрещивания) выполняется с заданной вероятностью pc.
Две хромосомы (родители) выбираются случайно из промежуточной популяции.
Если популяция нечетного размера, то последняя хромосома скрещивается со случайной
другой хромосомой из популяции. В таком случае одна из хромосом может поучаствовать
в кроссовере дважды.
"""
# Создаем копию популяции и перемешиваем её для случайного выбора пар
shuffled_population = population.copy()
np.random.shuffle(shuffled_population)
next_population = []
pop_size = len(shuffled_population)
for i in range(0, pop_size, 2):
p1 = shuffled_population[i]
p2 = shuffled_population[(i + 1) % pop_size]
if np.random.random() <= pc:
p1, p2 = crossover_fn(p1, p2)
next_population.append(p1)
next_population.append(p2)
return next_population[:pop_size]
def swap_mutation_fn(chrom: Chromosome) -> Chromosome:
"""Меняем два случайных города в маршруте местами."""
chrom = chrom.copy()
a, b = random.sample(range(len(chrom)), 2)
chrom[a], chrom[b] = chrom[b], chrom[a]
return chrom
def inversion_mutation_fn(chrom: Chromosome) -> Chromosome:
"""Инвертируем случайный сегмент маршрута."""
chrom = chrom.copy()
a, b = sorted(random.sample(range(len(chrom)), 2))
chrom[a:b] = reversed(chrom[a:b])
return chrom
def insertion_mutation_fn(chrom: Chromosome) -> Chromosome:
"""Вырезаем случайный город и вставляем его в случайное место маршрута."""
chrom = chrom.copy()
a, b = random.sample(range(len(chrom)), 2)
city = chrom.pop(a)
chrom.insert(b, city)
return chrom
def mutation(population: Population, pm: float, mutation_fn: MutationFn) -> Population:
"""Мутация происходит с вероятностью pm."""
next_population = []
for chrom in population:
next_population.append(
mutation_fn(chrom) if np.random.random() <= pm else chrom
)
return next_population
def clear_results_directory(results_dir: str) -> None:
"""Очищает папку с результатами перед началом эксперимента."""
if os.path.exists(results_dir):
shutil.rmtree(results_dir)
os.makedirs(results_dir, exist_ok=True)
def eval_population(population: Population, fitness_func: FitnessFn) -> Fitnesses:
return np.array([fitness_func(chrom) for chrom in population])
def plot_tour(cities: list[tuple[float, float]], tour: list[int], ax: Axes):
"""Рисует маршрут обхода городов."""
x = [cities[i][0] for i in tour]
y = [cities[i][1] for i in tour]
ax.plot(x + [x[0]], y + [y[0]], "k-", linewidth=1)
ax.plot(x, y, "ro", markersize=4)
# for i, (cx, cy) in enumerate(cities):
# plt.text(cx, cy, str(i), fontsize=7, ha="right", va="bottom")
ax.axis("equal")
def save_generation(
generation: Generation, history: list[Generation], config: GARunConfig
) -> None:
os.makedirs(config.results_dir, exist_ok=True)
fig = plt.figure(figsize=(7, 7))
fig.suptitle(
f"Поколение #{generation.number}. "
f"Лучшая особь: {generation.best_fitness:.0f}. "
f"Среднее значение: {np.mean(generation.fitnesses):.0f}",
fontsize=14,
y=0.95,
)
# Рисуем лучший маршрут в поколении
ax = fig.add_subplot(1, 1, 1)
plot_tour(config.cities, generation.best, ax)
filename = f"generation_{generation.number:03d}.png"
path_png = os.path.join(config.results_dir, filename)
fig.savefig(path_png, dpi=150, bbox_inches="tight")
plt.close(fig)
def genetic_algorithm(config: GARunConfig) -> GARunResult:
if config.seed is not None:
random.seed(config.seed)
np.random.seed(config.seed)
if config.save_generations:
clear_results_directory(config.results_dir)
population = config.initialize_population_fn(config.pop_size, config.cities)
start = time.perf_counter()
history: list[Generation] = []
best: Generation | None = None
generation_number = 1
best_repetitions = 0
while True:
# Вычисляем фитнес для всех особей в популяции
fitnesses = eval_population(population, config.fitness_func)
# Сохраняем лучших особей для переноса в следующее поколение
elites: list[Chromosome] = []
if config.elitism:
elites = deepcopy(
[
population[i]
for i in sorted(
range(len(fitnesses)),
key=lambda i: fitnesses[i],
reverse=not config.minimize,
)
][: config.elitism]
)
# Находим лучшую особь в поколении
best_index = (
int(np.argmin(fitnesses)) if config.minimize else int(np.argmax(fitnesses))
)
# Добавляем эпоху в историю
current = Generation(
number=generation_number,
best=population[best_index],
best_fitness=fitnesses[best_index],
population=deepcopy(population),
fitnesses=deepcopy(fitnesses),
)
history.append(current)
if config.log_every_generation:
print(
f"Generation #{generation_number} best: {current.best_fitness},"
f" avg: {np.mean(current.fitnesses)}"
)
# Обновляем лучшую эпоху
if (
best is None
or (config.minimize and current.best_fitness < best.best_fitness)
or (not config.minimize and current.best_fitness > best.best_fitness)
):
best = current
# Проверка критериев остановки
stop_algorithm = False
if generation_number >= config.max_generations:
stop_algorithm = True
if config.max_best_repetitions is not None and generation_number > 1:
if history[-2].best_fitness == current.best_fitness:
best_repetitions += 1
if best_repetitions == config.max_best_repetitions:
stop_algorithm = True
else:
best_repetitions = 0
# if config.variance_threshold is not None:
# fitness_variance = np.var(fitnesses)
# if fitness_variance < config.variance_threshold:
# stop_algorithm = True
if config.best_value_threshold is not None:
if (
config.minimize and current.best_fitness < config.best_value_threshold
) or (
not config.minimize
and current.best_fitness > config.best_value_threshold
):
stop_algorithm = True
if config.fitness_avg_threshold is not None:
mean_fitness = np.mean(fitnesses)
if (config.minimize and mean_fitness < config.fitness_avg_threshold) or (
not config.minimize and mean_fitness > config.fitness_avg_threshold
):
stop_algorithm = True
# Сохраняем указанные поколения и последнее поколение
if config.save_generations and (
stop_algorithm or generation_number in config.save_generations
):
save_generation(current, history, config)
if stop_algorithm:
break
# селекция (для минимума инвертируем знак)
parents = reproduction(
population, fitnesses if not config.minimize else -fitnesses
)
# кроссинговер попарно
next_population = crossover(parents, config.pc, config.crossover_fn)
# мутация
next_population = mutation(
next_population,
config.pm,
config.mutation_fn,
)
# Вставляем элиту в новую популяцию
population = next_population[: config.pop_size - config.elitism] + elites
generation_number += 1
end = time.perf_counter()
assert best is not None, "Best was never set"
return GARunResult(
len(history),
best,
history,
(end - start) * 1000.0,
)
def plot_fitness_history(result: GARunResult, save_path: str | None = None) -> None:
"""Рисует график изменения лучших и средних значений фитнеса по поколениям."""
generations = [gen.number for gen in result.history]
best_fitnesses = [gen.best_fitness for gen in result.history]
avg_fitnesses = [np.mean(gen.fitnesses) for gen in result.history]
fig, ax = plt.subplots(figsize=(10, 6))
ax.plot(
generations, best_fitnesses, label="Лучшее значение", linewidth=2, color="blue"
)
ax.plot(
generations,
avg_fitnesses,
label="Среднее значение",
linewidth=2,
color="orange",
)
ax.set_xlabel("Поколение", fontsize=12)
ax.set_ylabel("Значение фитнес-функции", fontsize=12)
ax.legend(fontsize=11)
ax.grid(True, alpha=0.3)
if save_path:
fig.savefig(save_path, dpi=150, bbox_inches="tight")
print(f"График сохранен в {save_path}")
else:
plt.show()
plt.close(fig)