diff --git a/run.slurm b/run.slurm index e947ec9..19d5a3f 100644 --- a/run.slurm +++ b/run.slurm @@ -7,9 +7,10 @@ # Путь к файлу данных (должен существовать на всех узлах) export DATA_PATH="/mnt/shared/supercomputers/data/data.csv" +# export DATA_PATH="/data/data.csv" # Доли данных для каждого ранка (сумма определяет пропорции) -export DATA_READ_SHARES="10,12,13,13" +export DATA_READ_SHARES="10,14,18,22" # Размер перекрытия в байтах для обработки границ строк export READ_OVERLAP_BYTES=131072 diff --git a/src/intervals.cpp b/src/intervals.cpp index deca317..f01d470 100644 --- a/src/intervals.cpp +++ b/src/intervals.cpp @@ -1,58 +1,309 @@ #include "intervals.hpp" +#include #include #include #include #include #include #include +#include -std::vector find_intervals(const std::vector& days, double threshold) { - if (days.empty()) { - return {}; +// Вспомогательная структура для накопления min/max в интервале +struct IntervalAccumulator { + DayIndex start_day; + double start_avg; + double open_min; + double open_max; + double close_min; + double close_max; + + void init(const DayStats& day) { + start_day = day.day; + start_avg = day.avg; + open_min = day.open_min; + open_max = day.open_max; + close_min = day.close_min; + close_max = day.close_max; } - std::vector intervals; + void update(const DayStats& day) { + open_min = std::min(open_min, day.open_min); + open_max = std::max(open_max, day.open_max); + close_min = std::min(close_min, day.close_min); + close_max = std::max(close_max, day.close_max); + } + Interval finalize(const DayStats& end_day, double change) const { + Interval iv; + iv.start_day = start_day; + iv.end_day = end_day.day; + iv.start_avg = start_avg; + iv.end_avg = end_day.avg; + iv.change = change; + iv.open_min = std::min(open_min, end_day.open_min); + iv.open_max = std::max(open_max, end_day.open_max); + iv.close_min = std::min(close_min, end_day.close_min); + iv.close_max = std::max(close_max, end_day.close_max); + return iv; + } +}; + +// Упакованная структура DayStats для MPI передачи (8 doubles) +struct PackedDayStats { + double day; // DayIndex as double + double avg; + double open_min; + double open_max; + double close_min; + double close_max; + double count; // int64_t as double + double valid; // флаг валидности (1.0 = valid, 0.0 = invalid) + + void pack(const DayStats& ds) { + day = static_cast(ds.day); + avg = ds.avg; + open_min = ds.open_min; + open_max = ds.open_max; + close_min = ds.close_min; + close_max = ds.close_max; + count = static_cast(ds.count); + valid = 1.0; + } + + DayStats unpack() const { + DayStats ds; + ds.day = static_cast(day); + ds.avg = avg; + ds.open_min = open_min; + ds.open_max = open_max; + ds.close_min = close_min; + ds.close_max = close_max; + ds.count = static_cast(count); + return ds; + } + + bool is_valid() const { return valid > 0.5; } + void set_invalid() { valid = 0.0; } +}; + +IntervalResult find_intervals_parallel( + const std::vector& days, + int rank, int size, + double threshold) +{ + IntervalResult result; + result.compute_time = 0.0; + result.wait_time = 0.0; + + if (days.empty()) { + // Передаём невалидный DayStats следующему ранку + if (rank < size - 1) { + PackedDayStats invalid; + invalid.set_invalid(); + MPI_Send(&invalid, 8, MPI_DOUBLE, rank + 1, 0, MPI_COMM_WORLD); + } + return result; + } + + double compute_start = MPI_Wtime(); + + // Определяем, до какого индекса обрабатывать + // Для последнего ранка - до конца, для остальных - до предпоследнего дня + size_t process_until = (rank == size - 1) ? days.size() : days.size() - 1; + + IntervalAccumulator acc; size_t start_idx = 0; - double price_base = days[start_idx].avg; + bool have_pending_interval = false; - for (size_t i = 1; i < days.size(); i++) { - double price_now = days[i].avg; - double change = std::abs(price_now - price_base) / price_base; + // Если не первый ранк - ждём данные от предыдущего + if (rank > 0) { + double wait_start = MPI_Wtime(); - if (change >= threshold) { - Interval interval; - interval.start_day = days[start_idx].day; - interval.end_day = days[i].day; - interval.start_avg = price_base; - interval.end_avg = price_now; - interval.change = change; + PackedDayStats received; + MPI_Recv(&received, 8, MPI_DOUBLE, rank - 1, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + + result.wait_time = MPI_Wtime() - wait_start; + compute_start = MPI_Wtime(); + + if (received.is_valid()) { + DayStats prev_day = received.unpack(); - // Находим min/max Open и Close в интервале - interval.open_min = days[start_idx].open_min; - interval.open_max = days[start_idx].open_max; - interval.close_min = days[start_idx].close_min; - interval.close_max = days[start_idx].close_max; - - for (size_t j = start_idx + 1; j <= i; j++) { - interval.open_min = std::min(interval.open_min, days[j].open_min); - interval.open_max = std::max(interval.open_max, days[j].open_max); - interval.close_min = std::min(interval.close_min, days[j].close_min); - interval.close_max = std::max(interval.close_max, days[j].close_max); + // Ищем первый день с индексом > prev_day.day + for (start_idx = 0; start_idx < days.size(); start_idx++) { + if (days[start_idx].day > prev_day.day) { + break; + } } - intervals.push_back(interval); - - // Начинаем новый интервал - start_idx = i + 1; - if (start_idx >= days.size()) { - break; + if (start_idx < process_until) { + // Инициализируем аккумулятор данными от предыдущего ранка + acc.init(prev_day); + have_pending_interval = true; + + // Продолжаем строить интервал + for (size_t i = start_idx; i < process_until; i++) { + acc.update(days[i]); + + double change = std::abs(days[i].avg - acc.start_avg) / acc.start_avg; + + if (change >= threshold) { + result.intervals.push_back(acc.finalize(days[i], change)); + have_pending_interval = false; + + // Начинаем новый интервал + start_idx = i + 1; + if (start_idx < process_until) { + acc.init(days[start_idx]); + have_pending_interval = true; + } + } + } } - price_base = days[start_idx].avg; + } else { + // Предыдущий ранк не передал валидные данные, начинаем с начала + if (process_until > 0) { + acc.init(days[0]); + have_pending_interval = true; + start_idx = 0; + } + } + } else { + // Первый ранк - начинаем с первого дня + if (process_until > 0) { + acc.init(days[0]); + have_pending_interval = true; + start_idx = 0; } } - return intervals; + // Обрабатываем дни (если ещё не обработали выше) + if (rank == 0 && have_pending_interval) { + for (size_t i = 1; i < process_until; i++) { + acc.update(days[i]); + + double change = std::abs(days[i].avg - acc.start_avg) / acc.start_avg; + + if (change >= threshold) { + result.intervals.push_back(acc.finalize(days[i], change)); + have_pending_interval = false; + + // Начинаем новый интервал + start_idx = i + 1; + if (start_idx < process_until) { + acc.init(days[start_idx]); + have_pending_interval = true; + } + } + } + } + + // Для последнего ранка: завершаем последний интервал на последнем дне + if (rank == size - 1 && have_pending_interval && !days.empty()) { + const auto& last_day = days.back(); + double change = std::abs(last_day.avg - acc.start_avg) / acc.start_avg; + result.intervals.push_back(acc.finalize(last_day, change)); + } + + result.compute_time = MPI_Wtime() - compute_start; + + // Передаём данные следующему ранку + if (rank < size - 1) { + PackedDayStats to_send; + + if (have_pending_interval) { + // Передаём день, с которого начался незавершённый интервал + DayStats start_day; + start_day.day = acc.start_day; + start_day.avg = acc.start_avg; + start_day.open_min = acc.open_min; + start_day.open_max = acc.open_max; + start_day.close_min = acc.close_min; + start_day.close_max = acc.close_max; + start_day.count = 0; + to_send.pack(start_day); + } else if (!days.empty()) { + // Интервал завершился, передаём предпоследний день + to_send.pack(days[days.size() - 2]); + } else { + to_send.set_invalid(); + } + + MPI_Send(&to_send, 8, MPI_DOUBLE, rank + 1, 0, MPI_COMM_WORLD); + } + + return result; +} + +double collect_intervals( + std::vector& local_intervals, + int rank, int size) +{ + double wait_time = 0.0; + + // Упакованный Interval для MPI (9 doubles) + // start_day, end_day, open_min, open_max, close_min, close_max, start_avg, end_avg, change + + if (rank == 0) { + // Собираем интервалы со всех остальных ранков + for (int r = 1; r < size; r++) { + double wait_start = MPI_Wtime(); + + // Сначала получаем количество интервалов + int count; + MPI_Recv(&count, 1, MPI_INT, r, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + + if (count > 0) { + std::vector buffer(count * 9); + MPI_Recv(buffer.data(), count * 9, MPI_DOUBLE, r, 2, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + + // Распаковываем + for (int i = 0; i < count; i++) { + Interval iv; + iv.start_day = static_cast(buffer[i * 9 + 0]); + iv.end_day = static_cast(buffer[i * 9 + 1]); + iv.open_min = buffer[i * 9 + 2]; + iv.open_max = buffer[i * 9 + 3]; + iv.close_min = buffer[i * 9 + 4]; + iv.close_max = buffer[i * 9 + 5]; + iv.start_avg = buffer[i * 9 + 6]; + iv.end_avg = buffer[i * 9 + 7]; + iv.change = buffer[i * 9 + 8]; + local_intervals.push_back(iv); + } + } + + wait_time += MPI_Wtime() - wait_start; + } + + // Сортируем по start_day + std::sort(local_intervals.begin(), local_intervals.end(), + [](const Interval& a, const Interval& b) { + return a.start_day < b.start_day; + }); + } else { + // Отправляем свои интервалы на ранк 0 + int count = static_cast(local_intervals.size()); + MPI_Send(&count, 1, MPI_INT, 0, 1, MPI_COMM_WORLD); + + if (count > 0) { + std::vector buffer(count * 9); + for (int i = 0; i < count; i++) { + const auto& iv = local_intervals[i]; + buffer[i * 9 + 0] = static_cast(iv.start_day); + buffer[i * 9 + 1] = static_cast(iv.end_day); + buffer[i * 9 + 2] = iv.open_min; + buffer[i * 9 + 3] = iv.open_max; + buffer[i * 9 + 4] = iv.close_min; + buffer[i * 9 + 5] = iv.close_max; + buffer[i * 9 + 6] = iv.start_avg; + buffer[i * 9 + 7] = iv.end_avg; + buffer[i * 9 + 8] = iv.change; + } + MPI_Send(buffer.data(), count * 9, MPI_DOUBLE, 0, 2, MPI_COMM_WORLD); + } + } + + return wait_time; } std::string day_index_to_date(DayIndex day) { diff --git a/src/intervals.hpp b/src/intervals.hpp index 8c4a4e7..4e211c0 100644 --- a/src/intervals.hpp +++ b/src/intervals.hpp @@ -8,17 +8,36 @@ struct Interval { DayIndex start_day; DayIndex end_day; - double open_min; // минимальный Open в интервале - double open_max; // максимальный Open в интервале - double close_min; // минимальный Close в интервале - double close_max; // максимальный Close в интервале + double open_min; + double open_max; + double close_min; + double close_max; double start_avg; double end_avg; double change; }; -// Вычисление интервалов с изменением >= threshold (по умолчанию 10%) -std::vector find_intervals(const std::vector& days, double threshold = 0.10); +// Результат параллельного построения интервалов +struct IntervalResult { + std::vector intervals; + double compute_time; // время вычислений + double wait_time; // время ожидания данных от предыдущего ранка +}; + +// Параллельное построение интервалов с использованием MPI +// Каждый ранк обрабатывает свою часть дней и передаёт незавершённый интервал следующему +IntervalResult find_intervals_parallel( + const std::vector& days, + int rank, int size, + double threshold = 0.10 +); + +// Сбор интервалов со всех ранков на ранк 0 +// Возвращает время ожидания данных +double collect_intervals( + std::vector& local_intervals, + int rank, int size +); // Вывод интервалов в файл void write_intervals(const std::string& filename, const std::vector& intervals); diff --git a/src/main.cpp b/src/main.cpp index 1dea0f6..e4bcbe4 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -7,6 +7,7 @@ #include "record.hpp" #include "day_stats.hpp" #include "aggregation.hpp" +#include "intervals.hpp" #include "utils.hpp" int main(int argc, char** argv) { @@ -47,6 +48,35 @@ int main(int argc, char** argv) { << ".." << (days.empty() ? 0 : days.back().day) << "]" << std::endl; + // Параллельное построение интервалов + IntervalResult iv_result = find_intervals_parallel(days, rank, size); + + std::cout << "Rank " << rank + << ": found " << iv_result.intervals.size() << " intervals" + << ", compute " << std::fixed << std::setprecision(6) << iv_result.compute_time << " sec" + << ", wait " << iv_result.wait_time << " sec" + << std::endl; + + // Сбор интервалов на ранке 0 + double collect_wait = collect_intervals(iv_result.intervals, rank, size); + + if (rank == 0) { + std::cout << "Rank 0: collected " << iv_result.intervals.size() << " total intervals" + << ", wait " << std::fixed << std::setprecision(3) << collect_wait << " sec" + << std::endl; + } + + // Запись результатов в файл (только ранк 0) + if (rank == 0) { + double write_start = MPI_Wtime(); + write_intervals("result.csv", iv_result.intervals); + double write_time = MPI_Wtime() - write_start; + + std::cout << "Rank 0: wrote result.csv" + << " in " << std::fixed << std::setprecision(3) << write_time << " sec" + << std::endl; + } + MPI_Finalize(); return 0; }