Compare commits

..

18 Commits

Author SHA1 Message Date
07dcda12a5 Cuda 2025-12-16 15:19:50 +00:00
e84d1e9fe3 Оптимизировал агрегацию на процессоре 2025-12-16 14:16:40 +00:00
a5aadbc774 Перешёл к произвольным периодам агрегации 2025-12-16 13:53:36 +00:00
2833d2f7b4 upsampling данных 2025-12-16 13:25:38 +00:00
e4e01e1df3 Неактуальная инфа 2025-12-16 12:45:11 +00:00
f5b6f0fc73 Скрипт для усреднения результатов 2025-12-15 14:46:27 +00:00
1cc9840d60 Общее время выполнения 2025-12-15 13:04:26 +00:00
dea7940e29 Поиск интервалов параллельный 2025-12-15 12:57:56 +00:00
f4ade418d6 Удаляем крайние дни 2025-12-15 11:30:50 +00:00
ab18d9770f Агрегация 2025-12-13 12:45:29 +00:00
6a22dc3ef7 Параллельное чтение данных 2025-12-13 12:13:23 +00:00
f90a641754 gpu_is_available 2025-12-13 11:07:31 +00:00
10bd6db2b8 Замечание про NFS 2025-12-13 11:06:41 +00:00
d82fde7116 Уточнил задание 2025-12-11 16:47:30 +03:00
7f16a5c17a Больше таймеров 2025-12-11 10:08:22 +00:00
44f297e55a Удалил бесполезные файлы 2025-12-11 09:06:48 +00:00
68ea345a35 Параллельность на уровне Cuda 2025-12-02 12:58:49 +00:00
143e01b2dd OpenMP в рамках одной машины 2025-12-02 12:55:16 +00:00
23 changed files with 1776 additions and 837 deletions

2
.gitignore vendored
View File

@@ -1,4 +1,4 @@
data data
build build
out.txt out.txt
result.csv *.csv

View File

@@ -1,5 +1,5 @@
CXX = mpic++ CXX = mpic++
CXXFLAGS = -std=c++17 -O2 -Wall -Wextra -Wno-cast-function-type CXXFLAGS = -std=c++17 -O2 -Wall -Wextra -Wno-cast-function-type -fopenmp
NVCC = nvcc NVCC = nvcc
NVCCFLAGS = -O3 -std=c++17 -arch=sm_86 -Xcompiler -fPIC NVCCFLAGS = -O3 -std=c++17 -arch=sm_86 -Xcompiler -fPIC

View File

