From 07dcda12a5b8f516105c6661d7331e2d554499dd Mon Sep 17 00:00:00 2001 From: Arity-T Date: Tue, 16 Dec 2025 15:19:50 +0000 Subject: [PATCH] Cuda --- run.slurm | 3 + src/gpu_loader.cpp | 188 +++++++------- src/gpu_loader.hpp | 45 +--- src/gpu_plugin.cu | 610 +++++++++++++++++++++++++++++---------------- src/main.cpp | 25 +- src/utils.cpp | 4 + src/utils.hpp | 1 + 7 files changed, 518 insertions(+), 358 deletions(-) diff --git a/run.slurm b/run.slurm index 41a2f25..725339d 100644 --- a/run.slurm +++ b/run.slurm @@ -17,5 +17,8 @@ export READ_OVERLAP_BYTES=131072 # Интервал агрегации в секундах (60 = минуты, 600 = 10 минут, 86400 = дни) export AGGREGATION_INTERVAL=60 +# Использовать ли CUDA для агрегации (0 = нет, 1 = да) +export USE_CUDA=1 + cd /mnt/shared/supercomputers/build mpirun -np $SLURM_NTASKS ./bitcoin_app diff --git a/src/gpu_loader.cpp b/src/gpu_loader.cpp index 5128f08..6f62b0f 100644 --- a/src/gpu_loader.cpp +++ b/src/gpu_loader.cpp @@ -1,137 +1,133 @@ #include "gpu_loader.hpp" -#include "utils.hpp" #include -#include -#include #include -#include -#include +#include + +// Структура результата GPU (должна совпадать с gpu_plugin.cu) +struct GpuPeriodStats { + int64_t period; + double avg; + double open_min; + double open_max; + double close_min; + double close_max; + int64_t count; +}; + +// Типы функций из GPU плагина +using gpu_is_available_fn = int (*)(); + +using gpu_aggregate_periods_fn = int (*)( + const double* h_timestamps, + const double* h_open, + const double* h_high, + const double* h_low, + const double* h_close, + int num_ticks, + int64_t interval, + GpuPeriodStats** h_out_stats, + int* out_num_periods +); + +using gpu_free_results_fn = void (*)(GpuPeriodStats*); static void* get_gpu_lib_handle() { static void* h = dlopen("./libgpu_compute.so", RTLD_NOW | RTLD_LOCAL); return h; } -gpu_is_available_fn load_gpu_is_available() { +bool gpu_is_available() { void* h = get_gpu_lib_handle(); - if (!h) return nullptr; - - auto fn = (gpu_is_available_fn)dlsym(h, "gpu_is_available"); - return fn; -} - -bool gpu_is_available() { - auto gpu_is_available_fn = load_gpu_is_available(); + if (!h) return false; - if (gpu_is_available_fn && gpu_is_available_fn()) { - return true; - } + auto fn = reinterpret_cast(dlsym(h, "gpu_is_available")); + if (!fn) return false; - return false; -} - -gpu_aggregate_periods_fn load_gpu_aggregate_periods() { - void* h = get_gpu_lib_handle(); - if (!h) return nullptr; - - auto fn = (gpu_aggregate_periods_fn)dlsym(h, "gpu_aggregate_periods"); - return fn; + return fn() != 0; } bool aggregate_periods_gpu( const std::vector& records, - std::vector& out_stats, - gpu_aggregate_periods_fn gpu_fn) + int64_t aggregation_interval, + std::vector& out_stats) { - if (!gpu_fn || records.empty()) { + if (records.empty()) { + out_stats.clear(); + return true; + } + + void* h = get_gpu_lib_handle(); + if (!h) { + std::cerr << "GPU: Failed to load libgpu_compute.so" << std::endl; return false; } - - int64_t interval = get_aggregation_interval(); - - double t_total_start = omp_get_wtime(); - double t_preprocess_start = omp_get_wtime(); - - std::map> period_record_indices; - for (size_t i = 0; i < records.size(); i++) { - PeriodIndex period = static_cast(records[i].timestamp) / interval; - period_record_indices[period].push_back(i); - } - - int num_periods = static_cast(period_record_indices.size()); + auto aggregate_fn = reinterpret_cast( + dlsym(h, "gpu_aggregate_periods")); + auto free_fn = reinterpret_cast( + dlsym(h, "gpu_free_results")); - std::vector gpu_records; - std::vector period_offsets; - std::vector period_counts; - std::vector period_indices; - - gpu_records.reserve(records.size()); - period_offsets.reserve(num_periods); - period_counts.reserve(num_periods); - period_indices.reserve(num_periods); - - int current_offset = 0; - - for (auto& [period, indices] : period_record_indices) { - period_indices.push_back(period); - period_offsets.push_back(current_offset); - period_counts.push_back(static_cast(indices.size())); - - for (size_t idx : indices) { - const auto& r = records[idx]; - GpuRecord gr; - gr.timestamp = r.timestamp; - gr.open = r.open; - gr.high = r.high; - gr.low = r.low; - gr.close = r.close; - gr.volume = r.volume; - gpu_records.push_back(gr); - } - - current_offset += static_cast(indices.size()); + if (!aggregate_fn || !free_fn) { + std::cerr << "GPU: Failed to load functions from plugin" << std::endl; + return false; } - std::vector gpu_stats(num_periods); + int num_ticks = static_cast(records.size()); - double t_preprocess_ms = (omp_get_wtime() - t_preprocess_start) * 1000.0; - std::cout << " GPU CPU preprocessing: " << std::fixed << std::setprecision(3) - << std::setw(7) << t_preprocess_ms << " ms" << std::endl << std::flush; + // Конвертируем AoS в SoA + std::vector timestamps(num_ticks); + std::vector open(num_ticks); + std::vector high(num_ticks); + std::vector low(num_ticks); + std::vector close(num_ticks); - int result = gpu_fn( - gpu_records.data(), - static_cast(gpu_records.size()), - period_offsets.data(), - period_counts.data(), - period_indices.data(), - num_periods, - gpu_stats.data() + for (int i = 0; i < num_ticks; i++) { + timestamps[i] = records[i].timestamp; + open[i] = records[i].open; + high[i] = records[i].high; + low[i] = records[i].low; + close[i] = records[i].close; + } + + // Вызываем GPU функцию + GpuPeriodStats* gpu_stats = nullptr; + int num_periods = 0; + + int result = aggregate_fn( + timestamps.data(), + open.data(), + high.data(), + low.data(), + close.data(), + num_ticks, + aggregation_interval, + &gpu_stats, + &num_periods ); if (result != 0) { - std::cout << " GPU: Function returned error code " << result << std::endl; + std::cerr << "GPU: Aggregation failed with code " << result << std::endl; return false; } + // Конвертируем результат в PeriodStats out_stats.clear(); out_stats.reserve(num_periods); - for (const auto& gs : gpu_stats) { + for (int i = 0; i < num_periods; i++) { PeriodStats ps; - ps.period = gs.period; - ps.avg = gs.avg; - ps.open_min = gs.open_min; - ps.open_max = gs.open_max; - ps.close_min = gs.close_min; - ps.close_max = gs.close_max; - ps.count = gs.count; + ps.period = gpu_stats[i].period; + ps.avg = gpu_stats[i].avg; + ps.open_min = gpu_stats[i].open_min; + ps.open_max = gpu_stats[i].open_max; + ps.close_min = gpu_stats[i].close_min; + ps.close_max = gpu_stats[i].close_max; + ps.count = gpu_stats[i].count; out_stats.push_back(ps); } - double t_total_ms = (omp_get_wtime() - t_total_start) * 1000.0; - std::cout << " GPU TOTAL (with prep): " << std::fixed << std::setprecision(3) - << std::setw(7) << t_total_ms << " ms" << std::endl << std::flush; + // Освобождаем память + free_fn(gpu_stats); return true; } diff --git a/src/gpu_loader.hpp b/src/gpu_loader.hpp index 3c9db97..0281e22 100644 --- a/src/gpu_loader.hpp +++ b/src/gpu_loader.hpp @@ -3,48 +3,13 @@ #include "record.hpp" #include +// Проверка доступности CUDA bool gpu_is_available(); -// Типы функций из GPU плагина -using gpu_is_available_fn = int (*)(); - -// Структуры для GPU (должны совпадать с gpu_plugin.cu) -struct GpuRecord { - double timestamp; - double open; - double high; - double low; - double close; - double volume; -}; - -struct GpuPeriodStats { - long long period; - double avg; - double open_min; - double open_max; - double close_min; - double close_max; - long long count; -}; - -using gpu_aggregate_periods_fn = int (*)( - const GpuRecord* h_records, - int num_records, - const int* h_period_offsets, - const int* h_period_counts, - const long long* h_period_indices, - int num_periods, - GpuPeriodStats* h_out_stats -); - -// Загрузка функций из плагина -gpu_is_available_fn load_gpu_is_available(); -gpu_aggregate_periods_fn load_gpu_aggregate_periods(); - -// Обёртка для агрегации на GPU (возвращает true если успешно) +// Агрегация периодов на GPU +// Возвращает true если успешно, false если GPU недоступен или ошибка bool aggregate_periods_gpu( const std::vector& records, - std::vector& out_stats, - gpu_aggregate_periods_fn gpu_fn + int64_t aggregation_interval, + std::vector& out_stats ); diff --git a/src/gpu_plugin.cu b/src/gpu_plugin.cu index 3f22939..9a55f53 100644 --- a/src/gpu_plugin.cu +++ b/src/gpu_plugin.cu @@ -1,262 +1,430 @@ #include +#include #include #include #include #include +#include +#include +#include + +// ============================================================================ +// Структуры данных +// ============================================================================ + +// SoA (Structure of Arrays) для входных данных на GPU +struct GpuTicksSoA { + double* timestamp; + double* open; + double* high; + double* low; + double* close; + int n; +}; + +// Результат агрегации одного периода +struct GpuPeriodStats { + int64_t period; + double avg; + double open_min; + double open_max; + double close_min; + double close_max; + int64_t count; +}; + +// ============================================================================ +// Вспомогательные функции +// ============================================================================ -// CPU таймер в миллисекундах static double get_time_ms() { struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); return ts.tv_sec * 1000.0 + ts.tv_nsec / 1000000.0; } -// Структуры данных (должны совпадать с C++ кодом) -struct GpuRecord { - double timestamp; - double open; - double high; - double low; - double close; - double volume; -}; +#define CUDA_CHECK(call) do { \ + cudaError_t err = call; \ + if (err != cudaSuccess) { \ + printf("CUDA error at %s:%d: %s\n", __FILE__, __LINE__, cudaGetErrorString(err)); \ + return -1; \ + } \ +} while(0) -struct GpuDayStats { - long long day; - double avg; - double open_min; - double open_max; - double close_min; - double close_max; - long long count; -}; +// ============================================================================ +// Kernel: вычисление period_id для каждого тика +// ============================================================================ + +__global__ void compute_period_ids_kernel( + const double* __restrict__ timestamps, + int64_t* __restrict__ period_ids, + int n, + int64_t interval) +{ + int idx = blockIdx.x * blockDim.x + threadIdx.x; + if (idx < n) { + period_ids[idx] = static_cast(timestamps[idx]) / interval; + } +} + +// ============================================================================ +// Kernel: агрегация одного периода (один блок на период) +// ============================================================================ + +__global__ void aggregate_periods_kernel( + const double* __restrict__ open, + const double* __restrict__ high, + const double* __restrict__ low, + const double* __restrict__ close, + const int64_t* __restrict__ unique_periods, + const int* __restrict__ offsets, + const int* __restrict__ counts, + int num_periods, + GpuPeriodStats* __restrict__ out_stats) +{ + int period_idx = blockIdx.x; + if (period_idx >= num_periods) return; + + int offset = offsets[period_idx]; + int count = counts[period_idx]; + + // Используем shared memory для редукции внутри блока + __shared__ double s_avg_sum; + __shared__ double s_open_min; + __shared__ double s_open_max; + __shared__ double s_close_min; + __shared__ double s_close_max; + + // Инициализация shared memory первым потоком + if (threadIdx.x == 0) { + s_avg_sum = 0.0; + s_open_min = DBL_MAX; + s_open_max = -DBL_MAX; + s_close_min = DBL_MAX; + s_close_max = -DBL_MAX; + } + __syncthreads(); + + // Локальные аккумуляторы для каждого потока + double local_avg_sum = 0.0; + double local_open_min = DBL_MAX; + double local_open_max = -DBL_MAX; + double local_close_min = DBL_MAX; + double local_close_max = -DBL_MAX; + + // Каждый поток обрабатывает свою часть тиков + for (int i = threadIdx.x; i < count; i += blockDim.x) { + int tick_idx = offset + i; + double avg = (low[tick_idx] + high[tick_idx]) / 2.0; + local_avg_sum += avg; + local_open_min = min(local_open_min, open[tick_idx]); + local_open_max = max(local_open_max, open[tick_idx]); + local_close_min = min(local_close_min, close[tick_idx]); + local_close_max = max(local_close_max, close[tick_idx]); + } + + // Редукция с использованием атомарных операций + atomicAdd(&s_avg_sum, local_avg_sum); + atomicMin(reinterpret_cast(&s_open_min), + __double_as_longlong(local_open_min)); + atomicMax(reinterpret_cast(&s_open_max), + __double_as_longlong(local_open_max)); + atomicMin(reinterpret_cast(&s_close_min), + __double_as_longlong(local_close_min)); + atomicMax(reinterpret_cast(&s_close_max), + __double_as_longlong(local_close_max)); + + __syncthreads(); + + // Первый поток записывает результат + if (threadIdx.x == 0) { + GpuPeriodStats stats; + stats.period = unique_periods[period_idx]; + stats.avg = s_avg_sum / static_cast(count); + stats.open_min = s_open_min; + stats.open_max = s_open_max; + stats.close_min = s_close_min; + stats.close_max = s_close_max; + stats.count = count; + out_stats[period_idx] = stats; + } +} + +// ============================================================================ +// Простой kernel для агрегации (один поток на период) +// Используется когда периодов много и тиков в каждом мало +// ============================================================================ + +__global__ void aggregate_periods_simple_kernel( + const double* __restrict__ open, + const double* __restrict__ high, + const double* __restrict__ low, + const double* __restrict__ close, + const int64_t* __restrict__ unique_periods, + const int* __restrict__ offsets, + const int* __restrict__ counts, + int num_periods, + GpuPeriodStats* __restrict__ out_stats) +{ + int period_idx = blockIdx.x * blockDim.x + threadIdx.x; + if (period_idx >= num_periods) return; + + int offset = offsets[period_idx]; + int count = counts[period_idx]; + + double avg_sum = 0.0; + double open_min = DBL_MAX; + double open_max = -DBL_MAX; + double close_min = DBL_MAX; + double close_max = -DBL_MAX; + + for (int i = 0; i < count; i++) { + int tick_idx = offset + i; + double avg = (low[tick_idx] + high[tick_idx]) / 2.0; + avg_sum += avg; + open_min = min(open_min, open[tick_idx]); + open_max = max(open_max, open[tick_idx]); + close_min = min(close_min, close[tick_idx]); + close_max = max(close_max, close[tick_idx]); + } + + GpuPeriodStats stats; + stats.period = unique_periods[period_idx]; + stats.avg = avg_sum / static_cast(count); + stats.open_min = open_min; + stats.open_max = open_max; + stats.close_min = close_min; + stats.close_max = close_max; + stats.count = count; + out_stats[period_idx] = stats; +} + +// ============================================================================ +// Проверка доступности GPU +// ============================================================================ extern "C" int gpu_is_available() { int n = 0; cudaError_t err = cudaGetDeviceCount(&n); if (err != cudaSuccess) return 0; if (n > 0) { - // Инициализируем CUDA контекст заранее (cudaFree(0) форсирует инициализацию) - cudaFree(0); + cudaFree(0); // Форсируем инициализацию контекста } return (n > 0) ? 1 : 0; } -// Kernel для агрегации (каждый поток обрабатывает один день) -__global__ void aggregate_kernel( - const GpuRecord* records, - int num_records, - const int* day_offsets, // начало каждого дня в массиве records - const int* day_counts, // количество записей в каждом дне - const long long* day_indices, // индексы дней - int num_days, - GpuDayStats* out_stats) -{ - // Глобальный индекс потока = индекс дня - int d = blockIdx.x * blockDim.x + threadIdx.x; - - if (d >= num_days) return; - - int offset = day_offsets[d]; - int count = day_counts[d]; - - GpuDayStats stats; - stats.day = day_indices[d]; - stats.open_min = DBL_MAX; - stats.open_max = -DBL_MAX; - stats.close_min = DBL_MAX; - stats.close_max = -DBL_MAX; - stats.count = count; - - double avg_sum = 0.0; - - for (int i = 0; i < count; i++) { - const GpuRecord& r = records[offset + i]; - - // Accumulate avg = (low + high) / 2 - avg_sum += (r.low + r.high) / 2.0; - - // min/max Open - if (r.open < stats.open_min) stats.open_min = r.open; - if (r.open > stats.open_max) stats.open_max = r.open; - - // min/max Close - if (r.close < stats.close_min) stats.close_min = r.close; - if (r.close > stats.close_max) stats.close_max = r.close; - } - - stats.avg = avg_sum / static_cast(count); - out_stats[d] = stats; -} +// ============================================================================ +// Главная функция агрегации на GPU +// ============================================================================ -// Функция агрегации, вызываемая из C++ -extern "C" int gpu_aggregate_days( - const GpuRecord* h_records, - int num_records, - const int* h_day_offsets, - const int* h_day_counts, - const long long* h_day_indices, - int num_days, - GpuDayStats* h_out_stats) +extern "C" int gpu_aggregate_periods( + const double* h_timestamps, + const double* h_open, + const double* h_high, + const double* h_low, + const double* h_close, + int num_ticks, + int64_t interval, + GpuPeriodStats** h_out_stats, + int* out_num_periods) { - double cpu_total_start = get_time_ms(); - - // === Создаём CUDA события для измерения времени === - double cpu_event_create_start = get_time_ms(); - - cudaEvent_t start_malloc, stop_malloc; - cudaEvent_t start_transfer, stop_transfer; - cudaEvent_t start_kernel, stop_kernel; - cudaEvent_t start_copy_back, stop_copy_back; - cudaEvent_t start_free, stop_free; - - cudaEventCreate(&start_malloc); - cudaEventCreate(&stop_malloc); - cudaEventCreate(&start_transfer); - cudaEventCreate(&stop_transfer); - cudaEventCreate(&start_kernel); - cudaEventCreate(&stop_kernel); - cudaEventCreate(&start_copy_back); - cudaEventCreate(&stop_copy_back); - cudaEventCreate(&start_free); - cudaEventCreate(&stop_free); - - double cpu_event_create_ms = get_time_ms() - cpu_event_create_start; - - // === ИЗМЕРЕНИЕ cudaMalloc === - cudaEventRecord(start_malloc); - - GpuRecord* d_records = nullptr; - int* d_day_offsets = nullptr; - int* d_day_counts = nullptr; - long long* d_day_indices = nullptr; - GpuDayStats* d_out_stats = nullptr; - - cudaError_t err; - - err = cudaMalloc(&d_records, num_records * sizeof(GpuRecord)); - if (err != cudaSuccess) return -1; - - err = cudaMalloc(&d_day_offsets, num_days * sizeof(int)); - if (err != cudaSuccess) { cudaFree(d_records); return -2; } - - err = cudaMalloc(&d_day_counts, num_days * sizeof(int)); - if (err != cudaSuccess) { cudaFree(d_records); cudaFree(d_day_offsets); return -3; } - - err = cudaMalloc(&d_day_indices, num_days * sizeof(long long)); - if (err != cudaSuccess) { cudaFree(d_records); cudaFree(d_day_offsets); cudaFree(d_day_counts); return -4; } - - err = cudaMalloc(&d_out_stats, num_days * sizeof(GpuDayStats)); - if (err != cudaSuccess) { cudaFree(d_records); cudaFree(d_day_offsets); cudaFree(d_day_counts); cudaFree(d_day_indices); return -5; } - - cudaEventRecord(stop_malloc); - cudaEventSynchronize(stop_malloc); - - float time_malloc_ms = 0; - cudaEventElapsedTime(&time_malloc_ms, start_malloc, stop_malloc); - - // === ИЗМЕРЕНИЕ memcpy H->D === - cudaEventRecord(start_transfer); - - err = cudaMemcpy(d_records, h_records, num_records * sizeof(GpuRecord), cudaMemcpyHostToDevice); - if (err != cudaSuccess) return -10; - - err = cudaMemcpy(d_day_offsets, h_day_offsets, num_days * sizeof(int), cudaMemcpyHostToDevice); - if (err != cudaSuccess) return -11; - - err = cudaMemcpy(d_day_counts, h_day_counts, num_days * sizeof(int), cudaMemcpyHostToDevice); - if (err != cudaSuccess) return -12; - - err = cudaMemcpy(d_day_indices, h_day_indices, num_days * sizeof(long long), cudaMemcpyHostToDevice); - if (err != cudaSuccess) return -13; - - cudaEventRecord(stop_transfer); - cudaEventSynchronize(stop_transfer); - - float time_transfer_ms = 0; - cudaEventElapsedTime(&time_transfer_ms, start_transfer, stop_transfer); - - // === ИЗМЕРЕНИЕ kernel === - const int THREADS_PER_BLOCK = 256; - int num_blocks = (num_days + THREADS_PER_BLOCK - 1) / THREADS_PER_BLOCK; - - cudaEventRecord(start_kernel); - - aggregate_kernel<<>>( - d_records, num_records, - d_day_offsets, d_day_counts, d_day_indices, - num_days, d_out_stats - ); - - err = cudaGetLastError(); - if (err != cudaSuccess) { - cudaFree(d_records); - cudaFree(d_day_offsets); - cudaFree(d_day_counts); - cudaFree(d_day_indices); - cudaFree(d_out_stats); - return -7; + if (num_ticks == 0) { + *h_out_stats = nullptr; + *out_num_periods = 0; + return 0; } - cudaEventRecord(stop_kernel); - cudaEventSynchronize(stop_kernel); + std::ostringstream output; + double total_start = get_time_ms(); - float time_kernel_ms = 0; - cudaEventElapsedTime(&time_kernel_ms, start_kernel, stop_kernel); + // ======================================================================== + // Шаг 1: Выделение памяти и копирование данных на GPU + // ======================================================================== + double step1_start = get_time_ms(); - // === ИЗМЕРЕНИЕ memcpy D->H === - cudaEventRecord(start_copy_back); - cudaMemcpy(h_out_stats, d_out_stats, num_days * sizeof(GpuDayStats), cudaMemcpyDeviceToHost); - cudaEventRecord(stop_copy_back); - cudaEventSynchronize(stop_copy_back); + double* d_timestamps = nullptr; + double* d_open = nullptr; + double* d_high = nullptr; + double* d_low = nullptr; + double* d_close = nullptr; + int64_t* d_period_ids = nullptr; - float time_copy_back_ms = 0; - cudaEventElapsedTime(&time_copy_back_ms, start_copy_back, stop_copy_back); + size_t ticks_bytes = num_ticks * sizeof(double); - // === ИЗМЕРЕНИЕ cudaFree === - cudaEventRecord(start_free); + CUDA_CHECK(cudaMalloc(&d_timestamps, ticks_bytes)); + CUDA_CHECK(cudaMalloc(&d_open, ticks_bytes)); + CUDA_CHECK(cudaMalloc(&d_high, ticks_bytes)); + CUDA_CHECK(cudaMalloc(&d_low, ticks_bytes)); + CUDA_CHECK(cudaMalloc(&d_close, ticks_bytes)); + CUDA_CHECK(cudaMalloc(&d_period_ids, num_ticks * sizeof(int64_t))); - cudaFree(d_records); - cudaFree(d_day_offsets); - cudaFree(d_day_counts); - cudaFree(d_day_indices); + CUDA_CHECK(cudaMemcpy(d_timestamps, h_timestamps, ticks_bytes, cudaMemcpyHostToDevice)); + CUDA_CHECK(cudaMemcpy(d_open, h_open, ticks_bytes, cudaMemcpyHostToDevice)); + CUDA_CHECK(cudaMemcpy(d_high, h_high, ticks_bytes, cudaMemcpyHostToDevice)); + CUDA_CHECK(cudaMemcpy(d_low, h_low, ticks_bytes, cudaMemcpyHostToDevice)); + CUDA_CHECK(cudaMemcpy(d_close, h_close, ticks_bytes, cudaMemcpyHostToDevice)); + + double step1_ms = get_time_ms() - step1_start; + + // ======================================================================== + // Шаг 2: Вычисление period_id для каждого тика + // ======================================================================== + double step2_start = get_time_ms(); + + const int BLOCK_SIZE = 256; + int num_blocks = (num_ticks + BLOCK_SIZE - 1) / BLOCK_SIZE; + + compute_period_ids_kernel<<>>( + d_timestamps, d_period_ids, num_ticks, interval); + CUDA_CHECK(cudaGetLastError()); + CUDA_CHECK(cudaDeviceSynchronize()); + + double step2_ms = get_time_ms() - step2_start; + + // ======================================================================== + // Шаг 3: RLE (Run-Length Encode) для нахождения уникальных периодов + // ======================================================================== + double step3_start = get_time_ms(); + + int64_t* d_unique_periods = nullptr; + int* d_counts = nullptr; + int* d_num_runs = nullptr; + + CUDA_CHECK(cudaMalloc(&d_unique_periods, num_ticks * sizeof(int64_t))); + CUDA_CHECK(cudaMalloc(&d_counts, num_ticks * sizeof(int))); + CUDA_CHECK(cudaMalloc(&d_num_runs, sizeof(int))); + + // Определяем размер временного буфера для CUB + void* d_temp_storage = nullptr; + size_t temp_storage_bytes = 0; + + cub::DeviceRunLengthEncode::Encode( + d_temp_storage, temp_storage_bytes, + d_period_ids, d_unique_periods, d_counts, d_num_runs, + num_ticks); + + CUDA_CHECK(cudaMalloc(&d_temp_storage, temp_storage_bytes)); + + cub::DeviceRunLengthEncode::Encode( + d_temp_storage, temp_storage_bytes, + d_period_ids, d_unique_periods, d_counts, d_num_runs, + num_ticks); + CUDA_CHECK(cudaGetLastError()); + + // Копируем количество уникальных периодов + int num_periods = 0; + CUDA_CHECK(cudaMemcpy(&num_periods, d_num_runs, sizeof(int), cudaMemcpyDeviceToHost)); + + cudaFree(d_temp_storage); + d_temp_storage = nullptr; + + double step3_ms = get_time_ms() - step3_start; + + // ======================================================================== + // Шаг 4: Exclusive Scan для вычисления offsets + // ======================================================================== + double step4_start = get_time_ms(); + + int* d_offsets = nullptr; + CUDA_CHECK(cudaMalloc(&d_offsets, num_periods * sizeof(int))); + + temp_storage_bytes = 0; + cub::DeviceScan::ExclusiveSum( + d_temp_storage, temp_storage_bytes, + d_counts, d_offsets, num_periods); + + CUDA_CHECK(cudaMalloc(&d_temp_storage, temp_storage_bytes)); + + cub::DeviceScan::ExclusiveSum( + d_temp_storage, temp_storage_bytes, + d_counts, d_offsets, num_periods); + CUDA_CHECK(cudaGetLastError()); + + cudaFree(d_temp_storage); + + double step4_ms = get_time_ms() - step4_start; + + // ======================================================================== + // Шаг 5: Агрегация периодов + // ======================================================================== + double step5_start = get_time_ms(); + + GpuPeriodStats* d_out_stats = nullptr; + CUDA_CHECK(cudaMalloc(&d_out_stats, num_periods * sizeof(GpuPeriodStats))); + + // Используем простой kernel (один поток на период) + // т.к. обычно тиков в периоде немного + int agg_blocks = (num_periods + BLOCK_SIZE - 1) / BLOCK_SIZE; + + aggregate_periods_simple_kernel<<>>( + d_open, d_high, d_low, d_close, + d_unique_periods, d_offsets, d_counts, + num_periods, d_out_stats); + CUDA_CHECK(cudaGetLastError()); + CUDA_CHECK(cudaDeviceSynchronize()); + + double step5_ms = get_time_ms() - step5_start; + + // ======================================================================== + // Шаг 6: Копирование результатов на CPU + // ======================================================================== + double step6_start = get_time_ms(); + + GpuPeriodStats* h_stats = new GpuPeriodStats[num_periods]; + CUDA_CHECK(cudaMemcpy(h_stats, d_out_stats, num_periods * sizeof(GpuPeriodStats), + cudaMemcpyDeviceToHost)); + + double step6_ms = get_time_ms() - step6_start; + + // ======================================================================== + // Шаг 7: Освобождение GPU памяти + // ======================================================================== + double step7_start = get_time_ms(); + + cudaFree(d_timestamps); + cudaFree(d_open); + cudaFree(d_high); + cudaFree(d_low); + cudaFree(d_close); + cudaFree(d_period_ids); + cudaFree(d_unique_periods); + cudaFree(d_counts); + cudaFree(d_offsets); + cudaFree(d_num_runs); cudaFree(d_out_stats); - cudaEventRecord(stop_free); - cudaEventSynchronize(stop_free); + double step7_ms = get_time_ms() - step7_start; - float time_free_ms = 0; - cudaEventElapsedTime(&time_free_ms, start_free, stop_free); + // ======================================================================== + // Итого + // ======================================================================== + double total_ms = get_time_ms() - total_start; - // Общее время GPU - float time_total_ms = time_malloc_ms + time_transfer_ms + time_kernel_ms + time_copy_back_ms + time_free_ms; + // Формируем весь вывод одной строкой + output << " GPU aggregation (" << num_ticks << " ticks, interval=" << interval << " sec):\n"; + output << " 1. Malloc + H->D copy: " << std::fixed << std::setprecision(3) << std::setw(7) << step1_ms << " ms\n"; + output << " 2. Compute period_ids: " << std::setw(7) << step2_ms << " ms\n"; + output << " 3. RLE (CUB): " << std::setw(7) << step3_ms << " ms (" << num_periods << " periods)\n"; + output << " 4. Exclusive scan: " << std::setw(7) << step4_ms << " ms\n"; + output << " 5. Aggregation kernel: " << std::setw(7) << step5_ms << " ms\n"; + output << " 6. D->H copy: " << std::setw(7) << step6_ms << " ms\n"; + output << " 7. Free GPU memory: " << std::setw(7) << step7_ms << " ms\n"; + output << " GPU TOTAL: " << std::setw(7) << total_ms << " ms\n"; - // === Освобождаем события === - double cpu_event_destroy_start = get_time_ms(); - - cudaEventDestroy(start_malloc); - cudaEventDestroy(stop_malloc); - cudaEventDestroy(start_transfer); - cudaEventDestroy(stop_transfer); - cudaEventDestroy(start_kernel); - cudaEventDestroy(stop_kernel); - cudaEventDestroy(start_copy_back); - cudaEventDestroy(stop_copy_back); - cudaEventDestroy(start_free); - cudaEventDestroy(stop_free); - - double cpu_event_destroy_ms = get_time_ms() - cpu_event_destroy_start; - double cpu_total_ms = get_time_ms() - cpu_total_start; - - // Выводим детальную статистику - printf(" GPU Timings (%d records, %d days):\n", num_records, num_days); - printf(" cudaMalloc: %7.3f ms\n", time_malloc_ms); - printf(" memcpy H->D: %7.3f ms\n", time_transfer_ms); - printf(" kernel execution: %7.3f ms\n", time_kernel_ms); - printf(" memcpy D->H: %7.3f ms\n", time_copy_back_ms); - printf(" cudaFree: %7.3f ms\n", time_free_ms); - printf(" GPU TOTAL: %7.3f ms\n", cpu_total_ms); + // Выводим всё одним принтом + printf("%s", output.str().c_str()); fflush(stdout); + *h_out_stats = h_stats; + *out_num_periods = num_periods; + return 0; } + +// ============================================================================ +// Освобождение памяти результатов +// ============================================================================ + +extern "C" void gpu_free_results(GpuPeriodStats* stats) { + delete[] stats; +} diff --git a/src/main.cpp b/src/main.cpp index 50e1583..1eafef5 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -9,6 +9,7 @@ #include "aggregation.hpp" #include "intervals.hpp" #include "utils.hpp" +#include "gpu_loader.hpp" int main(int argc, char** argv) { MPI_Init(&argc, &argv); @@ -18,6 +19,17 @@ int main(int argc, char** argv) { MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &size); + // Проверяем доступность GPU + bool use_cuda = get_use_cuda(); + bool have_gpu = gpu_is_available(); + bool use_gpu = use_cuda && have_gpu; + + std::cout << "Rank " << rank + << ": USE_CUDA=" << use_cuda + << ", GPU available=" << have_gpu + << ", using " << (use_gpu ? "GPU" : "CPU") + << std::endl; + // Параллельное чтение данных double read_start = MPI_Wtime(); std::vector records = load_csv_parallel(rank, size); @@ -30,7 +42,18 @@ int main(int argc, char** argv) { // Агрегация по периодам double agg_start = MPI_Wtime(); - std::vector periods = aggregate_periods(records); + std::vector periods; + + if (use_gpu) { + int64_t interval = get_aggregation_interval(); + if (!aggregate_periods_gpu(records, interval, periods)) { + std::cerr << "Rank " << rank << ": GPU aggregation failed, falling back to CPU" << std::endl; + periods = aggregate_periods(records); + } + } else { + periods = aggregate_periods(records); + } + double agg_time = MPI_Wtime() - agg_start; std::cout << "Rank " << rank diff --git a/src/utils.cpp b/src/utils.cpp index 95bf399..e2f7689 100644 --- a/src/utils.cpp +++ b/src/utils.cpp @@ -44,6 +44,10 @@ int64_t get_aggregation_interval() { return std::stoll(get_env("AGGREGATION_INTERVAL")); } +bool get_use_cuda() { + return std::stoi(get_env("USE_CUDA")) != 0; +} + int64_t get_file_size(const std::string& path) { std::ifstream file(path, std::ios::binary | std::ios::ate); if (!file.is_open()) { diff --git a/src/utils.hpp b/src/utils.hpp index 8bd1346..4637ecb 100644 --- a/src/utils.hpp +++ b/src/utils.hpp @@ -14,6 +14,7 @@ std::string get_data_path(); std::vector get_data_read_shares(); int64_t get_read_overlap_bytes(); int64_t get_aggregation_interval(); +bool get_use_cuda(); // Структура для хранения диапазона байт для чтения struct ByteRange {