From 6a22dc3ef7f85c82609a45734c6eaf557c2d07f2 Mon Sep 17 00:00:00 2001 From: Arity-T Date: Sat, 13 Dec 2025 12:13:12 +0000 Subject: [PATCH] =?UTF-8?q?=D0=9F=D0=B0=D1=80=D0=B0=D0=BB=D0=BB=D0=B5?= =?UTF-8?q?=D0=BB=D1=8C=D0=BD=D0=BE=D0=B5=20=D1=87=D1=82=D0=B5=D0=BD=D0=B8?= =?UTF-8?q?=D0=B5=20=D0=B4=D0=B0=D0=BD=D0=BD=D1=8B=D1=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- run.slurm | 10 +- src/csv_loader.cpp | 160 +++++++++++++++++++++------ src/csv_loader.hpp | 11 +- src/main.cpp | 265 ++------------------------------------------- src/utils.cpp | 89 +++++++++++++++ src/utils.hpp | 23 ++++ 6 files changed, 263 insertions(+), 295 deletions(-) diff --git a/run.slurm b/run.slurm index 9087ae7..e947ec9 100644 --- a/run.slurm +++ b/run.slurm @@ -5,8 +5,14 @@ #SBATCH --cpus-per-task=2 #SBATCH --output=out.txt -# Количество CPU потоков на узел (должно соответствовать cpus-per-task) -export NUM_CPU_THREADS=2 +# Путь к файлу данных (должен существовать на всех узлах) +export DATA_PATH="/mnt/shared/supercomputers/data/data.csv" + +# Доли данных для каждого ранка (сумма определяет пропорции) +export DATA_READ_SHARES="10,12,13,13" + +# Размер перекрытия в байтах для обработки границ строк +export READ_OVERLAP_BYTES=131072 cd /mnt/shared/supercomputers/build mpirun -np $SLURM_NTASKS ./bitcoin_app diff --git a/src/csv_loader.cpp b/src/csv_loader.cpp index 11b67f9..9b26dbe 100644 --- a/src/csv_loader.cpp +++ b/src/csv_loader.cpp @@ -2,46 +2,134 @@ #include #include #include +#include -std::vector load_csv(const std::string& filename) { +bool parse_csv_line(const std::string& line, Record& record) { + if (line.empty()) { + return false; + } + + std::stringstream ss(line); + std::string item; + + try { + // timestamp + if (!std::getline(ss, item, ',') || item.empty()) return false; + record.timestamp = std::stod(item); + + // open + if (!std::getline(ss, item, ',') || item.empty()) return false; + record.open = std::stod(item); + + // high + if (!std::getline(ss, item, ',') || item.empty()) return false; + record.high = std::stod(item); + + // low + if (!std::getline(ss, item, ',') || item.empty()) return false; + record.low = std::stod(item); + + // close + if (!std::getline(ss, item, ',') || item.empty()) return false; + record.close = std::stod(item); + + // volume + if (!std::getline(ss, item, ',')) return false; + // Volume может быть пустым или содержать данные + if (item.empty()) { + record.volume = 0.0; + } else { + record.volume = std::stod(item); + } + + return true; + } catch (const std::exception&) { + return false; + } +} + +std::vector load_csv_parallel(int rank, int size) { std::vector data; - std::ifstream file(filename); - + + // Читаем настройки из переменных окружения + std::string data_path = get_data_path(); + std::vector shares = get_data_read_shares(); + int64_t overlap_bytes = get_read_overlap_bytes(); + + // Получаем размер файла + int64_t file_size = get_file_size(data_path); + + // Вычисляем диапазон байт для этого ранка + ByteRange range = calculate_byte_range(rank, size, file_size, shares, overlap_bytes); + + // Открываем файл и читаем нужный диапазон + std::ifstream file(data_path, std::ios::binary); if (!file.is_open()) { - throw std::runtime_error("Cannot open file: " + filename); + throw std::runtime_error("Cannot open file: " + data_path); } - - std::string line; - - // читаем первую строку (заголовок) - std::getline(file, line); - - while (std::getline(file, line)) { - std::stringstream ss(line); - std::string item; - - Record row; - - std::getline(ss, item, ','); - row.timestamp = std::stod(item); - - std::getline(ss, item, ','); - row.open = std::stod(item); - - std::getline(ss, item, ','); - row.high = std::stod(item); - - std::getline(ss, item, ','); - row.low = std::stod(item); - - std::getline(ss, item, ','); - row.close = std::stod(item); - - std::getline(ss, item, ','); - row.volume = std::stod(item); - - data.push_back(row); + + // Переходим к началу диапазона + file.seekg(range.start); + + // Читаем данные в буфер + int64_t bytes_to_read = range.end - range.start; + std::vector buffer(bytes_to_read); + file.read(buffer.data(), bytes_to_read); + int64_t bytes_read = file.gcount(); + + file.close(); + + // Преобразуем в строку для удобства парсинга + std::string content(buffer.data(), bytes_read); + + // Находим позицию начала первой полной строки + size_t parse_start = 0; + if (rank == 0) { + // Первый ранк: пропускаем заголовок (первую строку) + size_t header_end = content.find('\n'); + if (header_end != std::string::npos) { + parse_start = header_end + 1; + } + } else { + // Остальные ранки: начинаем с первого \n (пропускаем неполную строку) + size_t first_newline = content.find('\n'); + if (first_newline != std::string::npos) { + parse_start = first_newline + 1; + } } - + + // Находим позицию конца последней полной строки + size_t parse_end = content.size(); + if (rank != size - 1) { + // Не последний ранк: ищем последний \n + size_t last_newline = content.rfind('\n'); + if (last_newline != std::string::npos && last_newline > parse_start) { + parse_end = last_newline; + } + } + + // Парсим строки + size_t pos = parse_start; + while (pos < parse_end) { + size_t line_end = content.find('\n', pos); + if (line_end == std::string::npos || line_end > parse_end) { + line_end = parse_end; + } + + std::string line = content.substr(pos, line_end - pos); + + // Убираем \r если есть (Windows line endings) + if (!line.empty() && line.back() == '\r') { + line.pop_back(); + } + + Record record; + if (parse_csv_line(line, record)) { + data.push_back(record); + } + + pos = line_end + 1; + } + return data; } diff --git a/src/csv_loader.hpp b/src/csv_loader.hpp index 727aef7..c9288e2 100644 --- a/src/csv_loader.hpp +++ b/src/csv_loader.hpp @@ -2,5 +2,14 @@ #include #include #include "record.hpp" +#include "utils.hpp" -std::vector load_csv(const std::string& filename); +// Параллельное чтение CSV файла для MPI +// rank - номер текущего ранка +// size - общее количество ранков +// Возвращает вектор записей, прочитанных этим ранком +std::vector load_csv_parallel(int rank, int size); + +// Парсинг одной строки CSV в Record +// Возвращает true если парсинг успешен +bool parse_csv_line(const std::string& line, Record& record); diff --git a/src/main.cpp b/src/main.cpp index 929d406..42229ce 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,54 +1,11 @@ #include -#include #include #include -#include #include -#include #include "csv_loader.hpp" #include "utils.hpp" #include "record.hpp" -#include "day_stats.hpp" -#include "aggregation.hpp" -#include "intervals.hpp" -#include "gpu_loader.hpp" - -// Функция: отобрать записи для конкретного ранга -std::vector select_records_for_rank( - const std::map>& days, - const std::vector& day_list) -{ - std::vector out; - for (auto d : day_list) { - auto it = days.find(d); - if (it != days.end()) { - const auto& vec = it->second; - out.insert(out.end(), vec.begin(), vec.end()); - } - } - return out; -} - -// Разделить записи на N частей (по дням) -std::vector> split_records(const std::vector& records, int n_parts) { - // Группируем по дням - std::map> by_day; - for (const auto& r : records) { - DayIndex day = static_cast(r.timestamp) / 86400; - by_day[day].push_back(r); - } - - // Распределяем дни по частям - std::vector> parts(n_parts); - int i = 0; - for (auto& [day, recs] : by_day) { - parts[i % n_parts].insert(parts[i % n_parts].end(), recs.begin(), recs.end()); - i++; - } - - return parts; -} int main(int argc, char** argv) { MPI_Init(&argc, &argv); @@ -57,222 +14,18 @@ int main(int argc, char** argv) { MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &size); - // Читаем количество CPU потоков из переменной окружения - int num_cpu_threads = 2; - const char* env_threads = std::getenv("NUM_CPU_THREADS"); - if (env_threads) { - num_cpu_threads = std::atoi(env_threads); - if (num_cpu_threads < 1) num_cpu_threads = 1; - } - omp_set_num_threads(num_cpu_threads + 1); // +1 для GPU потока если есть - - // Проверка доступности GPU - bool have_gpu = gpu_is_available(); + // Параллельное чтение данных + double start_time = MPI_Wtime(); - if (have_gpu) { - std::cout << "Rank " << rank << ": GPU available + " << num_cpu_threads << " CPU threads" << std::endl; - } else { - std::cout << "Rank " << rank << ": " << num_cpu_threads << " CPU threads only" << std::endl; - } - - std::vector local_records; - - // ====== ТАЙМЕРЫ ====== - double time_load_data = 0.0; - double time_distribute = 0.0; - - if (rank == 0) { - std::cout << "Rank 0 loading CSV..." << std::endl; - - // Таймер загрузки данных - double t_load_start = MPI_Wtime(); - - // Запускаем из build - auto records = load_csv("../data/data.csv"); - - auto days = group_by_day(records); - auto parts = split_days(days, size); - - time_load_data = MPI_Wtime() - t_load_start; - std::cout << "Rank 0: Data loading time: " << std::fixed << std::setprecision(3) - << time_load_data << "s" << std::endl; - - // Таймер рассылки данных - double t_distribute_start = MPI_Wtime(); - - // Рассылаем данные - for (int r = 0; r < size; r++) { - auto vec = select_records_for_rank(days, parts[r]); - - if (r == 0) { - // себе не отправляем — сразу сохраняем - local_records = vec; - continue; - } - - int count = static_cast(vec.size()); - MPI_Send(&count, 1, MPI_INT, r, 0, MPI_COMM_WORLD); - MPI_Send(vec.data(), count * sizeof(Record), MPI_BYTE, r, 1, MPI_COMM_WORLD); - } - - time_distribute = MPI_Wtime() - t_distribute_start; - } - else { - // Таймер получения данных - double t_receive_start = MPI_Wtime(); - - // Принимает данные - int count = 0; - MPI_Recv(&count, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - - local_records.resize(count); - MPI_Recv(local_records.data(), count * sizeof(Record), - MPI_BYTE, 0, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - - time_distribute = MPI_Wtime() - t_receive_start; - } - - MPI_Barrier(MPI_COMM_WORLD); + std::vector records = load_csv_parallel(rank, size); - // Вывод времени рассылки/получения данных - std::cout << "Rank " << rank << ": Data distribution time: " << std::fixed - << std::setprecision(3) << time_distribute << "s" << std::endl; + double end_time = MPI_Wtime(); + double read_time = end_time - start_time; - std::cout << "Rank " << rank << " received " - << local_records.size() << " records" << std::endl; - - // ====== АГРЕГАЦИЯ НА КАЖДОМ УЗЛЕ ====== - std::vector local_stats; - double time_start = omp_get_wtime(); - - // Время работы: [0] = GPU (если есть), [1..n] = CPU потоки - std::vector worker_times(num_cpu_threads + 1, 0.0); - - if (have_gpu) { - // GPU узел: делим на (1 + num_cpu_threads) частей - int n_workers = 1 + num_cpu_threads; - auto parts = split_records(local_records, n_workers); - - std::vector> results(n_workers); - std::vector success(n_workers, true); - - #pragma omp parallel - { - int tid = omp_get_thread_num(); - if (tid < n_workers) { - double t0 = omp_get_wtime(); - if (tid == 0) { - // GPU поток - success[0] = aggregate_days_gpu_simple(parts[0], results[0]); - } else { - // CPU потоки - results[tid] = aggregate_days(parts[tid]); - } - worker_times[tid] = omp_get_wtime() - t0; - } - } - - // Объединяем результаты - for (int i = 0; i < n_workers; i++) { - if (i == 0 && !success[0]) { - // GPU failed - обработаем на CPU - std::cout << "Rank " << rank << ": GPU failed, processing on CPU" << std::endl; - double t0 = omp_get_wtime(); - results[0] = aggregate_days(parts[0]); - worker_times[0] = omp_get_wtime() - t0; - } - local_stats.insert(local_stats.end(), results[i].begin(), results[i].end()); - } - - } else { - // CPU-only узел - auto parts = split_records(local_records, num_cpu_threads); - std::vector> results(num_cpu_threads); - - #pragma omp parallel - { - int tid = omp_get_thread_num(); - if (tid < num_cpu_threads) { - double t0 = omp_get_wtime(); - results[tid] = aggregate_days(parts[tid]); - worker_times[tid + 1] = omp_get_wtime() - t0; // +1 т.к. [0] для GPU - } - } - - for (int i = 0; i < num_cpu_threads; i++) { - local_stats.insert(local_stats.end(), results[i].begin(), results[i].end()); - } - } - - double time_total = omp_get_wtime() - time_start; - - // Вывод времени - std::cout << std::fixed << std::setprecision(3); - std::cout << "Rank " << rank << " aggregated " << local_stats.size() << " days in " - << time_total << "s ("; - if (have_gpu) { - std::cout << "GPU: " << worker_times[0] << "s, "; - } - for (int i = 0; i < num_cpu_threads; i++) { - int idx = have_gpu ? (i + 1) : (i + 1); - std::cout << "CPU" << i << ": " << worker_times[idx] << "s"; - if (i < num_cpu_threads - 1) std::cout << ", "; - } - std::cout << ")" << std::endl; - - // ====== СБОР АГРЕГИРОВАННЫХ ДАННЫХ НА RANK 0 ====== - std::vector all_stats; - - if (rank == 0) { - // Добавляем свои данные - all_stats.insert(all_stats.end(), local_stats.begin(), local_stats.end()); - - // Получаем данные от других узлов - for (int r = 1; r < size; r++) { - int count = 0; - MPI_Recv(&count, 1, MPI_INT, r, 2, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - - std::vector remote_stats(count); - MPI_Recv(remote_stats.data(), count * sizeof(DayStats), - MPI_BYTE, r, 3, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - - all_stats.insert(all_stats.end(), remote_stats.begin(), remote_stats.end()); - } - } else { - // Отправляем свои агрегированные данные на rank 0 - int count = static_cast(local_stats.size()); - MPI_Send(&count, 1, MPI_INT, 0, 2, MPI_COMM_WORLD); - MPI_Send(local_stats.data(), count * sizeof(DayStats), MPI_BYTE, 0, 3, MPI_COMM_WORLD); - } - - // ====== ВЫЧИСЛЕНИЕ ИНТЕРВАЛОВ НА RANK 0 ====== - if (rank == 0) { - std::cout << "Rank 0: merging " << all_stats.size() << " day stats..." << std::endl; - - // Объединяем и сортируем - auto merged_stats = merge_day_stats(all_stats); - std::cout << "Rank 0: total " << merged_stats.size() << " unique days" << std::endl; - - // Вычисляем интервалы - auto intervals = find_intervals(merged_stats, 0.10); - std::cout << "Found " << intervals.size() << " intervals with >=10% change" << std::endl; - - // Записываем результат - write_intervals("../result.csv", intervals); - std::cout << "Results written to result.csv" << std::endl; - - // Выводим первые несколько интервалов - std::cout << "\nFirst 5 intervals:\n"; - std::cout << "start_date,end_date,min_open,max_close,change\n"; - for (size_t i = 0; i < std::min(intervals.size(), size_t(5)); i++) { - const auto& iv = intervals[i]; - std::cout << day_index_to_date(iv.start_day) << "," - << day_index_to_date(iv.end_day) << "," - << iv.min_open << "," - << iv.max_close << "," - << iv.change << "\n"; - } - } + std::cout << "Rank " << rank + << ": read " << records.size() << " records" + << " in " << std::fixed << std::setprecision(3) << read_time << " sec" + << std::endl; MPI_Finalize(); return 0; diff --git a/src/utils.cpp b/src/utils.cpp index aa78e7f..b19cc30 100644 --- a/src/utils.cpp +++ b/src/utils.cpp @@ -1,4 +1,8 @@ #include "utils.hpp" +#include +#include +#include +#include std::map> group_by_day(const std::vector& recs) { std::map> days; @@ -23,3 +27,88 @@ std::vector> split_days(const std::map get_data_read_shares() { + std::vector shares; + std::stringstream ss(get_env("DATA_READ_SHARES")); + std::string item; + while (std::getline(ss, item, ',')) { + shares.push_back(std::stoi(item)); + } + return shares; +} + +int64_t get_read_overlap_bytes() { + return std::stoll(get_env("READ_OVERLAP_BYTES")); +} + +int64_t get_file_size(const std::string& path) { + std::ifstream file(path, std::ios::binary | std::ios::ate); + if (!file.is_open()) { + throw std::runtime_error("Cannot open file: " + path); + } + return static_cast(file.tellg()); +} + +ByteRange calculate_byte_range(int rank, int size, int64_t file_size, + const std::vector& shares, int64_t overlap_bytes) { + // Если shares пустой или не соответствует size, используем равные доли + std::vector effective_shares; + if (shares.size() == static_cast(size)) { + effective_shares = shares; + } else { + effective_shares.assign(size, 1); + } + + int total_shares = std::accumulate(effective_shares.begin(), effective_shares.end(), 0); + + // Вычисляем базовые границы для каждого ранка + int64_t bytes_per_share = file_size / total_shares; + + int64_t base_start = 0; + for (int i = 0; i < rank; i++) { + base_start += bytes_per_share * effective_shares[i]; + } + + int64_t base_end = base_start + bytes_per_share * effective_shares[rank]; + + // Применяем overlap + ByteRange range; + + if (rank == 0) { + // Первый ранк: начинаем с 0, добавляем overlap в конце + range.start = 0; + range.end = std::min(base_end + overlap_bytes, file_size); + } else if (rank == size - 1) { + // Последний ранк: вычитаем overlap в начале, читаем до конца файла + range.start = std::max(base_start - overlap_bytes, static_cast(0)); + range.end = file_size; + } else { + // Промежуточные ранки: overlap с обеих сторон + range.start = std::max(base_start - overlap_bytes, static_cast(0)); + range.end = std::min(base_end + overlap_bytes, file_size); + } + + return range; +} diff --git a/src/utils.hpp b/src/utils.hpp index 5a8c18c..710c6a5 100644 --- a/src/utils.hpp +++ b/src/utils.hpp @@ -4,6 +4,29 @@ #include "day_stats.hpp" #include #include +#include +#include +#include +// Группировка записей по дням std::map> group_by_day(const std::vector& recs); std::vector> split_days(const std::map>& days, int parts); + +// Чтение переменных окружения +int get_num_cpu_threads(); +std::string get_data_path(); +std::vector get_data_read_shares(); +int64_t get_read_overlap_bytes(); + +// Структура для хранения диапазона байт для чтения +struct ByteRange { + int64_t start; + int64_t end; // exclusive +}; + +// Вычисляет диапазон байт для конкретного ранка +ByteRange calculate_byte_range(int rank, int size, int64_t file_size, + const std::vector& shares, int64_t overlap_bytes); + +// Получение размера файла +int64_t get_file_size(const std::string& path);