@@ -2,6 +2,16 @@
[Kaggle Bitcoin Historical Data](https://www.kaggle.com/datasets/mczielinski/bitcoin-historical-data) [Kaggle Bitcoin Historical Data](https://www.kaggle.com/datasets/mczielinski/bitcoin-historical-data)
Исходные данные хранят информацию по каждой минуте. Чтобы увеличить объём данных
для более наглядной демонстрации эффективности параллельных вычислений
и вычислений на GPU, с помощью линейной интерполяции данные были преобразованы
из данных о каждой минуте в данные о каждых 10 секундах, то есть объём данных увеличился
в 6 раз.
```
python3 upsample.py -i ./data/data.csv -o ./data/data_10s.csv -s 10
```
## Задание ## Задание
Группируем данные по дням (Timestamp), за каждый день вычисляем среднюю цену Группируем данные по дням (Timestamp), за каждый день вычисляем среднюю цену
@@ -13,7 +23,7 @@
## Сборка ## Сборка
Проект обязательно должен быть расположен в общей директории для всех узлов, Проект обязательно должен быть расположен в общей директории для всех узлов,
например, в `/mnt/shared/supercomputers/bitcoin-project/build`. например, в `/mnt/shared/supercomputers/build`.
Перед запуском указать актуальный путь в `run.slurm`. Перед запуском указать актуальный путь в `run.slurm`.
```sh ```sh

115
benchmark.py Normal file
View File

@@ -0,0 +1,115 @@
"""
Запускает make run <number_of_runs> раз и считает статистику по времени выполнения.
Тупо парсит out.txt и берём значение из строки "Total execution time: <time> sec".
python benchmark.py <number_of_runs>
"""
import os
import re
import sys
import time
import subprocess
import statistics
N = int(sys.argv[1]) if len(sys.argv) > 1 else 10
OUT = "out.txt"
TIME_RE = re.compile(r"Total execution time:\s*([0-9]*\.?[0-9]+)\s*sec")
JOB_RE = re.compile(r"Submitted batch job\s+(\d+)")
APPEAR_TIMEOUT = 300.0 # ждать появления out.txt
FINISH_TIMEOUT = 3600.0 # ждать появления Total execution time (сек)
POLL = 0.2 # частота проверки файла
def wait_for_exists(path: str, timeout: float):
t0 = time.time()
while not os.path.exists(path):
if time.time() - t0 > timeout:
raise TimeoutError(f"{path} did not appear within {timeout} seconds")
time.sleep(POLL)
def try_read(path: str) -> str:
try:
with open(path, "r", encoding="utf-8", errors="replace") as f:
return f.read()
except FileNotFoundError:
return ""
except OSError:
# бывает, что файл на NFS в момент записи недоступен на чтение
return ""
def wait_for_time_line(path: str, timeout: float) -> float:
t0 = time.time()
last_report = 0.0
while True:
txt = try_read(path)
matches = TIME_RE.findall(txt)
if matches:
return float(matches[-1]) # последняя встреченная строка
now = time.time()
if now - t0 > timeout:
tail = txt[-800:] if txt else "<empty>"
raise TimeoutError("Timed out waiting for 'Total execution time' line.\n"
f"Last 800 chars of out.txt:\n{tail}")
# иногда полезно печатать прогресс раз в ~5 сек
if now - last_report > 5.0:
last_report = now
if txt:
# показать последнюю непустую строку
lines = [l for l in txt.splitlines() if l.strip()]
if lines:
print(f" waiting... last line: {lines[-1][:120]}", flush=True)
else:
print(" waiting... (out.txt empty)", flush=True)
else:
print(" waiting... (out.txt not readable yet)", flush=True)
time.sleep(POLL)
times = []
for i in range(N):
print(f"Run {i+1}/{N} ...", flush=True)
# удаляем out.txt перед запуском
try:
os.remove(OUT)
except FileNotFoundError:
pass
# запускаем make run и забираем stdout (там будет Submitted batch job XXX)
res = subprocess.run(["make", "run"], capture_output=True, text=True)
out = (res.stdout or "") + "\n" + (res.stderr or "")
job_id = None
m = JOB_RE.search(out)
if m:
job_id = m.group(1)
print(f" submitted job {job_id}", flush=True)
else:
print(" (job id not detected; will only watch out.txt)", flush=True)
# ждём появления out.txt и появления строки с Total execution time
wait_for_exists(OUT, APPEAR_TIMEOUT)
t = wait_for_time_line(OUT, FINISH_TIMEOUT)
times.append(t)
print(f" time = {t:.3f} sec", flush=True)
# опционально удалить out.txt после парсинга
try:
os.remove(OUT)
except FileNotFoundError:
pass
print("\n=== RESULTS ===")
print(f"Runs: {len(times)}")
print(f"Mean: {statistics.mean(times):.3f} sec")
print(f"Median: {statistics.median(times):.3f} sec")
print(f"Min: {min(times):.3f} sec")
print(f"Max: {max(times):.3f} sec")
if len(times) > 1:
print(f"Stddev: {statistics.stdev(times):.3f} sec")

View File

@@ -2,7 +2,7 @@
"cells": [ "cells": [
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 1, "execution_count": 20,
"id": "2acce44b", "id": "2acce44b",
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
@@ -12,7 +12,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 14, "execution_count": 21,
"id": "5ba70af7", "id": "5ba70af7",
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
@@ -111,7 +111,7 @@
"7317758 0.410369 " "7317758 0.410369 "
] ]
}, },
"execution_count": 14, "execution_count": 21,
"metadata": {}, "metadata": {},
"output_type": "execute_result" "output_type": "execute_result"
} }
@@ -124,8 +124,8 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 19, "execution_count": 23,
"id": "d4b22f3b", "id": "3b320537",
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
@@ -150,67 +150,190 @@
" <tr style=\"text-align: right;\">\n", " <tr style=\"text-align: right;\">\n",
" <th></th>\n", " <th></th>\n",
" <th>Timestamp</th>\n", " <th>Timestamp</th>\n",
" <th>Low</th>\n",
" <th>High</th>\n",
" <th>Open</th>\n", " <th>Open</th>\n",
" <th>High</th>\n",
" <th>Low</th>\n",
" <th>Close</th>\n", " <th>Close</th>\n",
" <th>Volume</th>\n",
" <th>Avg</th>\n",
" </tr>\n", " </tr>\n",
" </thead>\n", " </thead>\n",
" <tbody>\n", " <tbody>\n",
" <tr>\n", " <tr>\n",
" <th>5078</th>\n", " <th>7317754</th>\n",
" <td>2025-11-26</td>\n", " <td>2025-11-30 23:55:00+00:00</td>\n",
" <td>86304.0</td>\n", " <td>90405.0</td>\n",
" <td>90646.0</td>\n", " <td>90452.0</td>\n",
" <td>87331.0</td>\n", " <td>90403.0</td>\n",
" <td>90477.0</td>\n", " <td>90452.0</td>\n",
" <td>0.531700</td>\n",
" <td>90427.5</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>5079</th>\n", " <th>7317755</th>\n",
" <td>2025-11-27</td>\n", " <td>2025-11-30 23:56:00+00:00</td>\n",
" <td>90091.0</td>\n", " <td>90452.0</td>\n",
" <td>91926.0</td>\n", " <td>90481.0</td>\n",
" <td>90476.0</td>\n", " <td>90420.0</td>\n",
" <td>91325.0</td>\n", " <td>90420.0</td>\n",
" <td>0.055547</td>\n",
" <td>90450.5</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>5080</th>\n", " <th>7317756</th>\n",
" <td>2025-11-28</td>\n", " <td>2025-11-30 23:57:00+00:00</td>\n",
" <td>90233.0</td>\n", " <td>90412.0</td>\n",
" <td>93091.0</td>\n", " <td>90458.0</td>\n",
" <td>91326.0</td>\n", " <td>90396.0</td>\n",
" <td>90913.0</td>\n", " <td>90435.0</td>\n",
" <td>0.301931</td>\n",
" <td>90427.0</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>5081</th>\n", " <th>7317757</th>\n",
" <td>2025-11-29</td>\n", " <td>2025-11-30 23:58:00+00:00</td>\n",
" <td>90216.0</td>\n", " <td>90428.0</td>\n",
" <td>91179.0</td>\n", " <td>90428.0</td>\n",
" <td>90913.0</td>\n", " <td>90362.0</td>\n",
" <td>90832.0</td>\n", " <td>90362.0</td>\n",
" </tr>\n", " <td>4.591653</td>\n",
" <tr>\n", " <td>90395.0</td>\n",
" <th>5082</th>\n", " </tr>\n",
" <td>2025-11-30</td>\n", " <tr>\n",
" <th>7317758</th>\n",
" <td>2025-11-30 23:59:00+00:00</td>\n",
" <td>90363.0</td>\n",
" <td>90386.0</td>\n",
" <td>90362.0</td>\n", " <td>90362.0</td>\n",
" <td>91969.0</td>\n",
" <td>90832.0</td>\n",
" <td>90382.0</td>\n", " <td>90382.0</td>\n",
" <td>0.410369</td>\n",
" <td>90374.0</td>\n",
" </tr>\n", " </tr>\n",
" </tbody>\n", " </tbody>\n",
"</table>\n", "</table>\n",
"</div>" "</div>"
], ],
"text/plain": [ "text/plain": [
" Timestamp Low High Open Close\n", " Timestamp Open High Low Close \\\n",
"5078 2025-11-26 86304.0 90646.0 87331.0 90477.0\n", "7317754 2025-11-30 23:55:00+00:00 90405.0 90452.0 90403.0 90452.0 \n",
"5079 2025-11-27 90091.0 91926.0 90476.0 91325.0\n", "7317755 2025-11-30 23:56:00+00:00 90452.0 90481.0 90420.0 90420.0 \n",
"5080 2025-11-28 90233.0 93091.0 91326.0 90913.0\n", "7317756 2025-11-30 23:57:00+00:00 90412.0 90458.0 90396.0 90435.0 \n",
"5081 2025-11-29 90216.0 91179.0 90913.0 90832.0\n", "7317757 2025-11-30 23:58:00+00:00 90428.0 90428.0 90362.0 90362.0 \n",
"5082 2025-11-30 90362.0 91969.0 90832.0 90382.0" "7317758 2025-11-30 23:59:00+00:00 90363.0 90386.0 90362.0 90382.0 \n",
"\n",
" Volume Avg \n",
"7317754 0.531700 90427.5 \n",
"7317755 0.055547 90450.5 \n",
"7317756 0.301931 90427.0 \n",
"7317757 4.591653 90395.0 \n",
"7317758 0.410369 90374.0 "
] ]
}, },
"execution_count": 19, "execution_count": 23,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df['Avg'] = (df['Low'] + df['High']) / 2\n",
"df.tail()"
]
},
{
"cell_type": "code",
"execution_count": 25,
"id": "4b1cd63c",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>Timestamp</th>\n",
" <th>Avg</th>\n",
" <th>OpenMin</th>\n",
" <th>OpenMax</th>\n",
" <th>CloseMin</th>\n",
" <th>CloseMax</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>5078</th>\n",
" <td>2025-11-26</td>\n",
" <td>88057.301736</td>\n",
" <td>86312.0</td>\n",
" <td>90574.0</td>\n",
" <td>86323.0</td>\n",
" <td>90574.0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5079</th>\n",
" <td>2025-11-27</td>\n",
" <td>91245.092708</td>\n",
" <td>90126.0</td>\n",
" <td>91888.0</td>\n",
" <td>90126.0</td>\n",
" <td>91925.0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5080</th>\n",
" <td>2025-11-28</td>\n",
" <td>91324.308681</td>\n",
" <td>90255.0</td>\n",
" <td>92970.0</td>\n",
" <td>90283.0</td>\n",
" <td>92966.0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5081</th>\n",
" <td>2025-11-29</td>\n",
" <td>90746.479514</td>\n",
" <td>90265.0</td>\n",
" <td>91158.0</td>\n",
" <td>90279.0</td>\n",
" <td>91179.0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5082</th>\n",
" <td>2025-11-30</td>\n",
" <td>91187.356250</td>\n",
" <td>90363.0</td>\n",
" <td>91940.0</td>\n",
" <td>90362.0</td>\n",
" <td>91940.0</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" Timestamp Avg OpenMin OpenMax CloseMin CloseMax\n",
"5078 2025-11-26 88057.301736 86312.0 90574.0 86323.0 90574.0\n",
"5079 2025-11-27 91245.092708 90126.0 91888.0 90126.0 91925.0\n",
"5080 2025-11-28 91324.308681 90255.0 92970.0 90283.0 92966.0\n",
"5081 2025-11-29 90746.479514 90265.0 91158.0 90279.0 91179.0\n",
"5082 2025-11-30 91187.356250 90363.0 91940.0 90362.0 91940.0"
]
},
"execution_count": 25,
"metadata": {}, "metadata": {},
"output_type": "execute_result" "output_type": "execute_result"
} }
@@ -218,7 +341,13 @@
"source": [ "source": [
"df_days = (\n", "df_days = (\n",
" df.groupby(df[\"Timestamp\"].dt.date)\n", " df.groupby(df[\"Timestamp\"].dt.date)\n",
" .agg({\"Low\": \"min\", \"High\": \"max\", \"Open\": \"first\", \"Close\": \"last\"})\n", " .agg(\n",
" Avg=(\"Avg\", \"mean\"),\n",
" OpenMin=(\"Open\", \"min\"),\n",
" OpenMax=(\"Open\", \"max\"),\n",
" CloseMin=(\"Close\", \"min\"),\n",
" CloseMax=(\"Close\", \"max\"),\n",
" )\n",
" .reset_index()\n", " .reset_index()\n",
")\n", ")\n",
"df_days.tail()" "df_days.tail()"
@@ -226,111 +355,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 21, "execution_count": 26,
"id": "91823496",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>Timestamp</th>\n",
" <th>Low</th>\n",
" <th>High</th>\n",
" <th>Open</th>\n",
" <th>Close</th>\n",
" <th>Avg</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>5078</th>\n",
" <td>2025-11-26</td>\n",
" <td>86304.0</td>\n",
" <td>90646.0</td>\n",
" <td>87331.0</td>\n",
" <td>90477.0</td>\n",
" <td>88475.0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5079</th>\n",
" <td>2025-11-27</td>\n",
" <td>90091.0</td>\n",
" <td>91926.0</td>\n",
" <td>90476.0</td>\n",
" <td>91325.0</td>\n",
" <td>91008.5</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5080</th>\n",
" <td>2025-11-28</td>\n",
" <td>90233.0</td>\n",
" <td>93091.0</td>\n",
" <td>91326.0</td>\n",
" <td>90913.0</td>\n",
" <td>91662.0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5081</th>\n",
" <td>2025-11-29</td>\n",
" <td>90216.0</td>\n",
" <td>91179.0</td>\n",
" <td>90913.0</td>\n",
" <td>90832.0</td>\n",
" <td>90697.5</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5082</th>\n",
" <td>2025-11-30</td>\n",
" <td>90362.0</td>\n",
" <td>91969.0</td>\n",
" <td>90832.0</td>\n",
" <td>90382.0</td>\n",
" <td>91165.5</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" Timestamp Low High Open Close Avg\n",
"5078 2025-11-26 86304.0 90646.0 87331.0 90477.0 88475.0\n",
"5079 2025-11-27 90091.0 91926.0 90476.0 91325.0 91008.5\n",
"5080 2025-11-28 90233.0 93091.0 91326.0 90913.0 91662.0\n",
"5081 2025-11-29 90216.0 91179.0 90913.0 90832.0 90697.5\n",
"5082 2025-11-30 90362.0 91969.0 90832.0 90382.0 91165.5"
]
},
"execution_count": 21,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df_days[\"Avg\"] = (df_days[\"Low\"] + df_days[\"High\"]) / 2\n",
"df_days.tail()"
]
},
{
"cell_type": "code",
"execution_count": 25,
"id": "9a7b3310", "id": "9a7b3310",
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
@@ -358,6 +383,8 @@
" <th>start_date</th>\n", " <th>start_date</th>\n",
" <th>end_date</th>\n", " <th>end_date</th>\n",
" <th>min_open</th>\n", " <th>min_open</th>\n",
" <th>max_open</th>\n",
" <th>min_close</th>\n",
" <th>max_close</th>\n", " <th>max_close</th>\n",
" <th>start_avg</th>\n", " <th>start_avg</th>\n",
" <th>end_avg</th>\n", " <th>end_avg</th>\n",
@@ -366,76 +393,86 @@
" </thead>\n", " </thead>\n",
" <tbody>\n", " <tbody>\n",
" <tr>\n", " <tr>\n",
" <th>335</th>\n", " <th>316</th>\n",
" <td>2025-02-27</td>\n", " <td>2025-02-27</td>\n",
" <td>2025-04-23</td>\n", " <td>2025-04-25</td>\n",
" <td>76252.0</td>\n", " <td>74509.0</td>\n",
" <td>94273.0</td>\n", " <td>95801.0</td>\n",
" <td>84801.5</td>\n", " <td>74515.0</td>\n",
" <td>93335.0</td>\n", " <td>95800.0</td>\n",
" <td>0.100629</td>\n", " <td>85166.063889</td>\n",
" <td>94303.907292</td>\n",
" <td>0.107294</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>336</th>\n", " <th>317</th>\n",
" <td>2025-04-24</td>\n", " <td>2025-04-26</td>\n",
" <td>2025-05-09</td>\n", " <td>2025-05-11</td>\n",
" <td>93730.0</td>\n", " <td>92877.0</td>\n",
" <td>103261.0</td>\n", " <td>104971.0</td>\n",
" <td>92867.5</td>\n", " <td>92872.0</td>\n",
" <td>103341.0</td>\n", " <td>104965.0</td>\n",
" <td>0.112779</td>\n", " <td>94500.950347</td>\n",
" <td>104182.167708</td>\n",
" <td>0.102446</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>337</th>\n", " <th>318</th>\n",
" <td>2025-05-10</td>\n", " <td>2025-05-12</td>\n",
" <td>2025-07-11</td>\n", " <td>2025-07-11</td>\n",
" <td>100990.0</td>\n", " <td>98384.0</td>\n",
" <td>117579.0</td>\n", " <td>118833.0</td>\n",
" <td>103915.0</td>\n", " <td>98382.0</td>\n",
" <td>117032.5</td>\n", " <td>118839.0</td>\n",
" <td>0.126233</td>\n", " <td>103569.791319</td>\n",
" <td>117463.666667</td>\n",
" <td>0.134150</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>338</th>\n", " <th>319</th>\n",
" <td>2025-07-12</td>\n", " <td>2025-07-12</td>\n",
" <td>2025-11-04</td>\n", " <td>2025-11-04</td>\n",
" <td>106470.0</td>\n", " <td>98944.0</td>\n",
" <td>124728.0</td>\n", " <td>126202.0</td>\n",
" <td>117599.0</td>\n", " <td>98943.0</td>\n",
" <td>103079.0</td>\n", " <td>126202.0</td>\n",
" <td>0.123470</td>\n", " <td>117640.026389</td>\n",
" <td>103712.985764</td>\n",
" <td>0.118387</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>339</th>\n", " <th>320</th>\n",
" <td>2025-11-05</td>\n", " <td>2025-11-05</td>\n",
" <td>2025-11-18</td>\n", " <td>2025-11-18</td>\n",
" <td>92112.0</td>\n", " <td>89291.0</td>\n",
" <td>105972.0</td>\n", " <td>107343.0</td>\n",
" <td>101737.5</td>\n", " <td>89286.0</td>\n",
" <td>91471.0</td>\n", " <td>107343.0</td>\n",
" <td>0.100912</td>\n", " <td>102514.621181</td>\n",
" <td>91705.833333</td>\n",
" <td>0.105437</td>\n",
" </tr>\n", " </tr>\n",
" </tbody>\n", " </tbody>\n",
"</table>\n", "</table>\n",
"</div>" "</div>"
], ],
"text/plain": [ "text/plain": [
" start_date end_date min_open max_close start_avg end_avg \\\n", " start_date end_date min_open max_open min_close max_close \\\n",
"335 2025-02-27 2025-04-23 76252.0 94273.0 84801.5 93335.0 \n", "316 2025-02-27 2025-04-25 74509.0 95801.0 74515.0 95800.0 \n",
"336 2025-04-24 2025-05-09 93730.0 103261.0 92867.5 103341.0 \n", "317 2025-04-26 2025-05-11 92877.0 104971.0 92872.0 104965.0 \n",
"337 2025-05-10 2025-07-11 100990.0 117579.0 103915.0 117032.5 \n", "318 2025-05-12 2025-07-11 98384.0 118833.0 98382.0 118839.0 \n",
"338 2025-07-12 2025-11-04 106470.0 124728.0 117599.0 103079.0 \n", "319 2025-07-12 2025-11-04 98944.0 126202.0 98943.0 126202.0 \n",
"339 2025-11-05 2025-11-18 92112.0 105972.0 101737.5 91471.0 \n", "320 2025-11-05 2025-11-18 89291.0 107343.0 89286.0 107343.0 \n",
"\n", "\n",
" change \n", " start_avg end_avg change \n",
"335 0.100629 \n", "316 85166.063889 94303.907292 0.107294 \n",
"336 0.112779 \n", "317 94500.950347 104182.167708 0.102446 \n",
"337 0.126233 \n", "318 103569.791319 117463.666667 0.134150 \n",
"338 0.123470 \n", "319 117640.026389 103712.985764 0.118387 \n",
"339 0.100912 " "320 102514.621181 91705.833333 0.105437 "
] ]
}, },
"execution_count": 25, "execution_count": 26,
"metadata": {}, "metadata": {},
"output_type": "execute_result" "output_type": "execute_result"
} }
@@ -455,8 +492,10 @@
" intervals.append({\n", " intervals.append({\n",
" \"start_date\": df_days.loc[start_idx, \"Timestamp\"],\n", " \"start_date\": df_days.loc[start_idx, \"Timestamp\"],\n",
" \"end_date\": df_days.loc[i, \"Timestamp\"],\n", " \"end_date\": df_days.loc[i, \"Timestamp\"],\n",
" \"min_open\": interval[\"Open\"].min(),\n", " \"min_open\": interval[\"OpenMin\"].min(),\n",
" \"max_close\": interval[\"Close\"].max(),\n", " \"max_open\": interval[\"OpenMax\"].max(),\n",
" \"min_close\": interval[\"CloseMin\"].min(),\n",
" \"max_close\": interval[\"CloseMax\"].max(),\n",
" \"start_avg\": price_base,\n", " \"start_avg\": price_base,\n",
" \"end_avg\": price_now,\n", " \"end_avg\": price_now,\n",
" \"change\": change,\n", " \"change\": change,\n",
@@ -470,6 +509,14 @@
"df_intervals = pd.DataFrame(intervals)\n", "df_intervals = pd.DataFrame(intervals)\n",
"df_intervals.tail()" "df_intervals.tail()"
] ]
},
{
"cell_type": "code",
"execution_count": null,
"id": "07f1cd58",
"metadata": {},
"outputs": [],
"source": []
} }
], ],
"metadata": { "metadata": {

View File

@@ -2,9 +2,23 @@
#SBATCH --job-name=btc #SBATCH --job-name=btc
#SBATCH --nodes=4 #SBATCH --nodes=4
#SBATCH --ntasks=4 #SBATCH --ntasks=4
#SBATCH --cpus-per-task=2
#SBATCH --output=out.txt #SBATCH --output=out.txt
# mpirun -np $SLURM_NTASKS ./build/bitcoin_app # Путь к файлу данных (должен существовать на всех узлах)
export DATA_PATH="/mnt/shared/supercomputers/data/data_10s.csv"
# Доли данных для каждого ранка (сумма определяет пропорции)
export DATA_READ_SHARES="10,11,13,14"
# Размер перекрытия в байтах для обработки границ строк
export READ_OVERLAP_BYTES=131072
# Интервал агрегации в секундах (60 = минуты, 600 = 10 минут, 86400 = дни)
export AGGREGATION_INTERVAL=60
# Использовать ли CUDA для агрегации (0 = нет, 1 = да)
export USE_CUDA=1
cd /mnt/shared/supercomputers/build cd /mnt/shared/supercomputers/build
mpirun -np $SLURM_NTASKS ./bitcoin_app mpirun -np $SLURM_NTASKS ./bitcoin_app

View File

@@ -1,87 +1,75 @@
#include "aggregation.hpp" #include "aggregation.hpp"
#include "utils.hpp"
#include <algorithm> #include <algorithm>
#include <cstdint>
#include <limits> #include <limits>
#include <cmath> #include <vector>
std::vector<DayStats> aggregate_days(const std::vector<Record>& records) { std::vector<PeriodStats> aggregate_periods(const std::vector<Record>& records) {
// Группируем записи по дням const int64_t interval = get_aggregation_interval();
std::map<DayIndex, std::vector<const Record*>> day_records;
std::vector<PeriodStats> result;
for (const auto& r : records) { if (records.empty()) return result;
DayIndex day = static_cast<DayIndex>(r.timestamp) / 86400;
day_records[day].push_back(&r); struct PeriodAccumulator {
} double avg_sum = 0.0;
double open_min = std::numeric_limits<double>::max();
std::vector<DayStats> result; double open_max = std::numeric_limits<double>::lowest();
result.reserve(day_records.size()); double close_min = std::numeric_limits<double>::max();
double close_max = std::numeric_limits<double>::lowest();
for (auto& [day, recs] : day_records) { int64_t count = 0;
// Сортируем по timestamp для определения first/last
std::sort(recs.begin(), recs.end(), void add(const Record& r) {
[](const Record* a, const Record* b) { const double avg = (r.low + r.high) / 2.0;
return a->timestamp < b->timestamp; avg_sum += avg;
}); open_min = std::min(open_min, r.open);
open_max = std::max(open_max, r.open);
DayStats stats; close_min = std::min(close_min, r.close);
stats.day = day; close_max = std::max(close_max, r.close);
stats.low = std::numeric_limits<double>::max(); ++count;
stats.high = std::numeric_limits<double>::lowest();
stats.open = recs.front()->open;
stats.close = recs.back()->close;
stats.first_ts = recs.front()->timestamp;
stats.last_ts = recs.back()->timestamp;
for (const auto* r : recs) {
stats.low = std::min(stats.low, r->low);
stats.high = std::max(stats.high, r->high);
} }
};
stats.avg = (stats.low + stats.high) / 2.0;
PeriodIndex current_period =
result.push_back(stats); static_cast<PeriodIndex>(records[0].timestamp) / interval;
PeriodAccumulator acc;
acc.add(records[0]);
for (size_t i = 1; i < records.size(); ++i) {
const Record& r = records[i];
const PeriodIndex period =
static_cast<PeriodIndex>(r.timestamp) / interval;
if (period != current_period) {
PeriodStats stats;
stats.period = current_period;
stats.avg = acc.avg_sum / static_cast<double>(acc.count);
stats.open_min = acc.open_min;
stats.open_max = acc.open_max;
stats.close_min = acc.close_min;
stats.close_max = acc.close_max;
stats.count = acc.count;
result.push_back(stats);
current_period = period;
acc = PeriodAccumulator{};
}
acc.add(r);
} }
// последний период
PeriodStats stats;
stats.period = current_period;
stats.avg = acc.avg_sum / static_cast<double>(acc.count);
stats.open_min = acc.open_min;
stats.open_max = acc.open_max;
stats.close_min = acc.close_min;
stats.close_max = acc.close_max;
stats.count = acc.count;
result.push_back(stats);
return result; return result;
} }
std::vector<DayStats> merge_day_stats(const std::vector<DayStats>& all_stats) {
// Объединяем статистику по одинаковым дням (если такие есть)
std::map<DayIndex, DayStats> merged;
for (const auto& s : all_stats) {
auto it = merged.find(s.day);
if (it == merged.end()) {
merged[s.day] = s;
} else {
// Объединяем данные за один день
auto& m = it->second;
m.low = std::min(m.low, s.low);
m.high = std::max(m.high, s.high);
// open берём от записи с меньшим timestamp
if (s.first_ts < m.first_ts) {
m.open = s.open;
m.first_ts = s.first_ts;
}
// close берём от записи с большим timestamp
if (s.last_ts > m.last_ts) {
m.close = s.close;
m.last_ts = s.last_ts;
}
m.avg = (m.low + m.high) / 2.0;
}
}
// Преобразуем в отсортированный вектор
std::vector<DayStats> result;
result.reserve(merged.size());
for (auto& [day, stats] : merged) {
result.push_back(stats);
}
return result;
}

View File

@@ -1,14 +1,8 @@
#pragma once #pragma once
#include "record.hpp" #include "record.hpp"
#include "day_stats.hpp" #include "period_stats.hpp"
#include <vector> #include <vector>
#include <map>
// Агрегация записей по дням на одном узле
std::vector<DayStats> aggregate_days(const std::vector<Record>& records);
// Объединение агрегированных данных с разных узлов
// (на случай если один день попал на разные узлы - но в нашей схеме это не должно случиться)
std::vector<DayStats> merge_day_stats(const std::vector<DayStats>& all_stats);
// Агрегация записей по периодам на одном узле
std::vector<PeriodStats> aggregate_periods(const std::vector<Record>& records);

View File

@@ -2,46 +2,134 @@
#include <fstream> #include <fstream>
#include <sstream> #include <sstream>
#include <iostream> #include <iostream>
#include <stdexcept>
std::vector<Record> load_csv(const std::string& filename) { bool parse_csv_line(const std::string& line, Record& record) {
if (line.empty()) {
return false;
}
std::stringstream ss(line);
std::string item;
try {
// timestamp
if (!std::getline(ss, item, ',') || item.empty()) return false;
record.timestamp = std::stod(item);
// open
if (!std::getline(ss, item, ',') || item.empty()) return false;
record.open = std::stod(item);
// high
if (!std::getline(ss, item, ',') || item.empty()) return false;
record.high = std::stod(item);
// low
if (!std::getline(ss, item, ',') || item.empty()) return false;
record.low = std::stod(item);
// close
if (!std::getline(ss, item, ',') || item.empty()) return false;
record.close = std::stod(item);
// volume
if (!std::getline(ss, item, ',')) return false;
// Volume может быть пустым или содержать данные
if (item.empty()) {
record.volume = 0.0;
} else {
record.volume = std::stod(item);
}
return true;
} catch (const std::exception&) {
return false;
}
}
std::vector<Record> load_csv_parallel(int rank, int size) {
std::vector<Record> data; std::vector<Record> data;
std::ifstream file(filename);
// Читаем настройки из переменных окружения
std::string data_path = get_data_path();
std::vector<int> shares = get_data_read_shares();
int64_t overlap_bytes = get_read_overlap_bytes();
// Получаем размер файла
int64_t file_size = get_file_size(data_path);
// Вычисляем диапазон байт для этого ранка
ByteRange range = calculate_byte_range(rank, size, file_size, shares, overlap_bytes);
// Открываем файл и читаем нужный диапазон
std::ifstream file(data_path, std::ios::binary);
if (!file.is_open()) { if (!file.is_open()) {
throw std::runtime_error("Cannot open file: " + filename); throw std::runtime_error("Cannot open file: " + data_path);
} }
std::string line; // Переходим к началу диапазона
file.seekg(range.start);
// читаем первую строку (заголовок)
std::getline(file, line); // Читаем данные в буфер
int64_t bytes_to_read = range.end - range.start;
while (std::getline(file, line)) { std::vector<char> buffer(bytes_to_read);
std::stringstream ss(line); file.read(buffer.data(), bytes_to_read);
std::string item; int64_t bytes_read = file.gcount();
Record row; file.close();
std::getline(ss, item, ','); // Преобразуем в строку для удобства парсинга
row.timestamp = std::stod(item); std::string content(buffer.data(), bytes_read);
std::getline(ss, item, ','); // Находим позицию начала первой полной строки
row.open = std::stod(item); size_t parse_start = 0;
if (rank == 0) {
std::getline(ss, item, ','); // Первый ранк: пропускаем заголовок (первую строку)
row.high = std::stod(item); size_t header_end = content.find('\n');
if (header_end != std::string::npos) {
std::getline(ss, item, ','); parse_start = header_end + 1;
row.low = std::stod(item); }
} else {
std::getline(ss, item, ','); // Остальные ранки: начинаем с первого \n (пропускаем неполную строку)
row.close = std::stod(item); size_t first_newline = content.find('\n');
if (first_newline != std::string::npos) {
std::getline(ss, item, ','); parse_start = first_newline + 1;
row.volume = std::stod(item); }
data.push_back(row);
} }
// Находим позицию конца последней полной строки
size_t parse_end = content.size();
if (rank != size - 1) {
// Не последний ранк: ищем последний \n
size_t last_newline = content.rfind('\n');
if (last_newline != std::string::npos && last_newline > parse_start) {
parse_end = last_newline;
}
}
// Парсим строки
size_t pos = parse_start;
while (pos < parse_end) {
size_t line_end = content.find('\n', pos);
if (line_end == std::string::npos || line_end > parse_end) {
line_end = parse_end;
}
std::string line = content.substr(pos, line_end - pos);
// Убираем \r если есть (Windows line endings)
if (!line.empty() && line.back() == '\r') {
line.pop_back();
}
Record record;
if (parse_csv_line(line, record)) {
data.push_back(record);
}
pos = line_end + 1;
}
return data; return data;
} }

View File

@@ -2,5 +2,14 @@
#include <string> #include <string>
#include <vector> #include <vector>
#include "record.hpp" #include "record.hpp"
#include "utils.hpp"
std::vector<Record> load_csv(const std::string& filename); // Параллельное чтение CSV файла для MPI
// rank - номер текущего ранка
// size - общее количество ранков
// Возвращает вектор записей, прочитанных этим ранком
std::vector<Record> load_csv_parallel(int rank, int size);
// Парсинг одной строки CSV в Record
// Возвращает true если парсинг успешен
bool parse_csv_line(const std::string& line, Record& record);

View File

@@ -1,28 +0,0 @@
#pragma once
#include <cstdint>
using DayIndex = long long;
// Агрегированные данные за один день
struct DayStats {
DayIndex day; // индекс дня (timestamp / 86400)
double low; // минимальный Low за день
double high; // максимальный High за день
double open; // первый Open за день
double close; // последний Close за день
double avg; // среднее = (low + high) / 2
double first_ts; // timestamp первой записи (для определения порядка open)
double last_ts; // timestamp последней записи (для определения close)
};
// Интервал с изменением >= 10%
struct Interval {
DayIndex start_day;
DayIndex end_day;
double min_open;
double max_close;
double start_avg;
double end_avg;
double change;
};

View File

@@ -1,116 +1,133 @@
#include "gpu_loader.hpp" #include "gpu_loader.hpp"
#include <dlfcn.h> #include <dlfcn.h>
#include <map> #include <iostream>
#include <algorithm> #include <cstdint>
// Структура результата GPU (должна совпадать с gpu_plugin.cu)
struct GpuPeriodStats {
int64_t period;
double avg;
double open_min;
double open_max;
double close_min;
double close_max;
int64_t count;
};
// Типы функций из GPU плагина
using gpu_is_available_fn = int (*)();
using gpu_aggregate_periods_fn = int (*)(
const double* h_timestamps,
const double* h_open,
const double* h_high,
const double* h_low,
const double* h_close,
int num_ticks,
int64_t interval,
GpuPeriodStats** h_out_stats,
int* out_num_periods
);
using gpu_free_results_fn = void (*)(GpuPeriodStats*);
static void* get_gpu_lib_handle() { static void* get_gpu_lib_handle() {
static void* h = dlopen("./libgpu_compute.so", RTLD_NOW | RTLD_LOCAL); static void* h = dlopen("./libgpu_compute.so", RTLD_NOW | RTLD_LOCAL);
return h; return h;
} }
gpu_is_available_fn load_gpu_is_available() { bool gpu_is_available() {
void* h = get_gpu_lib_handle(); void* h = get_gpu_lib_handle();
if (!h) return nullptr; if (!h) return false;
auto fn = (gpu_is_available_fn)dlsym(h, "gpu_is_available"); auto fn = reinterpret_cast<gpu_is_available_fn>(dlsym(h, "gpu_is_available"));
return fn; if (!fn) return false;
return fn() != 0;
} }
gpu_aggregate_days_fn load_gpu_aggregate_days() { bool aggregate_periods_gpu(
void* h = get_gpu_lib_handle();
if (!h) return nullptr;
auto fn = (gpu_aggregate_days_fn)dlsym(h, "gpu_aggregate_days");
return fn;
}
bool aggregate_days_gpu(
const std::vector<Record>& records, const std::vector<Record>& records,
std::vector<DayStats>& out_stats, int64_t aggregation_interval,
gpu_aggregate_days_fn gpu_fn) std::vector<PeriodStats>& out_stats)
{ {
if (!gpu_fn || records.empty()) { if (records.empty()) {
out_stats.clear();
return true;
}
void* h = get_gpu_lib_handle();
if (!h) {
std::cerr << "GPU: Failed to load libgpu_compute.so" << std::endl;
return false; return false;
} }
// Группируем записи по дням и подготавливаем данные для GPU
std::map<DayIndex, std::vector<size_t>> day_record_indices;
for (size_t i = 0; i < records.size(); i++) { auto aggregate_fn = reinterpret_cast<gpu_aggregate_periods_fn>(
DayIndex day = static_cast<DayIndex>(records[i].timestamp) / 86400; dlsym(h, "gpu_aggregate_periods"));
day_record_indices[day].push_back(i); auto free_fn = reinterpret_cast<gpu_free_results_fn>(
} dlsym(h, "gpu_free_results"));
int num_days = static_cast<int>(day_record_indices.size());
// Подготавливаем массивы для GPU if (!aggregate_fn || !free_fn) {
std::vector<GpuRecord> gpu_records; std::cerr << "GPU: Failed to load functions from plugin" << std::endl;
std::vector<int> day_offsets; return false;
std::vector<int> day_counts;
std::vector<long long> day_indices;
gpu_records.reserve(records.size());
day_offsets.reserve(num_days);
day_counts.reserve(num_days);
day_indices.reserve(num_days);
int current_offset = 0;
for (auto& [day, indices] : day_record_indices) {
day_indices.push_back(day);
day_offsets.push_back(current_offset);
day_counts.push_back(static_cast<int>(indices.size()));
// Добавляем записи этого дня
for (size_t idx : indices) {
const auto& r = records[idx];
GpuRecord gr;
gr.timestamp = r.timestamp;
gr.open = r.open;
gr.high = r.high;
gr.low = r.low;
gr.close = r.close;
gr.volume = r.volume;
gpu_records.push_back(gr);
}
current_offset += static_cast<int>(indices.size());
} }
// Выделяем память для результата int num_ticks = static_cast<int>(records.size());
std::vector<GpuDayStats> gpu_stats(num_days);
// Конвертируем AoS в SoA
std::vector<double> timestamps(num_ticks);
std::vector<double> open(num_ticks);
std::vector<double> high(num_ticks);
std::vector<double> low(num_ticks);
std::vector<double> close(num_ticks);
for (int i = 0; i < num_ticks; i++) {
timestamps[i] = records[i].timestamp;
open[i] = records[i].open;
high[i] = records[i].high;
low[i] = records[i].low;
close[i] = records[i].close;
}
// Вызываем GPU функцию // Вызываем GPU функцию
int result = gpu_fn( GpuPeriodStats* gpu_stats = nullptr;
gpu_records.data(), int num_periods = 0;
static_cast<int>(gpu_records.size()),
day_offsets.data(), int result = aggregate_fn(
day_counts.data(), timestamps.data(),
day_indices.data(), open.data(),
num_days, high.data(),
gpu_stats.data() low.data(),
close.data(),
num_ticks,
aggregation_interval,
&gpu_stats,
&num_periods
); );
if (result != 0) { if (result != 0) {
std::cerr << "GPU: Aggregation failed with code " << result << std::endl;
return false; return false;
} }
// Конвертируем результат в DayStats // Конвертируем результат в PeriodStats
out_stats.clear(); out_stats.clear();
out_stats.reserve(num_days); out_stats.reserve(num_periods);
for (const auto& gs : gpu_stats) { for (int i = 0; i < num_periods; i++) {
DayStats ds; PeriodStats ps;
ds.day = gs.day; ps.period = gpu_stats[i].period;
ds.low = gs.low; ps.avg = gpu_stats[i].avg;
ds.high = gs.high; ps.open_min = gpu_stats[i].open_min;
ds.open = gs.open; ps.open_max = gpu_stats[i].open_max;
ds.close = gs.close; ps.close_min = gpu_stats[i].close_min;
ds.avg = gs.avg; ps.close_max = gpu_stats[i].close_max;
ds.first_ts = gs.first_ts; ps.count = gpu_stats[i].count;
ds.last_ts = gs.last_ts; out_stats.push_back(ps);
out_stats.push_back(ds);
} }
// Освобождаем память
free_fn(gpu_stats);
return true; return true;
} }

View File

@@ -1,49 +1,15 @@
#pragma once #pragma once
#include "day_stats.hpp" #include "period_stats.hpp"
#include "record.hpp" #include "record.hpp"
#include <vector> #include <vector>
// Типы функций из GPU плагина // Проверка доступности CUDA
using gpu_is_available_fn = int (*)(); bool gpu_is_available();
// Структуры для GPU (должны совпадать с gpu_plugin.cu) // Агрегация периодов на GPU
struct GpuRecord { // Возвращает true если успешно, false если GPU недоступен или ошибка
double timestamp; bool aggregate_periods_gpu(
double open;
double high;
double low;
double close;
double volume;
};
struct GpuDayStats {
long long day;
double low;
double high;
double open;
double close;
double avg;
double first_ts;
double last_ts;
};
using gpu_aggregate_days_fn = int (*)(
const GpuRecord* h_records,
int num_records,
const int* h_day_offsets,
const int* h_day_counts,
const long long* h_day_indices,
int num_days,
GpuDayStats* h_out_stats
);
// Загрузка функций из плагина
gpu_is_available_fn load_gpu_is_available();
gpu_aggregate_days_fn load_gpu_aggregate_days();
// Обёртка для агрегации на GPU (возвращает true если успешно)
bool aggregate_days_gpu(
const std::vector<Record>& records, const std::vector<Record>& records,
std::vector<DayStats>& out_stats, int64_t aggregation_interval,
gpu_aggregate_days_fn gpu_fn std::vector<PeriodStats>& out_stats
); );

View File

@@ -1,167 +1,430 @@
#include <cuda_runtime.h> #include <cuda_runtime.h>
#include <cub/cub.cuh>
#include <cstdint> #include <cstdint>
#include <cfloat> #include <cfloat>
#include <cstdio>
#include <ctime>
#include <string>
#include <sstream>
#include <iomanip>
// Структуры данных (должны совпадать с C++ кодом) // ============================================================================
struct GpuRecord { // Структуры данных
double timestamp; // ============================================================================
double open;
double high; // SoA (Structure of Arrays) для входных данных на GPU
double low; struct GpuTicksSoA {
double close; double* timestamp;
double volume; double* open;
double* high;
double* low;
double* close;
int n;
}; };
struct GpuDayStats { // Результат агрегации одного периода
long long day; struct GpuPeriodStats {
double low; int64_t period;
double high;
double open;
double close;
double avg; double avg;
double first_ts; double open_min;
double last_ts; double open_max;
double close_min;
double close_max;
int64_t count;
}; };
// ============================================================================
// Вспомогательные функции
// ============================================================================
static double get_time_ms() {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return ts.tv_sec * 1000.0 + ts.tv_nsec / 1000000.0;
}
#define CUDA_CHECK(call) do { \
cudaError_t err = call; \
if (err != cudaSuccess) { \
printf("CUDA error at %s:%d: %s\n", __FILE__, __LINE__, cudaGetErrorString(err)); \
return -1; \
} \
} while(0)
// ============================================================================
// Kernel: вычисление period_id для каждого тика
// ============================================================================
__global__ void compute_period_ids_kernel(
const double* __restrict__ timestamps,
int64_t* __restrict__ period_ids,
int n,
int64_t interval)
{
int idx = blockIdx.x * blockDim.x + threadIdx.x;
if (idx < n) {
period_ids[idx] = static_cast<int64_t>(timestamps[idx]) / interval;
}
}
// ============================================================================
// Kernel: агрегация одного периода (один блок на период)
// ============================================================================
__global__ void aggregate_periods_kernel(
const double* __restrict__ open,
const double* __restrict__ high,
const double* __restrict__ low,
const double* __restrict__ close,
const int64_t* __restrict__ unique_periods,
const int* __restrict__ offsets,
const int* __restrict__ counts,
int num_periods,
GpuPeriodStats* __restrict__ out_stats)
{
int period_idx = blockIdx.x;
if (period_idx >= num_periods) return;
int offset = offsets[period_idx];
int count = counts[period_idx];
// Используем shared memory для редукции внутри блока
__shared__ double s_avg_sum;
__shared__ double s_open_min;
__shared__ double s_open_max;
__shared__ double s_close_min;
__shared__ double s_close_max;
// Инициализация shared memory первым потоком
if (threadIdx.x == 0) {
s_avg_sum = 0.0;
s_open_min = DBL_MAX;
s_open_max = -DBL_MAX;
s_close_min = DBL_MAX;
s_close_max = -DBL_MAX;
}
__syncthreads();
// Локальные аккумуляторы для каждого потока
double local_avg_sum = 0.0;
double local_open_min = DBL_MAX;
double local_open_max = -DBL_MAX;
double local_close_min = DBL_MAX;
double local_close_max = -DBL_MAX;
// Каждый поток обрабатывает свою часть тиков
for (int i = threadIdx.x; i < count; i += blockDim.x) {
int tick_idx = offset + i;
double avg = (low[tick_idx] + high[tick_idx]) / 2.0;
local_avg_sum += avg;
local_open_min = min(local_open_min, open[tick_idx]);
local_open_max = max(local_open_max, open[tick_idx]);
local_close_min = min(local_close_min, close[tick_idx]);
local_close_max = max(local_close_max, close[tick_idx]);
}
// Редукция с использованием атомарных операций
atomicAdd(&s_avg_sum, local_avg_sum);
atomicMin(reinterpret_cast<unsigned long long*>(&s_open_min),
__double_as_longlong(local_open_min));
atomicMax(reinterpret_cast<unsigned long long*>(&s_open_max),
__double_as_longlong(local_open_max));
atomicMin(reinterpret_cast<unsigned long long*>(&s_close_min),
__double_as_longlong(local_close_min));
atomicMax(reinterpret_cast<unsigned long long*>(&s_close_max),
__double_as_longlong(local_close_max));
__syncthreads();
// Первый поток записывает результат
if (threadIdx.x == 0) {
GpuPeriodStats stats;
stats.period = unique_periods[period_idx];
stats.avg = s_avg_sum / static_cast<double>(count);
stats.open_min = s_open_min;
stats.open_max = s_open_max;
stats.close_min = s_close_min;
stats.close_max = s_close_max;
stats.count = count;
out_stats[period_idx] = stats;
}
}
// ============================================================================
// Простой kernel для агрегации (один поток на период)
// Используется когда периодов много и тиков в каждом мало
// ============================================================================
__global__ void aggregate_periods_simple_kernel(
const double* __restrict__ open,
const double* __restrict__ high,
const double* __restrict__ low,
const double* __restrict__ close,
const int64_t* __restrict__ unique_periods,
const int* __restrict__ offsets,
const int* __restrict__ counts,
int num_periods,
GpuPeriodStats* __restrict__ out_stats)
{
int period_idx = blockIdx.x * blockDim.x + threadIdx.x;
if (period_idx >= num_periods) return;
int offset = offsets[period_idx];
int count = counts[period_idx];
double avg_sum = 0.0;
double open_min = DBL_MAX;
double open_max = -DBL_MAX;
double close_min = DBL_MAX;
double close_max = -DBL_MAX;
for (int i = 0; i < count; i++) {
int tick_idx = offset + i;
double avg = (low[tick_idx] + high[tick_idx]) / 2.0;
avg_sum += avg;
open_min = min(open_min, open[tick_idx]);
open_max = max(open_max, open[tick_idx]);
close_min = min(close_min, close[tick_idx]);
close_max = max(close_max, close[tick_idx]);
}
GpuPeriodStats stats;
stats.period = unique_periods[period_idx];
stats.avg = avg_sum / static_cast<double>(count);
stats.open_min = open_min;
stats.open_max = open_max;
stats.close_min = close_min;
stats.close_max = close_max;
stats.count = count;
out_stats[period_idx] = stats;
}
// ============================================================================
// Проверка доступности GPU
// ============================================================================
extern "C" int gpu_is_available() { extern "C" int gpu_is_available() {
int n = 0; int n = 0;
cudaError_t err = cudaGetDeviceCount(&n); cudaError_t err = cudaGetDeviceCount(&n);
if (err != cudaSuccess) return 0; if (err != cudaSuccess) return 0;
if (n > 0) {
cudaFree(0); // Форсируем инициализацию контекста
}
return (n > 0) ? 1 : 0; return (n > 0) ? 1 : 0;
} }
// Kernel для агрегации (один поток обрабатывает все данные) // ============================================================================
__global__ void aggregate_kernel( // Главная функция агрегации на GPU
const GpuRecord* records, // ============================================================================
int num_records,
const int* day_offsets, // начало каждого дня в массиве records
const int* day_counts, // количество записей в каждом дне
const long long* day_indices, // индексы дней
int num_days,
GpuDayStats* out_stats)
{
// Один поток обрабатывает все дни последовательно
for (int d = 0; d < num_days; d++) {
int offset = day_offsets[d];
int count = day_counts[d];
GpuDayStats stats;
stats.day = day_indices[d];
stats.low = DBL_MAX;
stats.high = -DBL_MAX;
stats.first_ts = DBL_MAX;
stats.last_ts = -DBL_MAX;
stats.open = 0;
stats.close = 0;
for (int i = 0; i < count; i++) {
const GpuRecord& r = records[offset + i];
// min/max
if (r.low < stats.low) stats.low = r.low;
if (r.high > stats.high) stats.high = r.high;
// first/last по timestamp
if (r.timestamp < stats.first_ts) {
stats.first_ts = r.timestamp;
stats.open = r.open;
}
if (r.timestamp > stats.last_ts) {
stats.last_ts = r.timestamp;
stats.close = r.close;
}
}
stats.avg = (stats.low + stats.high) / 2.0;
out_stats[d] = stats;
}
}
// Функция агрегации, вызываемая из C++ extern "C" int gpu_aggregate_periods(
extern "C" int gpu_aggregate_days( const double* h_timestamps,
const GpuRecord* h_records, const double* h_open,
int num_records, const double* h_high,
const int* h_day_offsets, const double* h_low,
const int* h_day_counts, const double* h_close,
const long long* h_day_indices, int num_ticks,
int num_days, int64_t interval,
GpuDayStats* h_out_stats) GpuPeriodStats** h_out_stats,
int* out_num_periods)
{ {
// Выделяем память на GPU if (num_ticks == 0) {
GpuRecord* d_records = nullptr; *h_out_stats = nullptr;
int* d_day_offsets = nullptr; *out_num_periods = 0;
int* d_day_counts = nullptr; return 0;
long long* d_day_indices = nullptr;
GpuDayStats* d_out_stats = nullptr;
cudaError_t err;
err = cudaMalloc(&d_records, num_records * sizeof(GpuRecord));
if (err != cudaSuccess) return -1;
err = cudaMalloc(&d_day_offsets, num_days * sizeof(int));
if (err != cudaSuccess) { cudaFree(d_records); return -2; }
err = cudaMalloc(&d_day_counts, num_days * sizeof(int));
if (err != cudaSuccess) { cudaFree(d_records); cudaFree(d_day_offsets); return -3; }
err = cudaMalloc(&d_day_indices, num_days * sizeof(long long));
if (err != cudaSuccess) { cudaFree(d_records); cudaFree(d_day_offsets); cudaFree(d_day_counts); return -4; }
err = cudaMalloc(&d_out_stats, num_days * sizeof(GpuDayStats));
if (err != cudaSuccess) { cudaFree(d_records); cudaFree(d_day_offsets); cudaFree(d_day_counts); cudaFree(d_day_indices); return -5; }
// Копируем данные на GPU
err = cudaMemcpy(d_records, h_records, num_records * sizeof(GpuRecord), cudaMemcpyHostToDevice);
if (err != cudaSuccess) return -10;
err = cudaMemcpy(d_day_offsets, h_day_offsets, num_days * sizeof(int), cudaMemcpyHostToDevice);
if (err != cudaSuccess) return -11;
err = cudaMemcpy(d_day_counts, h_day_counts, num_days * sizeof(int), cudaMemcpyHostToDevice);
if (err != cudaSuccess) return -12;
err = cudaMemcpy(d_day_indices, h_day_indices, num_days * sizeof(long long), cudaMemcpyHostToDevice);
if (err != cudaSuccess) return -13;
// Запускаем kernel (1 блок, 1 поток)
aggregate_kernel<<<1, 1>>>(
d_records, num_records,
d_day_offsets, d_day_counts, d_day_indices,
num_days, d_out_stats
);
// Проверяем ошибку запуска kernel
err = cudaGetLastError();
if (err != cudaSuccess) {
cudaFree(d_records);
cudaFree(d_day_offsets);
cudaFree(d_day_counts);
cudaFree(d_day_indices);
cudaFree(d_out_stats);
return -7;
} }
// Ждём завершения std::ostringstream output;
err = cudaDeviceSynchronize(); double total_start = get_time_ms();
if (err != cudaSuccess) {
cudaFree(d_records);
cudaFree(d_day_offsets);
cudaFree(d_day_counts);
cudaFree(d_day_indices);
cudaFree(d_out_stats);
return -6;
}
// Копируем результат обратно // ========================================================================
cudaMemcpy(h_out_stats, d_out_stats, num_days * sizeof(GpuDayStats), cudaMemcpyDeviceToHost); // Шаг 1: Выделение памяти и копирование данных на GPU
// ========================================================================
double step1_start = get_time_ms();
// Освобождаем память double* d_timestamps = nullptr;
cudaFree(d_records); double* d_open = nullptr;
cudaFree(d_day_offsets); double* d_high = nullptr;
cudaFree(d_day_counts); double* d_low = nullptr;
cudaFree(d_day_indices); double* d_close = nullptr;
int64_t* d_period_ids = nullptr;
size_t ticks_bytes = num_ticks * sizeof(double);
CUDA_CHECK(cudaMalloc(&d_timestamps, ticks_bytes));
CUDA_CHECK(cudaMalloc(&d_open, ticks_bytes));
CUDA_CHECK(cudaMalloc(&d_high, ticks_bytes));
CUDA_CHECK(cudaMalloc(&d_low, ticks_bytes));
CUDA_CHECK(cudaMalloc(&d_close, ticks_bytes));
CUDA_CHECK(cudaMalloc(&d_period_ids, num_ticks * sizeof(int64_t)));
CUDA_CHECK(cudaMemcpy(d_timestamps, h_timestamps, ticks_bytes, cudaMemcpyHostToDevice));
CUDA_CHECK(cudaMemcpy(d_open, h_open, ticks_bytes, cudaMemcpyHostToDevice));
CUDA_CHECK(cudaMemcpy(d_high, h_high, ticks_bytes, cudaMemcpyHostToDevice));
CUDA_CHECK(cudaMemcpy(d_low, h_low, ticks_bytes, cudaMemcpyHostToDevice));
CUDA_CHECK(cudaMemcpy(d_close, h_close, ticks_bytes, cudaMemcpyHostToDevice));
double step1_ms = get_time_ms() - step1_start;
// ========================================================================
// Шаг 2: Вычисление period_id для каждого тика
// ========================================================================
double step2_start = get_time_ms();
const int BLOCK_SIZE = 256;
int num_blocks = (num_ticks + BLOCK_SIZE - 1) / BLOCK_SIZE;
compute_period_ids_kernel<<<num_blocks, BLOCK_SIZE>>>(
d_timestamps, d_period_ids, num_ticks, interval);
CUDA_CHECK(cudaGetLastError());
CUDA_CHECK(cudaDeviceSynchronize());
double step2_ms = get_time_ms() - step2_start;
// ========================================================================
// Шаг 3: RLE (Run-Length Encode) для нахождения уникальных периодов
// ========================================================================
double step3_start = get_time_ms();
int64_t* d_unique_periods = nullptr;
int* d_counts = nullptr;
int* d_num_runs = nullptr;
CUDA_CHECK(cudaMalloc(&d_unique_periods, num_ticks * sizeof(int64_t)));
CUDA_CHECK(cudaMalloc(&d_counts, num_ticks * sizeof(int)));
CUDA_CHECK(cudaMalloc(&d_num_runs, sizeof(int)));
// Определяем размер временного буфера для CUB
void* d_temp_storage = nullptr;
size_t temp_storage_bytes = 0;
cub::DeviceRunLengthEncode::Encode(
d_temp_storage, temp_storage_bytes,
d_period_ids, d_unique_periods, d_counts, d_num_runs,
num_ticks);
CUDA_CHECK(cudaMalloc(&d_temp_storage, temp_storage_bytes));
cub::DeviceRunLengthEncode::Encode(
d_temp_storage, temp_storage_bytes,
d_period_ids, d_unique_periods, d_counts, d_num_runs,
num_ticks);
CUDA_CHECK(cudaGetLastError());
// Копируем количество уникальных периодов
int num_periods = 0;
CUDA_CHECK(cudaMemcpy(&num_periods, d_num_runs, sizeof(int), cudaMemcpyDeviceToHost));
cudaFree(d_temp_storage);
d_temp_storage = nullptr;
double step3_ms = get_time_ms() - step3_start;
// ========================================================================
// Шаг 4: Exclusive Scan для вычисления offsets
// ========================================================================
double step4_start = get_time_ms();
int* d_offsets = nullptr;
CUDA_CHECK(cudaMalloc(&d_offsets, num_periods * sizeof(int)));
temp_storage_bytes = 0;
cub::DeviceScan::ExclusiveSum(
d_temp_storage, temp_storage_bytes,
d_counts, d_offsets, num_periods);
CUDA_CHECK(cudaMalloc(&d_temp_storage, temp_storage_bytes));
cub::DeviceScan::ExclusiveSum(
d_temp_storage, temp_storage_bytes,
d_counts, d_offsets, num_periods);
CUDA_CHECK(cudaGetLastError());
cudaFree(d_temp_storage);
double step4_ms = get_time_ms() - step4_start;
// ========================================================================
// Шаг 5: Агрегация периодов
// ========================================================================
double step5_start = get_time_ms();
GpuPeriodStats* d_out_stats = nullptr;
CUDA_CHECK(cudaMalloc(&d_out_stats, num_periods * sizeof(GpuPeriodStats)));
// Используем простой kernel (один поток на период)
// т.к. обычно тиков в периоде немного
int agg_blocks = (num_periods + BLOCK_SIZE - 1) / BLOCK_SIZE;
aggregate_periods_simple_kernel<<<agg_blocks, BLOCK_SIZE>>>(
d_open, d_high, d_low, d_close,
d_unique_periods, d_offsets, d_counts,
num_periods, d_out_stats);
CUDA_CHECK(cudaGetLastError());
CUDA_CHECK(cudaDeviceSynchronize());
double step5_ms = get_time_ms() - step5_start;
// ========================================================================
// Шаг 6: Копирование результатов на CPU
// ========================================================================
double step6_start = get_time_ms();
GpuPeriodStats* h_stats = new GpuPeriodStats[num_periods];
CUDA_CHECK(cudaMemcpy(h_stats, d_out_stats, num_periods * sizeof(GpuPeriodStats),
cudaMemcpyDeviceToHost));
double step6_ms = get_time_ms() - step6_start;
// ========================================================================
// Шаг 7: Освобождение GPU памяти
// ========================================================================
double step7_start = get_time_ms();
cudaFree(d_timestamps);
cudaFree(d_open);
cudaFree(d_high);
cudaFree(d_low);
cudaFree(d_close);
cudaFree(d_period_ids);
cudaFree(d_unique_periods);
cudaFree(d_counts);
cudaFree(d_offsets);
cudaFree(d_num_runs);
cudaFree(d_out_stats); cudaFree(d_out_stats);
double step7_ms = get_time_ms() - step7_start;
// ========================================================================
// Итого
// ========================================================================
double total_ms = get_time_ms() - total_start;
// Формируем весь вывод одной строкой
output << " GPU aggregation (" << num_ticks << " ticks, interval=" << interval << " sec):\n";
output << " 1. Malloc + H->D copy: " << std::fixed << std::setprecision(3) << std::setw(7) << step1_ms << " ms\n";
output << " 2. Compute period_ids: " << std::setw(7) << step2_ms << " ms\n";
output << " 3. RLE (CUB): " << std::setw(7) << step3_ms << " ms (" << num_periods << " periods)\n";
output << " 4. Exclusive scan: " << std::setw(7) << step4_ms << " ms\n";
output << " 5. Aggregation kernel: " << std::setw(7) << step5_ms << " ms\n";
output << " 6. D->H copy: " << std::setw(7) << step6_ms << " ms\n";
output << " 7. Free GPU memory: " << std::setw(7) << step7_ms << " ms\n";
output << " GPU TOTAL: " << std::setw(7) << total_ms << " ms\n";
// Выводим всё одним принтом
printf("%s", output.str().c_str());
fflush(stdout);
*h_out_stats = h_stats;
*out_num_periods = num_periods;
return 0; return 0;
} }
// ============================================================================
// Освобождение памяти результатов
// ============================================================================
extern "C" void gpu_free_results(GpuPeriodStats* stats) {
delete[] stats;
}

