import csv import random from datetime import datetime, timedelta import psycopg2 def load_csv_to_db( conn_params, table_name, csv_file_path, date_columns=None, delimiter=";" ): """Добавляет данные из csv таблицы в указанную таблицу в бд. Важно, чтобы названия столбцов и типы данных в таблицах совпадали. Преобразует даты в указанных столбцах. """ # Соединение с базой данных with psycopg2.connect(**conn_params) as conn: with conn.cursor() as cur: # Открытие CSV-файла with open(csv_file_path, mode="r", encoding="windows-1251") as file: reader = csv.reader(file, delimiter=delimiter) # Считывание заголовков для формирования SQL запроса headers = next(reader) # Формирование строки запроса с плейсхолдерами для значений placeholders = ", ".join(["%s"] * len(headers)) insert_query = f"INSERT INTO {table_name} ({', '.join(headers)}) VALUES ({placeholders});" # Вставка данных в таблицу for row in reader: # Преобразование дат, если требуется if date_columns: for col_index in date_columns: if row[col_index]: # Проверяем, не пустое ли значение row[col_index] = datetime.strptime( row[col_index], "%d.%m.%Y" ).strftime("%Y-%m-%d") cur.execute(insert_query, row) conn.commit() def print_rows_count_summary(conn_params): """Выводит в консолько количество строк во всех таблицах в базе данных, а также выводит суммарное количество всех строк в базе данных.""" total_rows = 0 with psycopg2.connect(**conn_params) as conn: with conn.cursor() as cur: # Получение списка всех таблиц в текущей базе данных cur.execute( """ SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' """ ) tables = cur.fetchall() # Подсчет строк в каждой таблице for (table_name,) in tables: cur.execute(f"SELECT COUNT(*) FROM {table_name}") count = cur.fetchone()[0] total_rows += count print(f"Таблица {table_name}: {count} строк") print("Всего строк в базе данных: ", total_rows) print() def delete_all_rows_from_all_tables(conn_params): """Очищает базу данных. Удаляет все строки из всех таблиц, но оставляет сами таблицы.""" with psycopg2.connect(**conn_params) as conn: with conn.cursor() as cur: # Отключение ограничений foreign key для избежания конфликтов при удалении cur.execute("SET session_replication_role = 'replica';") # Получение списка всех таблиц в текущей базе данных cur.execute( """ SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' AND table_type = 'BASE TABLE' """ ) tables = cur.fetchall() # Удаление всех строк из каждой таблицы for (table_name,) in tables: cur.execute(f"DELETE FROM {table_name}") print(f"Все строки удалены из таблицы {table_name}") # Восстановление ограничений foreign key cur.execute("SET session_replication_role = 'origin';") # Подтверждение всех изменений conn.commit() print() def generate_competitions(conn_params, num_records): adjectives = [ "Супер", "Мега", "Ультра", "Эпический", "Ежегодный", "Ежеквартальный", "Основной", "Главный", "Золотой", "Платиновый", ] competition_types = [ "кубок", "чемпионат", "марафон", "турнир", "фестиваль", "конкурс", "поединок", ] with psycopg2.connect(**conn_params) as conn: with conn.cursor() as cur: # Получение списка ID организаторов cur.execute("SELECT id_organizer FROM organizer") organizer_ids = [row[0] for row in cur.fetchall()] for _ in range(num_records): # Генерация случайного организатора id_organizer = random.choice(organizer_ids) # Генерация случайных дат start_date = datetime.now() - timedelta(days=random.randint(1, 5 * 365)) end_date = start_date + timedelta(days=random.randint(1, 10)) # Форматирование дат для SQL start_date = start_date.strftime("%Y-%m-%d") end_date = end_date.strftime("%Y-%m-%d") # Генерация случайного названия title = f"{random.choice(adjectives)} {random.choice(competition_types)} №{random.randint(1, 50)}" # Вставка данных в таблицу competition cur.execute( """ INSERT INTO competition (title, address, logo, start_date, end_date, id_organizer) VALUES (%s, %s, %s, %s, %s, %s); """, ( title, "Competition Address", "Competition Logo", start_date, end_date, id_organizer, ), ) conn.commit() def generate_protocols(conn_params): with psycopg2.connect(**conn_params) as conn: with conn.cursor() as cur: # Получение ID соревнований, которые еще не имеют протоколов cur.execute( """ SELECT id_competition FROM competition WHERE id_competition NOT IN (SELECT id_competition FROM protocol) """ ) competition_ids = [row[0] for row in cur.fetchall()] for comp_id in competition_ids: # Генерация случайной даты и имени файла date = datetime.now() - timedelta(days=random.randint(1, 5 * 365)) date = date.strftime("%Y-%m-%d") file_name = f"protocol_{comp_id}.pdf" # Вставка данных в таблицу protocol cur.execute( """ INSERT INTO protocol (name, date, file, id_competition) VALUES (%s, %s, %s, %s); """, (f"Protocol for Competition {comp_id}", date, file_name, comp_id), ) conn.commit() def generate_judge_requests(conn_params): with psycopg2.connect(**conn_params) as conn: with conn.cursor() as cur: # Получение списка всех соревнований cur.execute("SELECT id_competition FROM competition") competitions = [row[0] for row in cur.fetchall()] # Получение списка всех судей cur.execute( "SELECT id_judge, name, surname, patronymic, category, region, federation FROM judge" ) judges = cur.fetchall() for competition_id in competitions: # Выбор случайного количества судей от 5 до 7 для каждого соревнования selected_judges = random.sample(judges, random.randint(5, 7)) for judge in selected_judges: ( id_judge, name, surname, patronymic, category, region, federation, ) = judge # Определение статуса регистрации is_registered = random.choices([True, False], [0.9, 0.1])[0] # Вставка данных в таблицу judge_request cur.execute( """ INSERT INTO judge_request (name, surname, patronymic, category, region, federation, is_registered, id_competition, id_judge) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s); """, ( name, surname, patronymic, category, region, federation, is_registered, competition_id, id_judge, ), ) conn.commit() def generate_participant_requests(conn_params): with psycopg2.connect(**conn_params) as conn: with conn.cursor() as cur: # Получение списка всех соревнований cur.execute("SELECT id_competition FROM competition") competitions = [row[0] for row in cur.fetchall()] # Получение списка всех спортсменов cur.execute( "SELECT id_sportsman, name, surname, patronymic, gender, birthday, region, club, federation, category FROM sportsman" ) sportsmen = cur.fetchall() for competition_id in competitions: # Выбор случайного количества участников selected_sportsmen = random.sample(sportsmen, random.randint(30, 60)) for sportsman in selected_sportsmen: ( id_sportsman, name, surname, patronymic, gender, birthday, region, club, federation, category, ) = sportsman # Определение статуса регистрации is_registered = random.choices([True, False], [0.9, 0.1])[0] # Выбор подходящих дивизионов с учетом гендера cur.execute( """ SELECT id_division FROM division WHERE gender = %s """, (gender,), ) divisions = [row[0] for row in cur.fetchall()] # Выбор случайного дивизиона из подходящих id_division = random.choice(divisions) # Вставка данных в таблицу participant_request cur.execute( """ INSERT INTO participant_request ( name, surname, patronymic, gender, birthday, region, club, federation, category, is_registered, id_competition, id_sportsman, id_division ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s); """, ( name, surname, patronymic, gender, birthday, region, club, federation, category, is_registered, competition_id, id_sportsman, id_division, ), ) conn.commit() def generate_division_in_competitions(conn_params): with psycopg2.connect(**conn_params) as conn: with conn.cursor() as cur: # Получение списка всех соревнований cur.execute("SELECT id_competition FROM competition") competitions = [row[0] for row in cur.fetchall()] # Получение списка всех дивизионов cur.execute("SELECT id_division FROM division") divisions = [row[0] for row in cur.fetchall()] for competition_id in competitions: # Выбор случайного количества дивизионов selected_divisions = random.sample(divisions, random.randint(8, 16)) for division_id in selected_divisions: # Вставка данных в таблицу division_in_competition cur.execute( """ INSERT INTO division_in_competition (id_division, id_competition) VALUES (%s, %s); """, (division_id, competition_id), ) conn.commit() def generate_result_in_stage(conn_params): with psycopg2.connect(**conn_params) as conn: with conn.cursor() as cur: # Получение списка всех заявок участников cur.execute( """ SELECT id_participant_request, id_division FROM participant_request """ ) participant_requests = cur.fetchall() # Получение списка всех этапов соревнований cur.execute("SELECT id_competition_stage FROM competition_stage") competition_stages = [row[0] for row in cur.fetchall()] for request in participant_requests: id_participant_request, id_division = request # Количество результатов для каждой заявки num_results = random.randint(2, 6) for _ in range(num_results): score = random.randint(150, 300) shield_index = random.randint(0, 20) target_index = random.choice(["A", "B", "C", "D"]) id_competition_stage = random.choice(competition_stages) # Вставка данных в таблицу result_in_stage cur.execute( """ INSERT INTO result_in_stage (score, shield_index, target_index, id_participant_request, id_division, id_competition_stage) VALUES (%s, %s, %s, %s, %s, %s); """, ( score, shield_index, target_index, id_participant_request, id_division, id_competition_stage, ), ) conn.commit() def generate_shot_series(conn_params): with psycopg2.connect(**conn_params) as conn: with conn.cursor() as cur: # Получение всех записей result_in_stage и связанных с ними данных cur.execute( """ SELECT rs.id_result_in_stage, rs.id_participant_request, pr.id_competition FROM result_in_stage rs JOIN participant_request pr ON rs.id_participant_request = pr.id_participant_request """ ) results = cur.fetchall() # Получение id судей с учетом id_competition cur.execute( """ SELECT id_judge_request, id_competition FROM judge_request """ ) judges = cur.fetchall() # Сопоставление судей с соревнованиями judge_map = {} for judge_id, comp_id in judges: if comp_id in judge_map: judge_map[comp_id].append(judge_id) else: judge_map[comp_id] = [judge_id] for result in results: id_result_in_stage, id_participant_request, id_competition = result # Выбор случайного судьи для данного соревнования if id_competition in judge_map: id_judge_request = random.choice(judge_map[id_competition]) else: # Если нет судьи для соревнования, пропускаем raise KeyError(f"Нету судей для соревнования {id_competition}") # Выбираем случайное количество серий выстрелов num_shot_series = random.randint(3, 10) for _ in range(num_shot_series): score = random.randint(2, 10) photo = "path/to/photo.jpg" # Пример пути к фото # Вставка данных в таблицу shot_series cur.execute( """ INSERT INTO shot_series (score, photo, id_participant_request, id_result_in_stage, id_judge_request) VALUES (%s, %s, %s, %s, %s); """, ( score, photo, id_participant_request, id_result_in_stage, id_judge_request, ), ) conn.commit()