From ab18d9770f9bf389c5ae3b984dd37aa2d80f5400 Mon Sep 17 00:00:00 2001 From: Arity-T Date: Sat, 13 Dec 2025 12:31:21 +0000 Subject: [PATCH] =?UTF-8?q?=D0=90=D0=B3=D1=80=D0=B5=D0=B3=D0=B0=D1=86?= =?UTF-8?q?=D0=B8=D1=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/aggregation.cpp | 97 ++++++++++++++------------------------------- src/aggregation.hpp | 6 --- src/day_stats.hpp | 27 ++++--------- src/gpu_loader.cpp | 11 +++-- src/gpu_loader.hpp | 11 +++-- src/gpu_plugin.cu | 47 ++++++++++------------ src/intervals.cpp | 25 +++++++----- src/intervals.hpp | 14 ++++++- src/main.cpp | 22 +++++++--- 9 files changed, 112 insertions(+), 148 deletions(-) diff --git a/src/aggregation.cpp b/src/aggregation.cpp index 16a64d9..98dcf75 100644 --- a/src/aggregation.cpp +++ b/src/aggregation.cpp @@ -1,87 +1,48 @@ #include "aggregation.hpp" +#include #include #include -#include std::vector aggregate_days(const std::vector& records) { - // Группируем записи по дням - std::map> day_records; + // Накопители для каждого дня + struct DayAccumulator { + double avg_sum = 0.0; + double open_min = std::numeric_limits::max(); + double open_max = std::numeric_limits::lowest(); + double close_min = std::numeric_limits::max(); + double close_max = std::numeric_limits::lowest(); + int64_t count = 0; + }; + + std::map days; for (const auto& r : records) { DayIndex day = static_cast(r.timestamp) / 86400; - day_records[day].push_back(&r); + auto& acc = days[day]; + + double avg = (r.low + r.high) / 2.0; + acc.avg_sum += avg; + acc.open_min = std::min(acc.open_min, r.open); + acc.open_max = std::max(acc.open_max, r.open); + acc.close_min = std::min(acc.close_min, r.close); + acc.close_max = std::max(acc.close_max, r.close); + acc.count++; } std::vector result; - result.reserve(day_records.size()); + result.reserve(days.size()); - for (auto& [day, recs] : day_records) { - // Сортируем по timestamp для определения first/last - std::sort(recs.begin(), recs.end(), - [](const Record* a, const Record* b) { - return a->timestamp < b->timestamp; - }); - + for (const auto& [day, acc] : days) { DayStats stats; stats.day = day; - stats.low = std::numeric_limits::max(); - stats.high = std::numeric_limits::lowest(); - stats.open = recs.front()->open; - stats.close = recs.back()->close; - stats.first_ts = recs.front()->timestamp; - stats.last_ts = recs.back()->timestamp; - - for (const auto* r : recs) { - stats.low = std::min(stats.low, r->low); - stats.high = std::max(stats.high, r->high); - } - - stats.avg = (stats.low + stats.high) / 2.0; - + stats.avg = acc.avg_sum / static_cast(acc.count); + stats.open_min = acc.open_min; + stats.open_max = acc.open_max; + stats.close_min = acc.close_min; + stats.close_max = acc.close_max; + stats.count = acc.count; result.push_back(stats); } return result; } - -std::vector merge_day_stats(const std::vector& all_stats) { - // Объединяем статистику по одинаковым дням (если такие есть) - std::map merged; - - for (const auto& s : all_stats) { - auto it = merged.find(s.day); - if (it == merged.end()) { - merged[s.day] = s; - } else { - // Объединяем данные за один день - auto& m = it->second; - m.low = std::min(m.low, s.low); - m.high = std::max(m.high, s.high); - - // open берём от записи с меньшим timestamp - if (s.first_ts < m.first_ts) { - m.open = s.open; - m.first_ts = s.first_ts; - } - - // close берём от записи с большим timestamp - if (s.last_ts > m.last_ts) { - m.close = s.close; - m.last_ts = s.last_ts; - } - - m.avg = (m.low + m.high) / 2.0; - } - } - - // Преобразуем в отсортированный вектор - std::vector result; - result.reserve(merged.size()); - - for (auto& [day, stats] : merged) { - result.push_back(stats); - } - - return result; -} - diff --git a/src/aggregation.hpp b/src/aggregation.hpp index 67e9478..acceb05 100644 --- a/src/aggregation.hpp +++ b/src/aggregation.hpp @@ -3,12 +3,6 @@ #include "record.hpp" #include "day_stats.hpp" #include -#include // Агрегация записей по дням на одном узле std::vector aggregate_days(const std::vector& records); - -// Объединение агрегированных данных с разных узлов -// (на случай если один день попал на разные узлы - но в нашей схеме это не должно случиться) -std::vector merge_day_stats(const std::vector& all_stats); - diff --git a/src/day_stats.hpp b/src/day_stats.hpp index 017dbf8..a4dbbf1 100644 --- a/src/day_stats.hpp +++ b/src/day_stats.hpp @@ -1,28 +1,15 @@ #pragma once #include -using DayIndex = long long; +using DayIndex = int64_t; // Агрегированные данные за один день struct DayStats { DayIndex day; // индекс дня (timestamp / 86400) - double low; // минимальный Low за день - double high; // максимальный High за день - double open; // первый Open за день - double close; // последний Close за день - double avg; // среднее = (low + high) / 2 - double first_ts; // timestamp первой записи (для определения порядка open) - double last_ts; // timestamp последней записи (для определения close) + double avg; // среднее значение (Low + High) / 2 по всем записям + double open_min; // минимальный Open за день + double open_max; // максимальный Open за день + double close_min; // минимальный Close за день + double close_max; // максимальный Close за день + int64_t count; // количество записей, по которым агрегировали }; - -// Интервал с изменением >= 10% -struct Interval { - DayIndex start_day; - DayIndex end_day; - double min_open; - double max_close; - double start_avg; - double end_avg; - double change; -}; - diff --git a/src/gpu_loader.cpp b/src/gpu_loader.cpp index e19e575..de5563b 100644 --- a/src/gpu_loader.cpp +++ b/src/gpu_loader.cpp @@ -127,13 +127,12 @@ bool aggregate_days_gpu( for (const auto& gs : gpu_stats) { DayStats ds; ds.day = gs.day; - ds.low = gs.low; - ds.high = gs.high; - ds.open = gs.open; - ds.close = gs.close; ds.avg = gs.avg; - ds.first_ts = gs.first_ts; - ds.last_ts = gs.last_ts; + ds.open_min = gs.open_min; + ds.open_max = gs.open_max; + ds.close_min = gs.close_min; + ds.close_max = gs.close_max; + ds.count = gs.count; out_stats.push_back(ds); } diff --git a/src/gpu_loader.hpp b/src/gpu_loader.hpp index fffd33f..1617eea 100644 --- a/src/gpu_loader.hpp +++ b/src/gpu_loader.hpp @@ -20,13 +20,12 @@ struct GpuRecord { struct GpuDayStats { long long day; - double low; - double high; - double open; - double close; double avg; - double first_ts; - double last_ts; + double open_min; + double open_max; + double close_min; + double close_max; + long long count; }; using gpu_aggregate_days_fn = int (*)( diff --git a/src/gpu_plugin.cu b/src/gpu_plugin.cu index c6b8ffc..3f22939 100644 --- a/src/gpu_plugin.cu +++ b/src/gpu_plugin.cu @@ -23,13 +23,12 @@ struct GpuRecord { struct GpuDayStats { long long day; - double low; - double high; - double open; - double close; double avg; - double first_ts; - double last_ts; + double open_min; + double open_max; + double close_min; + double close_max; + long long count; }; extern "C" int gpu_is_available() { @@ -63,32 +62,30 @@ __global__ void aggregate_kernel( GpuDayStats stats; stats.day = day_indices[d]; - stats.low = DBL_MAX; - stats.high = -DBL_MAX; - stats.first_ts = DBL_MAX; - stats.last_ts = -DBL_MAX; - stats.open = 0; - stats.close = 0; + stats.open_min = DBL_MAX; + stats.open_max = -DBL_MAX; + stats.close_min = DBL_MAX; + stats.close_max = -DBL_MAX; + stats.count = count; + + double avg_sum = 0.0; for (int i = 0; i < count; i++) { const GpuRecord& r = records[offset + i]; - // min/max - if (r.low < stats.low) stats.low = r.low; - if (r.high > stats.high) stats.high = r.high; + // Accumulate avg = (low + high) / 2 + avg_sum += (r.low + r.high) / 2.0; - // first/last по timestamp - if (r.timestamp < stats.first_ts) { - stats.first_ts = r.timestamp; - stats.open = r.open; - } - if (r.timestamp > stats.last_ts) { - stats.last_ts = r.timestamp; - stats.close = r.close; - } + // min/max Open + if (r.open < stats.open_min) stats.open_min = r.open; + if (r.open > stats.open_max) stats.open_max = r.open; + + // min/max Close + if (r.close < stats.close_min) stats.close_min = r.close; + if (r.close > stats.close_max) stats.close_max = r.close; } - stats.avg = (stats.low + stats.high) / 2.0; + stats.avg = avg_sum / static_cast(count); out_stats[d] = stats; } diff --git a/src/intervals.cpp b/src/intervals.cpp index 3c1bb18..deca317 100644 --- a/src/intervals.cpp +++ b/src/intervals.cpp @@ -28,13 +28,17 @@ std::vector find_intervals(const std::vector& days, double t interval.end_avg = price_now; interval.change = change; - // Находим min(Open) и max(Close) в интервале - interval.min_open = days[start_idx].open; - interval.max_close = days[start_idx].close; + // Находим min/max Open и Close в интервале + interval.open_min = days[start_idx].open_min; + interval.open_max = days[start_idx].open_max; + interval.close_min = days[start_idx].close_min; + interval.close_max = days[start_idx].close_max; - for (size_t j = start_idx; j <= i; j++) { - interval.min_open = std::min(interval.min_open, days[j].open); - interval.max_close = std::max(interval.max_close, days[j].close); + for (size_t j = start_idx + 1; j <= i; j++) { + interval.open_min = std::min(interval.open_min, days[j].open_min); + interval.open_max = std::max(interval.open_max, days[j].open_max); + interval.close_min = std::min(interval.close_min, days[j].close_min); + interval.close_max = std::max(interval.close_max, days[j].close_max); } intervals.push_back(interval); @@ -68,16 +72,17 @@ void write_intervals(const std::string& filename, const std::vector& i std::ofstream out(filename); out << std::fixed << std::setprecision(2); - out << "start_date,end_date,min_open,max_close,start_avg,end_avg,change\n"; + out << "start_date,end_date,open_min,open_max,close_min,close_max,start_avg,end_avg,change\n"; for (const auto& iv : intervals) { out << day_index_to_date(iv.start_day) << "," << day_index_to_date(iv.end_day) << "," - << iv.min_open << "," - << iv.max_close << "," + << iv.open_min << "," + << iv.open_max << "," + << iv.close_min << "," + << iv.close_max << "," << iv.start_avg << "," << iv.end_avg << "," << std::setprecision(6) << iv.change << "\n"; } } - diff --git a/src/intervals.hpp b/src/intervals.hpp index 5a7d606..8c4a4e7 100644 --- a/src/intervals.hpp +++ b/src/intervals.hpp @@ -4,6 +4,19 @@ #include #include +// Интервал с изменением >= threshold +struct Interval { + DayIndex start_day; + DayIndex end_day; + double open_min; // минимальный Open в интервале + double open_max; // максимальный Open в интервале + double close_min; // минимальный Close в интервале + double close_max; // максимальный Close в интервале + double start_avg; + double end_avg; + double change; +}; + // Вычисление интервалов с изменением >= threshold (по умолчанию 10%) std::vector find_intervals(const std::vector& days, double threshold = 0.10); @@ -12,4 +25,3 @@ void write_intervals(const std::string& filename, const std::vector& i // Преобразование DayIndex в строку даты (YYYY-MM-DD) std::string day_index_to_date(DayIndex day); - diff --git a/src/main.cpp b/src/main.cpp index 42229ce..7f05cba 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -4,8 +4,9 @@ #include #include "csv_loader.hpp" -#include "utils.hpp" #include "record.hpp" +#include "day_stats.hpp" +#include "aggregation.hpp" int main(int argc, char** argv) { MPI_Init(&argc, &argv); @@ -15,18 +16,27 @@ int main(int argc, char** argv) { MPI_Comm_size(MPI_COMM_WORLD, &size); // Параллельное чтение данных - double start_time = MPI_Wtime(); - + double read_start = MPI_Wtime(); std::vector records = load_csv_parallel(rank, size); - - double end_time = MPI_Wtime(); - double read_time = end_time - start_time; + double read_time = MPI_Wtime() - read_start; std::cout << "Rank " << rank << ": read " << records.size() << " records" << " in " << std::fixed << std::setprecision(3) << read_time << " sec" << std::endl; + // Агрегация по дням + double agg_start = MPI_Wtime(); + std::vector days = aggregate_days(records); + double agg_time = MPI_Wtime() - agg_start; + + std::cout << "Rank " << rank + << ": aggregated " << days.size() << " days" + << " [" << (days.empty() ? 0 : days.front().day) + << ".." << (days.empty() ? 0 : days.back().day) << "]" + << " in " << std::fixed << std::setprecision(3) << agg_time << " sec" + << std::endl; + MPI_Finalize(); return 0; }