import os import random import shutil import time from copy import deepcopy from dataclasses import asdict, dataclass from typing import Callable import numpy as np import plotly.graph_objects as go from matplotlib import pyplot as plt from matplotlib.axes import Axes from mpl_toolkits.mplot3d import Axes3D from numpy.typing import NDArray type Cites = list[tuple[float, float]] type InitializePopulationFn = Callable[[int, Cites], Population] type Chromosome = list[int] type Population = list[Chromosome] type Fitnesses = NDArray[np.float64] type FitnessFn = Callable[[Chromosome], float] type CrossoverFn = Callable[[Chromosome, Chromosome], tuple[Chromosome, Chromosome]] type MutationFn = Callable[[Chromosome], Chromosome] @dataclass class GARunConfig: fitness_func: FitnessFn cities: Cites initialize_population_fn: InitializePopulationFn crossover_fn: CrossoverFn mutation_fn: MutationFn pop_size: int # размер популяции pc: float # вероятность кроссинговера pm: float # вероятность мутации max_generations: int # максимальное количество поколений elitism: int = ( 0 # сколько лучших особей перенести без изменения в следующее поколение ) max_best_repetitions: int | None = ( None # остановка при повторении лучшего результата ) seed: int | None = None # seed для генератора случайных чисел minimize: bool = False # если True, ищем минимум вместо максимума save_generations: list[int] | None = ( None # индексы поколений для сохранения графиков ) results_dir: str = "results" # папка для сохранения графиков fitness_avg_threshold: float | None = ( None # порог среднего значения фитнес функции для остановки ) best_value_threshold: float | None = ( None # остановка при достижении значения фитнеса лучше заданного ) log_every_generation: bool = False # логировать каждое поколение def save(self, filename: str = "GARunConfig.txt"): """Сохраняет конфиг в results_dir.""" os.makedirs(self.results_dir, exist_ok=True) path = os.path.join(self.results_dir, filename) with open(path, "w", encoding="utf-8") as f: for k, v in asdict(self).items(): f.write(f"{k}: {v}\n") @dataclass(frozen=True) class Generation: number: int best: Chromosome best_fitness: float population: Population fitnesses: Fitnesses @dataclass(frozen=True) class GARunResult: generations_count: int best_generation: Generation history: list[Generation] time_ms: float def save(self, path: str, filename: str = "GARunResult.txt"): """Сохраняет конфиг в results_dir.""" os.makedirs(path, exist_ok=True) path = os.path.join(path, filename) with open(path, "w", encoding="utf-8") as f: for k, v in asdict(self).items(): if k == "history": continue if k == "best_generation": f.write( f"{k}: Number: {v['number']}, Best Fitness: {v['best_fitness']}, Best: {v['best']}\n" ) else: f.write(f"{k}: {v}\n") def initialize_random_population(pop_size: int, cities: Cites) -> Population: """Инициализирует популяцию случайными маршрутами без повторений городов.""" return [random.sample(range(len(cities)), len(cities)) for _ in range(pop_size)] def reproduction(population: Population, fitnesses: Fitnesses) -> Population: """Репродукция (селекция) методом рулетки. Чем больше значение фитнеса, тем больше вероятность выбора особи. Для минимизации значения фитнеса нужно предварительно инвертировать. """ # Чтобы работать с отрицательными f, сдвигаем значения фитнес функции на минимальное # значение в популяции. Вычитаем min_fit, т. к. min_fit может быть отрицательным. min_fit = np.min(fitnesses) shifted_fitnesses = fitnesses - min_fit + 1e-12 # Получаем вероятности для каждой особи probs = shifted_fitnesses / np.sum(shifted_fitnesses) cum = np.cumsum(probs) # Выбираем особей методом рулетки selected = [] for _ in population: r = np.random.random() idx = int(np.searchsorted(cum, r, side="left")) selected.append(population[idx]) return selected def partially_mapped_crossover_fn( p1: Chromosome, p2: Chromosome, cut1: int | None = None, cut2: int | None = None, ) -> tuple[Chromosome, Chromosome]: n = len(p1) # если разрезы не заданы — выберем случайные if cut1 is None or cut2 is None: cut1 = random.randint(1, n - 2) # [1, n-2] cut2 = random.randint(cut1 + 1, n - 1) # (cut1, n-1] # отображения внутри среднего сегмента mapping12 = {p1[i]: p2[i] for i in range(cut1, cut2)} mapping21 = {p2[i]: p1[i] for i in range(cut1, cut2)} # будущие потомки o1 = p2[:cut1] + p1[cut1:cut2] + p2[cut2:] o2 = p1[:cut1] + p2[cut1:cut2] + p1[cut2:] # разрешаем конфликты по цепочке def resolve(x: int, mapping: dict[int, int]) -> int: while x in mapping: x = mapping[x] return x # исправляем только вне среднего сегмента for i in (*range(0, cut1), *range(cut2, n)): o1[i] = resolve(o1[i], mapping12) o2[i] = resolve(o2[i], mapping21) return o1, o2 def ordered_crossover_fn( p1: Chromosome, p2: Chromosome, cut1: int | None = None, cut2: int | None = None, ) -> tuple[Chromosome, Chromosome]: n = len(p1) # если разрезы не заданы — выберем случайные корректно if cut1 is None or cut2 is None: cut1 = random.randint(1, n - 2) # [1, n-2] cut2 = random.randint(cut1 + 1, n - 1) # [cut1+1, n-1] # --- o1: сегмент от p1, остальное — порядок из p2 o1: Chromosome = [None] * n # type: ignore o1[cut1:cut2] = p1[cut1:cut2] segment1 = set(p1[cut1:cut2]) fill_idx = cut2 % n for x in (p2[i % n] for i in range(cut2, cut2 + n)): if x not in segment1: # прокручиваем fill_idx до ближайшей пустой ячейки while o1[fill_idx] is not None: fill_idx = (fill_idx + 1) % n o1[fill_idx] = x fill_idx = (fill_idx + 1) % n # --- o2: сегмент от p2, остальное — порядок из p1 o2: Chromosome = [None] * n # type: ignore o2[cut1:cut2] = p2[cut1:cut2] segment2 = set(p2[cut1:cut2]) fill_idx = cut2 % n for x in (p1[i % n] for i in range(cut2, cut2 + n)): if x not in segment2: while o2[fill_idx] is not None: fill_idx = (fill_idx + 1) % n o2[fill_idx] = x fill_idx = (fill_idx + 1) % n return o1, o2 def cycle_crossover_fn(p1: Chromosome, p2: Chromosome) -> tuple[Chromosome, Chromosome]: n = len(p1) o1 = [None] * n o2 = [None] * n # быстрый поиск позиций элементов p1 pos_in_p1 = {val: i for i, val in enumerate(p1)} used = [False] * n cycle_index = 0 for start in range(n): if used[start]: continue # строим цикл индексов idx = start cycle = [] while not used[idx]: used[idx] = True cycle.append(idx) # переход: idx -> элемент p2[idx] -> его позиция в p1 val = p2[idx] idx = pos_in_p1[val] # нечётные циклы: из p1 в o1, из p2 в o2 # чётные циклы: наоборот if cycle_index % 2 == 0: for i in cycle: o1[i] = p1[i] o2[i] = p2[i] else: for i in cycle: o1[i] = p2[i] o2[i] = p1[i] cycle_index += 1 return o1, o2 # type: ignore def crossover( population: Population, pc: float, crossover_fn: CrossoverFn, ) -> Population: """Оператор кроссинговера (скрещивания) выполняется с заданной вероятностью pc. Две хромосомы (родители) выбираются случайно из промежуточной популяции. Если популяция нечетного размера, то последняя хромосома скрещивается со случайной другой хромосомой из популяции. В таком случае одна из хромосом может поучаствовать в кроссовере дважды. """ # Создаем копию популяции и перемешиваем её для случайного выбора пар shuffled_population = population.copy() np.random.shuffle(shuffled_population) next_population = [] pop_size = len(shuffled_population) for i in range(0, pop_size, 2): p1 = shuffled_population[i] p2 = shuffled_population[(i + 1) % pop_size] if np.random.random() <= pc: p1, p2 = crossover_fn(p1, p2) next_population.append(p1) next_population.append(p2) return next_population[:pop_size] def swap_mutation_fn(chrom: Chromosome) -> Chromosome: """Меняем два случайных города в маршруте местами.""" chrom = chrom.copy() a, b = random.sample(range(len(chrom)), 2) chrom[a], chrom[b] = chrom[b], chrom[a] return chrom def inversion_mutation_fn(chrom: Chromosome) -> Chromosome: """Инвертируем случайный сегмент маршрута.""" chrom = chrom.copy() a, b = sorted(random.sample(range(len(chrom)), 2)) chrom[a:b] = reversed(chrom[a:b]) return chrom def insertion_mutation_fn(chrom: Chromosome) -> Chromosome: """Вырезаем случайный город и вставляем его в случайное место маршрута.""" chrom = chrom.copy() a, b = random.sample(range(len(chrom)), 2) city = chrom.pop(a) chrom.insert(b, city) return chrom def mutation(population: Population, pm: float, mutation_fn: MutationFn) -> Population: """Мутация происходит с вероятностью pm.""" next_population = [] for chrom in population: next_population.append( mutation_fn(chrom) if np.random.random() <= pm else chrom ) return next_population def clear_results_directory(results_dir: str) -> None: """Очищает папку с результатами перед началом эксперимента.""" if os.path.exists(results_dir): shutil.rmtree(results_dir) os.makedirs(results_dir, exist_ok=True) def eval_population(population: Population, fitness_func: FitnessFn) -> Fitnesses: return np.array([fitness_func(chrom) for chrom in population]) def plot_tour(cities: list[tuple[float, float]], tour: list[int], ax: Axes): """Рисует маршрут обхода городов.""" x = [cities[i][0] for i in tour] y = [cities[i][1] for i in tour] ax.plot(x + [x[0]], y + [y[0]], "k-", linewidth=1) ax.plot(x, y, "ro", markersize=4) # for i, (cx, cy) in enumerate(cities): # plt.text(cx, cy, str(i), fontsize=7, ha="right", va="bottom") ax.axis("equal") def save_generation( generation: Generation, history: list[Generation], config: GARunConfig ) -> None: os.makedirs(config.results_dir, exist_ok=True) fig = plt.figure(figsize=(7, 7)) fig.suptitle( f"Поколение #{generation.number}. " f"Лучшая особь: {generation.best_fitness:.0f}. " f"Среднее значение: {np.mean(generation.fitnesses):.0f}", fontsize=14, y=0.95, ) # Рисуем лучший маршрут в поколении ax = fig.add_subplot(1, 1, 1) plot_tour(config.cities, generation.best, ax) filename = f"generation_{generation.number:03d}.png" path_png = os.path.join(config.results_dir, filename) fig.savefig(path_png, dpi=150, bbox_inches="tight") plt.close(fig) def genetic_algorithm(config: GARunConfig) -> GARunResult: if config.seed is not None: random.seed(config.seed) np.random.seed(config.seed) if config.save_generations: clear_results_directory(config.results_dir) population = config.initialize_population_fn(config.pop_size, config.cities) start = time.perf_counter() history: list[Generation] = [] best: Generation | None = None generation_number = 1 best_repetitions = 0 while True: # Вычисляем фитнес для всех особей в популяции fitnesses = eval_population(population, config.fitness_func) # Сохраняем лучших особей для переноса в следующее поколение elites: list[Chromosome] = [] if config.elitism: elites = deepcopy( [ population[i] for i in sorted( range(len(fitnesses)), key=lambda i: fitnesses[i], reverse=not config.minimize, ) ][: config.elitism] ) # Находим лучшую особь в поколении best_index = ( int(np.argmin(fitnesses)) if config.minimize else int(np.argmax(fitnesses)) ) # Добавляем эпоху в историю current = Generation( number=generation_number, best=population[best_index], best_fitness=fitnesses[best_index], population=deepcopy(population), fitnesses=deepcopy(fitnesses), ) history.append(current) if config.log_every_generation: print( f"Generation #{generation_number} best: {current.best_fitness}," f" avg: {np.mean(current.fitnesses)}" ) # Обновляем лучшую эпоху if ( best is None or (config.minimize and current.best_fitness < best.best_fitness) or (not config.minimize and current.best_fitness > best.best_fitness) ): best = current # Проверка критериев остановки stop_algorithm = False if generation_number >= config.max_generations: stop_algorithm = True if config.max_best_repetitions is not None and generation_number > 1: if history[-2].best_fitness == current.best_fitness: best_repetitions += 1 if best_repetitions == config.max_best_repetitions: stop_algorithm = True else: best_repetitions = 0 # if config.variance_threshold is not None: # fitness_variance = np.var(fitnesses) # if fitness_variance < config.variance_threshold: # stop_algorithm = True if config.best_value_threshold is not None: if ( config.minimize and current.best_fitness < config.best_value_threshold ) or ( not config.minimize and current.best_fitness > config.best_value_threshold ): stop_algorithm = True if config.fitness_avg_threshold is not None: mean_fitness = np.mean(fitnesses) if (config.minimize and mean_fitness < config.fitness_avg_threshold) or ( not config.minimize and mean_fitness > config.fitness_avg_threshold ): stop_algorithm = True # Сохраняем указанные поколения и последнее поколение if config.save_generations and ( stop_algorithm or generation_number in config.save_generations ): save_generation(current, history, config) if stop_algorithm: break # селекция (для минимума инвертируем знак) parents = reproduction( population, fitnesses if not config.minimize else -fitnesses ) # кроссинговер попарно next_population = crossover(parents, config.pc, config.crossover_fn) # мутация next_population = mutation( next_population, config.pm, config.mutation_fn, ) # Вставляем элиту в новую популяцию population = next_population[: config.pop_size - config.elitism] + elites generation_number += 1 end = time.perf_counter() assert best is not None, "Best was never set" return GARunResult( len(history), best, history, (end - start) * 1000.0, ) def plot_fitness_history(result: GARunResult, save_path: str | None = None) -> None: """Рисует график изменения лучших и средних значений фитнеса по поколениям.""" generations = [gen.number for gen in result.history] best_fitnesses = [gen.best_fitness for gen in result.history] avg_fitnesses = [np.mean(gen.fitnesses) for gen in result.history] fig, ax = plt.subplots(figsize=(10, 6)) ax.plot( generations, best_fitnesses, label="Лучшее значение", linewidth=2, color="blue" ) ax.plot( generations, avg_fitnesses, label="Среднее значение", linewidth=2, color="orange", ) ax.set_xlabel("Поколение", fontsize=12) ax.set_ylabel("Значение фитнес-функции", fontsize=12) ax.legend(fontsize=11) ax.grid(True, alpha=0.3) if save_path: fig.savefig(save_path, dpi=150, bbox_inches="tight") print(f"График сохранен в {save_path}") else: plt.show() plt.close(fig)