Поиск интервалов параллельный

This commit is contained in:
2025-12-15 12:57:56 +00:00
parent f4ade418d6
commit dea7940e29
4 changed files with 342 additions and 41 deletions

View File

@@ -7,9 +7,10 @@
# Путь к файлу данных (должен существовать на всех узлах) # Путь к файлу данных (должен существовать на всех узлах)
export DATA_PATH="/mnt/shared/supercomputers/data/data.csv" export DATA_PATH="/mnt/shared/supercomputers/data/data.csv"
# export DATA_PATH="/data/data.csv"
# Доли данных для каждого ранка (сумма определяет пропорции) # Доли данных для каждого ранка (сумма определяет пропорции)
export DATA_READ_SHARES="10,12,13,13" export DATA_READ_SHARES="10,14,18,22"
# Размер перекрытия в байтах для обработки границ строк # Размер перекрытия в байтах для обработки границ строк
export READ_OVERLAP_BYTES=131072 export READ_OVERLAP_BYTES=131072

View File

@@ -1,58 +1,309 @@
#include "intervals.hpp" #include "intervals.hpp"
#include <mpi.h>
#include <algorithm> #include <algorithm>
#include <cmath> #include <cmath>
#include <fstream> #include <fstream>
#include <iomanip> #include <iomanip>
#include <sstream> #include <sstream>
#include <ctime> #include <ctime>
#include <limits>
std::vector<Interval> find_intervals(const std::vector<DayStats>& days, double threshold) { // Вспомогательная структура для накопления min/max в интервале
if (days.empty()) { struct IntervalAccumulator {
return {}; DayIndex start_day;
double start_avg;
double open_min;
double open_max;
double close_min;
double close_max;
void init(const DayStats& day) {
start_day = day.day;
start_avg = day.avg;
open_min = day.open_min;
open_max = day.open_max;
close_min = day.close_min;
close_max = day.close_max;
} }
std::vector<Interval> intervals; void update(const DayStats& day) {
open_min = std::min(open_min, day.open_min);
open_max = std::max(open_max, day.open_max);
close_min = std::min(close_min, day.close_min);
close_max = std::max(close_max, day.close_max);
}
Interval finalize(const DayStats& end_day, double change) const {
Interval iv;
iv.start_day = start_day;
iv.end_day = end_day.day;
iv.start_avg = start_avg;
iv.end_avg = end_day.avg;
iv.change = change;
iv.open_min = std::min(open_min, end_day.open_min);
iv.open_max = std::max(open_max, end_day.open_max);
iv.close_min = std::min(close_min, end_day.close_min);
iv.close_max = std::max(close_max, end_day.close_max);
return iv;
}
};
// Упакованная структура DayStats для MPI передачи (8 doubles)
struct PackedDayStats {
double day; // DayIndex as double
double avg;
double open_min;
double open_max;
double close_min;
double close_max;
double count; // int64_t as double
double valid; // флаг валидности (1.0 = valid, 0.0 = invalid)
void pack(const DayStats& ds) {
day = static_cast<double>(ds.day);
avg = ds.avg;
open_min = ds.open_min;
open_max = ds.open_max;
close_min = ds.close_min;
close_max = ds.close_max;
count = static_cast<double>(ds.count);
valid = 1.0;
}
DayStats unpack() const {
DayStats ds;
ds.day = static_cast<DayIndex>(day);
ds.avg = avg;
ds.open_min = open_min;
ds.open_max = open_max;
ds.close_min = close_min;
ds.close_max = close_max;
ds.count = static_cast<int64_t>(count);
return ds;
}
bool is_valid() const { return valid > 0.5; }
void set_invalid() { valid = 0.0; }
};
IntervalResult find_intervals_parallel(
const std::vector<DayStats>& days,
int rank, int size,
double threshold)
{
IntervalResult result;
result.compute_time = 0.0;
result.wait_time = 0.0;
if (days.empty()) {
// Передаём невалидный DayStats следующему ранку
if (rank < size - 1) {
PackedDayStats invalid;
invalid.set_invalid();
MPI_Send(&invalid, 8, MPI_DOUBLE, rank + 1, 0, MPI_COMM_WORLD);
}
return result;
}
double compute_start = MPI_Wtime();
// Определяем, до какого индекса обрабатывать
// Для последнего ранка - до конца, для остальных - до предпоследнего дня
size_t process_until = (rank == size - 1) ? days.size() : days.size() - 1;
IntervalAccumulator acc;
size_t start_idx = 0; size_t start_idx = 0;
double price_base = days[start_idx].avg; bool have_pending_interval = false;
for (size_t i = 1; i < days.size(); i++) { // Если не первый ранк - ждём данные от предыдущего
double price_now = days[i].avg; if (rank > 0) {
double change = std::abs(price_now - price_base) / price_base; double wait_start = MPI_Wtime();
if (change >= threshold) { PackedDayStats received;
Interval interval; MPI_Recv(&received, 8, MPI_DOUBLE, rank - 1, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
interval.start_day = days[start_idx].day;
interval.end_day = days[i].day;
interval.start_avg = price_base;
interval.end_avg = price_now;
interval.change = change;
// Находим min/max Open и Close в интервале result.wait_time = MPI_Wtime() - wait_start;
interval.open_min = days[start_idx].open_min; compute_start = MPI_Wtime();
interval.open_max = days[start_idx].open_max;
interval.close_min = days[start_idx].close_min;
interval.close_max = days[start_idx].close_max;
for (size_t j = start_idx + 1; j <= i; j++) { if (received.is_valid()) {
interval.open_min = std::min(interval.open_min, days[j].open_min); DayStats prev_day = received.unpack();
interval.open_max = std::max(interval.open_max, days[j].open_max);
interval.close_min = std::min(interval.close_min, days[j].close_min); // Ищем первый день с индексом > prev_day.day
interval.close_max = std::max(interval.close_max, days[j].close_max); for (start_idx = 0; start_idx < days.size(); start_idx++) {
if (days[start_idx].day > prev_day.day) {
break;
}
} }
intervals.push_back(interval); if (start_idx < process_until) {
// Инициализируем аккумулятор данными от предыдущего ранка
acc.init(prev_day);
have_pending_interval = true;
// Начинаем новый интервал // Продолжаем строить интервал
start_idx = i + 1; for (size_t i = start_idx; i < process_until; i++) {
if (start_idx >= days.size()) { acc.update(days[i]);
break;
double change = std::abs(days[i].avg - acc.start_avg) / acc.start_avg;
if (change >= threshold) {
result.intervals.push_back(acc.finalize(days[i], change));
have_pending_interval = false;
// Начинаем новый интервал
start_idx = i + 1;
if (start_idx < process_until) {
acc.init(days[start_idx]);
have_pending_interval = true;
}
}
}
} }
price_base = days[start_idx].avg; } else {
// Предыдущий ранк не передал валидные данные, начинаем с начала
if (process_until > 0) {
acc.init(days[0]);
have_pending_interval = true;
start_idx = 0;
}
}
} else {
// Первый ранк - начинаем с первого дня
if (process_until > 0) {
acc.init(days[0]);
have_pending_interval = true;
start_idx = 0;
} }
} }
return intervals; // Обрабатываем дни (если ещё не обработали выше)
if (rank == 0 && have_pending_interval) {
for (size_t i = 1; i < process_until; i++) {
acc.update(days[i]);
double change = std::abs(days[i].avg - acc.start_avg) / acc.start_avg;
if (change >= threshold) {
result.intervals.push_back(acc.finalize(days[i], change));
have_pending_interval = false;
// Начинаем новый интервал
start_idx = i + 1;
if (start_idx < process_until) {
acc.init(days[start_idx]);
have_pending_interval = true;
}
}
}
}
// Для последнего ранка: завершаем последний интервал на последнем дне
if (rank == size - 1 && have_pending_interval && !days.empty()) {
const auto& last_day = days.back();
double change = std::abs(last_day.avg - acc.start_avg) / acc.start_avg;
result.intervals.push_back(acc.finalize(last_day, change));
}
result.compute_time = MPI_Wtime() - compute_start;
// Передаём данные следующему ранку
if (rank < size - 1) {
PackedDayStats to_send;
if (have_pending_interval) {
// Передаём день, с которого начался незавершённый интервал
DayStats start_day;
start_day.day = acc.start_day;
start_day.avg = acc.start_avg;
start_day.open_min = acc.open_min;
start_day.open_max = acc.open_max;
start_day.close_min = acc.close_min;
start_day.close_max = acc.close_max;
start_day.count = 0;
to_send.pack(start_day);
} else if (!days.empty()) {
// Интервал завершился, передаём предпоследний день
to_send.pack(days[days.size() - 2]);
} else {
to_send.set_invalid();
}
MPI_Send(&to_send, 8, MPI_DOUBLE, rank + 1, 0, MPI_COMM_WORLD);
}
return result;
}
double collect_intervals(
std::vector<Interval>& local_intervals,
int rank, int size)
{
double wait_time = 0.0;
// Упакованный Interval для MPI (9 doubles)
// start_day, end_day, open_min, open_max, close_min, close_max, start_avg, end_avg, change
if (rank == 0) {
// Собираем интервалы со всех остальных ранков
for (int r = 1; r < size; r++) {
double wait_start = MPI_Wtime();
// Сначала получаем количество интервалов
int count;
MPI_Recv(&count, 1, MPI_INT, r, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
if (count > 0) {
std::vector<double> buffer(count * 9);
MPI_Recv(buffer.data(), count * 9, MPI_DOUBLE, r, 2, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
// Распаковываем
for (int i = 0; i < count; i++) {
Interval iv;
iv.start_day = static_cast<DayIndex>(buffer[i * 9 + 0]);
iv.end_day = static_cast<DayIndex>(buffer[i * 9 + 1]);
iv.open_min = buffer[i * 9 + 2];
iv.open_max = buffer[i * 9 + 3];
iv.close_min = buffer[i * 9 + 4];
iv.close_max = buffer[i * 9 + 5];
iv.start_avg = buffer[i * 9 + 6];
iv.end_avg = buffer[i * 9 + 7];
iv.change = buffer[i * 9 + 8];
local_intervals.push_back(iv);
}
}
wait_time += MPI_Wtime() - wait_start;
}
// Сортируем по start_day
std::sort(local_intervals.begin(), local_intervals.end(),
[](const Interval& a, const Interval& b) {
return a.start_day < b.start_day;
});
} else {
// Отправляем свои интервалы на ранк 0
int count = static_cast<int>(local_intervals.size());
MPI_Send(&count, 1, MPI_INT, 0, 1, MPI_COMM_WORLD);
if (count > 0) {
std::vector<double> buffer(count * 9);
for (int i = 0; i < count; i++) {
const auto& iv = local_intervals[i];
buffer[i * 9 + 0] = static_cast<double>(iv.start_day);
buffer[i * 9 + 1] = static_cast<double>(iv.end_day);
buffer[i * 9 + 2] = iv.open_min;
buffer[i * 9 + 3] = iv.open_max;
buffer[i * 9 + 4] = iv.close_min;
buffer[i * 9 + 5] = iv.close_max;
buffer[i * 9 + 6] = iv.start_avg;
buffer[i * 9 + 7] = iv.end_avg;
buffer[i * 9 + 8] = iv.change;
}
MPI_Send(buffer.data(), count * 9, MPI_DOUBLE, 0, 2, MPI_COMM_WORLD);
}
}
return wait_time;
} }
std::string day_index_to_date(DayIndex day) { std::string day_index_to_date(DayIndex day) {

View File

@@ -8,17 +8,36 @@
struct Interval { struct Interval {
DayIndex start_day; DayIndex start_day;
DayIndex end_day; DayIndex end_day;
double open_min; // минимальный Open в интервале double open_min;
double open_max; // максимальный Open в интервале double open_max;
double close_min; // минимальный Close в интервале double close_min;
double close_max; // максимальный Close в интервале double close_max;
double start_avg; double start_avg;
double end_avg; double end_avg;
double change; double change;
}; };
// Вычисление интервалов с изменением >= threshold (по умолчанию 10%) // Результат параллельного построения интервалов
std::vector<Interval> find_intervals(const std::vector<DayStats>& days, double threshold = 0.10); struct IntervalResult {
std::vector<Interval> intervals;
double compute_time; // время вычислений
double wait_time; // время ожидания данных от предыдущего ранка
};
// Параллельное построение интервалов с использованием MPI
// Каждый ранк обрабатывает свою часть дней и передаёт незавершённый интервал следующему
IntervalResult find_intervals_parallel(
const std::vector<DayStats>& days,
int rank, int size,
double threshold = 0.10
);
// Сбор интервалов со всех ранков на ранк 0
// Возвращает время ожидания данных
double collect_intervals(
std::vector<Interval>& local_intervals,
int rank, int size
);
// Вывод интервалов в файл // Вывод интервалов в файл
void write_intervals(const std::string& filename, const std::vector<Interval>& intervals); void write_intervals(const std::string& filename, const std::vector<Interval>& intervals);

View File

@@ -7,6 +7,7 @@
#include "record.hpp" #include "record.hpp"
#include "day_stats.hpp" #include "day_stats.hpp"
#include "aggregation.hpp" #include "aggregation.hpp"
#include "intervals.hpp"
#include "utils.hpp" #include "utils.hpp"
int main(int argc, char** argv) { int main(int argc, char** argv) {
@@ -47,6 +48,35 @@ int main(int argc, char** argv) {
<< ".." << (days.empty() ? 0 : days.back().day) << "]" << ".." << (days.empty() ? 0 : days.back().day) << "]"
<< std::endl; << std::endl;
// Параллельное построение интервалов
IntervalResult iv_result = find_intervals_parallel(days, rank, size);
std::cout << "Rank " << rank
<< ": found " << iv_result.intervals.size() << " intervals"
<< ", compute " << std::fixed << std::setprecision(6) << iv_result.compute_time << " sec"
<< ", wait " << iv_result.wait_time << " sec"
<< std::endl;
// Сбор интервалов на ранке 0
double collect_wait = collect_intervals(iv_result.intervals, rank, size);
if (rank == 0) {
std::cout << "Rank 0: collected " << iv_result.intervals.size() << " total intervals"
<< ", wait " << std::fixed << std::setprecision(3) << collect_wait << " sec"
<< std::endl;
}
// Запись результатов в файл (только ранк 0)
if (rank == 0) {
double write_start = MPI_Wtime();
write_intervals("result.csv", iv_result.intervals);
double write_time = MPI_Wtime() - write_start;
std::cout << "Rank 0: wrote result.csv"
<< " in " << std::fixed << std::setprecision(3) << write_time << " sec"
<< std::endl;
}
MPI_Finalize(); MPI_Finalize();
return 0; return 0;
} }