Files
databases/fill_db_script/utils.py

453 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import csv
import random
from datetime import datetime, timedelta
import psycopg2
def load_csv_to_db(
conn_params, table_name, csv_file_path, date_columns=None, delimiter=";"
):
"""Добавляет данные из csv таблицы в указанную таблицу в бд. Важно, чтобы названия
столбцов и типы данных в таблицах совпадали. Преобразует даты в указанных столбцах.
"""
# Соединение с базой данных
with psycopg2.connect(**conn_params) as conn:
with conn.cursor() as cur:
# Открытие CSV-файла
with open(csv_file_path, mode="r", encoding="windows-1251") as file:
reader = csv.reader(file, delimiter=delimiter)
# Считывание заголовков для формирования SQL запроса
headers = next(reader)
# Формирование строки запроса с плейсхолдерами для значений
placeholders = ", ".join(["%s"] * len(headers))
insert_query = f"INSERT INTO {table_name} ({', '.join(headers)}) VALUES ({placeholders});"
# Вставка данных в таблицу
for row in reader:
# Преобразование дат, если требуется
if date_columns:
for col_index in date_columns:
if row[col_index]: # Проверяем, не пустое ли значение
row[col_index] = datetime.strptime(
row[col_index], "%d.%m.%Y"
).strftime("%Y-%m-%d")
cur.execute(insert_query, row)
conn.commit()
def print_rows_count_summary(conn_params):
"""Выводит в консолько количество строк во всех таблицах в базе данных, а также
выводит суммарное количество всех строк в базе данных."""
total_rows = 0
with psycopg2.connect(**conn_params) as conn:
with conn.cursor() as cur:
# Получение списка всех таблиц в текущей базе данных
cur.execute(
"""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
"""
)
tables = cur.fetchall()
# Подсчет строк в каждой таблице
for (table_name,) in tables:
cur.execute(f"SELECT COUNT(*) FROM {table_name}")
count = cur.fetchone()[0]
total_rows += count
print(f"Таблица {table_name}: {count} строк")
print("Всего строк в базе данных: ", total_rows)
print()
def delete_all_rows_from_all_tables(conn_params):
"""Очищает базу данных. Удаляет все строки из всех таблиц, но оставляет
сами таблицы."""
with psycopg2.connect(**conn_params) as conn:
with conn.cursor() as cur:
# Отключение ограничений foreign key для избежания конфликтов при удалении
cur.execute("SET session_replication_role = 'replica';")
# Получение списка всех таблиц в текущей базе данных
cur.execute(
"""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public' AND table_type = 'BASE TABLE'
"""
)
tables = cur.fetchall()
# Удаление всех строк из каждой таблицы
for (table_name,) in tables:
cur.execute(f"DELETE FROM {table_name}")
print(f"Все строки удалены из таблицы {table_name}")
# Восстановление ограничений foreign key
cur.execute("SET session_replication_role = 'origin';")
# Подтверждение всех изменений
conn.commit()
print()
def generate_competitions(conn_params, num_records):
adjectives = [
"Супер",
"Мега",
"Ультра",
"Эпический",
"Ежегодный",
"Ежеквартальный",
"Основной",
"Главный",
"Золотой",
"Платиновый",
]
competition_types = [
"кубок",
"чемпионат",
"марафон",
"турнир",
"фестиваль",
"конкурс",
"поединок",
]
with psycopg2.connect(**conn_params) as conn:
with conn.cursor() as cur:
# Получение списка ID организаторов
cur.execute("SELECT id_organizer FROM organizer")
organizer_ids = [row[0] for row in cur.fetchall()]
for _ in range(num_records):
# Генерация случайного организатора
id_organizer = random.choice(organizer_ids)
# Генерация случайных дат
start_date = datetime.now() - timedelta(days=random.randint(1, 5 * 365))
end_date = start_date + timedelta(days=random.randint(1, 10))
# Форматирование дат для SQL
start_date = start_date.strftime("%Y-%m-%d")
end_date = end_date.strftime("%Y-%m-%d")
# Генерация случайного названия
title = f"{random.choice(adjectives)} {random.choice(competition_types)}{random.randint(1, 50)}"
# Вставка данных в таблицу competition
cur.execute(
"""
INSERT INTO competition (title, address, logo, start_date, end_date, id_organizer)
VALUES (%s, %s, %s, %s, %s, %s);
""",
(
title,
"Competition Address",
"Competition Logo",
start_date,
end_date,
id_organizer,
),
)
conn.commit()
def generate_protocols(conn_params):
with psycopg2.connect(**conn_params) as conn:
with conn.cursor() as cur:
# Получение ID соревнований, которые еще не имеют протоколов
cur.execute(
"""
SELECT id_competition FROM competition
WHERE id_competition NOT IN (SELECT id_competition FROM protocol)
"""
)
competition_ids = [row[0] for row in cur.fetchall()]
for comp_id in competition_ids:
# Генерация случайной даты и имени файла
date = datetime.now() - timedelta(days=random.randint(1, 5 * 365))
date = date.strftime("%Y-%m-%d")
file_name = f"protocol_{comp_id}.pdf"
# Вставка данных в таблицу protocol
cur.execute(
"""
INSERT INTO protocol (name, date, file, id_competition)
VALUES (%s, %s, %s, %s);
""",
(f"Protocol for Competition {comp_id}", date, file_name, comp_id),
)
conn.commit()
def generate_judge_requests(conn_params):
with psycopg2.connect(**conn_params) as conn:
with conn.cursor() as cur:
# Получение списка всех соревнований
cur.execute("SELECT id_competition FROM competition")
competitions = [row[0] for row in cur.fetchall()]
# Получение списка всех судей
cur.execute(
"SELECT id_judge, name, surname, patronymic, category, region, federation FROM judge"
)
judges = cur.fetchall()
for competition_id in competitions:
# Выбор случайного количества судей от 5 до 7 для каждого соревнования
selected_judges = random.sample(judges, random.randint(5, 7))
for judge in selected_judges:
(
id_judge,
name,
surname,
patronymic,
category,
region,
federation,
) = judge
# Определение статуса регистрации
is_registered = random.choices([True, False], [0.9, 0.1])[0]
# Вставка данных в таблицу judge_request
cur.execute(
"""
INSERT INTO judge_request (name, surname, patronymic, category, region, federation, is_registered, id_competition, id_judge)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s);
""",
(
name,
surname,
patronymic,
category,
region,
federation,
is_registered,
competition_id,
id_judge,
),
)
conn.commit()
def generate_participant_requests(conn_params):
with psycopg2.connect(**conn_params) as conn:
with conn.cursor() as cur:
# Получение списка всех соревнований
cur.execute("SELECT id_competition FROM competition")
competitions = [row[0] for row in cur.fetchall()]
# Получение списка всех спортсменов
cur.execute(
"SELECT id_sportsman, name, surname, patronymic, gender, birthday, region, club, federation, category FROM sportsman"
)
sportsmen = cur.fetchall()
for competition_id in competitions:
# Выбор случайного количества участников
selected_sportsmen = random.sample(sportsmen, random.randint(30, 60))
for sportsman in selected_sportsmen:
(
id_sportsman,
name,
surname,
patronymic,
gender,
birthday,
region,
club,
federation,
category,
) = sportsman
# Определение статуса регистрации
is_registered = random.choices([True, False], [0.9, 0.1])[0]
# Выбор подходящих дивизионов с учетом гендера
cur.execute(
"""
SELECT id_division FROM division
WHERE gender = %s
""",
(gender,),
)
divisions = [row[0] for row in cur.fetchall()]
# Выбор случайного дивизиона из подходящих
id_division = random.choice(divisions)
# Вставка данных в таблицу participant_request
cur.execute(
"""
INSERT INTO participant_request (
name, surname, patronymic, gender, birthday, region, club, federation, category, is_registered, id_competition, id_sportsman, id_division
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
""",
(
name,
surname,
patronymic,
gender,
birthday,
region,
club,
federation,
category,
is_registered,
competition_id,
id_sportsman,
id_division,
),
)
conn.commit()
def generate_division_in_competitions(conn_params):
with psycopg2.connect(**conn_params) as conn:
with conn.cursor() as cur:
# Получение списка всех соревнований
cur.execute("SELECT id_competition FROM competition")
competitions = [row[0] for row in cur.fetchall()]
# Получение списка всех дивизионов
cur.execute("SELECT id_division FROM division")
divisions = [row[0] for row in cur.fetchall()]
for competition_id in competitions:
# Выбор случайного количества дивизионов
selected_divisions = random.sample(divisions, random.randint(8, 16))
for division_id in selected_divisions:
# Вставка данных в таблицу division_in_competition
cur.execute(
"""
INSERT INTO division_in_competition (id_division, id_competition)
VALUES (%s, %s);
""",
(division_id, competition_id),
)
conn.commit()
def generate_result_in_stage(conn_params):
with psycopg2.connect(**conn_params) as conn:
with conn.cursor() as cur:
# Получение списка всех заявок участников
cur.execute(
"""
SELECT id_participant_request, id_division
FROM participant_request
"""
)
participant_requests = cur.fetchall()
# Получение списка всех этапов соревнований
cur.execute("SELECT id_competition_stage FROM competition_stage")
competition_stages = [row[0] for row in cur.fetchall()]
for request in participant_requests:
id_participant_request, id_division = request
# Количество результатов для каждой заявки
num_results = random.randint(2, 6)
for _ in range(num_results):
score = random.randint(150, 300)
shield_index = random.randint(0, 20)
target_index = random.choice(["A", "B", "C", "D"])
id_competition_stage = random.choice(competition_stages)
# Вставка данных в таблицу result_in_stage
cur.execute(
"""
INSERT INTO result_in_stage (score, shield_index, target_index, id_participant_request, id_division, id_competition_stage)
VALUES (%s, %s, %s, %s, %s, %s);
""",
(
score,
shield_index,
target_index,
id_participant_request,
id_division,
id_competition_stage,
),
)
conn.commit()
def generate_shot_series(conn_params):
with psycopg2.connect(**conn_params) as conn:
with conn.cursor() as cur:
# Получение всех записей result_in_stage и связанных с ними данных
cur.execute(
"""
SELECT rs.id_result_in_stage, rs.id_participant_request, pr.id_competition
FROM result_in_stage rs
JOIN participant_request pr ON rs.id_participant_request = pr.id_participant_request
"""
)
results = cur.fetchall()
# Получение id судей с учетом id_competition
cur.execute(
"""
SELECT id_judge_request, id_competition FROM judge_request
"""
)
judges = cur.fetchall()
# Сопоставление судей с соревнованиями
judge_map = {}
for judge_id, comp_id in judges:
if comp_id in judge_map:
judge_map[comp_id].append(judge_id)
else:
judge_map[comp_id] = [judge_id]
for result in results:
id_result_in_stage, id_participant_request, id_competition = result
# Выбор случайного судьи для данного соревнования
if id_competition in judge_map:
id_judge_request = random.choice(judge_map[id_competition])
else:
# Если нет судьи для соревнования, пропускаем
raise KeyError(f"Нету судей для соревнования {id_competition}")
# Выбираем случайное количество серий выстрелов
num_shot_series = random.randint(3, 10)
for _ in range(num_shot_series):
score = random.randint(2, 10)
photo = "path/to/photo.jpg" # Пример пути к фото
# Вставка данных в таблицу shot_series
cur.execute(
"""
INSERT INTO shot_series (score, photo, id_participant_request, id_result_in_stage, id_judge_request)
VALUES (%s, %s, %s, %s, %s);
""",
(
score,
photo,
id_participant_request,
id_result_in_stage,
id_judge_request,
),
)
conn.commit()