From 78bdb1ddb7e8910214315bc5129f08f57e9696a3 Mon Sep 17 00:00:00 2001 From: Arity-T Date: Tue, 2 Dec 2025 12:22:16 +0000 Subject: [PATCH] =?UTF-8?q?=D0=9D=D0=B0=20CPU=20=D0=B2=D1=8B=D1=87=D0=B8?= =?UTF-8?q?=D1=81=D0=BB=D0=B5=D0=BD=D0=B8=D1=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 3 +- src/aggregation.cpp | 87 +++++++++++++++++++++++++++++++++++++++++++++ src/aggregation.hpp | 14 ++++++++ src/day_stats.hpp | 28 +++++++++++++++ src/intervals.cpp | 83 ++++++++++++++++++++++++++++++++++++++++++ src/intervals.hpp | 15 ++++++++ src/main.cpp | 69 ++++++++++++++++++++++++++++++++--- src/utils.hpp | 4 +-- 8 files changed, 295 insertions(+), 8 deletions(-) create mode 100644 src/aggregation.cpp create mode 100644 src/aggregation.hpp create mode 100644 src/day_stats.hpp create mode 100644 src/intervals.cpp create mode 100644 src/intervals.hpp diff --git a/.gitignore b/.gitignore index 381bf03..581f276 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ data build -out.txt \ No newline at end of file +out.txt +result.csv \ No newline at end of file diff --git a/src/aggregation.cpp b/src/aggregation.cpp new file mode 100644 index 0000000..16a64d9 --- /dev/null +++ b/src/aggregation.cpp @@ -0,0 +1,87 @@ +#include "aggregation.hpp" +#include +#include +#include + +std::vector aggregate_days(const std::vector& records) { + // Группируем записи по дням + std::map> day_records; + + for (const auto& r : records) { + DayIndex day = static_cast(r.timestamp) / 86400; + day_records[day].push_back(&r); + } + + std::vector result; + result.reserve(day_records.size()); + + for (auto& [day, recs] : day_records) { + // Сортируем по timestamp для определения first/last + std::sort(recs.begin(), recs.end(), + [](const Record* a, const Record* b) { + return a->timestamp < b->timestamp; + }); + + DayStats stats; + stats.day = day; + stats.low = std::numeric_limits::max(); + stats.high = std::numeric_limits::lowest(); + stats.open = recs.front()->open; + stats.close = recs.back()->close; + stats.first_ts = recs.front()->timestamp; + stats.last_ts = recs.back()->timestamp; + + for (const auto* r : recs) { + stats.low = std::min(stats.low, r->low); + stats.high = std::max(stats.high, r->high); + } + + stats.avg = (stats.low + stats.high) / 2.0; + + result.push_back(stats); + } + + return result; +} + +std::vector merge_day_stats(const std::vector& all_stats) { + // Объединяем статистику по одинаковым дням (если такие есть) + std::map merged; + + for (const auto& s : all_stats) { + auto it = merged.find(s.day); + if (it == merged.end()) { + merged[s.day] = s; + } else { + // Объединяем данные за один день + auto& m = it->second; + m.low = std::min(m.low, s.low); + m.high = std::max(m.high, s.high); + + // open берём от записи с меньшим timestamp + if (s.first_ts < m.first_ts) { + m.open = s.open; + m.first_ts = s.first_ts; + } + + // close берём от записи с большим timestamp + if (s.last_ts > m.last_ts) { + m.close = s.close; + m.last_ts = s.last_ts; + } + + m.avg = (m.low + m.high) / 2.0; + } + } + + // Преобразуем в отсортированный вектор + std::vector result; + result.reserve(merged.size()); + + for (auto& [day, stats] : merged) { + result.push_back(stats); + } + + return result; +} + diff --git a/src/aggregation.hpp b/src/aggregation.hpp new file mode 100644 index 0000000..67e9478 --- /dev/null +++ b/src/aggregation.hpp @@ -0,0 +1,14 @@ +#pragma once + +#include "record.hpp" +#include "day_stats.hpp" +#include +#include + +// Агрегация записей по дням на одном узле +std::vector aggregate_days(const std::vector& records); + +// Объединение агрегированных данных с разных узлов +// (на случай если один день попал на разные узлы - но в нашей схеме это не должно случиться) +std::vector merge_day_stats(const std::vector& all_stats); + diff --git a/src/day_stats.hpp b/src/day_stats.hpp new file mode 100644 index 0000000..017dbf8 --- /dev/null +++ b/src/day_stats.hpp @@ -0,0 +1,28 @@ +#pragma once +#include + +using DayIndex = long long; + +// Агрегированные данные за один день +struct DayStats { + DayIndex day; // индекс дня (timestamp / 86400) + double low; // минимальный Low за день + double high; // максимальный High за день + double open; // первый Open за день + double close; // последний Close за день + double avg; // среднее = (low + high) / 2 + double first_ts; // timestamp первой записи (для определения порядка open) + double last_ts; // timestamp последней записи (для определения close) +}; + +// Интервал с изменением >= 10% +struct Interval { + DayIndex start_day; + DayIndex end_day; + double min_open; + double max_close; + double start_avg; + double end_avg; + double change; +}; + diff --git a/src/intervals.cpp b/src/intervals.cpp new file mode 100644 index 0000000..3c1bb18 --- /dev/null +++ b/src/intervals.cpp @@ -0,0 +1,83 @@ +#include "intervals.hpp" +#include +#include +#include +#include +#include +#include + +std::vector find_intervals(const std::vector& days, double threshold) { + if (days.empty()) { + return {}; + } + + std::vector intervals; + + size_t start_idx = 0; + double price_base = days[start_idx].avg; + + for (size_t i = 1; i < days.size(); i++) { + double price_now = days[i].avg; + double change = std::abs(price_now - price_base) / price_base; + + if (change >= threshold) { + Interval interval; + interval.start_day = days[start_idx].day; + interval.end_day = days[i].day; + interval.start_avg = price_base; + interval.end_avg = price_now; + interval.change = change; + + // Находим min(Open) и max(Close) в интервале + interval.min_open = days[start_idx].open; + interval.max_close = days[start_idx].close; + + for (size_t j = start_idx; j <= i; j++) { + interval.min_open = std::min(interval.min_open, days[j].open); + interval.max_close = std::max(interval.max_close, days[j].close); + } + + intervals.push_back(interval); + + // Начинаем новый интервал + start_idx = i + 1; + if (start_idx >= days.size()) { + break; + } + price_base = days[start_idx].avg; + } + } + + return intervals; +} + +std::string day_index_to_date(DayIndex day) { + time_t ts = static_cast(day) * 86400; + struct tm* tm_info = gmtime(&ts); + + std::ostringstream oss; + oss << std::setfill('0') + << (tm_info->tm_year + 1900) << "-" + << std::setw(2) << (tm_info->tm_mon + 1) << "-" + << std::setw(2) << tm_info->tm_mday; + + return oss.str(); +} + +void write_intervals(const std::string& filename, const std::vector& intervals) { + std::ofstream out(filename); + + out << std::fixed << std::setprecision(2); + out << "start_date,end_date,min_open,max_close,start_avg,end_avg,change\n"; + + for (const auto& iv : intervals) { + out << day_index_to_date(iv.start_day) << "," + << day_index_to_date(iv.end_day) << "," + << iv.min_open << "," + << iv.max_close << "," + << iv.start_avg << "," + << iv.end_avg << "," + << std::setprecision(6) << iv.change << "\n"; + } +} + diff --git a/src/intervals.hpp b/src/intervals.hpp new file mode 100644 index 0000000..5a7d606 --- /dev/null +++ b/src/intervals.hpp @@ -0,0 +1,15 @@ +#pragma once + +#include "day_stats.hpp" +#include +#include + +// Вычисление интервалов с изменением >= threshold (по умолчанию 10%) +std::vector find_intervals(const std::vector& days, double threshold = 0.10); + +// Вывод интервалов в файл +void write_intervals(const std::string& filename, const std::vector& intervals); + +// Преобразование DayIndex в строку даты (YYYY-MM-DD) +std::string day_index_to_date(DayIndex day); + diff --git a/src/main.cpp b/src/main.cpp index 02d475f..66b9731 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -6,12 +6,15 @@ #include "csv_loader.hpp" #include "utils.hpp" #include "record.hpp" +#include "day_stats.hpp" +#include "aggregation.hpp" +#include "intervals.hpp" #include "gpu_loader.hpp" // Функция: отобрать записи для конкретного ранга std::vector select_records_for_rank( - const std::map>& days, - const std::vector& day_list) + const std::map>& days, + const std::vector& day_list) { std::vector out; for (auto d : day_list) { @@ -52,7 +55,7 @@ int main(int argc, char** argv) { continue; } - int count = vec.size(); + int count = static_cast(vec.size()); MPI_Send(&count, 1, MPI_INT, r, 0, MPI_COMM_WORLD); MPI_Send(vec.data(), count * sizeof(Record), MPI_BYTE, r, 1, MPI_COMM_WORLD); } @@ -72,14 +75,72 @@ int main(int argc, char** argv) { std::cout << "Rank " << rank << " received " << local_records.size() << " records" << std::endl; + // ====== АГРЕГАЦИЯ НА КАЖДОМ УЗЛЕ ====== + auto local_stats = aggregate_days(local_records); + std::cout << "Rank " << rank << " aggregated " + << local_stats.size() << " days" << std::endl; + // ====== СБОР АГРЕГИРОВАННЫХ ДАННЫХ НА RANK 0 ====== + std::vector all_stats; + + if (rank == 0) { + // Добавляем свои данные + all_stats.insert(all_stats.end(), local_stats.begin(), local_stats.end()); + + // Получаем данные от других узлов + for (int r = 1; r < size; r++) { + int count = 0; + MPI_Recv(&count, 1, MPI_INT, r, 2, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + + std::vector remote_stats(count); + MPI_Recv(remote_stats.data(), count * sizeof(DayStats), + MPI_BYTE, r, 3, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + + all_stats.insert(all_stats.end(), remote_stats.begin(), remote_stats.end()); + } + } else { + // Отправляем свои агрегированные данные на rank 0 + int count = static_cast(local_stats.size()); + MPI_Send(&count, 1, MPI_INT, 0, 2, MPI_COMM_WORLD); + MPI_Send(local_stats.data(), count * sizeof(DayStats), MPI_BYTE, 0, 3, MPI_COMM_WORLD); + } + + // ====== ВЫЧИСЛЕНИЕ ИНТЕРВАЛОВ НА RANK 0 ====== + if (rank == 0) { + std::cout << "Rank 0: merging " << all_stats.size() << " day stats..." << std::endl; + + // Объединяем и сортируем + auto merged_stats = merge_day_stats(all_stats); + std::cout << "Rank 0: total " << merged_stats.size() << " unique days" << std::endl; + + // Вычисляем интервалы + auto intervals = find_intervals(merged_stats, 0.10); + std::cout << "Found " << intervals.size() << " intervals with >=10% change" << std::endl; + + // Записываем результат + write_intervals("../result.csv", intervals); + std::cout << "Results written to result.csv" << std::endl; + + // Выводим первые несколько интервалов + std::cout << "\nFirst 5 intervals:\n"; + std::cout << "start_date,end_date,min_open,max_close,change\n"; + for (size_t i = 0; i < std::min(intervals.size(), size_t(5)); i++) { + const auto& iv = intervals[i]; + std::cout << day_index_to_date(iv.start_day) << "," + << day_index_to_date(iv.end_day) << "," + << iv.min_open << "," + << iv.max_close << "," + << iv.change << "\n"; + } + } + + // Проверка GPU (оставляем как есть) auto gpu_is_available = load_gpu_is_available(); int have_gpu = 0; if (gpu_is_available) { std::cout << "Rank " << rank << " dll loaded" << std::endl; have_gpu = gpu_is_available(); } - std::cout << "Rank " << rank << ": gpu_available=" << have_gpu << "\n"; MPI_Finalize(); diff --git a/src/utils.hpp b/src/utils.hpp index 515acfd..5a8c18c 100644 --- a/src/utils.hpp +++ b/src/utils.hpp @@ -1,11 +1,9 @@ #pragma once #include "record.hpp" +#include "day_stats.hpp" #include #include -using DayIndex = long long; - std::map> group_by_day(const std::vector& recs); std::vector> split_days(const std::map>& days, int parts); -