Compare commits

..

20 Commits

Author SHA1 Message Date
07dcda12a5 Cuda 2025-12-16 15:19:50 +00:00
e84d1e9fe3 Оптимизировал агрегацию на процессоре 2025-12-16 14:16:40 +00:00
a5aadbc774 Перешёл к произвольным периодам агрегации 2025-12-16 13:53:36 +00:00
2833d2f7b4 upsampling данных 2025-12-16 13:25:38 +00:00
e4e01e1df3 Неактуальная инфа 2025-12-16 12:45:11 +00:00
f5b6f0fc73 Скрипт для усреднения результатов 2025-12-15 14:46:27 +00:00
1cc9840d60 Общее время выполнения 2025-12-15 13:04:26 +00:00
dea7940e29 Поиск интервалов параллельный 2025-12-15 12:57:56 +00:00
f4ade418d6 Удаляем крайние дни 2025-12-15 11:30:50 +00:00
ab18d9770f Агрегация 2025-12-13 12:45:29 +00:00
6a22dc3ef7 Параллельное чтение данных 2025-12-13 12:13:23 +00:00
f90a641754 gpu_is_available 2025-12-13 11:07:31 +00:00
10bd6db2b8 Замечание про NFS 2025-12-13 11:06:41 +00:00
d82fde7116 Уточнил задание 2025-12-11 16:47:30 +03:00
7f16a5c17a Больше таймеров 2025-12-11 10:08:22 +00:00
44f297e55a Удалил бесполезные файлы 2025-12-11 09:06:48 +00:00
68ea345a35 Параллельность на уровне Cuda 2025-12-02 12:58:49 +00:00
143e01b2dd OpenMP в рамках одной машины 2025-12-02 12:55:16 +00:00
73c9e580e4 На GPU вычисления 2025-12-02 12:39:09 +00:00
78bdb1ddb7 На CPU вычисления 2025-12-02 12:22:16 +00:00
22 changed files with 1897 additions and 344 deletions

1
.gitignore vendored
View File

@@ -1,3 +1,4 @@
data data
build build
out.txt out.txt
*.csv

View File

@@ -1,8 +1,8 @@
CXX = mpic++ CXX = mpic++
CXXFLAGS = -std=c++17 -O2 -Wall -Wextra -Wno-cast-function-type CXXFLAGS = -std=c++17 -O2 -Wall -Wextra -Wno-cast-function-type -fopenmp
NVCC = nvcc NVCC = nvcc
NVCCFLAGS = -O2 -Xcompiler -fPIC NVCCFLAGS = -O3 -std=c++17 -arch=sm_86 -Xcompiler -fPIC
SRC_DIR = src SRC_DIR = src
BUILD_DIR = build BUILD_DIR = build

View File

