This commit is contained in:
2025-12-16 15:19:50 +00:00
parent e84d1e9fe3
commit 07dcda12a5
7 changed files with 518 additions and 358 deletions

View File

@@ -1,137 +1,133 @@
#include "gpu_loader.hpp"
#include "utils.hpp"
#include <dlfcn.h>
#include <map>
#include <algorithm>
#include <iostream>
#include <iomanip>
#include <omp.h>
#include <cstdint>
// Структура результата GPU (должна совпадать с gpu_plugin.cu)
struct GpuPeriodStats {
int64_t period;
double avg;
double open_min;
double open_max;
double close_min;
double close_max;
int64_t count;
};
// Типы функций из GPU плагина
using gpu_is_available_fn = int (*)();
using gpu_aggregate_periods_fn = int (*)(
const double* h_timestamps,
const double* h_open,
const double* h_high,
const double* h_low,
const double* h_close,
int num_ticks,
int64_t interval,
GpuPeriodStats** h_out_stats,
int* out_num_periods
);
using gpu_free_results_fn = void (*)(GpuPeriodStats*);
static void* get_gpu_lib_handle() {
static void* h = dlopen("./libgpu_compute.so", RTLD_NOW | RTLD_LOCAL);
return h;
}
gpu_is_available_fn load_gpu_is_available() {
bool gpu_is_available() {
void* h = get_gpu_lib_handle();
if (!h) return nullptr;
auto fn = (gpu_is_available_fn)dlsym(h, "gpu_is_available");
return fn;
}
bool gpu_is_available() {
auto gpu_is_available_fn = load_gpu_is_available();
if (!h) return false;
if (gpu_is_available_fn && gpu_is_available_fn()) {
return true;
}
auto fn = reinterpret_cast<gpu_is_available_fn>(dlsym(h, "gpu_is_available"));
if (!fn) return false;
return false;
}
gpu_aggregate_periods_fn load_gpu_aggregate_periods() {
void* h = get_gpu_lib_handle();
if (!h) return nullptr;
auto fn = (gpu_aggregate_periods_fn)dlsym(h, "gpu_aggregate_periods");
return fn;
return fn() != 0;
}
bool aggregate_periods_gpu(
const std::vector<Record>& records,
std::vector<PeriodStats>& out_stats,
gpu_aggregate_periods_fn gpu_fn)
int64_t aggregation_interval,
std::vector<PeriodStats>& out_stats)
{
if (!gpu_fn || records.empty()) {
if (records.empty()) {
out_stats.clear();
return true;
}
void* h = get_gpu_lib_handle();
if (!h) {
std::cerr << "GPU: Failed to load libgpu_compute.so" << std::endl;
return false;
}
int64_t interval = get_aggregation_interval();
double t_total_start = omp_get_wtime();
double t_preprocess_start = omp_get_wtime();
std::map<PeriodIndex, std::vector<size_t>> period_record_indices;
for (size_t i = 0; i < records.size(); i++) {
PeriodIndex period = static_cast<PeriodIndex>(records[i].timestamp) / interval;
period_record_indices[period].push_back(i);
}
int num_periods = static_cast<int>(period_record_indices.size());
auto aggregate_fn = reinterpret_cast<gpu_aggregate_periods_fn>(
dlsym(h, "gpu_aggregate_periods"));
auto free_fn = reinterpret_cast<gpu_free_results_fn>(
dlsym(h, "gpu_free_results"));
std::vector<GpuRecord> gpu_records;
std::vector<int> period_offsets;
std::vector<int> period_counts;
std::vector<long long> period_indices;
gpu_records.reserve(records.size());
period_offsets.reserve(num_periods);
period_counts.reserve(num_periods);
period_indices.reserve(num_periods);
int current_offset = 0;
for (auto& [period, indices] : period_record_indices) {
period_indices.push_back(period);
period_offsets.push_back(current_offset);
period_counts.push_back(static_cast<int>(indices.size()));
for (size_t idx : indices) {
const auto& r = records[idx];
GpuRecord gr;
gr.timestamp = r.timestamp;
gr.open = r.open;
gr.high = r.high;
gr.low = r.low;
gr.close = r.close;
gr.volume = r.volume;
gpu_records.push_back(gr);
}
current_offset += static_cast<int>(indices.size());
if (!aggregate_fn || !free_fn) {
std::cerr << "GPU: Failed to load functions from plugin" << std::endl;
return false;
}
std::vector<GpuPeriodStats> gpu_stats(num_periods);
int num_ticks = static_cast<int>(records.size());
double t_preprocess_ms = (omp_get_wtime() - t_preprocess_start) * 1000.0;
std::cout << " GPU CPU preprocessing: " << std::fixed << std::setprecision(3)
<< std::setw(7) << t_preprocess_ms << " ms" << std::endl << std::flush;
// Конвертируем AoS в SoA
std::vector<double> timestamps(num_ticks);
std::vector<double> open(num_ticks);
std::vector<double> high(num_ticks);
std::vector<double> low(num_ticks);
std::vector<double> close(num_ticks);
int result = gpu_fn(
gpu_records.data(),
static_cast<int>(gpu_records.size()),
period_offsets.data(),
period_counts.data(),
period_indices.data(),
num_periods,
gpu_stats.data()
for (int i = 0; i < num_ticks; i++) {
timestamps[i] = records[i].timestamp;
open[i] = records[i].open;
high[i] = records[i].high;
low[i] = records[i].low;
close[i] = records[i].close;
}
// Вызываем GPU функцию
GpuPeriodStats* gpu_stats = nullptr;
int num_periods = 0;
int result = aggregate_fn(
timestamps.data(),
open.data(),
high.data(),
low.data(),
close.data(),
num_ticks,
aggregation_interval,
&gpu_stats,
&num_periods
);
if (result != 0) {
std::cout << " GPU: Function returned error code " << result << std::endl;
std::cerr << "GPU: Aggregation failed with code " << result << std::endl;
return false;
}
// Конвертируем результат в PeriodStats
out_stats.clear();
out_stats.reserve(num_periods);
for (const auto& gs : gpu_stats) {
for (int i = 0; i < num_periods; i++) {
PeriodStats ps;
ps.period = gs.period;
ps.avg = gs.avg;
ps.open_min = gs.open_min;
ps.open_max = gs.open_max;
ps.close_min = gs.close_min;
ps.close_max = gs.close_max;
ps.count = gs.count;
ps.period = gpu_stats[i].period;
ps.avg = gpu_stats[i].avg;
ps.open_min = gpu_stats[i].open_min;
ps.open_max = gpu_stats[i].open_max;
ps.close_min = gpu_stats[i].close_min;
ps.close_max = gpu_stats[i].close_max;
ps.count = gpu_stats[i].count;
out_stats.push_back(ps);
}
double t_total_ms = (omp_get_wtime() - t_total_start) * 1000.0;
std::cout << " GPU TOTAL (with prep): " << std::fixed << std::setprecision(3)
<< std::setw(7) << t_total_ms << " ms" << std::endl << std::flush;
// Освобождаем память
free_fn(gpu_stats);
return true;
}