diff --git a/run.slurm b/run.slurm index 9ff1a2a..41a2f25 100644 --- a/run.slurm +++ b/run.slurm @@ -6,13 +6,16 @@ #SBATCH --output=out.txt # Путь к файлу данных (должен существовать на всех узлах) -export DATA_PATH="/mnt/shared/supercomputers/data/data.csv" +export DATA_PATH="/mnt/shared/supercomputers/data/data_10s.csv" # Доли данных для каждого ранка (сумма определяет пропорции) -export DATA_READ_SHARES="10,14,18,22" +export DATA_READ_SHARES="10,11,13,14" # Размер перекрытия в байтах для обработки границ строк export READ_OVERLAP_BYTES=131072 +# Интервал агрегации в секундах (60 = минуты, 600 = 10 минут, 86400 = дни) +export AGGREGATION_INTERVAL=60 + cd /mnt/shared/supercomputers/build mpirun -np $SLURM_NTASKS ./bitcoin_app diff --git a/src/aggregation.cpp b/src/aggregation.cpp index 98dcf75..e9d676f 100644 --- a/src/aggregation.cpp +++ b/src/aggregation.cpp @@ -1,11 +1,13 @@ #include "aggregation.hpp" +#include "utils.hpp" #include #include #include -std::vector aggregate_days(const std::vector& records) { - // Накопители для каждого дня - struct DayAccumulator { +std::vector aggregate_periods(const std::vector& records) { + int64_t interval = get_aggregation_interval(); + + struct PeriodAccumulator { double avg_sum = 0.0; double open_min = std::numeric_limits::max(); double open_max = std::numeric_limits::lowest(); @@ -14,11 +16,11 @@ std::vector aggregate_days(const std::vector& records) { int64_t count = 0; }; - std::map days; + std::map periods; for (const auto& r : records) { - DayIndex day = static_cast(r.timestamp) / 86400; - auto& acc = days[day]; + PeriodIndex period = static_cast(r.timestamp) / interval; + auto& acc = periods[period]; double avg = (r.low + r.high) / 2.0; acc.avg_sum += avg; @@ -29,12 +31,12 @@ std::vector aggregate_days(const std::vector& records) { acc.count++; } - std::vector result; - result.reserve(days.size()); + std::vector result; + result.reserve(periods.size()); - for (const auto& [day, acc] : days) { - DayStats stats; - stats.day = day; + for (const auto& [period, acc] : periods) { + PeriodStats stats; + stats.period = period; stats.avg = acc.avg_sum / static_cast(acc.count); stats.open_min = acc.open_min; stats.open_max = acc.open_max; diff --git a/src/aggregation.hpp b/src/aggregation.hpp index acceb05..668ed60 100644 --- a/src/aggregation.hpp +++ b/src/aggregation.hpp @@ -1,8 +1,8 @@ #pragma once #include "record.hpp" -#include "day_stats.hpp" +#include "period_stats.hpp" #include -// Агрегация записей по дням на одном узле -std::vector aggregate_days(const std::vector& records); +// Агрегация записей по периодам на одном узле +std::vector aggregate_periods(const std::vector& records); diff --git a/src/day_stats.hpp b/src/day_stats.hpp deleted file mode 100644 index a4dbbf1..0000000 --- a/src/day_stats.hpp +++ /dev/null @@ -1,15 +0,0 @@ -#pragma once -#include - -using DayIndex = int64_t; - -// Агрегированные данные за один день -struct DayStats { - DayIndex day; // индекс дня (timestamp / 86400) - double avg; // среднее значение (Low + High) / 2 по всем записям - double open_min; // минимальный Open за день - double open_max; // максимальный Open за день - double close_min; // минимальный Close за день - double close_max; // максимальный Close за день - int64_t count; // количество записей, по которым агрегировали -}; diff --git a/src/gpu_loader.cpp b/src/gpu_loader.cpp index de5563b..5128f08 100644 --- a/src/gpu_loader.cpp +++ b/src/gpu_loader.cpp @@ -1,4 +1,5 @@ #include "gpu_loader.hpp" +#include "utils.hpp" #include #include #include @@ -29,58 +30,54 @@ bool gpu_is_available() { return false; } -gpu_aggregate_days_fn load_gpu_aggregate_days() { +gpu_aggregate_periods_fn load_gpu_aggregate_periods() { void* h = get_gpu_lib_handle(); if (!h) return nullptr; - auto fn = (gpu_aggregate_days_fn)dlsym(h, "gpu_aggregate_days"); + auto fn = (gpu_aggregate_periods_fn)dlsym(h, "gpu_aggregate_periods"); return fn; } -bool aggregate_days_gpu( +bool aggregate_periods_gpu( const std::vector& records, - std::vector& out_stats, - gpu_aggregate_days_fn gpu_fn) + std::vector& out_stats, + gpu_aggregate_periods_fn gpu_fn) { if (!gpu_fn || records.empty()) { return false; } - // Общий таймер всей функции + int64_t interval = get_aggregation_interval(); + double t_total_start = omp_get_wtime(); - - // Таймер CPU preprocessing double t_preprocess_start = omp_get_wtime(); - // Группируем записи по дням и подготавливаем данные для GPU - std::map> day_record_indices; + std::map> period_record_indices; for (size_t i = 0; i < records.size(); i++) { - DayIndex day = static_cast(records[i].timestamp) / 86400; - day_record_indices[day].push_back(i); + PeriodIndex period = static_cast(records[i].timestamp) / interval; + period_record_indices[period].push_back(i); } - int num_days = static_cast(day_record_indices.size()); + int num_periods = static_cast(period_record_indices.size()); - // Подготавливаем массивы для GPU std::vector gpu_records; - std::vector day_offsets; - std::vector day_counts; - std::vector day_indices; + std::vector period_offsets; + std::vector period_counts; + std::vector period_indices; gpu_records.reserve(records.size()); - day_offsets.reserve(num_days); - day_counts.reserve(num_days); - day_indices.reserve(num_days); + period_offsets.reserve(num_periods); + period_counts.reserve(num_periods); + period_indices.reserve(num_periods); int current_offset = 0; - for (auto& [day, indices] : day_record_indices) { - day_indices.push_back(day); - day_offsets.push_back(current_offset); - day_counts.push_back(static_cast(indices.size())); + for (auto& [period, indices] : period_record_indices) { + period_indices.push_back(period); + period_offsets.push_back(current_offset); + period_counts.push_back(static_cast(indices.size())); - // Добавляем записи этого дня for (size_t idx : indices) { const auto& r = records[idx]; GpuRecord gr; @@ -96,22 +93,19 @@ bool aggregate_days_gpu( current_offset += static_cast(indices.size()); } - // Выделяем память для результата - std::vector gpu_stats(num_days); + std::vector gpu_stats(num_periods); double t_preprocess_ms = (omp_get_wtime() - t_preprocess_start) * 1000.0; std::cout << " GPU CPU preprocessing: " << std::fixed << std::setprecision(3) << std::setw(7) << t_preprocess_ms << " ms" << std::endl << std::flush; - // Вызываем GPU функцию (включает: malloc, memcpy H->D, kernel, memcpy D->H, free) - // Детальные тайминги выводятся внутри GPU функции int result = gpu_fn( gpu_records.data(), static_cast(gpu_records.size()), - day_offsets.data(), - day_counts.data(), - day_indices.data(), - num_days, + period_offsets.data(), + period_counts.data(), + period_indices.data(), + num_periods, gpu_stats.data() ); @@ -120,23 +114,21 @@ bool aggregate_days_gpu( return false; } - // Конвертируем результат в DayStats out_stats.clear(); - out_stats.reserve(num_days); + out_stats.reserve(num_periods); for (const auto& gs : gpu_stats) { - DayStats ds; - ds.day = gs.day; - ds.avg = gs.avg; - ds.open_min = gs.open_min; - ds.open_max = gs.open_max; - ds.close_min = gs.close_min; - ds.close_max = gs.close_max; - ds.count = gs.count; - out_stats.push_back(ds); + PeriodStats ps; + ps.period = gs.period; + ps.avg = gs.avg; + ps.open_min = gs.open_min; + ps.open_max = gs.open_max; + ps.close_min = gs.close_min; + ps.close_max = gs.close_max; + ps.count = gs.count; + out_stats.push_back(ps); } - // Общее время всей GPU функции (включая preprocessing) double t_total_ms = (omp_get_wtime() - t_total_start) * 1000.0; std::cout << " GPU TOTAL (with prep): " << std::fixed << std::setprecision(3) << std::setw(7) << t_total_ms << " ms" << std::endl << std::flush; diff --git a/src/gpu_loader.hpp b/src/gpu_loader.hpp index 1617eea..3c9db97 100644 --- a/src/gpu_loader.hpp +++ b/src/gpu_loader.hpp @@ -1,5 +1,5 @@ #pragma once -#include "day_stats.hpp" +#include "period_stats.hpp" #include "record.hpp" #include @@ -18,8 +18,8 @@ struct GpuRecord { double volume; }; -struct GpuDayStats { - long long day; +struct GpuPeriodStats { + long long period; double avg; double open_min; double open_max; @@ -28,23 +28,23 @@ struct GpuDayStats { long long count; }; -using gpu_aggregate_days_fn = int (*)( +using gpu_aggregate_periods_fn = int (*)( const GpuRecord* h_records, int num_records, - const int* h_day_offsets, - const int* h_day_counts, - const long long* h_day_indices, - int num_days, - GpuDayStats* h_out_stats + const int* h_period_offsets, + const int* h_period_counts, + const long long* h_period_indices, + int num_periods, + GpuPeriodStats* h_out_stats ); // Загрузка функций из плагина gpu_is_available_fn load_gpu_is_available(); -gpu_aggregate_days_fn load_gpu_aggregate_days(); +gpu_aggregate_periods_fn load_gpu_aggregate_periods(); // Обёртка для агрегации на GPU (возвращает true если успешно) -bool aggregate_days_gpu( +bool aggregate_periods_gpu( const std::vector& records, - std::vector& out_stats, - gpu_aggregate_days_fn gpu_fn + std::vector& out_stats, + gpu_aggregate_periods_fn gpu_fn ); diff --git a/src/intervals.cpp b/src/intervals.cpp index f01d470..20b7fe1 100644 --- a/src/intervals.cpp +++ b/src/intervals.cpp @@ -1,4 +1,5 @@ #include "intervals.hpp" +#include "utils.hpp" #include #include #include @@ -10,47 +11,47 @@ // Вспомогательная структура для накопления min/max в интервале struct IntervalAccumulator { - DayIndex start_day; + PeriodIndex start_period; double start_avg; double open_min; double open_max; double close_min; double close_max; - void init(const DayStats& day) { - start_day = day.day; - start_avg = day.avg; - open_min = day.open_min; - open_max = day.open_max; - close_min = day.close_min; - close_max = day.close_max; + void init(const PeriodStats& p) { + start_period = p.period; + start_avg = p.avg; + open_min = p.open_min; + open_max = p.open_max; + close_min = p.close_min; + close_max = p.close_max; } - void update(const DayStats& day) { - open_min = std::min(open_min, day.open_min); - open_max = std::max(open_max, day.open_max); - close_min = std::min(close_min, day.close_min); - close_max = std::max(close_max, day.close_max); + void update(const PeriodStats& p) { + open_min = std::min(open_min, p.open_min); + open_max = std::max(open_max, p.open_max); + close_min = std::min(close_min, p.close_min); + close_max = std::max(close_max, p.close_max); } - Interval finalize(const DayStats& end_day, double change) const { + Interval finalize(const PeriodStats& end_period, double change) const { Interval iv; - iv.start_day = start_day; - iv.end_day = end_day.day; + iv.start_period = start_period; + iv.end_period = end_period.period; iv.start_avg = start_avg; - iv.end_avg = end_day.avg; + iv.end_avg = end_period.avg; iv.change = change; - iv.open_min = std::min(open_min, end_day.open_min); - iv.open_max = std::max(open_max, end_day.open_max); - iv.close_min = std::min(close_min, end_day.close_min); - iv.close_max = std::max(close_max, end_day.close_max); + iv.open_min = std::min(open_min, end_period.open_min); + iv.open_max = std::max(open_max, end_period.open_max); + iv.close_min = std::min(close_min, end_period.close_min); + iv.close_max = std::max(close_max, end_period.close_max); return iv; } }; -// Упакованная структура DayStats для MPI передачи (8 doubles) -struct PackedDayStats { - double day; // DayIndex as double +// Упакованная структура PeriodStats для MPI передачи (8 doubles) +struct PackedPeriodStats { + double period; // PeriodIndex as double double avg; double open_min; double open_max; @@ -59,27 +60,27 @@ struct PackedDayStats { double count; // int64_t as double double valid; // флаг валидности (1.0 = valid, 0.0 = invalid) - void pack(const DayStats& ds) { - day = static_cast(ds.day); - avg = ds.avg; - open_min = ds.open_min; - open_max = ds.open_max; - close_min = ds.close_min; - close_max = ds.close_max; - count = static_cast(ds.count); + void pack(const PeriodStats& ps) { + period = static_cast(ps.period); + avg = ps.avg; + open_min = ps.open_min; + open_max = ps.open_max; + close_min = ps.close_min; + close_max = ps.close_max; + count = static_cast(ps.count); valid = 1.0; } - DayStats unpack() const { - DayStats ds; - ds.day = static_cast(day); - ds.avg = avg; - ds.open_min = open_min; - ds.open_max = open_max; - ds.close_min = close_min; - ds.close_max = close_max; - ds.count = static_cast(count); - return ds; + PeriodStats unpack() const { + PeriodStats ps; + ps.period = static_cast(period); + ps.avg = avg; + ps.open_min = open_min; + ps.open_max = open_max; + ps.close_min = close_min; + ps.close_max = close_max; + ps.count = static_cast(count); + return ps; } bool is_valid() const { return valid > 0.5; } @@ -87,7 +88,7 @@ struct PackedDayStats { }; IntervalResult find_intervals_parallel( - const std::vector& days, + const std::vector& periods, int rank, int size, double threshold) { @@ -95,10 +96,9 @@ IntervalResult find_intervals_parallel( result.compute_time = 0.0; result.wait_time = 0.0; - if (days.empty()) { - // Передаём невалидный DayStats следующему ранку + if (periods.empty()) { if (rank < size - 1) { - PackedDayStats invalid; + PackedPeriodStats invalid; invalid.set_invalid(); MPI_Send(&invalid, 8, MPI_DOUBLE, rank + 1, 0, MPI_COMM_WORLD); } @@ -107,123 +107,108 @@ IntervalResult find_intervals_parallel( double compute_start = MPI_Wtime(); - // Определяем, до какого индекса обрабатывать - // Для последнего ранка - до конца, для остальных - до предпоследнего дня - size_t process_until = (rank == size - 1) ? days.size() : days.size() - 1; + size_t process_until = (rank == size - 1) ? periods.size() : periods.size() - 1; IntervalAccumulator acc; size_t start_idx = 0; bool have_pending_interval = false; - // Если не первый ранк - ждём данные от предыдущего if (rank > 0) { double wait_start = MPI_Wtime(); - PackedDayStats received; + PackedPeriodStats received; MPI_Recv(&received, 8, MPI_DOUBLE, rank - 1, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); result.wait_time = MPI_Wtime() - wait_start; compute_start = MPI_Wtime(); if (received.is_valid()) { - DayStats prev_day = received.unpack(); + PeriodStats prev_period = received.unpack(); - // Ищем первый день с индексом > prev_day.day - for (start_idx = 0; start_idx < days.size(); start_idx++) { - if (days[start_idx].day > prev_day.day) { + for (start_idx = 0; start_idx < periods.size(); start_idx++) { + if (periods[start_idx].period > prev_period.period) { break; } } if (start_idx < process_until) { - // Инициализируем аккумулятор данными от предыдущего ранка - acc.init(prev_day); + acc.init(prev_period); have_pending_interval = true; - // Продолжаем строить интервал for (size_t i = start_idx; i < process_until; i++) { - acc.update(days[i]); + acc.update(periods[i]); - double change = std::abs(days[i].avg - acc.start_avg) / acc.start_avg; + double change = std::abs(periods[i].avg - acc.start_avg) / acc.start_avg; if (change >= threshold) { - result.intervals.push_back(acc.finalize(days[i], change)); + result.intervals.push_back(acc.finalize(periods[i], change)); have_pending_interval = false; - // Начинаем новый интервал start_idx = i + 1; if (start_idx < process_until) { - acc.init(days[start_idx]); + acc.init(periods[start_idx]); have_pending_interval = true; } } } } } else { - // Предыдущий ранк не передал валидные данные, начинаем с начала if (process_until > 0) { - acc.init(days[0]); + acc.init(periods[0]); have_pending_interval = true; start_idx = 0; } } } else { - // Первый ранк - начинаем с первого дня if (process_until > 0) { - acc.init(days[0]); + acc.init(periods[0]); have_pending_interval = true; start_idx = 0; } } - // Обрабатываем дни (если ещё не обработали выше) if (rank == 0 && have_pending_interval) { for (size_t i = 1; i < process_until; i++) { - acc.update(days[i]); + acc.update(periods[i]); - double change = std::abs(days[i].avg - acc.start_avg) / acc.start_avg; + double change = std::abs(periods[i].avg - acc.start_avg) / acc.start_avg; if (change >= threshold) { - result.intervals.push_back(acc.finalize(days[i], change)); + result.intervals.push_back(acc.finalize(periods[i], change)); have_pending_interval = false; - // Начинаем новый интервал start_idx = i + 1; if (start_idx < process_until) { - acc.init(days[start_idx]); + acc.init(periods[start_idx]); have_pending_interval = true; } } } } - // Для последнего ранка: завершаем последний интервал на последнем дне - if (rank == size - 1 && have_pending_interval && !days.empty()) { - const auto& last_day = days.back(); - double change = std::abs(last_day.avg - acc.start_avg) / acc.start_avg; - result.intervals.push_back(acc.finalize(last_day, change)); + if (rank == size - 1 && have_pending_interval && !periods.empty()) { + const auto& last_period = periods.back(); + double change = std::abs(last_period.avg - acc.start_avg) / acc.start_avg; + result.intervals.push_back(acc.finalize(last_period, change)); } result.compute_time = MPI_Wtime() - compute_start; - // Передаём данные следующему ранку if (rank < size - 1) { - PackedDayStats to_send; + PackedPeriodStats to_send; if (have_pending_interval) { - // Передаём день, с которого начался незавершённый интервал - DayStats start_day; - start_day.day = acc.start_day; - start_day.avg = acc.start_avg; - start_day.open_min = acc.open_min; - start_day.open_max = acc.open_max; - start_day.close_min = acc.close_min; - start_day.close_max = acc.close_max; - start_day.count = 0; - to_send.pack(start_day); - } else if (!days.empty()) { - // Интервал завершился, передаём предпоследний день - to_send.pack(days[days.size() - 2]); + PeriodStats start_period; + start_period.period = acc.start_period; + start_period.avg = acc.start_avg; + start_period.open_min = acc.open_min; + start_period.open_max = acc.open_max; + start_period.close_min = acc.close_min; + start_period.close_max = acc.close_max; + start_period.count = 0; + to_send.pack(start_period); + } else if (periods.size() >= 2) { + to_send.pack(periods[periods.size() - 2]); } else { to_send.set_invalid(); } @@ -240,15 +225,10 @@ double collect_intervals( { double wait_time = 0.0; - // Упакованный Interval для MPI (9 doubles) - // start_day, end_day, open_min, open_max, close_min, close_max, start_avg, end_avg, change - if (rank == 0) { - // Собираем интервалы со всех остальных ранков for (int r = 1; r < size; r++) { double wait_start = MPI_Wtime(); - // Сначала получаем количество интервалов int count; MPI_Recv(&count, 1, MPI_INT, r, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE); @@ -256,11 +236,10 @@ double collect_intervals( std::vector buffer(count * 9); MPI_Recv(buffer.data(), count * 9, MPI_DOUBLE, r, 2, MPI_COMM_WORLD, MPI_STATUS_IGNORE); - // Распаковываем for (int i = 0; i < count; i++) { Interval iv; - iv.start_day = static_cast(buffer[i * 9 + 0]); - iv.end_day = static_cast(buffer[i * 9 + 1]); + iv.start_period = static_cast(buffer[i * 9 + 0]); + iv.end_period = static_cast(buffer[i * 9 + 1]); iv.open_min = buffer[i * 9 + 2]; iv.open_max = buffer[i * 9 + 3]; iv.close_min = buffer[i * 9 + 4]; @@ -275,13 +254,11 @@ double collect_intervals( wait_time += MPI_Wtime() - wait_start; } - // Сортируем по start_day std::sort(local_intervals.begin(), local_intervals.end(), [](const Interval& a, const Interval& b) { - return a.start_day < b.start_day; + return a.start_period < b.start_period; }); } else { - // Отправляем свои интервалы на ранк 0 int count = static_cast(local_intervals.size()); MPI_Send(&count, 1, MPI_INT, 0, 1, MPI_COMM_WORLD); @@ -289,8 +266,8 @@ double collect_intervals( std::vector buffer(count * 9); for (int i = 0; i < count; i++) { const auto& iv = local_intervals[i]; - buffer[i * 9 + 0] = static_cast(iv.start_day); - buffer[i * 9 + 1] = static_cast(iv.end_day); + buffer[i * 9 + 0] = static_cast(iv.start_period); + buffer[i * 9 + 1] = static_cast(iv.end_period); buffer[i * 9 + 2] = iv.open_min; buffer[i * 9 + 3] = iv.open_max; buffer[i * 9 + 4] = iv.close_min; @@ -306,15 +283,19 @@ double collect_intervals( return wait_time; } -std::string day_index_to_date(DayIndex day) { - time_t ts = static_cast(day) * 86400; +std::string period_index_to_datetime(PeriodIndex period) { + int64_t interval = get_aggregation_interval(); + time_t ts = static_cast(period) * interval; struct tm* tm_info = gmtime(&ts); std::ostringstream oss; oss << std::setfill('0') << (tm_info->tm_year + 1900) << "-" << std::setw(2) << (tm_info->tm_mon + 1) << "-" - << std::setw(2) << tm_info->tm_mday; + << std::setw(2) << tm_info->tm_mday << " " + << std::setw(2) << tm_info->tm_hour << ":" + << std::setw(2) << tm_info->tm_min << ":" + << std::setw(2) << tm_info->tm_sec; return oss.str(); } @@ -323,11 +304,11 @@ void write_intervals(const std::string& filename, const std::vector& i std::ofstream out(filename); out << std::fixed << std::setprecision(2); - out << "start_date,end_date,open_min,open_max,close_min,close_max,start_avg,end_avg,change\n"; + out << "start_datetime,end_datetime,open_min,open_max,close_min,close_max,start_avg,end_avg,change\n"; for (const auto& iv : intervals) { - out << day_index_to_date(iv.start_day) << "," - << day_index_to_date(iv.end_day) << "," + out << period_index_to_datetime(iv.start_period) << "," + << period_index_to_datetime(iv.end_period) << "," << iv.open_min << "," << iv.open_max << "," << iv.close_min << "," diff --git a/src/intervals.hpp b/src/intervals.hpp index 4e211c0..b5ba4bc 100644 --- a/src/intervals.hpp +++ b/src/intervals.hpp @@ -1,13 +1,13 @@ #pragma once -#include "day_stats.hpp" +#include "period_stats.hpp" #include #include // Интервал с изменением >= threshold struct Interval { - DayIndex start_day; - DayIndex end_day; + PeriodIndex start_period; + PeriodIndex end_period; double open_min; double open_max; double close_min; @@ -25,15 +25,13 @@ struct IntervalResult { }; // Параллельное построение интервалов с использованием MPI -// Каждый ранк обрабатывает свою часть дней и передаёт незавершённый интервал следующему IntervalResult find_intervals_parallel( - const std::vector& days, + const std::vector& periods, int rank, int size, double threshold = 0.10 ); // Сбор интервалов со всех ранков на ранк 0 -// Возвращает время ожидания данных double collect_intervals( std::vector& local_intervals, int rank, int size @@ -42,5 +40,5 @@ double collect_intervals( // Вывод интервалов в файл void write_intervals(const std::string& filename, const std::vector& intervals); -// Преобразование DayIndex в строку даты (YYYY-MM-DD) -std::string day_index_to_date(DayIndex day); +// Преобразование PeriodIndex в строку даты/времени +std::string period_index_to_datetime(PeriodIndex period); diff --git a/src/main.cpp b/src/main.cpp index 8ba17e5..50e1583 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -5,7 +5,7 @@ #include "csv_loader.hpp" #include "record.hpp" -#include "day_stats.hpp" +#include "period_stats.hpp" #include "aggregation.hpp" #include "intervals.hpp" #include "utils.hpp" @@ -28,29 +28,29 @@ int main(int argc, char** argv) { << " in " << std::fixed << std::setprecision(3) << read_time << " sec" << std::endl; - // Агрегация по дням + // Агрегация по периодам double agg_start = MPI_Wtime(); - std::vector days = aggregate_days(records); + std::vector periods = aggregate_periods(records); double agg_time = MPI_Wtime() - agg_start; std::cout << "Rank " << rank - << ": aggregated " << days.size() << " days" - << " [" << (days.empty() ? 0 : days.front().day) - << ".." << (days.empty() ? 0 : days.back().day) << "]" + << ": aggregated " << periods.size() << " periods" + << " [" << (periods.empty() ? 0 : periods.front().period) + << ".." << (periods.empty() ? 0 : periods.back().period) << "]" << " in " << std::fixed << std::setprecision(3) << agg_time << " sec" << std::endl; - // Удаляем крайние дни (могут быть неполными из-за параллельного чтения) - trim_edge_days(days, rank, size); + // Удаляем крайние периоды (могут быть неполными из-за параллельного чтения) + trim_edge_periods(periods, rank, size); std::cout << "Rank " << rank - << ": after trim " << days.size() << " days" - << " [" << (days.empty() ? 0 : days.front().day) - << ".." << (days.empty() ? 0 : days.back().day) << "]" + << ": after trim " << periods.size() << " periods" + << " [" << (periods.empty() ? 0 : periods.front().period) + << ".." << (periods.empty() ? 0 : periods.back().period) << "]" << std::endl; // Параллельное построение интервалов - IntervalResult iv_result = find_intervals_parallel(days, rank, size); + IntervalResult iv_result = find_intervals_parallel(periods, rank, size); std::cout << "Rank " << rank << ": found " << iv_result.intervals.size() << " intervals" diff --git a/src/period_stats.hpp b/src/period_stats.hpp new file mode 100644 index 0000000..675f513 --- /dev/null +++ b/src/period_stats.hpp @@ -0,0 +1,15 @@ +#pragma once +#include + +using PeriodIndex = int64_t; + +// Агрегированные данные за один период +struct PeriodStats { + PeriodIndex period; // индекс периода (timestamp / AGGREGATION_INTERVAL) + double avg; // среднее значение (Low + High) / 2 по всем записям + double open_min; // минимальный Open за период + double open_max; // максимальный Open за период + double close_min; // минимальный Close за период + double close_max; // максимальный Close за период + int64_t count; // количество записей, по которым агрегировали +}; diff --git a/src/utils.cpp b/src/utils.cpp index ba62b76..95bf399 100644 --- a/src/utils.cpp +++ b/src/utils.cpp @@ -4,29 +4,6 @@ #include #include -std::map> group_by_day(const std::vector& recs) { - std::map> days; - - for (const auto& r : recs) { - DayIndex day = static_cast(r.timestamp) / 86400; - days[day].push_back(r); - } - - return days; -} - -std::vector> split_days(const std::map>& days, int parts) { - std::vector> out(parts); - - int i = 0; - for (auto& kv : days) { - out[i % parts].push_back(kv.first); - i++; - } - - return out; -} - int get_num_cpu_threads() { const char* env_threads = std::getenv("NUM_CPU_THREADS"); int num_cpu_threads = 1; @@ -63,6 +40,10 @@ int64_t get_read_overlap_bytes() { return std::stoll(get_env("READ_OVERLAP_BYTES")); } +int64_t get_aggregation_interval() { + return std::stoll(get_env("AGGREGATION_INTERVAL")); +} + int64_t get_file_size(const std::string& path) { std::ifstream file(path, std::ios::binary | std::ios::ate); if (!file.is_open()) { @@ -73,7 +54,6 @@ int64_t get_file_size(const std::string& path) { ByteRange calculate_byte_range(int rank, int size, int64_t file_size, const std::vector& shares, int64_t overlap_bytes) { - // Если shares пустой или не соответствует size, используем равные доли std::vector effective_shares; if (shares.size() == static_cast(size)) { effective_shares = shares; @@ -82,8 +62,6 @@ ByteRange calculate_byte_range(int rank, int size, int64_t file_size, } int total_shares = std::accumulate(effective_shares.begin(), effective_shares.end(), 0); - - // Вычисляем базовые границы для каждого ранка int64_t bytes_per_share = file_size / total_shares; int64_t base_start = 0; @@ -93,19 +71,15 @@ ByteRange calculate_byte_range(int rank, int size, int64_t file_size, int64_t base_end = base_start + bytes_per_share * effective_shares[rank]; - // Применяем overlap ByteRange range; if (rank == 0) { - // Первый ранк: начинаем с 0, добавляем overlap в конце range.start = 0; range.end = std::min(base_end + overlap_bytes, file_size); } else if (rank == size - 1) { - // Последний ранк: вычитаем overlap в начале, читаем до конца файла range.start = std::max(base_start - overlap_bytes, static_cast(0)); range.end = file_size; } else { - // Промежуточные ранки: overlap с обеих сторон range.start = std::max(base_start - overlap_bytes, static_cast(0)); range.end = std::min(base_end + overlap_bytes, file_size); } @@ -113,15 +87,15 @@ ByteRange calculate_byte_range(int rank, int size, int64_t file_size, return range; } -void trim_edge_days(std::vector& days, int rank, int size) { - if (days.empty()) return; +void trim_edge_periods(std::vector& periods, int rank, int size) { + if (periods.empty()) return; if (rank == 0) { - days.pop_back(); + periods.pop_back(); } else if (rank == size - 1) { - days.erase(days.begin()); + periods.erase(periods.begin()); } else { - days.pop_back(); - days.erase(days.begin()); + periods.pop_back(); + periods.erase(periods.begin()); } } diff --git a/src/utils.hpp b/src/utils.hpp index 56b4b05..8bd1346 100644 --- a/src/utils.hpp +++ b/src/utils.hpp @@ -1,22 +1,19 @@ #pragma once #include "record.hpp" -#include "day_stats.hpp" +#include "period_stats.hpp" #include #include #include #include #include -// Группировка записей по дням -std::map> group_by_day(const std::vector& recs); -std::vector> split_days(const std::map>& days, int parts); - // Чтение переменных окружения int get_num_cpu_threads(); std::string get_data_path(); std::vector get_data_read_shares(); int64_t get_read_overlap_bytes(); +int64_t get_aggregation_interval(); // Структура для хранения диапазона байт для чтения struct ByteRange { @@ -31,8 +28,5 @@ ByteRange calculate_byte_range(int rank, int size, int64_t file_size, // Получение размера файла int64_t get_file_size(const std::string& path); -// Удаляет крайние дни, которые могут быть неполными из-за параллельного чтения -// rank 0: удаляет последний день -// последний rank: удаляет первый день -// промежуточные: удаляют первый и последний дни -void trim_edge_days(std::vector& days, int rank, int size); +// Удаляет крайние периоды, которые могут быть неполными из-за параллельного чтения +void trim_edge_periods(std::vector& periods, int rank, int size);