@@ -2,6 +2,16 @@
[Kaggle Bitcoin Historical Data](https://www.kaggle.com/datasets/mczielinski/bitcoin-historical-data) [Kaggle Bitcoin Historical Data](https://www.kaggle.com/datasets/mczielinski/bitcoin-historical-data)
Исходные данные хранят информацию по каждой минуте. Чтобы увеличить объём данных
для более наглядной демонстрации эффективности параллельных вычислений
и вычислений на GPU, с помощью линейной интерполяции данные были преобразованы
из данных о каждой минуте в данные о каждых 10 секундах, то есть объём данных увеличился
в 6 раз.
```
python3 upsample.py -i ./data/data.csv -o ./data/data_10s.csv -s 10
```
## Задание ## Задание
Группируем данные по дням (Timestamp), за каждый день вычисляем среднюю цену Группируем данные по дням (Timestamp), за каждый день вычисляем среднюю цену
@@ -13,7 +23,7 @@
## Сборка ## Сборка
Проект обязательно должен быть расположен в общей директории для всех узлов, Проект обязательно должен быть расположен в общей директории для всех узлов,
например, в `/mnt/shared/supercomputers/bitcoin-project/build`. например, в `/mnt/shared/supercomputers/build`.
Перед запуском указать актуальный путь в `run.slurm`. Перед запуском указать актуальный путь в `run.slurm`.
```sh ```sh

115
benchmark.py Normal file
View File

@@ -0,0 +1,115 @@
"""
Запускает make run <number_of_runs> раз и считает статистику по времени выполнения.
Тупо парсит out.txt и берём значение из строки "Total execution time: <time> sec".
python benchmark.py <number_of_runs>
"""
import os
import re
import sys
import time
import subprocess
import statistics
N = int(sys.argv[1]) if len(sys.argv) > 1 else 10
OUT = "out.txt"
TIME_RE = re.compile(r"Total execution time:\s*([0-9]*\.?[0-9]+)\s*sec")
JOB_RE = re.compile(r"Submitted batch job\s+(\d+)")
APPEAR_TIMEOUT = 300.0 # ждать появления out.txt
FINISH_TIMEOUT = 3600.0 # ждать появления Total execution time (сек)
POLL = 0.2 # частота проверки файла
def wait_for_exists(path: str, timeout: float):
t0 = time.time()
while not os.path.exists(path):
if time.time() - t0 > timeout:
raise TimeoutError(f"{path} did not appear within {timeout} seconds")
time.sleep(POLL)
def try_read(path: str) -> str:
try:
with open(path, "r", encoding="utf-8", errors="replace") as f:
return f.read()
except FileNotFoundError:
return ""
except OSError:
# бывает, что файл на NFS в момент записи недоступен на чтение
return ""
def wait_for_time_line(path: str, timeout: float) -> float:
t0 = time.time()
last_report = 0.0
while True:
txt = try_read(path)
matches = TIME_RE.findall(txt)
if matches:
return float(matches[-1]) # последняя встреченная строка
now = time.time()
if now - t0 > timeout:
tail = txt[-800:] if txt else "<empty>"
raise TimeoutError("Timed out waiting for 'Total execution time' line.\n"
f"Last 800 chars of out.txt:\n{tail}")
# иногда полезно печатать прогресс раз в ~5 сек
if now - last_report > 5.0:
last_report = now
if txt:
# показать последнюю непустую строку
lines = [l for l in txt.splitlines() if l.strip()]
if lines:
print(f" waiting... last line: {lines[-1][:120]}", flush=True)
else:
print(" waiting... (out.txt empty)", flush=True)
else:
print(" waiting... (out.txt not readable yet)", flush=True)
time.sleep(POLL)
times = []
for i in range(N):
print(f"Run {i+1}/{N} ...", flush=True)
# удаляем out.txt перед запуском
try:
os.remove(OUT)
except FileNotFoundError:
pass
# запускаем make run и забираем stdout (там будет Submitted batch job XXX)
res = subprocess.run(["make", "run"], capture_output=True, text=True)
out = (res.stdout or "") + "\n" + (res.stderr or "")
job_id = None
m = JOB_RE.search(out)
if m:
job_id = m.group(1)
print(f" submitted job {job_id}", flush=True)
else:
print(" (job id not detected; will only watch out.txt)", flush=True)
# ждём появления out.txt и появления строки с Total execution time
wait_for_exists(OUT, APPEAR_TIMEOUT)
t = wait_for_time_line(OUT, FINISH_TIMEOUT)
times.append(t)
print(f" time = {t:.3f} sec", flush=True)
# опционально удалить out.txt после парсинга
try:
os.remove(OUT)
except FileNotFoundError:
pass
print("\n=== RESULTS ===")
print(f"Runs: {len(times)}")
print(f"Mean: {statistics.mean(times):.3f} sec")
print(f"Median: {statistics.median(times):.3f} sec")
print(f"Min: {min(times):.3f} sec")
print(f"Max: {max(times):.3f} sec")
if len(times) > 1:
print(f"Stddev: {statistics.stdev(times):.3f} sec")

View File

@@ -2,7 +2,7 @@
"cells": [ "cells": [
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 1, "execution_count": 20,
"id": "2acce44b", "id": "2acce44b",
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
@@ -12,7 +12,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 14, "execution_count": 21,
"id": "5ba70af7", "id": "5ba70af7",
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
@@ -111,7 +111,7 @@
"7317758 0.410369 " "7317758 0.410369 "
] ]
}, },
"execution_count": 14, "execution_count": 21,
"metadata": {}, "metadata": {},
"output_type": "execute_result" "output_type": "execute_result"
} }
@@ -124,8 +124,8 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 19, "execution_count": 23,
"id": "d4b22f3b", "id": "3b320537",
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
@@ -150,67 +150,190 @@
" <tr style=\"text-align: right;\">\n", " <tr style=\"text-align: right;\">\n",
" <th></th>\n", " <th></th>\n",
" <th>Timestamp</th>\n", " <th>Timestamp</th>\n",
" <th>Low</th>\n",
" <th>High</th>\n",
" <th>Open</th>\n", " <th>Open</th>\n",
" <th>High</th>\n",
" <th>Low</th>\n",
" <th>Close</th>\n", " <th>Close</th>\n",
" <th>Volume</th>\n",
" <th>Avg</th>\n",
" </tr>\n", " </tr>\n",
" </thead>\n", " </thead>\n",
" <tbody>\n", " <tbody>\n",
" <tr>\n", " <tr>\n",
" <th>5078</th>\n", " <th>7317754</th>\n",
" <td>2025-11-26</td>\n", " <td>2025-11-30 23:55:00+00:00</td>\n",
" <td>86304.0</td>\n", " <td>90405.0</td>\n",
" <td>90646.0</td>\n", " <td>90452.0</td>\n",
" <td>87331.0</td>\n", " <td>90403.0</td>\n",
" <td>90477.0</td>\n", " <td>90452.0</td>\n",
" <td>0.531700</td>\n",
" <td>90427.5</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>5079</th>\n", " <th>7317755</th>\n",
" <td>2025-11-27</td>\n", " <td>2025-11-30 23:56:00+00:00</td>\n",
" <td>90091.0</td>\n", " <td>90452.0</td>\n",
" <td>91926.0</td>\n", " <td>90481.0</td>\n",
" <td>90476.0</td>\n", " <td>90420.0</td>\n",
" <td>91325.0</td>\n", " <td>90420.0</td>\n",
" <td>0.055547</td>\n",
" <td>90450.5</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>5080</th>\n", " <th>7317756</th>\n",
" <td>2025-11-28</td>\n", " <td>2025-11-30 23:57:00+00:00</td>\n",
" <td>90233.0</td>\n", " <td>90412.0</td>\n",
" <td>93091.0</td>\n", " <td>90458.0</td>\n",
" <td>91326.0</td>\n", " <td>90396.0</td>\n",
" <td>90913.0</td>\n", " <td>90435.0</td>\n",
" <td>0.301931</td>\n",
" <td>90427.0</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>5081</th>\n", " <th>7317757</th>\n",
" <td>2025-11-29</td>\n", " <td>2025-11-30 23:58:00+00:00</td>\n",
" <td>90216.0</td>\n", " <td>90428.0</td>\n",
" <td>91179.0</td>\n", " <td>90428.0</td>\n",
" <td>90913.0</td>\n", " <td>90362.0</td>\n",
" <td>90832.0</td>\n", " <td>90362.0</td>\n",
" </tr>\n", " <td>4.591653</td>\n",
" <tr>\n", " <td>90395.0</td>\n",
" <th>5082</th>\n", " </tr>\n",
" <td>2025-11-30</td>\n", " <tr>\n",
" <th>7317758</th>\n",
" <td>2025-11-30 23:59:00+00:00</td>\n",
" <td>90363.0</td>\n",
" <td>90386.0</td>\n",
" <td>90362.0</td>\n", " <td>90362.0</td>\n",
" <td>91969.0</td>\n",
" <td>90832.0</td>\n",
" <td>90382.0</td>\n", " <td>90382.0</td>\n",
" <td>0.410369</td>\n",
" <td>90374.0</td>\n",
" </tr>\n", " </tr>\n",
" </tbody>\n", " </tbody>\n",
"</table>\n", "</table>\n",
"</div>" "</div>"
], ],
"text/plain": [ "text/plain": [
" Timestamp Low High Open Close\n", " Timestamp Open High Low Close \\\n",
"5078 2025-11-26 86304.0 90646.0 87331.0 90477.0\n", "7317754 2025-11-30 23:55:00+00:00 90405.0 90452.0 90403.0 90452.0 \n",
"5079 2025-11-27 90091.0 91926.0 90476.0 91325.0\n", "7317755 2025-11-30 23:56:00+00:00 90452.0 90481.0 90420.0 90420.0 \n",
"5080 2025-11-28 90233.0 93091.0 91326.0 90913.0\n", "7317756 2025-11-30 23:57:00+00:00 90412.0 90458.0 90396.0 90435.0 \n",
"5081 2025-11-29 90216.0 91179.0 90913.0 90832.0\n", "7317757 2025-11-30 23:58:00+00:00 90428.0 90428.0 90362.0 90362.0 \n",
"5082 2025-11-30 90362.0 91969.0 90832.0 90382.0" "7317758 2025-11-30 23:59:00+00:00 90363.0 90386.0 90362.0 90382.0 \n",
"\n",
" Volume Avg \n",
"7317754 0.531700 90427.5 \n",
"7317755 0.055547 90450.5 \n",
"7317756 0.301931 90427.0 \n",
"7317757 4.591653 90395.0 \n",
"7317758 0.410369 90374.0 "
] ]
}, },
"execution_count": 19, "execution_count": 23,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df['Avg'] = (df['Low'] + df['High']) / 2\n",
"df.tail()"
]
},
{
"cell_type": "code",
"execution_count": 25,
"id": "4b1cd63c",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>Timestamp</th>\n",
" <th>Avg</th>\n",
" <th>OpenMin</th>\n",
" <th>OpenMax</th>\n",
" <th>CloseMin</th>\n",
" <th>CloseMax</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>5078</th>\n",
" <td>2025-11-26</td>\n",
" <td>88057.301736</td>\n",
" <td>86312.0</td>\n",
" <td>90574.0</td>\n",
" <td>86323.0</td>\n",
" <td>90574.0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5079</th>\n",
" <td>2025-11-27</td>\n",
" <td>91245.092708</td>\n",
" <td>90126.0</td>\n",
" <td>91888.0</td>\n",
" <td>90126.0</td>\n",
" <td>91925.0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5080</th>\n",
" <td>2025-11-28</td>\n",
" <td>91324.308681</td>\n",
" <td>90255.0</td>\n",
" <td>92970.0</td>\n",
" <td>90283.0</td>\n",
" <td>92966.0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5081</th>\n",
" <td>2025-11-29</td>\n",
" <td>90746.479514</td>\n",
" <td>90265.0</td>\n",
" <td>91158.0</td>\n",
" <td>90279.0</td>\n",
" <td>91179.0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5082</th>\n",
" <td>2025-11-30</td>\n",
" <td>91187.356250</td>\n",
" <td>90363.0</td>\n",
" <td>91940.0</td>\n",
" <td>90362.0</td>\n",
" <td>91940.0</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" Timestamp Avg OpenMin OpenMax CloseMin CloseMax\n",
"5078 2025-11-26 88057.301736 86312.0 90574.0 86323.0 90574.0\n",
"5079 2025-11-27 91245.092708 90126.0 91888.0 90126.0 91925.0\n",
"5080 2025-11-28 91324.308681 90255.0 92970.0 90283.0 92966.0\n",
"5081 2025-11-29 90746.479514 90265.0 91158.0 90279.0 91179.0\n",
"5082 2025-11-30 91187.356250 90363.0 91940.0 90362.0 91940.0"
]
},
"execution_count": 25,
"metadata": {}, "metadata": {},
"output_type": "execute_result" "output_type": "execute_result"
} }
@@ -218,7 +341,13 @@
"source": [ "source": [
"df_days = (\n", "df_days = (\n",
" df.groupby(df[\"Timestamp\"].dt.date)\n", " df.groupby(df[\"Timestamp\"].dt.date)\n",
" .agg({\"Low\": \"min\", \"High\": \"max\", \"Open\": \"first\", \"Close\": \"last\"})\n", " .agg(\n",
" Avg=(\"Avg\", \"mean\"),\n",
" OpenMin=(\"Open\", \"min\"),\n",
" OpenMax=(\"Open\", \"max\"),\n",
" CloseMin=(\"Close\", \"min\"),\n",
" CloseMax=(\"Close\", \"max\"),\n",
" )\n",
" .reset_index()\n", " .reset_index()\n",
")\n", ")\n",
"df_days.tail()" "df_days.tail()"
@@ -226,111 +355,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 21, "execution_count": 26,
"id": "91823496",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>Timestamp</th>\n",
" <th>Low</th>\n",
" <th>High</th>\n",
" <th>Open</th>\n",
" <th>Close</th>\n",
" <th>Avg</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>5078</th>\n",
" <td>2025-11-26</td>\n",
" <td>86304.0</td>\n",
" <td>90646.0</td>\n",
" <td>87331.0</td>\n",
" <td>90477.0</td>\n",
" <td>88475.0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5079</th>\n",
" <td>2025-11-27</td>\n",
" <td>90091.0</td>\n",
" <td>91926.0</td>\n",
" <td>90476.0</td>\n",
" <td>91325.0</td>\n",
" <td>91008.5</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5080</th>\n",
" <td>2025-11-28</td>\n",
" <td>90233.0</td>\n",
" <td>93091.0</td>\n",
" <td>91326.0</td>\n",
" <td>90913.0</td>\n",
" <td>91662.0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5081</th>\n",
" <td>2025-11-29</td>\n",
" <td>90216.0</td>\n",
" <td>91179.0</td>\n",
" <td>90913.0</td>\n",
" <td>90832.0</td>\n",
" <td>90697.5</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5082</th>\n",
" <td>2025-11-30</td>\n",
" <td>90362.0</td>\n",
" <td>91969.0</td>\n",
" <td>90832.0</td>\n",
" <td>90382.0</td>\n",
" <td>91165.5</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" Timestamp Low High Open Close Avg\n",
"5078 2025-11-26 86304.0 90646.0 87331.0 90477.0 88475.0\n",
"5079 2025-11-27 90091.0 91926.0 90476.0 91325.0 91008.5\n",
"5080 2025-11-28 90233.0 93091.0 91326.0 90913.0 91662.0\n",
"5081 2025-11-29 90216.0 91179.0 90913.0 90832.0 90697.5\n",
"5082 2025-11-30 90362.0 91969.0 90832.0 90382.0 91165.5"
]
},
"execution_count": 21,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df_days[\"Avg\"] = (df_days[\"Low\"] + df_days[\"High\"]) / 2\n",
"df_days.tail()"
]
},
{
"cell_type": "code",
"execution_count": 25,
"id": "9a7b3310", "id": "9a7b3310",
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
@@ -358,6 +383,8 @@
" <th>start_date</th>\n", " <th>start_date</th>\n",
" <th>end_date</th>\n", " <th>end_date</th>\n",
" <th>min_open</th>\n", " <th>min_open</th>\n",
" <th>max_open</th>\n",
" <th>min_close</th>\n",
" <th>max_close</th>\n", " <th>max_close</th>\n",
" <th>start_avg</th>\n", " <th>start_avg</th>\n",
" <th>end_avg</th>\n", " <th>end_avg</th>\n",
@@ -366,76 +393,86 @@
" </thead>\n", " </thead>\n",
" <tbody>\n", " <tbody>\n",
" <tr>\n", " <tr>\n",
" <th>335</th>\n", " <th>316</th>\n",
" <td>2025-02-27</td>\n", " <td>2025-02-27</td>\n",
" <td>2025-04-23</td>\n", " <td>2025-04-25</td>\n",
" <td>76252.0</td>\n", " <td>74509.0</td>\n",
" <td>94273.0</td>\n", " <td>95801.0</td>\n",
" <td>84801.5</td>\n", " <td>74515.0</td>\n",
" <td>93335.0</td>\n", " <td>95800.0</td>\n",
" <td>0.100629</td>\n", " <td>85166.063889</td>\n",
" <td>94303.907292</td>\n",
" <td>0.107294</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>336</th>\n", " <th>317</th>\n",
" <td>2025-04-24</td>\n", " <td>2025-04-26</td>\n",
" <td>2025-05-09</td>\n", " <td>2025-05-11</td>\n",
" <td>93730.0</td>\n", " <td>92877.0</td>\n",
" <td>103261.0</td>\n", " <td>104971.0</td>\n",
" <td>92867.5</td>\n", " <td>92872.0</td>\n",
" <td>103341.0</td>\n", " <td>104965.0</td>\n",
" <td>0.112779</td>\n", " <td>94500.950347</td>\n",
" <td>104182.167708</td>\n",
" <td>0.102446</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>337</th>\n", " <th>318</th>\n",
" <td>2025-05-10</td>\n", " <td>2025-05-12</td>\n",
" <td>2025-07-11</td>\n", " <td>2025-07-11</td>\n",
" <td>100990.0</td>\n", " <td>98384.0</td>\n",
" <td>117579.0</td>\n", " <td>118833.0</td>\n",
" <td>103915.0</td>\n", " <td>98382.0</td>\n",
" <td>117032.5</td>\n", " <td>118839.0</td>\n",
" <td>0.126233</td>\n", " <td>103569.791319</td>\n",
" <td>117463.666667</td>\n",
" <td>0.134150</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>338</th>\n", " <th>319</th>\n",
" <td>2025-07-12</td>\n", " <td>2025-07-12</td>\n",
" <td>2025-11-04</td>\n", " <td>2025-11-04</td>\n",
" <td>106470.0</td>\n", " <td>98944.0</td>\n",
" <td>124728.0</td>\n", " <td>126202.0</td>\n",
" <td>117599.0</td>\n", " <td>98943.0</td>\n",
" <td>103079.0</td>\n", " <td>126202.0</td>\n",
" <td>0.123470</td>\n", " <td>117640.026389</td>\n",
" <td>103712.985764</td>\n",
" <td>0.118387</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>339</th>\n", " <th>320</th>\n",
" <td>2025-11-05</td>\n", " <td>2025-11-05</td>\n",
" <td>2025-11-18</td>\n", " <td>2025-11-18</td>\n",
" <td>92112.0</td>\n", " <td>89291.0</td>\n",
" <td>105972.0</td>\n", " <td>107343.0</td>\n",
" <td>101737.5</td>\n", " <td>89286.0</td>\n",
" <td>91471.0</td>\n", " <td>107343.0</td>\n",
" <td>0.100912</td>\n", " <td>102514.621181</td>\n",
" <td>91705.833333</td>\n",
" <td>0.105437</td>\n",
" </tr>\n", " </tr>\n",
" </tbody>\n", " </tbody>\n",
"</table>\n", "</table>\n",
"</div>" "</div>"
], ],
"text/plain": [ "text/plain": [
" start_date end_date min_open max_close start_avg end_avg \\\n", " start_date end_date min_open max_open min_close max_close \\\n",
"335 2025-02-27 2025-04-23 76252.0 94273.0 84801.5 93335.0 \n", "316 2025-02-27 2025-04-25 74509.0 95801.0 74515.0 95800.0 \n",
"336 2025-04-24 2025-05-09 93730.0 103261.0 92867.5 103341.0 \n", "317 2025-04-26 2025-05-11 92877.0 104971.0 92872.0 104965.0 \n",
"337 2025-05-10 2025-07-11 100990.0 117579.0 103915.0 117032.5 \n", "318 2025-05-12 2025-07-11 98384.0 118833.0 98382.0 118839.0 \n",
"338 2025-07-12 2025-11-04 106470.0 124728.0 117599.0 103079.0 \n", "319 2025-07-12 2025-11-04 98944.0 126202.0 98943.0 126202.0 \n",
"339 2025-11-05 2025-11-18 92112.0 105972.0 101737.5 91471.0 \n", "320 2025-11-05 2025-11-18 89291.0 107343.0 89286.0 107343.0 \n",
"\n", "\n",
" change \n", " start_avg end_avg change \n",
"335 0.100629 \n", "316 85166.063889 94303.907292 0.107294 \n",
"336 0.112779 \n", "317 94500.950347 104182.167708 0.102446 \n",
"337 0.126233 \n", "318 103569.791319 117463.666667 0.134150 \n",
"338 0.123470 \n", "319 117640.026389 103712.985764 0.118387 \n",
"339 0.100912 " "320 102514.621181 91705.833333 0.105437 "
] ]
}, },
"execution_count": 25, "execution_count": 26,
"metadata": {}, "metadata": {},
"output_type": "execute_result" "output_type": "execute_result"
} }
@@ -455,8 +492,10 @@
" intervals.append({\n", " intervals.append({\n",
" \"start_date\": df_days.loc[start_idx, \"Timestamp\"],\n", " \"start_date\": df_days.loc[start_idx, \"Timestamp\"],\n",
" \"end_date\": df_days.loc[i, \"Timestamp\"],\n", " \"end_date\": df_days.loc[i, \"Timestamp\"],\n",
" \"min_open\": interval[\"Open\"].min(),\n", " \"min_open\": interval[\"OpenMin\"].min(),\n",
" \"max_close\": interval[\"Close\"].max(),\n", " \"max_open\": interval[\"OpenMax\"].max(),\n",
" \"min_close\": interval[\"CloseMin\"].min(),\n",
" \"max_close\": interval[\"CloseMax\"].max(),\n",
" \"start_avg\": price_base,\n", " \"start_avg\": price_base,\n",
" \"end_avg\": price_now,\n", " \"end_avg\": price_now,\n",
" \"change\": change,\n", " \"change\": change,\n",
@@ -470,6 +509,14 @@
"df_intervals = pd.DataFrame(intervals)\n", "df_intervals = pd.DataFrame(intervals)\n",
"df_intervals.tail()" "df_intervals.tail()"
] ]
},
{
"cell_type": "code",
"execution_count": null,
"id": "07f1cd58",
"metadata": {},
"outputs": [],
"source": []
} }
], ],
"metadata": { "metadata": {

View File

@@ -2,9 +2,23 @@
#SBATCH --job-name=btc #SBATCH --job-name=btc
#SBATCH --nodes=4 #SBATCH --nodes=4
#SBATCH --ntasks=4 #SBATCH --ntasks=4
#SBATCH --cpus-per-task=2
#SBATCH --output=out.txt #SBATCH --output=out.txt
# mpirun -np $SLURM_NTASKS ./build/bitcoin_app # Путь к файлу данных (должен существовать на всех узлах)
export DATA_PATH="/mnt/shared/supercomputers/data/data_10s.csv"
# Доли данных для каждого ранка (сумма определяет пропорции)
export DATA_READ_SHARES="10,11,13,14"
# Размер перекрытия в байтах для обработки границ строк
export READ_OVERLAP_BYTES=131072
# Интервал агрегации в секундах (60 = минуты, 600 = 10 минут, 86400 = дни)
export AGGREGATION_INTERVAL=60
# Использовать ли CUDA для агрегации (0 = нет, 1 = да)
export USE_CUDA=1
cd /mnt/shared/supercomputers/build cd /mnt/shared/supercomputers/build
mpirun -np $SLURM_NTASKS ./bitcoin_app mpirun -np $SLURM_NTASKS ./bitcoin_app

75
src/aggregation.cpp Normal file
View File

@@ -0,0 +1,75 @@
#include "aggregation.hpp"
#include "utils.hpp"
#include <algorithm>
#include <cstdint>
#include <limits>
#include <vector>
std::vector<PeriodStats> aggregate_periods(const std::vector<Record>& records) {
const int64_t interval = get_aggregation_interval();
std::vector<PeriodStats> result;
if (records.empty()) return result;
struct PeriodAccumulator {
double avg_sum = 0.0;
double open_min = std::numeric_limits<double>::max();
double open_max = std::numeric_limits<double>::lowest();
double close_min = std::numeric_limits<double>::max();
double close_max = std::numeric_limits<double>::lowest();
int64_t count = 0;
void add(const Record& r) {
const double avg = (r.low + r.high) / 2.0;
avg_sum += avg;
open_min = std::min(open_min, r.open);
open_max = std::max(open_max, r.open);
close_min = std::min(close_min, r.close);
close_max = std::max(close_max, r.close);
++count;
}
};
PeriodIndex current_period =
static_cast<PeriodIndex>(records[0].timestamp) / interval;
PeriodAccumulator acc;
acc.add(records[0]);
for (size_t i = 1; i < records.size(); ++i) {
const Record& r = records[i];
const PeriodIndex period =
static_cast<PeriodIndex>(r.timestamp) / interval;
if (period != current_period) {
PeriodStats stats;
stats.period = current_period;
stats.avg = acc.avg_sum / static_cast<double>(acc.count);
stats.open_min = acc.open_min;
stats.open_max = acc.open_max;
stats.close_min = acc.close_min;
stats.close_max = acc.close_max;
stats.count = acc.count;
result.push_back(stats);
current_period = period;
acc = PeriodAccumulator{};
}
acc.add(r);
}
// последний период
PeriodStats stats;
stats.period = current_period;
stats.avg = acc.avg_sum / static_cast<double>(acc.count);
stats.open_min = acc.open_min;
stats.open_max = acc.open_max;
stats.close_min = acc.close_min;
stats.close_max = acc.close_max;
stats.count = acc.count;
result.push_back(stats);
return result;
}

8
src/aggregation.hpp Normal file
View File

@@ -0,0 +1,8 @@
#pragma once
#include "record.hpp"
#include "period_stats.hpp"
#include <vector>
// Агрегация записей по периодам на одном узле
std::vector<PeriodStats> aggregate_periods(const std::vector<Record>& records);

View File

@@ -2,45 +2,133 @@
#include <fstream> #include <fstream>
#include <sstream> #include <sstream>
#include <iostream> #include <iostream>
#include <stdexcept>
std::vector<Record> load_csv(const std::string& filename) { bool parse_csv_line(const std::string& line, Record& record) {
std::vector<Record> data; if (line.empty()) {
std::ifstream file(filename); return false;
if (!file.is_open()) {
throw std::runtime_error("Cannot open file: " + filename);
} }
std::string line;
// читаем первую строку (заголовок)
std::getline(file, line);
while (std::getline(file, line)) {
std::stringstream ss(line); std::stringstream ss(line);
std::string item; std::string item;
Record row; try {
// timestamp
if (!std::getline(ss, item, ',') || item.empty()) return false;
record.timestamp = std::stod(item);
std::getline(ss, item, ','); // open
row.timestamp = std::stod(item); if (!std::getline(ss, item, ',') || item.empty()) return false;
record.open = std::stod(item);
std::getline(ss, item, ','); // high
row.open = std::stod(item); if (!std::getline(ss, item, ',') || item.empty()) return false;
record.high = std::stod(item);
std::getline(ss, item, ','); // low
row.high = std::stod(item); if (!std::getline(ss, item, ',') || item.empty()) return false;
record.low = std::stod(item);
std::getline(ss, item, ','); // close
row.low = std::stod(item); if (!std::getline(ss, item, ',') || item.empty()) return false;
record.close = std::stod(item);
std::getline(ss, item, ','); // volume
row.close = std::stod(item); if (!std::getline(ss, item, ',')) return false;
// Volume может быть пустым или содержать данные
if (item.empty()) {
record.volume = 0.0;
} else {
record.volume = std::stod(item);
}
std::getline(ss, item, ','); return true;
row.volume = std::stod(item); } catch (const std::exception&) {
return false;
}
}
data.push_back(row); std::vector<Record> load_csv_parallel(int rank, int size) {
std::vector<Record> data;
// Читаем настройки из переменных окружения
std::string data_path = get_data_path();
std::vector<int> shares = get_data_read_shares();
int64_t overlap_bytes = get_read_overlap_bytes();
// Получаем размер файла
int64_t file_size = get_file_size(data_path);
// Вычисляем диапазон байт для этого ранка
ByteRange range = calculate_byte_range(rank, size, file_size, shares, overlap_bytes);
// Открываем файл и читаем нужный диапазон
std::ifstream file(data_path, std::ios::binary);
if (!file.is_open()) {
throw std::runtime_error("Cannot open file: " + data_path);
}
// Переходим к началу диапазона
file.seekg(range.start);
// Читаем данные в буфер
int64_t bytes_to_read = range.end - range.start;
std::vector<char> buffer(bytes_to_read);
file.read(buffer.data(), bytes_to_read);
int64_t bytes_read = file.gcount();
file.close();
// Преобразуем в строку для удобства парсинга
std::string content(buffer.data(), bytes_read);
// Находим позицию начала первой полной строки
size_t parse_start = 0;
if (rank == 0) {
// Первый ранк: пропускаем заголовок (первую строку)
size_t header_end = content.find('\n');
if (header_end != std::string::npos) {
parse_start = header_end + 1;
}
} else {
// Остальные ранки: начинаем с первого \n (пропускаем неполную строку)
size_t first_newline = content.find('\n');
if (first_newline != std::string::npos) {
parse_start = first_newline + 1;
}
}
// Находим позицию конца последней полной строки
size_t parse_end = content.size();
if (rank != size - 1) {
// Не последний ранк: ищем последний \n
size_t last_newline = content.rfind('\n');
if (last_newline != std::string::npos && last_newline > parse_start) {
parse_end = last_newline;
}
}
// Парсим строки
size_t pos = parse_start;
while (pos < parse_end) {
size_t line_end = content.find('\n', pos);
if (line_end == std::string::npos || line_end > parse_end) {
line_end = parse_end;
}
std::string line = content.substr(pos, line_end - pos);
// Убираем \r если есть (Windows line endings)
if (!line.empty() && line.back() == '\r') {
line.pop_back();
}
Record record;
if (parse_csv_line(line, record)) {
data.push_back(record);
}
pos = line_end + 1;
} }
return data; return data;

View File

@@ -2,5 +2,14 @@
#include <string> #include <string>
#include <vector> #include <vector>
#include "record.hpp" #include "record.hpp"
#include "utils.hpp"
std::vector<Record> load_csv(const std::string& filename); // Параллельное чтение CSV файла для MPI
// rank - номер текущего ранка
// size - общее количество ранков
// Возвращает вектор записей, прочитанных этим ранком
std::vector<Record> load_csv_parallel(int rank, int size);
// Парсинг одной строки CSV в Record
// Возвращает true если парсинг успешен
bool parse_csv_line(const std::string& line, Record& record);

View File

@@ -1,12 +1,133 @@
#include "gpu_loader.hpp" #include "gpu_loader.hpp"
#include <dlfcn.h> #include <dlfcn.h>
#include <iostream>
#include <cstdint>
gpu_is_available_fn load_gpu_is_available() { // Структура результата GPU (должна совпадать с gpu_plugin.cu)
void* h = dlopen("./libgpu_compute.so", RTLD_NOW | RTLD_LOCAL); struct GpuPeriodStats {
if (!h) return nullptr; int64_t period;
double avg;
double open_min;
double open_max;
double close_min;
double close_max;
int64_t count;
};
auto fn = (gpu_is_available_fn)dlsym(h, "gpu_is_available"); // Типы функций из GPU плагина
if (!fn) return nullptr; using gpu_is_available_fn = int (*)();
return fn; using gpu_aggregate_periods_fn = int (*)(
const double* h_timestamps,
const double* h_open,
const double* h_high,
const double* h_low,
const double* h_close,
int num_ticks,
int64_t interval,
GpuPeriodStats** h_out_stats,
int* out_num_periods
);
using gpu_free_results_fn = void (*)(GpuPeriodStats*);
static void* get_gpu_lib_handle() {
static void* h = dlopen("./libgpu_compute.so", RTLD_NOW | RTLD_LOCAL);
return h;
}
bool gpu_is_available() {
void* h = get_gpu_lib_handle();
if (!h) return false;
auto fn = reinterpret_cast<gpu_is_available_fn>(dlsym(h, "gpu_is_available"));
if (!fn) return false;
return fn() != 0;
}
bool aggregate_periods_gpu(
const std::vector<Record>& records,
int64_t aggregation_interval,
std::vector<PeriodStats>& out_stats)
{
if (records.empty()) {
out_stats.clear();
return true;
}
void* h = get_gpu_lib_handle();
if (!h) {
std::cerr << "GPU: Failed to load libgpu_compute.so" << std::endl;
return false;
}
auto aggregate_fn = reinterpret_cast<gpu_aggregate_periods_fn>(
dlsym(h, "gpu_aggregate_periods"));
auto free_fn = reinterpret_cast<gpu_free_results_fn>(
dlsym(h, "gpu_free_results"));
if (!aggregate_fn || !free_fn) {
std::cerr << "GPU: Failed to load functions from plugin" << std::endl;
return false;
}
int num_ticks = static_cast<int>(records.size());
// Конвертируем AoS в SoA
std::vector<double> timestamps(num_ticks);
std::vector<double> open(num_ticks);
std::vector<double> high(num_ticks);
std::vector<double> low(num_ticks);
std::vector<double> close(num_ticks);
for (int i = 0; i < num_ticks; i++) {
timestamps[i] = records[i].timestamp;
open[i] = records[i].open;
high[i] = records[i].high;
low[i] = records[i].low;
close[i] = records[i].close;
}
// Вызываем GPU функцию
GpuPeriodStats* gpu_stats = nullptr;
int num_periods = 0;
int result = aggregate_fn(
timestamps.data(),
open.data(),
high.data(),
low.data(),
close.data(),
num_ticks,
aggregation_interval,
&gpu_stats,
&num_periods
);
if (result != 0) {
std::cerr << "GPU: Aggregation failed with code " << result << std::endl;
return false;
}
// Конвертируем результат в PeriodStats
out_stats.clear();
out_stats.reserve(num_periods);
for (int i = 0; i < num_periods; i++) {
PeriodStats ps;
ps.period = gpu_stats[i].period;
ps.avg = gpu_stats[i].avg;
ps.open_min = gpu_stats[i].open_min;
ps.open_max = gpu_stats[i].open_max;
ps.close_min = gpu_stats[i].close_min;
ps.close_max = gpu_stats[i].close_max;
ps.count = gpu_stats[i].count;
out_stats.push_back(ps);
}
// Освобождаем память
free_fn(gpu_stats);
return true;
} }

View File

@@ -1,4 +1,15 @@
#pragma once #pragma once
using gpu_is_available_fn = int (*)(); #include "period_stats.hpp"
#include "record.hpp"
#include <vector>
gpu_is_available_fn load_gpu_is_available(); // Проверка доступности CUDA
bool gpu_is_available();
// Агрегация периодов на GPU
// Возвращает true если успешно, false если GPU недоступен или ошибка
bool aggregate_periods_gpu(
const std::vector<Record>& records,
int64_t aggregation_interval,
std::vector<PeriodStats>& out_stats
);

View File

@@ -1,8 +1,430 @@
#include <cuda_runtime.h> #include <cuda_runtime.h>
#include <cub/cub.cuh>
#include <cstdint>
#include <cfloat>
#include <cstdio>
#include <ctime>
#include <string>
#include <sstream>
#include <iomanip>
// ============================================================================
// Структуры данных
// ============================================================================
// SoA (Structure of Arrays) для входных данных на GPU
struct GpuTicksSoA {
double* timestamp;
double* open;
double* high;
double* low;
double* close;
int n;
};
// Результат агрегации одного периода
struct GpuPeriodStats {
int64_t period;
double avg;
double open_min;
double open_max;
double close_min;
double close_max;
int64_t count;
};
// ============================================================================
// Вспомогательные функции
// ============================================================================
static double get_time_ms() {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return ts.tv_sec * 1000.0 + ts.tv_nsec / 1000000.0;
}
#define CUDA_CHECK(call) do { \
cudaError_t err = call; \
if (err != cudaSuccess) { \
printf("CUDA error at %s:%d: %s\n", __FILE__, __LINE__, cudaGetErrorString(err)); \
return -1; \
} \
} while(0)
// ============================================================================
// Kernel: вычисление period_id для каждого тика
// ============================================================================
__global__ void compute_period_ids_kernel(
const double* __restrict__ timestamps,
int64_t* __restrict__ period_ids,
int n,
int64_t interval)
{
int idx = blockIdx.x * blockDim.x + threadIdx.x;
if (idx < n) {
period_ids[idx] = static_cast<int64_t>(timestamps[idx]) / interval;
}
}
// ============================================================================
// Kernel: агрегация одного периода (один блок на период)
// ============================================================================
__global__ void aggregate_periods_kernel(
const double* __restrict__ open,
const double* __restrict__ high,
const double* __restrict__ low,
const double* __restrict__ close,
const int64_t* __restrict__ unique_periods,
const int* __restrict__ offsets,
const int* __restrict__ counts,
int num_periods,
GpuPeriodStats* __restrict__ out_stats)
{
int period_idx = blockIdx.x;
if (period_idx >= num_periods) return;
int offset = offsets[period_idx];
int count = counts[period_idx];
// Используем shared memory для редукции внутри блока
__shared__ double s_avg_sum;
__shared__ double s_open_min;
__shared__ double s_open_max;
__shared__ double s_close_min;
__shared__ double s_close_max;
// Инициализация shared memory первым потоком
if (threadIdx.x == 0) {
s_avg_sum = 0.0;
s_open_min = DBL_MAX;
s_open_max = -DBL_MAX;
s_close_min = DBL_MAX;
s_close_max = -DBL_MAX;
}
__syncthreads();
// Локальные аккумуляторы для каждого потока
double local_avg_sum = 0.0;
double local_open_min = DBL_MAX;
double local_open_max = -DBL_MAX;
double local_close_min = DBL_MAX;
double local_close_max = -DBL_MAX;
// Каждый поток обрабатывает свою часть тиков
for (int i = threadIdx.x; i < count; i += blockDim.x) {
int tick_idx = offset + i;
double avg = (low[tick_idx] + high[tick_idx]) / 2.0;
local_avg_sum += avg;
local_open_min = min(local_open_min, open[tick_idx]);
local_open_max = max(local_open_max, open[tick_idx]);
local_close_min = min(local_close_min, close[tick_idx]);
local_close_max = max(local_close_max, close[tick_idx]);
}
// Редукция с использованием атомарных операций
atomicAdd(&s_avg_sum, local_avg_sum);
atomicMin(reinterpret_cast<unsigned long long*>(&s_open_min),
__double_as_longlong(local_open_min));
atomicMax(reinterpret_cast<unsigned long long*>(&s_open_max),
__double_as_longlong(local_open_max));
atomicMin(reinterpret_cast<unsigned long long*>(&s_close_min),
__double_as_longlong(local_close_min));
atomicMax(reinterpret_cast<unsigned long long*>(&s_close_max),
__double_as_longlong(local_close_max));
__syncthreads();
// Первый поток записывает результат
if (threadIdx.x == 0) {
GpuPeriodStats stats;
stats.period = unique_periods[period_idx];
stats.avg = s_avg_sum / static_cast<double>(count);
stats.open_min = s_open_min;
stats.open_max = s_open_max;
stats.close_min = s_close_min;
stats.close_max = s_close_max;
stats.count = count;
out_stats[period_idx] = stats;
}
}
// ============================================================================
// Простой kernel для агрегации (один поток на период)
// Используется когда периодов много и тиков в каждом мало
// ============================================================================
__global__ void aggregate_periods_simple_kernel(
const double* __restrict__ open,
const double* __restrict__ high,
const double* __restrict__ low,
const double* __restrict__ close,
const int64_t* __restrict__ unique_periods,
const int* __restrict__ offsets,
const int* __restrict__ counts,
int num_periods,
GpuPeriodStats* __restrict__ out_stats)
{
int period_idx = blockIdx.x * blockDim.x + threadIdx.x;
if (period_idx >= num_periods) return;
int offset = offsets[period_idx];
int count = counts[period_idx];
double avg_sum = 0.0;
double open_min = DBL_MAX;
double open_max = -DBL_MAX;
double close_min = DBL_MAX;
double close_max = -DBL_MAX;
for (int i = 0; i < count; i++) {
int tick_idx = offset + i;
double avg = (low[tick_idx] + high[tick_idx]) / 2.0;
avg_sum += avg;
open_min = min(open_min, open[tick_idx]);
open_max = max(open_max, open[tick_idx]);
close_min = min(close_min, close[tick_idx]);
close_max = max(close_max, close[tick_idx]);
}
GpuPeriodStats stats;
stats.period = unique_periods[period_idx];
stats.avg = avg_sum / static_cast<double>(count);
stats.open_min = open_min;
stats.open_max = open_max;
stats.close_min = close_min;
stats.close_max = close_max;
stats.count = count;
out_stats[period_idx] = stats;
}
// ============================================================================
// Проверка доступности GPU
// ============================================================================
extern "C" int gpu_is_available() { extern "C" int gpu_is_available() {
int n = 0; int n = 0;
cudaError_t err = cudaGetDeviceCount(&n); cudaError_t err = cudaGetDeviceCount(&n);
if (err != cudaSuccess) return 0; if (err != cudaSuccess) return 0;
if (n > 0) {
cudaFree(0); // Форсируем инициализацию контекста
}
return (n > 0) ? 1 : 0; return (n > 0) ? 1 : 0;
} }
// ============================================================================
// Главная функция агрегации на GPU
// ============================================================================
extern "C" int gpu_aggregate_periods(
const double* h_timestamps,
const double* h_open,
const double* h_high,
const double* h_low,
const double* h_close,
int num_ticks,
int64_t interval,
GpuPeriodStats** h_out_stats,
int* out_num_periods)
{
if (num_ticks == 0) {
*h_out_stats = nullptr;
*out_num_periods = 0;
return 0;
}
std::ostringstream output;
double total_start = get_time_ms();
// ========================================================================
// Шаг 1: Выделение памяти и копирование данных на GPU
// ========================================================================
double step1_start = get_time_ms();
double* d_timestamps = nullptr;
double* d_open = nullptr;
double* d_high = nullptr;
double* d_low = nullptr;
double* d_close = nullptr;
int64_t* d_period_ids = nullptr;
size_t ticks_bytes = num_ticks * sizeof(double);
CUDA_CHECK(cudaMalloc(&d_timestamps, ticks_bytes));
CUDA_CHECK(cudaMalloc(&d_open, ticks_bytes));
CUDA_CHECK(cudaMalloc(&d_high, ticks_bytes));
CUDA_CHECK(cudaMalloc(&d_low, ticks_bytes));
CUDA_CHECK(cudaMalloc(&d_close, ticks_bytes));
CUDA_CHECK(cudaMalloc(&d_period_ids, num_ticks * sizeof(int64_t)));
CUDA_CHECK(cudaMemcpy(d_timestamps, h_timestamps, ticks_bytes, cudaMemcpyHostToDevice));
CUDA_CHECK(cudaMemcpy(d_open, h_open, ticks_bytes, cudaMemcpyHostToDevice));
CUDA_CHECK(cudaMemcpy(d_high, h_high, ticks_bytes, cudaMemcpyHostToDevice));
CUDA_CHECK(cudaMemcpy(d_low, h_low, ticks_bytes, cudaMemcpyHostToDevice));
CUDA_CHECK(cudaMemcpy(d_close, h_close, ticks_bytes, cudaMemcpyHostToDevice));
double step1_ms = get_time_ms() - step1_start;
// ========================================================================
// Шаг 2: Вычисление period_id для каждого тика
// ========================================================================
double step2_start = get_time_ms();
const int BLOCK_SIZE = 256;
int num_blocks = (num_ticks + BLOCK_SIZE - 1) / BLOCK_SIZE;
compute_period_ids_kernel<<<num_blocks, BLOCK_SIZE>>>(
d_timestamps, d_period_ids, num_ticks, interval);
CUDA_CHECK(cudaGetLastError());
CUDA_CHECK(cudaDeviceSynchronize());
double step2_ms = get_time_ms() - step2_start;
// ========================================================================
// Шаг 3: RLE (Run-Length Encode) для нахождения уникальных периодов
// ========================================================================
double step3_start = get_time_ms();
int64_t* d_unique_periods = nullptr;
int* d_counts = nullptr;
int* d_num_runs = nullptr;
CUDA_CHECK(cudaMalloc(&d_unique_periods, num_ticks * sizeof(int64_t)));
CUDA_CHECK(cudaMalloc(&d_counts, num_ticks * sizeof(int)));
CUDA_CHECK(cudaMalloc(&d_num_runs, sizeof(int)));
// Определяем размер временного буфера для CUB
void* d_temp_storage = nullptr;
size_t temp_storage_bytes = 0;
cub::DeviceRunLengthEncode::Encode(
d_temp_storage, temp_storage_bytes,
d_period_ids, d_unique_periods, d_counts, d_num_runs,
num_ticks);
CUDA_CHECK(cudaMalloc(&d_temp_storage, temp_storage_bytes));
cub::DeviceRunLengthEncode::Encode(
d_temp_storage, temp_storage_bytes,
d_period_ids, d_unique_periods, d_counts, d_num_runs,
num_ticks);
CUDA_CHECK(cudaGetLastError());
// Копируем количество уникальных периодов
int num_periods = 0;
CUDA_CHECK(cudaMemcpy(&num_periods, d_num_runs, sizeof(int), cudaMemcpyDeviceToHost));
cudaFree(d_temp_storage);
d_temp_storage = nullptr;
double step3_ms = get_time_ms() - step3_start;
// ========================================================================
// Шаг 4: Exclusive Scan для вычисления offsets
// ========================================================================
double step4_start = get_time_ms();
int* d_offsets = nullptr;
CUDA_CHECK(cudaMalloc(&d_offsets, num_periods * sizeof(int)));
temp_storage_bytes = 0;
cub::DeviceScan::ExclusiveSum(
d_temp_storage, temp_storage_bytes,
d_counts, d_offsets, num_periods);
CUDA_CHECK(cudaMalloc(&d_temp_storage, temp_storage_bytes));
cub::DeviceScan::ExclusiveSum(
d_temp_storage, temp_storage_bytes,
d_counts, d_offsets, num_periods);
CUDA_CHECK(cudaGetLastError());
cudaFree(d_temp_storage);
double step4_ms = get_time_ms() - step4_start;
// ========================================================================
// Шаг 5: Агрегация периодов
// ========================================================================
double step5_start = get_time_ms();
GpuPeriodStats* d_out_stats = nullptr;
CUDA_CHECK(cudaMalloc(&d_out_stats, num_periods * sizeof(GpuPeriodStats)));
// Используем простой kernel (один поток на период)
// т.к. обычно тиков в периоде немного
int agg_blocks = (num_periods + BLOCK_SIZE - 1) / BLOCK_SIZE;
aggregate_periods_simple_kernel<<<agg_blocks, BLOCK_SIZE>>>(
d_open, d_high, d_low, d_close,
d_unique_periods, d_offsets, d_counts,
num_periods, d_out_stats);
CUDA_CHECK(cudaGetLastError());
CUDA_CHECK(cudaDeviceSynchronize());
double step5_ms = get_time_ms() - step5_start;
// ========================================================================
// Шаг 6: Копирование результатов на CPU
// ========================================================================
double step6_start = get_time_ms();
GpuPeriodStats* h_stats = new GpuPeriodStats[num_periods];
CUDA_CHECK(cudaMemcpy(h_stats, d_out_stats, num_periods * sizeof(GpuPeriodStats),
cudaMemcpyDeviceToHost));
double step6_ms = get_time_ms() - step6_start;
// ========================================================================
// Шаг 7: Освобождение GPU памяти
// ========================================================================
double step7_start = get_time_ms();
cudaFree(d_timestamps);
cudaFree(d_open);
cudaFree(d_high);
cudaFree(d_low);
cudaFree(d_close);
cudaFree(d_period_ids);
cudaFree(d_unique_periods);
cudaFree(d_counts);
cudaFree(d_offsets);
cudaFree(d_num_runs);
cudaFree(d_out_stats);
double step7_ms = get_time_ms() - step7_start;
// ========================================================================
// Итого
// ========================================================================
double total_ms = get_time_ms() - total_start;
// Формируем весь вывод одной строкой
output << " GPU aggregation (" << num_ticks << " ticks, interval=" << interval << " sec):\n";
output << " 1. Malloc + H->D copy: " << std::fixed << std::setprecision(3) << std::setw(7) << step1_ms << " ms\n";
output << " 2. Compute period_ids: " << std::setw(7) << step2_ms << " ms\n";
output << " 3. RLE (CUB): " << std::setw(7) << step3_ms << " ms (" << num_periods << " periods)\n";
output << " 4. Exclusive scan: " << std::setw(7) << step4_ms << " ms\n";
output << " 5. Aggregation kernel: " << std::setw(7) << step5_ms << " ms\n";
output << " 6. D->H copy: " << std::setw(7) << step6_ms << " ms\n";
output << " 7. Free GPU memory: " << std::setw(7) << step7_ms << " ms\n";
output << " GPU TOTAL: " << std::setw(7) << total_ms << " ms\n";
// Выводим всё одним принтом
printf("%s", output.str().c_str());
fflush(stdout);
*h_out_stats = h_stats;
*out_num_periods = num_periods;
return 0;
}
// ============================================================================
// Освобождение памяти результатов
// ============================================================================
extern "C" void gpu_free_results(GpuPeriodStats* stats) {
delete[] stats;
}

320
src/intervals.cpp Normal file
View File

@@ -0,0 +1,320 @@
#include "intervals.hpp"
#include "utils.hpp"
#include <mpi.h>
#include <algorithm>
#include <cmath>
#include <fstream>
#include <iomanip>
#include <sstream>
#include <ctime>
#include <limits>
// Вспомогательная структура для накопления min/max в интервале
struct IntervalAccumulator {
PeriodIndex start_period;
double start_avg;
double open_min;
double open_max;
double close_min;
double close_max;
void init(const PeriodStats& p) {
start_period = p.period;
start_avg = p.avg;
open_min = p.open_min;
open_max = p.open_max;
close_min = p.close_min;
close_max = p.close_max;
}
void update(const PeriodStats& p) {
open_min = std::min(open_min, p.open_min);
open_max = std::max(open_max, p.open_max);
close_min = std::min(close_min, p.close_min);
close_max = std::max(close_max, p.close_max);
}
Interval finalize(const PeriodStats& end_period, double change) const {
Interval iv;
iv.start_period = start_period;
iv.end_period = end_period.period;
iv.start_avg = start_avg;
iv.end_avg = end_period.avg;
iv.change = change;
iv.open_min = std::min(open_min, end_period.open_min);
iv.open_max = std::max(open_max, end_period.open_max);
iv.close_min = std::min(close_min, end_period.close_min);
iv.close_max = std::max(close_max, end_period.close_max);
return iv;
}
};
// Упакованная структура PeriodStats для MPI передачи (8 doubles)
struct PackedPeriodStats {
double period; // PeriodIndex as double
double avg;
double open_min;
double open_max;
double close_min;
double close_max;
double count; // int64_t as double
double valid; // флаг валидности (1.0 = valid, 0.0 = invalid)
void pack(const PeriodStats& ps) {
period = static_cast<double>(ps.period);
avg = ps.avg;
open_min = ps.open_min;
open_max = ps.open_max;
close_min = ps.close_min;
close_max = ps.close_max;
count = static_cast<double>(ps.count);
valid = 1.0;
}
PeriodStats unpack() const {
PeriodStats ps;
ps.period = static_cast<PeriodIndex>(period);
ps.avg = avg;
ps.open_min = open_min;
ps.open_max = open_max;
ps.close_min = close_min;
ps.close_max = close_max;
ps.count = static_cast<int64_t>(count);
return ps;
}
bool is_valid() const { return valid > 0.5; }
void set_invalid() { valid = 0.0; }
};
IntervalResult find_intervals_parallel(
const std::vector<PeriodStats>& periods,
int rank, int size,
double threshold)
{
IntervalResult result;
result.compute_time = 0.0;
result.wait_time = 0.0;
if (periods.empty()) {
if (rank < size - 1) {
PackedPeriodStats invalid;
invalid.set_invalid();
MPI_Send(&invalid, 8, MPI_DOUBLE, rank + 1, 0, MPI_COMM_WORLD);
}
return result;
}
double compute_start = MPI_Wtime();
size_t process_until = (rank == size - 1) ? periods.size() : periods.size() - 1;
IntervalAccumulator acc;
size_t start_idx = 0;
bool have_pending_interval = false;
if (rank > 0) {
double wait_start = MPI_Wtime();
PackedPeriodStats received;
MPI_Recv(&received, 8, MPI_DOUBLE, rank - 1, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
result.wait_time = MPI_Wtime() - wait_start;
compute_start = MPI_Wtime();
if (received.is_valid()) {
PeriodStats prev_period = received.unpack();
for (start_idx = 0; start_idx < periods.size(); start_idx++) {
if (periods[start_idx].period > prev_period.period) {
break;
}
}
if (start_idx < process_until) {
acc.init(prev_period);
have_pending_interval = true;
for (size_t i = start_idx; i < process_until; i++) {
acc.update(periods[i]);
double change = std::abs(periods[i].avg - acc.start_avg) / acc.start_avg;
if (change >= threshold) {
result.intervals.push_back(acc.finalize(periods[i], change));
have_pending_interval = false;
start_idx = i + 1;
if (start_idx < process_until) {
acc.init(periods[start_idx]);
have_pending_interval = true;
}
}
}
}
} else {
if (process_until > 0) {
acc.init(periods[0]);
have_pending_interval = true;
start_idx = 0;
}
}
} else {
if (process_until > 0) {
acc.init(periods[0]);
have_pending_interval = true;
start_idx = 0;
}
}
if (rank == 0 && have_pending_interval) {
for (size_t i = 1; i < process_until; i++) {
acc.update(periods[i]);
double change = std::abs(periods[i].avg - acc.start_avg) / acc.start_avg;
if (change >= threshold) {
result.intervals.push_back(acc.finalize(periods[i], change));
have_pending_interval = false;
start_idx = i + 1;
if (start_idx < process_until) {
acc.init(periods[start_idx]);
have_pending_interval = true;
}
}
}
}
if (rank == size - 1 && have_pending_interval && !periods.empty()) {
const auto& last_period = periods.back();
double change = std::abs(last_period.avg - acc.start_avg) / acc.start_avg;
result.intervals.push_back(acc.finalize(last_period, change));
}
result.compute_time = MPI_Wtime() - compute_start;
if (rank < size - 1) {
PackedPeriodStats to_send;
if (have_pending_interval) {
PeriodStats start_period;
start_period.period = acc.start_period;
start_period.avg = acc.start_avg;
start_period.open_min = acc.open_min;
start_period.open_max = acc.open_max;
start_period.close_min = acc.close_min;
start_period.close_max = acc.close_max;
start_period.count = 0;
to_send.pack(start_period);
} else if (periods.size() >= 2) {
to_send.pack(periods[periods.size() - 2]);
} else {
to_send.set_invalid();
}
MPI_Send(&to_send, 8, MPI_DOUBLE, rank + 1, 0, MPI_COMM_WORLD);
}
return result;
}
double collect_intervals(
std::vector<Interval>& local_intervals,
int rank, int size)
{
double wait_time = 0.0;
if (rank == 0) {
for (int r = 1; r < size; r++) {
double wait_start = MPI_Wtime();
int count;
MPI_Recv(&count, 1, MPI_INT, r, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
if (count > 0) {
std::vector<double> buffer(count * 9);
MPI_Recv(buffer.data(), count * 9, MPI_DOUBLE, r, 2, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
for (int i = 0; i < count; i++) {
Interval iv;
iv.start_period = static_cast<PeriodIndex>(buffer[i * 9 + 0]);
iv.end_period = static_cast<PeriodIndex>(buffer[i * 9 + 1]);
iv.open_min = buffer[i * 9 + 2];
iv.open_max = buffer[i * 9 + 3];
iv.close_min = buffer[i * 9 + 4];
iv.close_max = buffer[i * 9 + 5];
iv.start_avg = buffer[i * 9 + 6];
iv.end_avg = buffer[i * 9 + 7];
iv.change = buffer[i * 9 + 8];
local_intervals.push_back(iv);
}
}
wait_time += MPI_Wtime() - wait_start;
}
std::sort(local_intervals.begin(), local_intervals.end(),
[](const Interval& a, const Interval& b) {
return a.start_period < b.start_period;
});
} else {
int count = static_cast<int>(local_intervals.size());
MPI_Send(&count, 1, MPI_INT, 0, 1, MPI_COMM_WORLD);
if (count > 0) {
std::vector<double> buffer(count * 9);
for (int i = 0; i < count; i++) {
const auto& iv = local_intervals[i];
buffer[i * 9 + 0] = static_cast<double>(iv.start_period);
buffer[i * 9 + 1] = static_cast<double>(iv.end_period);
buffer[i * 9 + 2] = iv.open_min;
buffer[i * 9 + 3] = iv.open_max;
buffer[i * 9 + 4] = iv.close_min;
buffer[i * 9 + 5] = iv.close_max;
buffer[i * 9 + 6] = iv.start_avg;
buffer[i * 9 + 7] = iv.end_avg;
buffer[i * 9 + 8] = iv.change;
}
MPI_Send(buffer.data(), count * 9, MPI_DOUBLE, 0, 2, MPI_COMM_WORLD);
}
}
return wait_time;
}
std::string period_index_to_datetime(PeriodIndex period) {
int64_t interval = get_aggregation_interval();
time_t ts = static_cast<time_t>(period) * interval;
struct tm* tm_info = gmtime(&ts);
std::ostringstream oss;
oss << std::setfill('0')
<< (tm_info->tm_year + 1900) << "-"
<< std::setw(2) << (tm_info->tm_mon + 1) << "-"
<< std::setw(2) << tm_info->tm_mday << " "
<< std::setw(2) << tm_info->tm_hour << ":"
<< std::setw(2) << tm_info->tm_min << ":"
<< std::setw(2) << tm_info->tm_sec;
return oss.str();
}
void write_intervals(const std::string& filename, const std::vector<Interval>& intervals) {
std::ofstream out(filename);
out << std::fixed << std::setprecision(2);
out << "start_datetime,end_datetime,open_min,open_max,close_min,close_max,start_avg,end_avg,change\n";
for (const auto& iv : intervals) {
out << period_index_to_datetime(iv.start_period) << ","
<< period_index_to_datetime(iv.end_period) << ","
<< iv.open_min << ","
<< iv.open_max << ","
<< iv.close_min << ","
<< iv.close_max << ","
<< iv.start_avg << ","
<< iv.end_avg << ","
<< std::setprecision(6) << iv.change << "\n";
}
}

44
src/intervals.hpp Normal file
View File

@@ -0,0 +1,44 @@
#pragma once
#include "period_stats.hpp"
#include <vector>
#include <string>
// Интервал с изменением >= threshold
struct Interval {
PeriodIndex start_period;
PeriodIndex end_period;
double open_min;
double open_max;
double close_min;
double close_max;
double start_avg;
double end_avg;
double change;
};
// Результат параллельного построения интервалов
struct IntervalResult {
std::vector<Interval> intervals;
double compute_time; // время вычислений
double wait_time; // время ожидания данных от предыдущего ранка
};
// Параллельное построение интервалов с использованием MPI
IntervalResult find_intervals_parallel(
const std::vector<PeriodStats>& periods,
int rank, int size,
double threshold = 0.10
);
// Сбор интервалов со всех ранков на ранк 0
double collect_intervals(
std::vector<Interval>& local_intervals,
int rank, int size
);
// Вывод интервалов в файл
void write_intervals(const std::string& filename, const std::vector<Interval>& intervals);
// Преобразование PeriodIndex в строку даты/времени
std::string period_index_to_datetime(PeriodIndex period);

View File

@@ -1,87 +1,115 @@
#include <mpi.h> #include <mpi.h>
#include <iostream> #include <iostream>
#include <vector> #include <vector>
#include <map> #include <iomanip>
#include "csv_loader.hpp" #include "csv_loader.hpp"
#include "utils.hpp"
#include "record.hpp" #include "record.hpp"
#include "period_stats.hpp"
#include "aggregation.hpp"
#include "intervals.hpp"
#include "utils.hpp"
#include "gpu_loader.hpp" #include "gpu_loader.hpp"
// Функция: отобрать записи для конкретного ранга
std::vector<Record> select_records_for_rank(
const std::map<long long, std::vector<Record>>& days,
const std::vector<long long>& day_list)
{
std::vector<Record> out;
for (auto d : day_list) {
auto it = days.find(d);
if (it != days.end()) {
const auto& vec = it->second;
out.insert(out.end(), vec.begin(), vec.end());
}
}
return out;
}
int main(int argc, char** argv) { int main(int argc, char** argv) {
MPI_Init(&argc, &argv); MPI_Init(&argc, &argv);
double total_start = MPI_Wtime();
int rank, size; int rank, size;
MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size); MPI_Comm_size(MPI_COMM_WORLD, &size);
std::vector<Record> local_records; // Проверяем доступность GPU
bool use_cuda = get_use_cuda();
bool have_gpu = gpu_is_available();
bool use_gpu = use_cuda && have_gpu;
std::cout << "Rank " << rank
<< ": USE_CUDA=" << use_cuda
<< ", GPU available=" << have_gpu
<< ", using " << (use_gpu ? "GPU" : "CPU")
<< std::endl;
// Параллельное чтение данных
double read_start = MPI_Wtime();
std::vector<Record> records = load_csv_parallel(rank, size);
double read_time = MPI_Wtime() - read_start;
std::cout << "Rank " << rank
<< ": read " << records.size() << " records"
<< " in " << std::fixed << std::setprecision(3) << read_time << " sec"
<< std::endl;
// Агрегация по периодам
double agg_start = MPI_Wtime();
std::vector<PeriodStats> periods;
if (use_gpu) {
int64_t interval = get_aggregation_interval();
if (!aggregate_periods_gpu(records, interval, periods)) {
std::cerr << "Rank " << rank << ": GPU aggregation failed, falling back to CPU" << std::endl;
periods = aggregate_periods(records);
}
} else {
periods = aggregate_periods(records);
}
double agg_time = MPI_Wtime() - agg_start;
std::cout << "Rank " << rank
<< ": aggregated " << periods.size() << " periods"
<< " [" << (periods.empty() ? 0 : periods.front().period)
<< ".." << (periods.empty() ? 0 : periods.back().period) << "]"
<< " in " << std::fixed << std::setprecision(3) << agg_time << " sec"
<< std::endl;
// Удаляем крайние периоды (могут быть неполными из-за параллельного чтения)
trim_edge_periods(periods, rank, size);
std::cout << "Rank " << rank
<< ": after trim " << periods.size() << " periods"
<< " [" << (periods.empty() ? 0 : periods.front().period)
<< ".." << (periods.empty() ? 0 : periods.back().period) << "]"
<< std::endl;
// Параллельное построение интервалов
IntervalResult iv_result = find_intervals_parallel(periods, rank, size);
std::cout << "Rank " << rank
<< ": found " << iv_result.intervals.size() << " intervals"
<< ", compute " << std::fixed << std::setprecision(6) << iv_result.compute_time << " sec"
<< ", wait " << iv_result.wait_time << " sec"
<< std::endl;
// Сбор интервалов на ранке 0
double collect_wait = collect_intervals(iv_result.intervals, rank, size);
if (rank == 0) { if (rank == 0) {
std::cout << "Rank 0 loading CSV..." << std::endl; std::cout << "Rank 0: collected " << iv_result.intervals.size() << " total intervals"
<< ", wait " << std::fixed << std::setprecision(3) << collect_wait << " sec"
// Запускаем из build << std::endl;
auto records = load_csv("../data/data.csv");
auto days = group_by_day(records);
auto parts = split_days(days, size);
// Рассылаем данные
for (int r = 0; r < size; r++) {
auto vec = select_records_for_rank(days, parts[r]);
if (r == 0) {
// себе не отправляем — сразу сохраняем
local_records = vec;
continue;
} }
int count = vec.size(); // Запись результатов в файл (только ранк 0)
MPI_Send(&count, 1, MPI_INT, r, 0, MPI_COMM_WORLD); if (rank == 0) {
MPI_Send(vec.data(), count * sizeof(Record), MPI_BYTE, r, 1, MPI_COMM_WORLD); double write_start = MPI_Wtime();
} write_intervals("result.csv", iv_result.intervals);
} double write_time = MPI_Wtime() - write_start;
else {
// Принимает данные
int count = 0;
MPI_Recv(&count, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
local_records.resize(count); std::cout << "Rank 0: wrote result.csv"
MPI_Recv(local_records.data(), count * sizeof(Record), << " in " << std::fixed << std::setprecision(3) << write_time << " sec"
MPI_BYTE, 0, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE); << std::endl;
} }
// Вывод общего времени выполнения
MPI_Barrier(MPI_COMM_WORLD); MPI_Barrier(MPI_COMM_WORLD);
double total_time = MPI_Wtime() - total_start;
std::cout << "Rank " << rank << " received " if (rank == 0) {
<< local_records.size() << " records" << std::endl; std::cout << "Total execution time: "
<< std::fixed << std::setprecision(3)
<< total_time << " sec" << std::endl;
auto gpu_is_available = load_gpu_is_available();
int have_gpu = 0;
if (gpu_is_available) {
std::cout << "Rank " << rank << " dll loaded" << std::endl;
have_gpu = gpu_is_available();
} }
std::cout << "Rank " << rank << ": gpu_available=" << have_gpu << "\n";
MPI_Finalize(); MPI_Finalize();
return 0; return 0;
} }

View File

@@ -1,11 +0,0 @@
#include "mpi_utils.hpp"
#include <mpi.h>
#include <iostream>
void mpi_print_basic() {
int rank, size;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
std::cout << "Hello from rank " << rank << " of " << size << std::endl;
}

View File

@@ -1,2 +0,0 @@
#pragma once
void mpi_print_basic();

15
src/period_stats.hpp Normal file
View File

@@ -0,0 +1,15 @@
#pragma once
#include <cstdint>
using PeriodIndex = int64_t;
// Агрегированные данные за один период
struct PeriodStats {
PeriodIndex period; // индекс периода (timestamp / AGGREGATION_INTERVAL)
double avg; // среднее значение (Low + High) / 2 по всем записям
double open_min; // минимальный Open за период
double open_max; // максимальный Open за период
double close_min; // минимальный Close за период
double close_max; // максимальный Close за период
int64_t count; // количество записей, по которым агрегировали
};

View File

@@ -1,25 +1,105 @@
#include "utils.hpp" #include "utils.hpp"
#include <fstream>
#include <sstream>
#include <stdexcept>
#include <numeric>
std::map<DayIndex, std::vector<Record>> group_by_day(const std::vector<Record>& recs) { int get_num_cpu_threads() {
std::map<DayIndex, std::vector<Record>> days; const char* env_threads = std::getenv("NUM_CPU_THREADS");
int num_cpu_threads = 1;
for (const auto& r : recs) { if (env_threads) {
DayIndex day = static_cast<DayIndex>(r.timestamp) / 86400; num_cpu_threads = std::atoi(env_threads);
days[day].push_back(r); if (num_cpu_threads < 1) num_cpu_threads = 1;
}
return num_cpu_threads;
} }
return days; std::string get_env(const char* name) {
const char* env = std::getenv(name);
if (!env) {
throw std::runtime_error(std::string("Environment variable not set: ") + name);
}
return std::string(env);
} }
std::vector<std::vector<DayIndex>> split_days(const std::map<DayIndex, std::vector<Record>>& days, int parts) { std::string get_data_path() {
std::vector<std::vector<DayIndex>> out(parts); return get_env("DATA_PATH");
int i = 0;
for (auto& kv : days) {
out[i % parts].push_back(kv.first);
i++;
} }
return out; std::vector<int> get_data_read_shares() {
std::vector<int> shares;
std::stringstream ss(get_env("DATA_READ_SHARES"));
std::string item;
while (std::getline(ss, item, ',')) {
shares.push_back(std::stoi(item));
}
return shares;
} }
int64_t get_read_overlap_bytes() {
return std::stoll(get_env("READ_OVERLAP_BYTES"));
}
int64_t get_aggregation_interval() {
return std::stoll(get_env("AGGREGATION_INTERVAL"));
}
bool get_use_cuda() {
return std::stoi(get_env("USE_CUDA")) != 0;
}
int64_t get_file_size(const std::string& path) {
std::ifstream file(path, std::ios::binary | std::ios::ate);
if (!file.is_open()) {
throw std::runtime_error("Cannot open file: " + path);
}
return static_cast<int64_t>(file.tellg());
}
ByteRange calculate_byte_range(int rank, int size, int64_t file_size,
const std::vector<int>& shares, int64_t overlap_bytes) {
std::vector<int> effective_shares;
if (shares.size() == static_cast<size_t>(size)) {
effective_shares = shares;
} else {
effective_shares.assign(size, 1);
}
int total_shares = std::accumulate(effective_shares.begin(), effective_shares.end(), 0);
int64_t bytes_per_share = file_size / total_shares;
int64_t base_start = 0;
for (int i = 0; i < rank; i++) {
base_start += bytes_per_share * effective_shares[i];
}
int64_t base_end = base_start + bytes_per_share * effective_shares[rank];
ByteRange range;
if (rank == 0) {
range.start = 0;
range.end = std::min(base_end + overlap_bytes, file_size);
} else if (rank == size - 1) {
range.start = std::max(base_start - overlap_bytes, static_cast<int64_t>(0));
range.end = file_size;
} else {
range.start = std::max(base_start - overlap_bytes, static_cast<int64_t>(0));
range.end = std::min(base_end + overlap_bytes, file_size);
}
return range;
}
void trim_edge_periods(std::vector<PeriodStats>& periods, int rank, int size) {
if (periods.empty()) return;
if (rank == 0) {
periods.pop_back();
} else if (rank == size - 1) {
periods.erase(periods.begin());
} else {
periods.pop_back();
periods.erase(periods.begin());
}
}

View File

@@ -1,11 +1,33 @@
#pragma once #pragma once
#include "record.hpp" #include "record.hpp"
#include "period_stats.hpp"
#include <map> #include <map>
#include <vector> #include <vector>
#include <string>
#include <cstdlib>
#include <cstdint>
using DayIndex = long long; // Чтение переменных окружения
int get_num_cpu_threads();
std::string get_data_path();
std::vector<int> get_data_read_shares();
int64_t get_read_overlap_bytes();
int64_t get_aggregation_interval();
bool get_use_cuda();
std::map<DayIndex, std::vector<Record>> group_by_day(const std::vector<Record>& recs); // Структура для хранения диапазона байт для чтения
std::vector<std::vector<DayIndex>> split_days(const std::map<DayIndex, std::vector<Record>>& days, int parts); struct ByteRange {
int64_t start;
int64_t end; // exclusive
};
// Вычисляет диапазон байт для конкретного ранка
ByteRange calculate_byte_range(int rank, int size, int64_t file_size,
const std::vector<int>& shares, int64_t overlap_bytes);
// Получение размера файла
int64_t get_file_size(const std::string& path);
// Удаляет крайние периоды, которые могут быть неполными из-за параллельного чтения
void trim_edge_periods(std::vector<PeriodStats>& periods, int rank, int size);

136
upsample.py Normal file
View File

@@ -0,0 +1,136 @@
#!/usr/bin/env python3
import argparse
import sys
import time
def parse_row(line: str):
# Timestamp,Open,High,Low,Close,Volume
ts, o, h, l, c, v = line.split(',')
return int(float(ts)), float(o), float(h), float(l), float(c), float(v)
def fmt_row(ts, o, h, l, c, v):
return f"{ts},{o:.2f},{h:.2f},{l:.2f},{c:.2f},{v:.8f}\n"
def count_lines_fast(path: str) -> int:
with open(path, "rb") as f:
return sum(1 for _ in f) - 1 # минус header
def main(inp, out, step, flush_every):
# считаем количество строк для прогресса
total_lines = count_lines_fast(inp)
print(f"Total input rows: {total_lines:,}", file=sys.stderr)
start_time = time.time()
processed = 0
last_report = start_time
with open(inp, "r", buffering=8 * 1024 * 1024) as fin, \
open(out, "w", buffering=8 * 1024 * 1024) as fout:
fin.readline() # пропускаем header
fout.write("Timestamp,Open,High,Low,Close,Volume\n")
first = fin.readline()
if not first:
return
prev = parse_row(first.strip())
out_buf = []
out_rows = 0
for line in fin:
line = line.strip()
if not line:
continue
cur = parse_row(line)
t1, o1, h1, l1, c1, v1 = prev
t2, o2, h2, l2, c2, v2 = cur
dt = t2 - t1
steps = dt // step
if steps > 0:
do = o2 - o1
dh = h2 - h1
dl = l2 - l1
dc = c2 - c1
dv = v2 - v1
inv = 1.0 / steps
for i in range(steps):
a = i * inv
out_buf.append(fmt_row(
t1 + i * step,
o1 + do * a,
h1 + dh * a,
l1 + dl * a,
c1 + dc * a,
v1 + dv * a
))
out_rows += steps
prev = cur
processed += 1
# прогресс
if processed % 100_000 == 0:
now = time.time()
if now - last_report >= 0.5:
pct = processed * 100.0 / total_lines
elapsed = now - start_time
speed = processed / elapsed if elapsed > 0 else 0
eta = (total_lines - processed) / speed if speed > 0 else 0
print(
f"\rprocessed: {processed:,} / {total_lines:,} "
f"({pct:5.1f}%) | "
f"out ~ {out_rows:,} | "
f"{speed:,.0f} rows/s | "
f"ETA {eta/60:5.1f} min",
end="",
file=sys.stderr,
flush=True,
)
last_report = now
# сброс буфера
if out_rows >= flush_every:
fout.write("".join(out_buf))
out_buf.clear()
out_rows = 0
# остатки
if out_buf:
fout.write("".join(out_buf))
# последнюю строку пишем как есть
t, o, h, l, c, v = prev
fout.write(fmt_row(t, o, h, l, c, v))
total_time = time.time() - start_time
print(
f"\nDone in {total_time/60:.1f} min",
file=sys.stderr
)
if __name__ == "__main__":
ap = argparse.ArgumentParser()
ap.add_argument("-i", "--input", required=True)
ap.add_argument("-o", "--output", required=True)
ap.add_argument("-s", "--step", type=int, default=10)
ap.add_argument("--flush-every", type=int, default=200_000)
args = ap.parse_args()
if args.step <= 0:
raise SystemExit("step must be > 0")
main(args.input, args.output, args.step, args.flush_every)