Оптимизировал агрегацию на процессоре

This commit is contained in:
2025-12-16 14:16:40 +00:00
parent a5aadbc774
commit e84d1e9fe3

View File

@@ -1,12 +1,17 @@
#include "aggregation.hpp" #include "aggregation.hpp"
#include "utils.hpp" #include "utils.hpp"
#include <map>
#include <algorithm> #include <algorithm>
#include <cstdint>
#include <limits> #include <limits>
#include <vector>
std::vector<PeriodStats> aggregate_periods(const std::vector<Record>& records) { std::vector<PeriodStats> aggregate_periods(const std::vector<Record>& records) {
int64_t interval = get_aggregation_interval(); const int64_t interval = get_aggregation_interval();
std::vector<PeriodStats> result;
if (records.empty()) return result;
struct PeriodAccumulator { struct PeriodAccumulator {
double avg_sum = 0.0; double avg_sum = 0.0;
double open_min = std::numeric_limits<double>::max(); double open_min = std::numeric_limits<double>::max();
@@ -14,37 +19,57 @@ std::vector<PeriodStats> aggregate_periods(const std::vector<Record>& records) {
double close_min = std::numeric_limits<double>::max(); double close_min = std::numeric_limits<double>::max();
double close_max = std::numeric_limits<double>::lowest(); double close_max = std::numeric_limits<double>::lowest();
int64_t count = 0; int64_t count = 0;
void add(const Record& r) {
const double avg = (r.low + r.high) / 2.0;
avg_sum += avg;
open_min = std::min(open_min, r.open);
open_max = std::max(open_max, r.open);
close_min = std::min(close_min, r.close);
close_max = std::max(close_max, r.close);
++count;
}
}; };
std::map<PeriodIndex, PeriodAccumulator> periods; PeriodIndex current_period =
static_cast<PeriodIndex>(records[0].timestamp) / interval;
for (const auto& r : records) {
PeriodIndex period = static_cast<PeriodIndex>(r.timestamp) / interval; PeriodAccumulator acc;
auto& acc = periods[period]; acc.add(records[0]);
double avg = (r.low + r.high) / 2.0; for (size_t i = 1; i < records.size(); ++i) {
acc.avg_sum += avg; const Record& r = records[i];
acc.open_min = std::min(acc.open_min, r.open); const PeriodIndex period =
acc.open_max = std::max(acc.open_max, r.open); static_cast<PeriodIndex>(r.timestamp) / interval;
acc.close_min = std::min(acc.close_min, r.close);
acc.close_max = std::max(acc.close_max, r.close); if (period != current_period) {
acc.count++; PeriodStats stats;
stats.period = current_period;
stats.avg = acc.avg_sum / static_cast<double>(acc.count);
stats.open_min = acc.open_min;
stats.open_max = acc.open_max;
stats.close_min = acc.close_min;
stats.close_max = acc.close_max;
stats.count = acc.count;
result.push_back(stats);
current_period = period;
acc = PeriodAccumulator{};
}
acc.add(r);
} }
std::vector<PeriodStats> result; // последний период
result.reserve(periods.size()); PeriodStats stats;
stats.period = current_period;
for (const auto& [period, acc] : periods) { stats.avg = acc.avg_sum / static_cast<double>(acc.count);
PeriodStats stats; stats.open_min = acc.open_min;
stats.period = period; stats.open_max = acc.open_max;
stats.avg = acc.avg_sum / static_cast<double>(acc.count); stats.close_min = acc.close_min;
stats.open_min = acc.open_min; stats.close_max = acc.close_max;
stats.open_max = acc.open_max; stats.count = acc.count;
stats.close_min = acc.close_min; result.push_back(stats);
stats.close_max = acc.close_max;
stats.count = acc.count;
result.push_back(stats);
}
return result; return result;
} }