View File

@@ -1,65 +1,301 @@
#include "intervals.hpp" #include "intervals.hpp"
#include "utils.hpp"
#include <mpi.h>
#include <algorithm> #include <algorithm>
#include <cmath> #include <cmath>
#include <fstream> #include <fstream>
#include <iomanip> #include <iomanip>
#include <sstream> #include <sstream>
#include <ctime> #include <ctime>
#include <limits>
std::vector<Interval> find_intervals(const std::vector<DayStats>& days, double threshold) { // Вспомогательная структура для накопления min/max в интервале
if (days.empty()) { struct IntervalAccumulator {
return {}; PeriodIndex start_period;
double start_avg;
double open_min;
double open_max;
double close_min;
double close_max;
void init(const PeriodStats& p) {
start_period = p.period;
start_avg = p.avg;
open_min = p.open_min;
open_max = p.open_max;
close_min = p.close_min;
close_max = p.close_max;
} }
std::vector<Interval> intervals; void update(const PeriodStats& p) {
open_min = std::min(open_min, p.open_min);
open_max = std::max(open_max, p.open_max);
close_min = std::min(close_min, p.close_min);
close_max = std::max(close_max, p.close_max);
}
Interval finalize(const PeriodStats& end_period, double change) const {
Interval iv;
iv.start_period = start_period;
iv.end_period = end_period.period;
iv.start_avg = start_avg;
iv.end_avg = end_period.avg;
iv.change = change;
iv.open_min = std::min(open_min, end_period.open_min);
iv.open_max = std::max(open_max, end_period.open_max);
iv.close_min = std::min(close_min, end_period.close_min);
iv.close_max = std::max(close_max, end_period.close_max);
return iv;
}
};
// Упакованная структура PeriodStats для MPI передачи (8 doubles)
struct PackedPeriodStats {
double period; // PeriodIndex as double
double avg;
double open_min;
double open_max;
double close_min;
double close_max;
double count; // int64_t as double
double valid; // флаг валидности (1.0 = valid, 0.0 = invalid)
void pack(const PeriodStats& ps) {
period = static_cast<double>(ps.period);
avg = ps.avg;
open_min = ps.open_min;
open_max = ps.open_max;
close_min = ps.close_min;
close_max = ps.close_max;
count = static_cast<double>(ps.count);
valid = 1.0;
}
PeriodStats unpack() const {
PeriodStats ps;
ps.period = static_cast<PeriodIndex>(period);
ps.avg = avg;
ps.open_min = open_min;
ps.open_max = open_max;
ps.close_min = close_min;
ps.close_max = close_max;
ps.count = static_cast<int64_t>(count);
return ps;
}
bool is_valid() const { return valid > 0.5; }
void set_invalid() { valid = 0.0; }
};
IntervalResult find_intervals_parallel(
const std::vector<PeriodStats>& periods,
int rank, int size,
double threshold)
{
IntervalResult result;
result.compute_time = 0.0;
result.wait_time = 0.0;
if (periods.empty()) {
if (rank < size - 1) {
PackedPeriodStats invalid;
invalid.set_invalid();
MPI_Send(&invalid, 8, MPI_DOUBLE, rank + 1, 0, MPI_COMM_WORLD);
}
return result;
}
double compute_start = MPI_Wtime();
size_t process_until = (rank == size - 1) ? periods.size() : periods.size() - 1;
IntervalAccumulator acc;
size_t start_idx = 0; size_t start_idx = 0;
double price_base = days[start_idx].avg; bool have_pending_interval = false;
for (size_t i = 1; i < days.size(); i++) { if (rank > 0) {
double price_now = days[i].avg; double wait_start = MPI_Wtime();
double change = std::abs(price_now - price_base) / price_base;
if (change >= threshold) { PackedPeriodStats received;
Interval interval; MPI_Recv(&received, 8, MPI_DOUBLE, rank - 1, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
interval.start_day = days[start_idx].day;
interval.end_day = days[i].day; result.wait_time = MPI_Wtime() - wait_start;
interval.start_avg = price_base; compute_start = MPI_Wtime();
interval.end_avg = price_now;
interval.change = change; if (received.is_valid()) {
PeriodStats prev_period = received.unpack();
// Находим min(Open) и max(Close) в интервале for (start_idx = 0; start_idx < periods.size(); start_idx++) {
interval.min_open = days[start_idx].open; if (periods[start_idx].period > prev_period.period) {
interval.max_close = days[start_idx].close; break;
}
for (size_t j = start_idx; j <= i; j++) {
interval.min_open = std::min(interval.min_open, days[j].open);
interval.max_close = std::max(interval.max_close, days[j].close);
} }
intervals.push_back(interval); if (start_idx < process_until) {
acc.init(prev_period);
// Начинаем новый интервал have_pending_interval = true;
start_idx = i + 1;
if (start_idx >= days.size()) { for (size_t i = start_idx; i < process_until; i++) {
break; acc.update(periods[i]);
double change = std::abs(periods[i].avg - acc.start_avg) / acc.start_avg;
if (change >= threshold) {
result.intervals.push_back(acc.finalize(periods[i], change));
have_pending_interval = false;
start_idx = i + 1;
if (start_idx < process_until) {
acc.init(periods[start_idx]);
have_pending_interval = true;
}
}
}
} }
price_base = days[start_idx].avg; } else {
if (process_until > 0) {
acc.init(periods[0]);
have_pending_interval = true;
start_idx = 0;
}
}
} else {
if (process_until > 0) {
acc.init(periods[0]);
have_pending_interval = true;
start_idx = 0;
} }
} }
return intervals; if (rank == 0 && have_pending_interval) {
for (size_t i = 1; i < process_until; i++) {
acc.update(periods[i]);
double change = std::abs(periods[i].avg - acc.start_avg) / acc.start_avg;
if (change >= threshold) {
result.intervals.push_back(acc.finalize(periods[i], change));
have_pending_interval = false;
start_idx = i + 1;
if (start_idx < process_until) {
acc.init(periods[start_idx]);
have_pending_interval = true;
}
}
}
}
if (rank == size - 1 && have_pending_interval && !periods.empty()) {
const auto& last_period = periods.back();
double change = std::abs(last_period.avg - acc.start_avg) / acc.start_avg;
result.intervals.push_back(acc.finalize(last_period, change));
}
result.compute_time = MPI_Wtime() - compute_start;
if (rank < size - 1) {
PackedPeriodStats to_send;
if (have_pending_interval) {
PeriodStats start_period;
start_period.period = acc.start_period;
start_period.avg = acc.start_avg;
start_period.open_min = acc.open_min;
start_period.open_max = acc.open_max;
start_period.close_min = acc.close_min;
start_period.close_max = acc.close_max;
start_period.count = 0;
to_send.pack(start_period);
} else if (periods.size() >= 2) {
to_send.pack(periods[periods.size() - 2]);
} else {
to_send.set_invalid();
}
MPI_Send(&to_send, 8, MPI_DOUBLE, rank + 1, 0, MPI_COMM_WORLD);
}
return result;
} }
std::string day_index_to_date(DayIndex day) { double collect_intervals(
time_t ts = static_cast<time_t>(day) * 86400; std::vector<Interval>& local_intervals,
int rank, int size)
{
double wait_time = 0.0;
if (rank == 0) {
for (int r = 1; r < size; r++) {
double wait_start = MPI_Wtime();
int count;
MPI_Recv(&count, 1, MPI_INT, r, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
if (count > 0) {
std::vector<double> buffer(count * 9);
MPI_Recv(buffer.data(), count * 9, MPI_DOUBLE, r, 2, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
for (int i = 0; i < count; i++) {
Interval iv;
iv.start_period = static_cast<PeriodIndex>(buffer[i * 9 + 0]);
iv.end_period = static_cast<PeriodIndex>(buffer[i * 9 + 1]);
iv.open_min = buffer[i * 9 + 2];
iv.open_max = buffer[i * 9 + 3];
iv.close_min = buffer[i * 9 + 4];
iv.close_max = buffer[i * 9 + 5];
iv.start_avg = buffer[i * 9 + 6];
iv.end_avg = buffer[i * 9 + 7];
iv.change = buffer[i * 9 + 8];
local_intervals.push_back(iv);
}
}
wait_time += MPI_Wtime() - wait_start;
}
std::sort(local_intervals.begin(), local_intervals.end(),
[](const Interval& a, const Interval& b) {
return a.start_period < b.start_period;
});
} else {
int count = static_cast<int>(local_intervals.size());
MPI_Send(&count, 1, MPI_INT, 0, 1, MPI_COMM_WORLD);
if (count > 0) {
std::vector<double> buffer(count * 9);
for (int i = 0; i < count; i++) {
const auto& iv = local_intervals[i];
buffer[i * 9 + 0] = static_cast<double>(iv.start_period);
buffer[i * 9 + 1] = static_cast<double>(iv.end_period);
buffer[i * 9 + 2] = iv.open_min;
buffer[i * 9 + 3] = iv.open_max;
buffer[i * 9 + 4] = iv.close_min;
buffer[i * 9 + 5] = iv.close_max;
buffer[i * 9 + 6] = iv.start_avg;
buffer[i * 9 + 7] = iv.end_avg;
buffer[i * 9 + 8] = iv.change;
}
MPI_Send(buffer.data(), count * 9, MPI_DOUBLE, 0, 2, MPI_COMM_WORLD);
}
}
return wait_time;
}
std::string period_index_to_datetime(PeriodIndex period) {
int64_t interval = get_aggregation_interval();
time_t ts = static_cast<time_t>(period) * interval;
struct tm* tm_info = gmtime(&ts); struct tm* tm_info = gmtime(&ts);
std::ostringstream oss; std::ostringstream oss;
oss << std::setfill('0') oss << std::setfill('0')
<< (tm_info->tm_year + 1900) << "-" << (tm_info->tm_year + 1900) << "-"
<< std::setw(2) << (tm_info->tm_mon + 1) << "-" << std::setw(2) << (tm_info->tm_mon + 1) << "-"
<< std::setw(2) << tm_info->tm_mday; << std::setw(2) << tm_info->tm_mday << " "
<< std::setw(2) << tm_info->tm_hour << ":"
<< std::setw(2) << tm_info->tm_min << ":"
<< std::setw(2) << tm_info->tm_sec;
return oss.str(); return oss.str();
} }
@@ -68,16 +304,17 @@ void write_intervals(const std::string& filename, const std::vector<Interval>& i
std::ofstream out(filename); std::ofstream out(filename);
out << std::fixed << std::setprecision(2); out << std::fixed << std::setprecision(2);
out << "start_date,end_date,min_open,max_close,start_avg,end_avg,change\n"; out << "start_datetime,end_datetime,open_min,open_max,close_min,close_max,start_avg,end_avg,change\n";
for (const auto& iv : intervals) { for (const auto& iv : intervals) {
out << day_index_to_date(iv.start_day) << "," out << period_index_to_datetime(iv.start_period) << ","
<< day_index_to_date(iv.end_day) << "," << period_index_to_datetime(iv.end_period) << ","
<< iv.min_open << "," << iv.open_min << ","
<< iv.max_close << "," << iv.open_max << ","
<< iv.close_min << ","
<< iv.close_max << ","
<< iv.start_avg << "," << iv.start_avg << ","
<< iv.end_avg << "," << iv.end_avg << ","
<< std::setprecision(6) << iv.change << "\n"; << std::setprecision(6) << iv.change << "\n";
} }
} }

View File

@@ -1,15 +1,44 @@
#pragma once #pragma once
#include "day_stats.hpp" #include "period_stats.hpp"
#include <vector> #include <vector>
#include <string> #include <string>
// Вычисление интервалов с изменением >= threshold (по умолчанию 10%) // Интервал с изменением >= threshold
std::vector<Interval> find_intervals(const std::vector<DayStats>& days, double threshold = 0.10); struct Interval {
PeriodIndex start_period;
PeriodIndex end_period;
double open_min;
double open_max;
double close_min;
double close_max;
double start_avg;
double end_avg;
double change;
};
// Результат параллельного построения интервалов
struct IntervalResult {
std::vector<Interval> intervals;
double compute_time; // время вычислений
double wait_time; // время ожидания данных от предыдущего ранка
};
// Параллельное построение интервалов с использованием MPI
IntervalResult find_intervals_parallel(
const std::vector<PeriodStats>& periods,
int rank, int size,
double threshold = 0.10
);
// Сбор интервалов со всех ранков на ранк 0
double collect_intervals(
std::vector<Interval>& local_intervals,
int rank, int size
);
// Вывод интервалов в файл // Вывод интервалов в файл
void write_intervals(const std::string& filename, const std::vector<Interval>& intervals); void write_intervals(const std::string& filename, const std::vector<Interval>& intervals);
// Преобразование DayIndex в строку даты (YYYY-MM-DD) // Преобразование PeriodIndex в строку даты/времени
std::string day_index_to_date(DayIndex day); std::string period_index_to_datetime(PeriodIndex period);

View File

@@ -1,165 +1,113 @@
#include <mpi.h> #include <mpi.h>
#include <iostream> #include <iostream>
#include <vector> #include <vector>
#include <map> #include <iomanip>
#include "csv_loader.hpp" #include "csv_loader.hpp"
#include "utils.hpp"
#include "record.hpp" #include "record.hpp"
#include "day_stats.hpp" #include "period_stats.hpp"
#include "aggregation.hpp" #include "aggregation.hpp"
#include "intervals.hpp" #include "intervals.hpp"
#include "utils.hpp"
#include "gpu_loader.hpp" #include "gpu_loader.hpp"
// Функция: отобрать записи для конкретного ранга
std::vector<Record> select_records_for_rank(
const std::map<DayIndex, std::vector<Record>>& days,
const std::vector<DayIndex>& day_list)
{
std::vector<Record> out;
for (auto d : day_list) {
auto it = days.find(d);
if (it != days.end()) {
const auto& vec = it->second;
out.insert(out.end(), vec.begin(), vec.end());
}
}
return out;
}
int main(int argc, char** argv) { int main(int argc, char** argv) {
MPI_Init(&argc, &argv); MPI_Init(&argc, &argv);
double total_start = MPI_Wtime();
int rank, size; int rank, size;
MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size); MPI_Comm_size(MPI_COMM_WORLD, &size);
// ====== ЗАГРУЗКА GPU ФУНКЦИЙ ====== // Проверяем доступность GPU
auto gpu_is_available = load_gpu_is_available(); bool use_cuda = get_use_cuda();
auto gpu_aggregate = load_gpu_aggregate_days(); bool have_gpu = gpu_is_available();
bool use_gpu = use_cuda && have_gpu;
bool have_gpu = false; std::cout << "Rank " << rank
if (gpu_is_available && gpu_is_available()) { << ": USE_CUDA=" << use_cuda
have_gpu = true; << ", GPU available=" << have_gpu
std::cout << "Rank " << rank << ": GPU available" << std::endl; << ", using " << (use_gpu ? "GPU" : "CPU")
} else { << std::endl;
std::cout << "Rank " << rank << ": GPU not available, using CPU" << std::endl;
}
std::vector<Record> local_records; // Параллельное чтение данных
double read_start = MPI_Wtime();
std::vector<Record> records = load_csv_parallel(rank, size);
double read_time = MPI_Wtime() - read_start;
std::cout << "Rank " << rank
<< ": read " << records.size() << " records"
<< " in " << std::fixed << std::setprecision(3) << read_time << " sec"
<< std::endl;
// Агрегация по периодам
double agg_start = MPI_Wtime();
std::vector<PeriodStats> periods;
if (use_gpu) {
int64_t interval = get_aggregation_interval();
if (!aggregate_periods_gpu(records, interval, periods)) {
std::cerr << "Rank " << rank << ": GPU aggregation failed, falling back to CPU" << std::endl;
periods = aggregate_periods(records);
}
} else {
periods = aggregate_periods(records);
}
double agg_time = MPI_Wtime() - agg_start;
std::cout << "Rank " << rank
<< ": aggregated " << periods.size() << " periods"
<< " [" << (periods.empty() ? 0 : periods.front().period)
<< ".." << (periods.empty() ? 0 : periods.back().period) << "]"
<< " in " << std::fixed << std::setprecision(3) << agg_time << " sec"
<< std::endl;
// Удаляем крайние периоды (могут быть неполными из-за параллельного чтения)
trim_edge_periods(periods, rank, size);
std::cout << "Rank " << rank
<< ": after trim " << periods.size() << " periods"
<< " [" << (periods.empty() ? 0 : periods.front().period)
<< ".." << (periods.empty() ? 0 : periods.back().period) << "]"
<< std::endl;
// Параллельное построение интервалов
IntervalResult iv_result = find_intervals_parallel(periods, rank, size);
std::cout << "Rank " << rank
<< ": found " << iv_result.intervals.size() << " intervals"
<< ", compute " << std::fixed << std::setprecision(6) << iv_result.compute_time << " sec"
<< ", wait " << iv_result.wait_time << " sec"
<< std::endl;
// Сбор интервалов на ранке 0
double collect_wait = collect_intervals(iv_result.intervals, rank, size);
if (rank == 0) { if (rank == 0) {
std::cout << "Rank 0 loading CSV..." << std::endl; std::cout << "Rank 0: collected " << iv_result.intervals.size() << " total intervals"
<< ", wait " << std::fixed << std::setprecision(3) << collect_wait << " sec"
// Запускаем из build << std::endl;
auto records = load_csv("../data/data.csv");
auto days = group_by_day(records);
auto parts = split_days(days, size);
// Рассылаем данные
for (int r = 0; r < size; r++) {
auto vec = select_records_for_rank(days, parts[r]);
if (r == 0) {
// себе не отправляем — сразу сохраняем
local_records = vec;
continue;
}
int count = static_cast<int>(vec.size());
MPI_Send(&count, 1, MPI_INT, r, 0, MPI_COMM_WORLD);
MPI_Send(vec.data(), count * sizeof(Record), MPI_BYTE, r, 1, MPI_COMM_WORLD);
}
}
else {
// Принимает данные
int count = 0;
MPI_Recv(&count, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
local_records.resize(count);
MPI_Recv(local_records.data(), count * sizeof(Record),
MPI_BYTE, 0, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
} }
// Запись результатов в файл (только ранк 0)
if (rank == 0) {
double write_start = MPI_Wtime();
write_intervals("result.csv", iv_result.intervals);
double write_time = MPI_Wtime() - write_start;
std::cout << "Rank 0: wrote result.csv"
<< " in " << std::fixed << std::setprecision(3) << write_time << " sec"
<< std::endl;
}
// Вывод общего времени выполнения
MPI_Barrier(MPI_COMM_WORLD); MPI_Barrier(MPI_COMM_WORLD);
double total_time = MPI_Wtime() - total_start;
std::cout << "Rank " << rank << " received "
<< local_records.size() << " records" << std::endl;
// ====== АГРЕГАЦИЯ НА КАЖДОМ УЗЛЕ (GPU или CPU) ======
std::vector<DayStats> local_stats;
if (have_gpu && gpu_aggregate) {
bool gpu_success = aggregate_days_gpu(local_records, local_stats, gpu_aggregate);
if (gpu_success) {
std::cout << "Rank " << rank << " aggregated "
<< local_stats.size() << " days (GPU)" << std::endl;
} else {
// Fallback на CPU при ошибке GPU
std::cout << "Rank " << rank << ": GPU aggregation failed, falling back to CPU" << std::endl;
local_stats = aggregate_days(local_records);
std::cout << "Rank " << rank << " aggregated "
<< local_stats.size() << " days (CPU)" << std::endl;
}
} else {
local_stats = aggregate_days(local_records);
std::cout << "Rank " << rank << " aggregated "
<< local_stats.size() << " days (CPU)" << std::endl;
}
// ====== СБОР АГРЕГИРОВАННЫХ ДАННЫХ НА RANK 0 ======
std::vector<DayStats> all_stats;
if (rank == 0) { if (rank == 0) {
// Добавляем свои данные std::cout << "Total execution time: "
all_stats.insert(all_stats.end(), local_stats.begin(), local_stats.end()); << std::fixed << std::setprecision(3)
<< total_time << " sec" << std::endl;
// Получаем данные от других узлов
for (int r = 1; r < size; r++) {
int count = 0;
MPI_Recv(&count, 1, MPI_INT, r, 2, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
std::vector<DayStats> remote_stats(count);
MPI_Recv(remote_stats.data(), count * sizeof(DayStats),
MPI_BYTE, r, 3, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
all_stats.insert(all_stats.end(), remote_stats.begin(), remote_stats.end());
}
} else {
// Отправляем свои агрегированные данные на rank 0
int count = static_cast<int>(local_stats.size());
MPI_Send(&count, 1, MPI_INT, 0, 2, MPI_COMM_WORLD);
MPI_Send(local_stats.data(), count * sizeof(DayStats), MPI_BYTE, 0, 3, MPI_COMM_WORLD);
}
// ====== ВЫЧИСЛЕНИЕ ИНТЕРВАЛОВ НА RANK 0 ======
if (rank == 0) {
std::cout << "Rank 0: merging " << all_stats.size() << " day stats..." << std::endl;
// Объединяем и сортируем
auto merged_stats = merge_day_stats(all_stats);
std::cout << "Rank 0: total " << merged_stats.size() << " unique days" << std::endl;
// Вычисляем интервалы
auto intervals = find_intervals(merged_stats, 0.10);
std::cout << "Found " << intervals.size() << " intervals with >=10% change" << std::endl;
// Записываем результат
write_intervals("../result.csv", intervals);
std::cout << "Results written to result.csv" << std::endl;
// Выводим первые несколько интервалов
std::cout << "\nFirst 5 intervals:\n";
std::cout << "start_date,end_date,min_open,max_close,change\n";
for (size_t i = 0; i < std::min(intervals.size(), size_t(5)); i++) {
const auto& iv = intervals[i];
std::cout << day_index_to_date(iv.start_day) << ","
<< day_index_to_date(iv.end_day) << ","
<< iv.min_open << ","
<< iv.max_close << ","
<< iv.change << "\n";
}
} }
MPI_Finalize(); MPI_Finalize();

View File

@@ -1,11 +0,0 @@
#include "mpi_utils.hpp"
#include <mpi.h>
#include <iostream>
void mpi_print_basic() {
int rank, size;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
std::cout << "Hello from rank " << rank << " of " << size << std::endl;
}

View File

@@ -1,2 +0,0 @@
#pragma once
void mpi_print_basic();

15
src/period_stats.hpp Normal file
View File

@@ -0,0 +1,15 @@
#pragma once
#include <cstdint>
using PeriodIndex = int64_t;
// Агрегированные данные за один период
struct PeriodStats {
PeriodIndex period; // индекс периода (timestamp / AGGREGATION_INTERVAL)
double avg; // среднее значение (Low + High) / 2 по всем записям
double open_min; // минимальный Open за период
double open_max; // максимальный Open за период
double close_min; // минимальный Close за период
double close_max; // максимальный Close за период
int64_t count; // количество записей, по которым агрегировали
};

View File

@@ -1,25 +1,105 @@
#include "utils.hpp" #include "utils.hpp"
#include <fstream>
#include <sstream>
#include <stdexcept>
#include <numeric>
std::map<DayIndex, std::vector<Record>> group_by_day(const std::vector<Record>& recs) { int get_num_cpu_threads() {
std::map<DayIndex, std::vector<Record>> days; const char* env_threads = std::getenv("NUM_CPU_THREADS");
int num_cpu_threads = 1;
for (const auto& r : recs) { if (env_threads) {
DayIndex day = static_cast<DayIndex>(r.timestamp) / 86400; num_cpu_threads = std::atoi(env_threads);
days[day].push_back(r); if (num_cpu_threads < 1) num_cpu_threads = 1;
} }
return num_cpu_threads;
return days;
} }
std::vector<std::vector<DayIndex>> split_days(const std::map<DayIndex, std::vector<Record>>& days, int parts) { std::string get_env(const char* name) {
std::vector<std::vector<DayIndex>> out(parts); const char* env = std::getenv(name);
if (!env) {
int i = 0; throw std::runtime_error(std::string("Environment variable not set: ") + name);
for (auto& kv : days) {
out[i % parts].push_back(kv.first);
i++;
} }
return std::string(env);
return out;
} }
std::string get_data_path() {
return get_env("DATA_PATH");
}
std::vector<int> get_data_read_shares() {
std::vector<int> shares;
std::stringstream ss(get_env("DATA_READ_SHARES"));
std::string item;
while (std::getline(ss, item, ',')) {
shares.push_back(std::stoi(item));
}
return shares;
}
int64_t get_read_overlap_bytes() {
return std::stoll(get_env("READ_OVERLAP_BYTES"));
}
int64_t get_aggregation_interval() {
return std::stoll(get_env("AGGREGATION_INTERVAL"));
}
bool get_use_cuda() {
return std::stoi(get_env("USE_CUDA")) != 0;
}
int64_t get_file_size(const std::string& path) {
std::ifstream file(path, std::ios::binary | std::ios::ate);
if (!file.is_open()) {
throw std::runtime_error("Cannot open file: " + path);
}
return static_cast<int64_t>(file.tellg());
}
ByteRange calculate_byte_range(int rank, int size, int64_t file_size,
const std::vector<int>& shares, int64_t overlap_bytes) {
std::vector<int> effective_shares;
if (shares.size() == static_cast<size_t>(size)) {
effective_shares = shares;
} else {
effective_shares.assign(size, 1);
}
int total_shares = std::accumulate(effective_shares.begin(), effective_shares.end(), 0);
int64_t bytes_per_share = file_size / total_shares;
int64_t base_start = 0;
for (int i = 0; i < rank; i++) {
base_start += bytes_per_share * effective_shares[i];
}
int64_t base_end = base_start + bytes_per_share * effective_shares[rank];
ByteRange range;
if (rank == 0) {
range.start = 0;
range.end = std::min(base_end + overlap_bytes, file_size);
} else if (rank == size - 1) {
range.start = std::max(base_start - overlap_bytes, static_cast<int64_t>(0));
range.end = file_size;
} else {
range.start = std::max(base_start - overlap_bytes, static_cast<int64_t>(0));
range.end = std::min(base_end + overlap_bytes, file_size);
}
return range;
}
void trim_edge_periods(std::vector<PeriodStats>& periods, int rank, int size) {
if (periods.empty()) return;
if (rank == 0) {
periods.pop_back();
} else if (rank == size - 1) {
periods.erase(periods.begin());
} else {
periods.pop_back();
periods.erase(periods.begin());
}
}

View File

@@ -1,9 +1,33 @@
#pragma once #pragma once
#include "record.hpp" #include "record.hpp"
#include "day_stats.hpp" #include "period_stats.hpp"
#include <map> #include <map>
#include <vector> #include <vector>
#include <string>
#include <cstdlib>
#include <cstdint>
std::map<DayIndex, std::vector<Record>> group_by_day(const std::vector<Record>& recs); // Чтение переменных окружения
std::vector<std::vector<DayIndex>> split_days(const std::map<DayIndex, std::vector<Record>>& days, int parts); int get_num_cpu_threads();
std::string get_data_path();
std::vector<int> get_data_read_shares();
int64_t get_read_overlap_bytes();
int64_t get_aggregation_interval();
bool get_use_cuda();
// Структура для хранения диапазона байт для чтения
struct ByteRange {
int64_t start;
int64_t end; // exclusive
};
// Вычисляет диапазон байт для конкретного ранка
ByteRange calculate_byte_range(int rank, int size, int64_t file_size,
const std::vector<int>& shares, int64_t overlap_bytes);
// Получение размера файла
int64_t get_file_size(const std::string& path);
// Удаляет крайние периоды, которые могут быть неполными из-за параллельного чтения
void trim_edge_periods(std::vector<PeriodStats>& periods, int rank, int size);

136
upsample.py Normal file
View File

@@ -0,0 +1,136 @@
#!/usr/bin/env python3
import argparse
import sys
import time
def parse_row(line: str):
# Timestamp,Open,High,Low,Close,Volume
ts, o, h, l, c, v = line.split(',')
return int(float(ts)), float(o), float(h), float(l), float(c), float(v)
def fmt_row(ts, o, h, l, c, v):
return f"{ts},{o:.2f},{h:.2f},{l:.2f},{c:.2f},{v:.8f}\n"
def count_lines_fast(path: str) -> int:
with open(path, "rb") as f:
return sum(1 for _ in f) - 1 # минус header
def main(inp, out, step, flush_every):
# считаем количество строк для прогресса
total_lines = count_lines_fast(inp)
print(f"Total input rows: {total_lines:,}", file=sys.stderr)
start_time = time.time()
processed = 0
last_report = start_time
with open(inp, "r", buffering=8 * 1024 * 1024) as fin, \
open(out, "w", buffering=8 * 1024 * 1024) as fout:
fin.readline() # пропускаем header
fout.write("Timestamp,Open,High,Low,Close,Volume\n")
first = fin.readline()
if not first:
return
prev = parse_row(first.strip())
out_buf = []
out_rows = 0
for line in fin:
line = line.strip()
if not line:
continue
cur = parse_row(line)
t1, o1, h1, l1, c1, v1 = prev
t2, o2, h2, l2, c2, v2 = cur
dt = t2 - t1
steps = dt // step
if steps > 0:
do = o2 - o1
dh = h2 - h1
dl = l2 - l1
dc = c2 - c1
dv = v2 - v1
inv = 1.0 / steps
for i in range(steps):
a = i * inv
out_buf.append(fmt_row(
t1 + i * step,
o1 + do * a,
h1 + dh * a,
l1 + dl * a,
c1 + dc * a,
v1 + dv * a
))
out_rows += steps
prev = cur
processed += 1
# прогресс
if processed % 100_000 == 0:
now = time.time()
if now - last_report >= 0.5:
pct = processed * 100.0 / total_lines
elapsed = now - start_time
speed = processed / elapsed if elapsed > 0 else 0
eta = (total_lines - processed) / speed if speed > 0 else 0
print(
f"\rprocessed: {processed:,} / {total_lines:,} "
f"({pct:5.1f}%) | "
f"out ~ {out_rows:,} | "
f"{speed:,.0f} rows/s | "
f"ETA {eta/60:5.1f} min",
end="",
file=sys.stderr,
flush=True,
)
last_report = now
# сброс буфера
if out_rows >= flush_every:
fout.write("".join(out_buf))
out_buf.clear()
out_rows = 0
# остатки
if out_buf:
fout.write("".join(out_buf))
# последнюю строку пишем как есть
t, o, h, l, c, v = prev
fout.write(fmt_row(t, o, h, l, c, v))
total_time = time.time() - start_time
print(
f"\nDone in {total_time/60:.1f} min",
file=sys.stderr
)
if __name__ == "__main__":
ap = argparse.ArgumentParser()
ap.add_argument("-i", "--input", required=True)
ap.add_argument("-o", "--output", required=True)
ap.add_argument("-s", "--step", type=int, default=10)
ap.add_argument("--flush-every", type=int, default=200_000)
args = ap.parse_args()
if args.step <= 0:
raise SystemExit("step must be > 0")
main(args.input, args.output, args.step, args.flush_every)