Compare commits

...

15 Commits

21 changed files with 1621 additions and 322 deletions

3
.gitignore vendored
View File

@@ -1,3 +1,4 @@
data data
build build
out.txt out.txt
*.csv

View File

@@ -1,8 +1,8 @@
CXX = mpic++ CXX = mpic++
CXXFLAGS = -std=c++17 -O2 -Wall -Wextra -Wno-cast-function-type CXXFLAGS = -std=c++17 -O2 -Wall -Wextra -Wno-cast-function-type -fopenmp
NVCC = nvcc NVCC = nvcc
NVCCFLAGS = -O2 -Xcompiler -fPIC NVCCFLAGS = -O3 -std=c++17 -arch=sm_86 -Xcompiler -fPIC
SRC_DIR = src SRC_DIR = src
BUILD_DIR = build BUILD_DIR = build

View File

@@ -10,10 +10,30 @@
не менее чем на 10% от даты начала интервала, вместе с минимальными и максимальными не менее чем на 10% от даты начала интервала, вместе с минимальными и максимальными
значениями Open и Close за все дни внутри интервала. значениями Open и Close за все дни внутри интервала.
## Параллельное чтение данных
Нет смысла параллельно читать данные из NFS, так как в реальности файлы с данными
будут лежать только на NFS сервере. То есть другие узлы лишь отправляют сетевые запросы
на NFS сервер, который уже читает реальные данные с диска и лишь затем отправляет
их другим узлам.
Чтобы этого избежать, нужно на всех машинах скопировать файлы с данными в их реальные
файловые системы. Например в папку `/data`.
```sh
# На каждом узле создаем директорию /data
sudo mkdir /data
sudo chown $USER /data
# Копируем данные
cd /mnt/shared/supercomputers/data
cp data.csv /data/
```
## Сборка ## Сборка
Проект обязательно должен быть расположен в общей директории для всех узлов, Проект обязательно должен быть расположен в общей директории для всех узлов,
например, в `/mnt/shared/supercomputers/bitcoin-project/build`. например, в `/mnt/shared/supercomputers/build`.
Перед запуском указать актуальный путь в `run.slurm`. Перед запуском указать актуальный путь в `run.slurm`.
```sh ```sh

115
benchmark.py Normal file
View File

@@ -0,0 +1,115 @@
"""
Запускает make run <number_of_runs> раз и считает статистику по времени выполнения.
Тупо парсит out.txt и берём значение из строки "Total execution time: <time> sec".
python benchmark.py <number_of_runs>
"""
import os
import re
import sys
import time
import subprocess
import statistics
N = int(sys.argv[1]) if len(sys.argv) > 1 else 10
OUT = "out.txt"
TIME_RE = re.compile(r"Total execution time:\s*([0-9]*\.?[0-9]+)\s*sec")
JOB_RE = re.compile(r"Submitted batch job\s+(\d+)")
APPEAR_TIMEOUT = 300.0 # ждать появления out.txt
FINISH_TIMEOUT = 3600.0 # ждать появления Total execution time (сек)
POLL = 0.2 # частота проверки файла
def wait_for_exists(path: str, timeout: float):
t0 = time.time()
while not os.path.exists(path):
if time.time() - t0 > timeout:
raise TimeoutError(f"{path} did not appear within {timeout} seconds")
time.sleep(POLL)
def try_read(path: str) -> str:
try:
with open(path, "r", encoding="utf-8", errors="replace") as f:
return f.read()
except FileNotFoundError:
return ""
except OSError:
# бывает, что файл на NFS в момент записи недоступен на чтение
return ""
def wait_for_time_line(path: str, timeout: float) -> float:
t0 = time.time()
last_report = 0.0
while True:
txt = try_read(path)
matches = TIME_RE.findall(txt)
if matches:
return float(matches[-1]) # последняя встреченная строка
now = time.time()
if now - t0 > timeout:
tail = txt[-800:] if txt else "<empty>"
raise TimeoutError("Timed out waiting for 'Total execution time' line.\n"
f"Last 800 chars of out.txt:\n{tail}")
# иногда полезно печатать прогресс раз в ~5 сек
if now - last_report > 5.0:
last_report = now
if txt:
# показать последнюю непустую строку
lines = [l for l in txt.splitlines() if l.strip()]
if lines:
print(f" waiting... last line: {lines[-1][:120]}", flush=True)
else:
print(" waiting... (out.txt empty)", flush=True)
else:
print(" waiting... (out.txt not readable yet)", flush=True)
time.sleep(POLL)
times = []
for i in range(N):
print(f"Run {i+1}/{N} ...", flush=True)
# удаляем out.txt перед запуском
try:
os.remove(OUT)
except FileNotFoundError:
pass
# запускаем make run и забираем stdout (там будет Submitted batch job XXX)
res = subprocess.run(["make", "run"], capture_output=True, text=True)
out = (res.stdout or "") + "\n" + (res.stderr or "")
job_id = None
m = JOB_RE.search(out)
if m:
job_id = m.group(1)
print(f" submitted job {job_id}", flush=True)
else:
print(" (job id not detected; will only watch out.txt)", flush=True)
# ждём появления out.txt и появления строки с Total execution time
wait_for_exists(OUT, APPEAR_TIMEOUT)
t = wait_for_time_line(OUT, FINISH_TIMEOUT)
times.append(t)
print(f" time = {t:.3f} sec", flush=True)
# опционально удалить out.txt после парсинга
try:
os.remove(OUT)
except FileNotFoundError:
pass
print("\n=== RESULTS ===")
print(f"Runs: {len(times)}")
print(f"Mean: {statistics.mean(times):.3f} sec")
print(f"Median: {statistics.median(times):.3f} sec")
print(f"Min: {min(times):.3f} sec")
print(f"Max: {max(times):.3f} sec")
if len(times) > 1:
print(f"Stddev: {statistics.stdev(times):.3f} sec")

View File

@@ -2,7 +2,7 @@
"cells": [ "cells": [
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 1, "execution_count": 20,
"id": "2acce44b", "id": "2acce44b",
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
@@ -12,7 +12,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 14, "execution_count": 21,
"id": "5ba70af7", "id": "5ba70af7",
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
@@ -111,7 +111,7 @@
"7317758 0.410369 " "7317758 0.410369 "
] ]
}, },
"execution_count": 14, "execution_count": 21,
"metadata": {}, "metadata": {},
"output_type": "execute_result" "output_type": "execute_result"
} }
@@ -124,8 +124,8 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 19, "execution_count": 23,
"id": "d4b22f3b", "id": "3b320537",
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
@@ -150,67 +150,190 @@
" <tr style=\"text-align: right;\">\n", " <tr style=\"text-align: right;\">\n",
" <th></th>\n", " <th></th>\n",
" <th>Timestamp</th>\n", " <th>Timestamp</th>\n",
" <th>Low</th>\n",
" <th>High</th>\n",
" <th>Open</th>\n", " <th>Open</th>\n",
" <th>High</th>\n",
" <th>Low</th>\n",
" <th>Close</th>\n", " <th>Close</th>\n",
" <th>Volume</th>\n",
" <th>Avg</th>\n",
" </tr>\n", " </tr>\n",
" </thead>\n", " </thead>\n",
" <tbody>\n", " <tbody>\n",
" <tr>\n", " <tr>\n",
" <th>5078</th>\n", " <th>7317754</th>\n",
" <td>2025-11-26</td>\n", " <td>2025-11-30 23:55:00+00:00</td>\n",
" <td>86304.0</td>\n", " <td>90405.0</td>\n",
" <td>90646.0</td>\n", " <td>90452.0</td>\n",
" <td>87331.0</td>\n", " <td>90403.0</td>\n",
" <td>90477.0</td>\n", " <td>90452.0</td>\n",
" <td>0.531700</td>\n",
" <td>90427.5</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>5079</th>\n", " <th>7317755</th>\n",
" <td>2025-11-27</td>\n", " <td>2025-11-30 23:56:00+00:00</td>\n",
" <td>90091.0</td>\n", " <td>90452.0</td>\n",
" <td>91926.0</td>\n", " <td>90481.0</td>\n",
" <td>90476.0</td>\n", " <td>90420.0</td>\n",
" <td>91325.0</td>\n", " <td>90420.0</td>\n",
" <td>0.055547</td>\n",
" <td>90450.5</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>5080</th>\n", " <th>7317756</th>\n",
" <td>2025-11-28</td>\n", " <td>2025-11-30 23:57:00+00:00</td>\n",
" <td>90233.0</td>\n", " <td>90412.0</td>\n",
" <td>93091.0</td>\n", " <td>90458.0</td>\n",
" <td>91326.0</td>\n", " <td>90396.0</td>\n",
" <td>90913.0</td>\n", " <td>90435.0</td>\n",
" <td>0.301931</td>\n",
" <td>90427.0</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>5081</th>\n", " <th>7317757</th>\n",
" <td>2025-11-29</td>\n", " <td>2025-11-30 23:58:00+00:00</td>\n",
" <td>90216.0</td>\n", " <td>90428.0</td>\n",
" <td>91179.0</td>\n", " <td>90428.0</td>\n",
" <td>90913.0</td>\n", " <td>90362.0</td>\n",
" <td>90832.0</td>\n", " <td>90362.0</td>\n",
" </tr>\n", " <td>4.591653</td>\n",
" <tr>\n", " <td>90395.0</td>\n",
" <th>5082</th>\n", " </tr>\n",
" <td>2025-11-30</td>\n", " <tr>\n",
" <th>7317758</th>\n",
" <td>2025-11-30 23:59:00+00:00</td>\n",
" <td>90363.0</td>\n",
" <td>90386.0</td>\n",
" <td>90362.0</td>\n", " <td>90362.0</td>\n",
" <td>91969.0</td>\n",
" <td>90832.0</td>\n",
" <td>90382.0</td>\n", " <td>90382.0</td>\n",
" <td>0.410369</td>\n",
" <td>90374.0</td>\n",
" </tr>\n", " </tr>\n",
" </tbody>\n", " </tbody>\n",
"</table>\n", "</table>\n",
"</div>" "</div>"
], ],
"text/plain": [ "text/plain": [
" Timestamp Low High Open Close\n", " Timestamp Open High Low Close \\\n",
"5078 2025-11-26 86304.0 90646.0 87331.0 90477.0\n", "7317754 2025-11-30 23:55:00+00:00 90405.0 90452.0 90403.0 90452.0 \n",
"5079 2025-11-27 90091.0 91926.0 90476.0 91325.0\n", "7317755 2025-11-30 23:56:00+00:00 90452.0 90481.0 90420.0 90420.0 \n",
"5080 2025-11-28 90233.0 93091.0 91326.0 90913.0\n", "7317756 2025-11-30 23:57:00+00:00 90412.0 90458.0 90396.0 90435.0 \n",
"5081 2025-11-29 90216.0 91179.0 90913.0 90832.0\n", "7317757 2025-11-30 23:58:00+00:00 90428.0 90428.0 90362.0 90362.0 \n",
"5082 2025-11-30 90362.0 91969.0 90832.0 90382.0" "7317758 2025-11-30 23:59:00+00:00 90363.0 90386.0 90362.0 90382.0 \n",
"\n",
" Volume Avg \n",
"7317754 0.531700 90427.5 \n",
"7317755 0.055547 90450.5 \n",
"7317756 0.301931 90427.0 \n",
"7317757 4.591653 90395.0 \n",
"7317758 0.410369 90374.0 "
] ]
}, },
"execution_count": 19, "execution_count": 23,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df['Avg'] = (df['Low'] + df['High']) / 2\n",
"df.tail()"
]
},
{
"cell_type": "code",
"execution_count": 25,
"id": "4b1cd63c",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>Timestamp</th>\n",
" <th>Avg</th>\n",
" <th>OpenMin</th>\n",
" <th>OpenMax</th>\n",
" <th>CloseMin</th>\n",
" <th>CloseMax</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>5078</th>\n",
" <td>2025-11-26</td>\n",
" <td>88057.301736</td>\n",
" <td>86312.0</td>\n",
" <td>90574.0</td>\n",
" <td>86323.0</td>\n",
" <td>90574.0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5079</th>\n",
" <td>2025-11-27</td>\n",
" <td>91245.092708</td>\n",
" <td>90126.0</td>\n",
" <td>91888.0</td>\n",
" <td>90126.0</td>\n",
" <td>91925.0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5080</th>\n",
" <td>2025-11-28</td>\n",
" <td>91324.308681</td>\n",
" <td>90255.0</td>\n",
" <td>92970.0</td>\n",
" <td>90283.0</td>\n",
" <td>92966.0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5081</th>\n",
" <td>2025-11-29</td>\n",
" <td>90746.479514</td>\n",
" <td>90265.0</td>\n",
" <td>91158.0</td>\n",
" <td>90279.0</td>\n",
" <td>91179.0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5082</th>\n",
" <td>2025-11-30</td>\n",
" <td>91187.356250</td>\n",
" <td>90363.0</td>\n",
" <td>91940.0</td>\n",
" <td>90362.0</td>\n",
" <td>91940.0</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" Timestamp Avg OpenMin OpenMax CloseMin CloseMax\n",
"5078 2025-11-26 88057.301736 86312.0 90574.0 86323.0 90574.0\n",
"5079 2025-11-27 91245.092708 90126.0 91888.0 90126.0 91925.0\n",
"5080 2025-11-28 91324.308681 90255.0 92970.0 90283.0 92966.0\n",
"5081 2025-11-29 90746.479514 90265.0 91158.0 90279.0 91179.0\n",
"5082 2025-11-30 91187.356250 90363.0 91940.0 90362.0 91940.0"
]
},
"execution_count": 25,
"metadata": {}, "metadata": {},
"output_type": "execute_result" "output_type": "execute_result"
} }
@@ -218,7 +341,13 @@
"source": [ "source": [
"df_days = (\n", "df_days = (\n",
" df.groupby(df[\"Timestamp\"].dt.date)\n", " df.groupby(df[\"Timestamp\"].dt.date)\n",
" .agg({\"Low\": \"min\", \"High\": \"max\", \"Open\": \"first\", \"Close\": \"last\"})\n", " .agg(\n",
" Avg=(\"Avg\", \"mean\"),\n",
" OpenMin=(\"Open\", \"min\"),\n",
" OpenMax=(\"Open\", \"max\"),\n",
" CloseMin=(\"Close\", \"min\"),\n",
" CloseMax=(\"Close\", \"max\"),\n",
" )\n",
" .reset_index()\n", " .reset_index()\n",
")\n", ")\n",
"df_days.tail()" "df_days.tail()"
@@ -226,111 +355,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 21, "execution_count": 26,
"id": "91823496",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>Timestamp</th>\n",
" <th>Low</th>\n",
" <th>High</th>\n",
" <th>Open</th>\n",
" <th>Close</th>\n",
" <th>Avg</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>5078</th>\n",
" <td>2025-11-26</td>\n",
" <td>86304.0</td>\n",
" <td>90646.0</td>\n",
" <td>87331.0</td>\n",
" <td>90477.0</td>\n",
" <td>88475.0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5079</th>\n",
" <td>2025-11-27</td>\n",
" <td>90091.0</td>\n",
" <td>91926.0</td>\n",
" <td>90476.0</td>\n",
" <td>91325.0</td>\n",
" <td>91008.5</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5080</th>\n",
" <td>2025-11-28</td>\n",
" <td>90233.0</td>\n",
" <td>93091.0</td>\n",
" <td>91326.0</td>\n",
" <td>90913.0</td>\n",
" <td>91662.0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5081</th>\n",
" <td>2025-11-29</td>\n",
" <td>90216.0</td>\n",
" <td>91179.0</td>\n",
" <td>90913.0</td>\n",
" <td>90832.0</td>\n",
" <td>90697.5</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5082</th>\n",
" <td>2025-11-30</td>\n",
" <td>90362.0</td>\n",
" <td>91969.0</td>\n",
" <td>90832.0</td>\n",
" <td>90382.0</td>\n",
" <td>91165.5</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" Timestamp Low High Open Close Avg\n",
"5078 2025-11-26 86304.0 90646.0 87331.0 90477.0 88475.0\n",
"5079 2025-11-27 90091.0 91926.0 90476.0 91325.0 91008.5\n",
"5080 2025-11-28 90233.0 93091.0 91326.0 90913.0 91662.0\n",
"5081 2025-11-29 90216.0 91179.0 90913.0 90832.0 90697.5\n",
"5082 2025-11-30 90362.0 91969.0 90832.0 90382.0 91165.5"
]
},
"execution_count": 21,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df_days[\"Avg\"] = (df_days[\"Low\"] + df_days[\"High\"]) / 2\n",
"df_days.tail()"
]
},
{
"cell_type": "code",
"execution_count": 25,
"id": "9a7b3310", "id": "9a7b3310",
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
@@ -358,6 +383,8 @@
" <th>start_date</th>\n", " <th>start_date</th>\n",
" <th>end_date</th>\n", " <th>end_date</th>\n",
" <th>min_open</th>\n", " <th>min_open</th>\n",
" <th>max_open</th>\n",
" <th>min_close</th>\n",
" <th>max_close</th>\n", " <th>max_close</th>\n",
" <th>start_avg</th>\n", " <th>start_avg</th>\n",
" <th>end_avg</th>\n", " <th>end_avg</th>\n",
@@ -366,76 +393,86 @@
" </thead>\n", " </thead>\n",
" <tbody>\n", " <tbody>\n",
" <tr>\n", " <tr>\n",
" <th>335</th>\n", " <th>316</th>\n",
" <td>2025-02-27</td>\n", " <td>2025-02-27</td>\n",
" <td>2025-04-23</td>\n", " <td>2025-04-25</td>\n",
" <td>76252.0</td>\n", " <td>74509.0</td>\n",
" <td>94273.0</td>\n", " <td>95801.0</td>\n",
" <td>84801.5</td>\n", " <td>74515.0</td>\n",
" <td>93335.0</td>\n", " <td>95800.0</td>\n",
" <td>0.100629</td>\n", " <td>85166.063889</td>\n",
" <td>94303.907292</td>\n",
" <td>0.107294</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>336</th>\n", " <th>317</th>\n",
" <td>2025-04-24</td>\n", " <td>2025-04-26</td>\n",
" <td>2025-05-09</td>\n", " <td>2025-05-11</td>\n",
" <td>93730.0</td>\n", " <td>92877.0</td>\n",
" <td>103261.0</td>\n", " <td>104971.0</td>\n",
" <td>92867.5</td>\n", " <td>92872.0</td>\n",
" <td>103341.0</td>\n", " <td>104965.0</td>\n",
" <td>0.112779</td>\n", " <td>94500.950347</td>\n",
" <td>104182.167708</td>\n",
" <td>0.102446</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>337</th>\n", " <th>318</th>\n",
" <td>2025-05-10</td>\n", " <td>2025-05-12</td>\n",
" <td>2025-07-11</td>\n", " <td>2025-07-11</td>\n",
" <td>100990.0</td>\n", " <td>98384.0</td>\n",
" <td>117579.0</td>\n", " <td>118833.0</td>\n",
" <td>103915.0</td>\n", " <td>98382.0</td>\n",
" <td>117032.5</td>\n", " <td>118839.0</td>\n",
" <td>0.126233</td>\n", " <td>103569.791319</td>\n",
" <td>117463.666667</td>\n",
" <td>0.134150</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>338</th>\n", " <th>319</th>\n",
" <td>2025-07-12</td>\n", " <td>2025-07-12</td>\n",
" <td>2025-11-04</td>\n", " <td>2025-11-04</td>\n",
" <td>106470.0</td>\n", " <td>98944.0</td>\n",
" <td>124728.0</td>\n", " <td>126202.0</td>\n",
" <td>117599.0</td>\n", " <td>98943.0</td>\n",
" <td>103079.0</td>\n", " <td>126202.0</td>\n",
" <td>0.123470</td>\n", " <td>117640.026389</td>\n",
" <td>103712.985764</td>\n",
" <td>0.118387</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>339</th>\n", " <th>320</th>\n",
" <td>2025-11-05</td>\n", " <td>2025-11-05</td>\n",
" <td>2025-11-18</td>\n", " <td>2025-11-18</td>\n",
" <td>92112.0</td>\n", " <td>89291.0</td>\n",
" <td>105972.0</td>\n", " <td>107343.0</td>\n",
" <td>101737.5</td>\n", " <td>89286.0</td>\n",
" <td>91471.0</td>\n", " <td>107343.0</td>\n",
" <td>0.100912</td>\n", " <td>102514.621181</td>\n",
" <td>91705.833333</td>\n",
" <td>0.105437</td>\n",
" </tr>\n", " </tr>\n",
" </tbody>\n", " </tbody>\n",
"</table>\n", "</table>\n",
"</div>" "</div>"
], ],
"text/plain": [ "text/plain": [
" start_date end_date min_open max_close start_avg end_avg \\\n", " start_date end_date min_open max_open min_close max_close \\\n",
"335 2025-02-27 2025-04-23 76252.0 94273.0 84801.5 93335.0 \n", "316 2025-02-27 2025-04-25 74509.0 95801.0 74515.0 95800.0 \n",
"336 2025-04-24 2025-05-09 93730.0 103261.0 92867.5 103341.0 \n", "317 2025-04-26 2025-05-11 92877.0 104971.0 92872.0 104965.0 \n",
"337 2025-05-10 2025-07-11 100990.0 117579.0 103915.0 117032.5 \n", "318 2025-05-12 2025-07-11 98384.0 118833.0 98382.0 118839.0 \n",
"338 2025-07-12 2025-11-04 106470.0 124728.0 117599.0 103079.0 \n", "319 2025-07-12 2025-11-04 98944.0 126202.0 98943.0 126202.0 \n",
"339 2025-11-05 2025-11-18 92112.0 105972.0 101737.5 91471.0 \n", "320 2025-11-05 2025-11-18 89291.0 107343.0 89286.0 107343.0 \n",
"\n", "\n",
" change \n", " start_avg end_avg change \n",
"335 0.100629 \n", "316 85166.063889 94303.907292 0.107294 \n",
"336 0.112779 \n", "317 94500.950347 104182.167708 0.102446 \n",
"337 0.126233 \n", "318 103569.791319 117463.666667 0.134150 \n",
"338 0.123470 \n", "319 117640.026389 103712.985764 0.118387 \n",
"339 0.100912 " "320 102514.621181 91705.833333 0.105437 "
] ]
}, },
"execution_count": 25, "execution_count": 26,
"metadata": {}, "metadata": {},
"output_type": "execute_result" "output_type": "execute_result"
} }
@@ -455,8 +492,10 @@
" intervals.append({\n", " intervals.append({\n",
" \"start_date\": df_days.loc[start_idx, \"Timestamp\"],\n", " \"start_date\": df_days.loc[start_idx, \"Timestamp\"],\n",
" \"end_date\": df_days.loc[i, \"Timestamp\"],\n", " \"end_date\": df_days.loc[i, \"Timestamp\"],\n",
" \"min_open\": interval[\"Open\"].min(),\n", " \"min_open\": interval[\"OpenMin\"].min(),\n",
" \"max_close\": interval[\"Close\"].max(),\n", " \"max_open\": interval[\"OpenMax\"].max(),\n",
" \"min_close\": interval[\"CloseMin\"].min(),\n",
" \"max_close\": interval[\"CloseMax\"].max(),\n",
" \"start_avg\": price_base,\n", " \"start_avg\": price_base,\n",
" \"end_avg\": price_now,\n", " \"end_avg\": price_now,\n",
" \"change\": change,\n", " \"change\": change,\n",
@@ -470,6 +509,14 @@
"df_intervals = pd.DataFrame(intervals)\n", "df_intervals = pd.DataFrame(intervals)\n",
"df_intervals.tail()" "df_intervals.tail()"
] ]
},
{
"cell_type": "code",
"execution_count": null,
"id": "07f1cd58",
"metadata": {},
"outputs": [],
"source": []
} }
], ],
"metadata": { "metadata": {

View File

@@ -2,9 +2,18 @@
#SBATCH --job-name=btc #SBATCH --job-name=btc
#SBATCH --nodes=4 #SBATCH --nodes=4
#SBATCH --ntasks=4 #SBATCH --ntasks=4
#SBATCH --cpus-per-task=2
#SBATCH --output=out.txt #SBATCH --output=out.txt
# mpirun -np $SLURM_NTASKS ./build/bitcoin_app # Путь к файлу данных (должен существовать на всех узлах)
export DATA_PATH="/mnt/shared/supercomputers/data/data.csv"
# export DATA_PATH="/data/data.csv"
# Доли данных для каждого ранка (сумма определяет пропорции)
export DATA_READ_SHARES="10,14,18,22"
# Размер перекрытия в байтах для обработки границ строк
export READ_OVERLAP_BYTES=131072
cd /mnt/shared/supercomputers/build cd /mnt/shared/supercomputers/build
mpirun -np $SLURM_NTASKS ./bitcoin_app mpirun -np $SLURM_NTASKS ./bitcoin_app

48
src/aggregation.cpp Normal file
View File

@@ -0,0 +1,48 @@
#include "aggregation.hpp"
#include <map>
#include <algorithm>
#include <limits>
std::vector<DayStats> aggregate_days(const std::vector<Record>& records) {
// Накопители для каждого дня
struct DayAccumulator {
double avg_sum = 0.0;
double open_min = std::numeric_limits<double>::max();
double open_max = std::numeric_limits<double>::lowest();
double close_min = std::numeric_limits<double>::max();
double close_max = std::numeric_limits<double>::lowest();
int64_t count = 0;
};
std::map<DayIndex, DayAccumulator> days;
for (const auto& r : records) {
DayIndex day = static_cast<DayIndex>(r.timestamp) / 86400;
auto& acc = days[day];
double avg = (r.low + r.high) / 2.0;
acc.avg_sum += avg;
acc.open_min = std::min(acc.open_min, r.open);
acc.open_max = std::max(acc.open_max, r.open);
acc.close_min = std::min(acc.close_min, r.close);
acc.close_max = std::max(acc.close_max, r.close);
acc.count++;
}
std::vector<DayStats> result;
result.reserve(days.size());
for (const auto& [day, acc] : days) {
DayStats stats;
stats.day = day;
stats.avg = acc.avg_sum / static_cast<double>(acc.count);
stats.open_min = acc.open_min;
stats.open_max = acc.open_max;
stats.close_min = acc.close_min;
stats.close_max = acc.close_max;
stats.count = acc.count;
result.push_back(stats);
}
return result;
}

8
src/aggregation.hpp Normal file
View File

@@ -0,0 +1,8 @@
#pragma once
#include "record.hpp"
#include "day_stats.hpp"
#include <vector>
// Агрегация записей по дням на одном узле
std::vector<DayStats> aggregate_days(const std::vector<Record>& records);

View File

@@ -2,46 +2,134 @@
#include <fstream> #include <fstream>
#include <sstream> #include <sstream>
#include <iostream> #include <iostream>
#include <stdexcept>
std::vector<Record> load_csv(const std::string& filename) { bool parse_csv_line(const std::string& line, Record& record) {
if (line.empty()) {
return false;
}
std::stringstream ss(line);
std::string item;
try {
// timestamp
if (!std::getline(ss, item, ',') || item.empty()) return false;
record.timestamp = std::stod(item);
// open
if (!std::getline(ss, item, ',') || item.empty()) return false;
record.open = std::stod(item);
// high
if (!std::getline(ss, item, ',') || item.empty()) return false;
record.high = std::stod(item);
// low
if (!std::getline(ss, item, ',') || item.empty()) return false;
record.low = std::stod(item);
// close
if (!std::getline(ss, item, ',') || item.empty()) return false;
record.close = std::stod(item);
// volume
if (!std::getline(ss, item, ',')) return false;
// Volume может быть пустым или содержать данные
if (item.empty()) {
record.volume = 0.0;
} else {
record.volume = std::stod(item);
}
return true;
} catch (const std::exception&) {
return false;
}
}
std::vector<Record> load_csv_parallel(int rank, int size) {
std::vector<Record> data; std::vector<Record> data;
std::ifstream file(filename);
// Читаем настройки из переменных окружения
std::string data_path = get_data_path();
std::vector<int> shares = get_data_read_shares();
int64_t overlap_bytes = get_read_overlap_bytes();
// Получаем размер файла
int64_t file_size = get_file_size(data_path);
// Вычисляем диапазон байт для этого ранка
ByteRange range = calculate_byte_range(rank, size, file_size, shares, overlap_bytes);
// Открываем файл и читаем нужный диапазон
std::ifstream file(data_path, std::ios::binary);
if (!file.is_open()) { if (!file.is_open()) {
throw std::runtime_error("Cannot open file: " + filename); throw std::runtime_error("Cannot open file: " + data_path);
} }
std::string line; // Переходим к началу диапазона
file.seekg(range.start);
// читаем первую строку (заголовок)
std::getline(file, line); // Читаем данные в буфер
int64_t bytes_to_read = range.end - range.start;
while (std::getline(file, line)) { std::vector<char> buffer(bytes_to_read);
std::stringstream ss(line); file.read(buffer.data(), bytes_to_read);
std::string item; int64_t bytes_read = file.gcount();
Record row; file.close();
std::getline(ss, item, ','); // Преобразуем в строку для удобства парсинга
row.timestamp = std::stod(item); std::string content(buffer.data(), bytes_read);
std::getline(ss, item, ','); // Находим позицию начала первой полной строки
row.open = std::stod(item); size_t parse_start = 0;
if (rank == 0) {
std::getline(ss, item, ','); // Первый ранк: пропускаем заголовок (первую строку)
row.high = std::stod(item); size_t header_end = content.find('\n');
if (header_end != std::string::npos) {
std::getline(ss, item, ','); parse_start = header_end + 1;
row.low = std::stod(item); }
} else {
std::getline(ss, item, ','); // Остальные ранки: начинаем с первого \n (пропускаем неполную строку)
row.close = std::stod(item); size_t first_newline = content.find('\n');
if (first_newline != std::string::npos) {
std::getline(ss, item, ','); parse_start = first_newline + 1;
row.volume = std::stod(item); }
data.push_back(row);
} }
// Находим позицию конца последней полной строки
size_t parse_end = content.size();
if (rank != size - 1) {
// Не последний ранк: ищем последний \n
size_t last_newline = content.rfind('\n');
if (last_newline != std::string::npos && last_newline > parse_start) {
parse_end = last_newline;
}
}
// Парсим строки
size_t pos = parse_start;
while (pos < parse_end) {
size_t line_end = content.find('\n', pos);
if (line_end == std::string::npos || line_end > parse_end) {
line_end = parse_end;
}
std::string line = content.substr(pos, line_end - pos);
// Убираем \r если есть (Windows line endings)
if (!line.empty() && line.back() == '\r') {
line.pop_back();
}
Record record;
if (parse_csv_line(line, record)) {
data.push_back(record);
}
pos = line_end + 1;
}
return data; return data;
} }

View File

@@ -2,5 +2,14 @@
#include <string> #include <string>
#include <vector> #include <vector>
#include "record.hpp" #include "record.hpp"
#include "utils.hpp"
std::vector<Record> load_csv(const std::string& filename); // Параллельное чтение CSV файла для MPI
// rank - номер текущего ранка
// size - общее количество ранков
// Возвращает вектор записей, прочитанных этим ранком
std::vector<Record> load_csv_parallel(int rank, int size);
// Парсинг одной строки CSV в Record
// Возвращает true если парсинг успешен
bool parse_csv_line(const std::string& line, Record& record);

15
src/day_stats.hpp Normal file
View File

@@ -0,0 +1,15 @@
#pragma once
#include <cstdint>
using DayIndex = int64_t;
// Агрегированные данные за один день
struct DayStats {
DayIndex day; // индекс дня (timestamp / 86400)
double avg; // среднее значение (Low + High) / 2 по всем записям
double open_min; // минимальный Open за день
double open_max; // максимальный Open за день
double close_min; // минимальный Close за день
double close_max; // максимальный Close за день
int64_t count; // количество записей, по которым агрегировали
};

View File

@@ -1,12 +1,145 @@
#include "gpu_loader.hpp" #include "gpu_loader.hpp"
#include <dlfcn.h> #include <dlfcn.h>
#include <map>
#include <algorithm>
#include <iostream>
#include <iomanip>
#include <omp.h>
static void* get_gpu_lib_handle() {
static void* h = dlopen("./libgpu_compute.so", RTLD_NOW | RTLD_LOCAL);
return h;
}
gpu_is_available_fn load_gpu_is_available() { gpu_is_available_fn load_gpu_is_available() {
void* h = dlopen("./libgpu_compute.so", RTLD_NOW | RTLD_LOCAL); void* h = get_gpu_lib_handle();
if (!h) return nullptr; if (!h) return nullptr;
auto fn = (gpu_is_available_fn)dlsym(h, "gpu_is_available"); auto fn = (gpu_is_available_fn)dlsym(h, "gpu_is_available");
if (!fn) return nullptr;
return fn; return fn;
} }
bool gpu_is_available() {
auto gpu_is_available_fn = load_gpu_is_available();
if (gpu_is_available_fn && gpu_is_available_fn()) {
return true;
}
return false;
}
gpu_aggregate_days_fn load_gpu_aggregate_days() {
void* h = get_gpu_lib_handle();
if (!h) return nullptr;
auto fn = (gpu_aggregate_days_fn)dlsym(h, "gpu_aggregate_days");
return fn;
}
bool aggregate_days_gpu(
const std::vector<Record>& records,
std::vector<DayStats>& out_stats,
gpu_aggregate_days_fn gpu_fn)
{
if (!gpu_fn || records.empty()) {
return false;
}
// Общий таймер всей функции
double t_total_start = omp_get_wtime();
// Таймер CPU preprocessing
double t_preprocess_start = omp_get_wtime();
// Группируем записи по дням и подготавливаем данные для GPU
std::map<DayIndex, std::vector<size_t>> day_record_indices;
for (size_t i = 0; i < records.size(); i++) {
DayIndex day = static_cast<DayIndex>(records[i].timestamp) / 86400;
day_record_indices[day].push_back(i);
}
int num_days = static_cast<int>(day_record_indices.size());
// Подготавливаем массивы для GPU
std::vector<GpuRecord> gpu_records;
std::vector<int> day_offsets;
std::vector<int> day_counts;
std::vector<long long> day_indices;
gpu_records.reserve(records.size());
day_offsets.reserve(num_days);
day_counts.reserve(num_days);
day_indices.reserve(num_days);
int current_offset = 0;
for (auto& [day, indices] : day_record_indices) {
day_indices.push_back(day);
day_offsets.push_back(current_offset);
day_counts.push_back(static_cast<int>(indices.size()));
// Добавляем записи этого дня
for (size_t idx : indices) {
const auto& r = records[idx];
GpuRecord gr;
gr.timestamp = r.timestamp;
gr.open = r.open;
gr.high = r.high;
gr.low = r.low;
gr.close = r.close;
gr.volume = r.volume;
gpu_records.push_back(gr);
}
current_offset += static_cast<int>(indices.size());
}
// Выделяем память для результата
std::vector<GpuDayStats> gpu_stats(num_days);
double t_preprocess_ms = (omp_get_wtime() - t_preprocess_start) * 1000.0;
std::cout << " GPU CPU preprocessing: " << std::fixed << std::setprecision(3)
<< std::setw(7) << t_preprocess_ms << " ms" << std::endl << std::flush;
// Вызываем GPU функцию (включает: malloc, memcpy H->D, kernel, memcpy D->H, free)
// Детальные тайминги выводятся внутри GPU функции
int result = gpu_fn(
gpu_records.data(),
static_cast<int>(gpu_records.size()),
day_offsets.data(),
day_counts.data(),
day_indices.data(),
num_days,
gpu_stats.data()
);
if (result != 0) {
std::cout << " GPU: Function returned error code " << result << std::endl;
return false;
}
// Конвертируем результат в DayStats
out_stats.clear();
out_stats.reserve(num_days);
for (const auto& gs : gpu_stats) {
DayStats ds;
ds.day = gs.day;
ds.avg = gs.avg;
ds.open_min = gs.open_min;
ds.open_max = gs.open_max;
ds.close_min = gs.close_min;
ds.close_max = gs.close_max;
ds.count = gs.count;
out_stats.push_back(ds);
}
// Общее время всей GPU функции (включая preprocessing)
double t_total_ms = (omp_get_wtime() - t_total_start) * 1000.0;
std::cout << " GPU TOTAL (with prep): " << std::fixed << std::setprecision(3)
<< std::setw(7) << t_total_ms << " ms" << std::endl << std::flush;
return true;
}

View File

@@ -1,4 +1,50 @@
#pragma once #pragma once
#include "day_stats.hpp"
#include "record.hpp"
#include <vector>
bool gpu_is_available();
// Типы функций из GPU плагина
using gpu_is_available_fn = int (*)(); using gpu_is_available_fn = int (*)();
// Структуры для GPU (должны совпадать с gpu_plugin.cu)
struct GpuRecord {
double timestamp;
double open;
double high;
double low;
double close;
double volume;
};
struct GpuDayStats {
long long day;
double avg;
double open_min;
double open_max;
double close_min;
double close_max;
long long count;
};
using gpu_aggregate_days_fn = int (*)(
const GpuRecord* h_records,
int num_records,
const int* h_day_offsets,
const int* h_day_counts,
const long long* h_day_indices,
int num_days,
GpuDayStats* h_out_stats
);
// Загрузка функций из плагина
gpu_is_available_fn load_gpu_is_available(); gpu_is_available_fn load_gpu_is_available();
gpu_aggregate_days_fn load_gpu_aggregate_days();
// Обёртка для агрегации на GPU (возвращает true если успешно)
bool aggregate_days_gpu(
const std::vector<Record>& records,
std::vector<DayStats>& out_stats,
gpu_aggregate_days_fn gpu_fn
);

View File

@@ -1,8 +1,262 @@
#include <cuda_runtime.h> #include <cuda_runtime.h>
#include <cstdint>
#include <cfloat>
#include <cstdio>
#include <ctime>
// CPU таймер в миллисекундах
static double get_time_ms() {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return ts.tv_sec * 1000.0 + ts.tv_nsec / 1000000.0;
}
// Структуры данных (должны совпадать с C++ кодом)
struct GpuRecord {
double timestamp;
double open;
double high;
double low;
double close;
double volume;
};
struct GpuDayStats {
long long day;
double avg;
double open_min;
double open_max;
double close_min;
double close_max;
long long count;
};
extern "C" int gpu_is_available() { extern "C" int gpu_is_available() {
int n = 0; int n = 0;
cudaError_t err = cudaGetDeviceCount(&n); cudaError_t err = cudaGetDeviceCount(&n);
if (err != cudaSuccess) return 0; if (err != cudaSuccess) return 0;
if (n > 0) {
// Инициализируем CUDA контекст заранее (cudaFree(0) форсирует инициализацию)
cudaFree(0);
}
return (n > 0) ? 1 : 0; return (n > 0) ? 1 : 0;
} }
// Kernel для агрегации (каждый поток обрабатывает один день)
__global__ void aggregate_kernel(
const GpuRecord* records,
int num_records,
const int* day_offsets, // начало каждого дня в массиве records
const int* day_counts, // количество записей в каждом дне
const long long* day_indices, // индексы дней
int num_days,
GpuDayStats* out_stats)
{
// Глобальный индекс потока = индекс дня
int d = blockIdx.x * blockDim.x + threadIdx.x;
if (d >= num_days) return;
int offset = day_offsets[d];
int count = day_counts[d];
GpuDayStats stats;
stats.day = day_indices[d];
stats.open_min = DBL_MAX;
stats.open_max = -DBL_MAX;
stats.close_min = DBL_MAX;
stats.close_max = -DBL_MAX;
stats.count = count;
double avg_sum = 0.0;
for (int i = 0; i < count; i++) {
const GpuRecord& r = records[offset + i];
// Accumulate avg = (low + high) / 2
avg_sum += (r.low + r.high) / 2.0;
// min/max Open
if (r.open < stats.open_min) stats.open_min = r.open;
if (r.open > stats.open_max) stats.open_max = r.open;
// min/max Close
if (r.close < stats.close_min) stats.close_min = r.close;
if (r.close > stats.close_max) stats.close_max = r.close;
}
stats.avg = avg_sum / static_cast<double>(count);
out_stats[d] = stats;
}
// Функция агрегации, вызываемая из C++
extern "C" int gpu_aggregate_days(
const GpuRecord* h_records,
int num_records,
const int* h_day_offsets,
const int* h_day_counts,
const long long* h_day_indices,
int num_days,
GpuDayStats* h_out_stats)
{
double cpu_total_start = get_time_ms();
// === Создаём CUDA события для измерения времени ===
double cpu_event_create_start = get_time_ms();
cudaEvent_t start_malloc, stop_malloc;
cudaEvent_t start_transfer, stop_transfer;
cudaEvent_t start_kernel, stop_kernel;
cudaEvent_t start_copy_back, stop_copy_back;
cudaEvent_t start_free, stop_free;
cudaEventCreate(&start_malloc);
cudaEventCreate(&stop_malloc);
cudaEventCreate(&start_transfer);
cudaEventCreate(&stop_transfer);
cudaEventCreate(&start_kernel);
cudaEventCreate(&stop_kernel);
cudaEventCreate(&start_copy_back);
cudaEventCreate(&stop_copy_back);
cudaEventCreate(&start_free);
cudaEventCreate(&stop_free);
double cpu_event_create_ms = get_time_ms() - cpu_event_create_start;
// === ИЗМЕРЕНИЕ cudaMalloc ===
cudaEventRecord(start_malloc);
GpuRecord* d_records = nullptr;
int* d_day_offsets = nullptr;
int* d_day_counts = nullptr;
long long* d_day_indices = nullptr;
GpuDayStats* d_out_stats = nullptr;
cudaError_t err;
err = cudaMalloc(&d_records, num_records * sizeof(GpuRecord));
if (err != cudaSuccess) return -1;
err = cudaMalloc(&d_day_offsets, num_days * sizeof(int));
if (err != cudaSuccess) { cudaFree(d_records); return -2; }
err = cudaMalloc(&d_day_counts, num_days * sizeof(int));
if (err != cudaSuccess) { cudaFree(d_records); cudaFree(d_day_offsets); return -3; }
err = cudaMalloc(&d_day_indices, num_days * sizeof(long long));
if (err != cudaSuccess) { cudaFree(d_records); cudaFree(d_day_offsets); cudaFree(d_day_counts); return -4; }
err = cudaMalloc(&d_out_stats, num_days * sizeof(GpuDayStats));
if (err != cudaSuccess) { cudaFree(d_records); cudaFree(d_day_offsets); cudaFree(d_day_counts); cudaFree(d_day_indices); return -5; }
cudaEventRecord(stop_malloc);
cudaEventSynchronize(stop_malloc);
float time_malloc_ms = 0;
cudaEventElapsedTime(&time_malloc_ms, start_malloc, stop_malloc);
// === ИЗМЕРЕНИЕ memcpy H->D ===
cudaEventRecord(start_transfer);
err = cudaMemcpy(d_records, h_records, num_records * sizeof(GpuRecord), cudaMemcpyHostToDevice);
if (err != cudaSuccess) return -10;
err = cudaMemcpy(d_day_offsets, h_day_offsets, num_days * sizeof(int), cudaMemcpyHostToDevice);
if (err != cudaSuccess) return -11;
err = cudaMemcpy(d_day_counts, h_day_counts, num_days * sizeof(int), cudaMemcpyHostToDevice);
if (err != cudaSuccess) return -12;
err = cudaMemcpy(d_day_indices, h_day_indices, num_days * sizeof(long long), cudaMemcpyHostToDevice);
if (err != cudaSuccess) return -13;
cudaEventRecord(stop_transfer);
cudaEventSynchronize(stop_transfer);
float time_transfer_ms = 0;
cudaEventElapsedTime(&time_transfer_ms, start_transfer, stop_transfer);
// === ИЗМЕРЕНИЕ kernel ===
const int THREADS_PER_BLOCK = 256;
int num_blocks = (num_days + THREADS_PER_BLOCK - 1) / THREADS_PER_BLOCK;
cudaEventRecord(start_kernel);
aggregate_kernel<<<num_blocks, THREADS_PER_BLOCK>>>(
d_records, num_records,
d_day_offsets, d_day_counts, d_day_indices,
num_days, d_out_stats
);
err = cudaGetLastError();
if (err != cudaSuccess) {
cudaFree(d_records);
cudaFree(d_day_offsets);
cudaFree(d_day_counts);
cudaFree(d_day_indices);
cudaFree(d_out_stats);
return -7;
}
cudaEventRecord(stop_kernel);
cudaEventSynchronize(stop_kernel);
float time_kernel_ms = 0;
cudaEventElapsedTime(&time_kernel_ms, start_kernel, stop_kernel);
// === ИЗМЕРЕНИЕ memcpy D->H ===
cudaEventRecord(start_copy_back);
cudaMemcpy(h_out_stats, d_out_stats, num_days * sizeof(GpuDayStats), cudaMemcpyDeviceToHost);
cudaEventRecord(stop_copy_back);
cudaEventSynchronize(stop_copy_back);
float time_copy_back_ms = 0;
cudaEventElapsedTime(&time_copy_back_ms, start_copy_back, stop_copy_back);
// === ИЗМЕРЕНИЕ cudaFree ===
cudaEventRecord(start_free);
cudaFree(d_records);
cudaFree(d_day_offsets);
cudaFree(d_day_counts);
cudaFree(d_day_indices);
cudaFree(d_out_stats);
cudaEventRecord(stop_free);
cudaEventSynchronize(stop_free);
float time_free_ms = 0;
cudaEventElapsedTime(&time_free_ms, start_free, stop_free);
// Общее время GPU
float time_total_ms = time_malloc_ms + time_transfer_ms + time_kernel_ms + time_copy_back_ms + time_free_ms;
// === Освобождаем события ===
double cpu_event_destroy_start = get_time_ms();
cudaEventDestroy(start_malloc);
cudaEventDestroy(stop_malloc);
cudaEventDestroy(start_transfer);
cudaEventDestroy(stop_transfer);
cudaEventDestroy(start_kernel);
cudaEventDestroy(stop_kernel);
cudaEventDestroy(start_copy_back);
cudaEventDestroy(stop_copy_back);
cudaEventDestroy(start_free);
cudaEventDestroy(stop_free);
double cpu_event_destroy_ms = get_time_ms() - cpu_event_destroy_start;
double cpu_total_ms = get_time_ms() - cpu_total_start;
// Выводим детальную статистику
printf(" GPU Timings (%d records, %d days):\n", num_records, num_days);
printf(" cudaMalloc: %7.3f ms\n", time_malloc_ms);
printf(" memcpy H->D: %7.3f ms\n", time_transfer_ms);
printf(" kernel execution: %7.3f ms\n", time_kernel_ms);
printf(" memcpy D->H: %7.3f ms\n", time_copy_back_ms);
printf(" cudaFree: %7.3f ms\n", time_free_ms);
printf(" GPU TOTAL: %7.3f ms\n", cpu_total_ms);
fflush(stdout);
return 0;
}

339
src/intervals.cpp Normal file
View File

@@ -0,0 +1,339 @@
#include "intervals.hpp"
#include <mpi.h>
#include <algorithm>
#include <cmath>
#include <fstream>
#include <iomanip>
#include <sstream>
#include <ctime>
#include <limits>
// Вспомогательная структура для накопления min/max в интервале
struct IntervalAccumulator {
DayIndex start_day;
double start_avg;
double open_min;
double open_max;
double close_min;
double close_max;
void init(const DayStats& day) {
start_day = day.day;
start_avg = day.avg;
open_min = day.open_min;
open_max = day.open_max;
close_min = day.close_min;
close_max = day.close_max;
}
void update(const DayStats& day) {
open_min = std::min(open_min, day.open_min);
open_max = std::max(open_max, day.open_max);
close_min = std::min(close_min, day.close_min);
close_max = std::max(close_max, day.close_max);
}
Interval finalize(const DayStats& end_day, double change) const {
Interval iv;
iv.start_day = start_day;
iv.end_day = end_day.day;
iv.start_avg = start_avg;
iv.end_avg = end_day.avg;
iv.change = change;
iv.open_min = std::min(open_min, end_day.open_min);
iv.open_max = std::max(open_max, end_day.open_max);
iv.close_min = std::min(close_min, end_day.close_min);
iv.close_max = std::max(close_max, end_day.close_max);
return iv;
}
};
// Упакованная структура DayStats для MPI передачи (8 doubles)
struct PackedDayStats {
double day; // DayIndex as double
double avg;
double open_min;
double open_max;
double close_min;
double close_max;
double count; // int64_t as double
double valid; // флаг валидности (1.0 = valid, 0.0 = invalid)
void pack(const DayStats& ds) {
day = static_cast<double>(ds.day);
avg = ds.avg;
open_min = ds.open_min;
open_max = ds.open_max;
close_min = ds.close_min;
close_max = ds.close_max;
count = static_cast<double>(ds.count);
valid = 1.0;
}
DayStats unpack() const {
DayStats ds;
ds.day = static_cast<DayIndex>(day);
ds.avg = avg;
ds.open_min = open_min;
ds.open_max = open_max;
ds.close_min = close_min;
ds.close_max = close_max;
ds.count = static_cast<int64_t>(count);
return ds;
}
bool is_valid() const { return valid > 0.5; }
void set_invalid() { valid = 0.0; }
};
IntervalResult find_intervals_parallel(
const std::vector<DayStats>& days,
int rank, int size,
double threshold)
{
IntervalResult result;
result.compute_time = 0.0;
result.wait_time = 0.0;
if (days.empty()) {
// Передаём невалидный DayStats следующему ранку
if (rank < size - 1) {
PackedDayStats invalid;
invalid.set_invalid();
MPI_Send(&invalid, 8, MPI_DOUBLE, rank + 1, 0, MPI_COMM_WORLD);
}
return result;
}
double compute_start = MPI_Wtime();
// Определяем, до какого индекса обрабатывать
// Для последнего ранка - до конца, для остальных - до предпоследнего дня
size_t process_until = (rank == size - 1) ? days.size() : days.size() - 1;
IntervalAccumulator acc;
size_t start_idx = 0;
bool have_pending_interval = false;
// Если не первый ранк - ждём данные от предыдущего
if (rank > 0) {
double wait_start = MPI_Wtime();
PackedDayStats received;
MPI_Recv(&received, 8, MPI_DOUBLE, rank - 1, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
result.wait_time = MPI_Wtime() - wait_start;
compute_start = MPI_Wtime();
if (received.is_valid()) {
DayStats prev_day = received.unpack();
// Ищем первый день с индексом > prev_day.day
for (start_idx = 0; start_idx < days.size(); start_idx++) {
if (days[start_idx].day > prev_day.day) {
break;
}
}
if (start_idx < process_until) {
// Инициализируем аккумулятор данными от предыдущего ранка
acc.init(prev_day);
have_pending_interval = true;
// Продолжаем строить интервал
for (size_t i = start_idx; i < process_until; i++) {
acc.update(days[i]);
double change = std::abs(days[i].avg - acc.start_avg) / acc.start_avg;
if (change >= threshold) {
result.intervals.push_back(acc.finalize(days[i], change));
have_pending_interval = false;
// Начинаем новый интервал
start_idx = i + 1;
if (start_idx < process_until) {
acc.init(days[start_idx]);
have_pending_interval = true;
}
}
}
}
} else {
// Предыдущий ранк не передал валидные данные, начинаем с начала
if (process_until > 0) {
acc.init(days[0]);
have_pending_interval = true;
start_idx = 0;
}
}
} else {
// Первый ранк - начинаем с первого дня
if (process_until > 0) {
acc.init(days[0]);
have_pending_interval = true;
start_idx = 0;
}
}
// Обрабатываем дни (если ещё не обработали выше)
if (rank == 0 && have_pending_interval) {
for (size_t i = 1; i < process_until; i++) {
acc.update(days[i]);
double change = std::abs(days[i].avg - acc.start_avg) / acc.start_avg;
if (change >= threshold) {
result.intervals.push_back(acc.finalize(days[i], change));
have_pending_interval = false;
// Начинаем новый интервал
start_idx = i + 1;
if (start_idx < process_until) {
acc.init(days[start_idx]);
have_pending_interval = true;
}
}
}
}
// Для последнего ранка: завершаем последний интервал на последнем дне
if (rank == size - 1 && have_pending_interval && !days.empty()) {
const auto& last_day = days.back();
double change = std::abs(last_day.avg - acc.start_avg) / acc.start_avg;
result.intervals.push_back(acc.finalize(last_day, change));
}
result.compute_time = MPI_Wtime() - compute_start;
// Передаём данные следующему ранку
if (rank < size - 1) {
PackedDayStats to_send;
if (have_pending_interval) {
// Передаём день, с которого начался незавершённый интервал
DayStats start_day;
start_day.day = acc.start_day;
start_day.avg = acc.start_avg;
start_day.open_min = acc.open_min;
start_day.open_max = acc.open_max;
start_day.close_min = acc.close_min;
start_day.close_max = acc.close_max;
start_day.count = 0;
to_send.pack(start_day);
} else if (!days.empty()) {
// Интервал завершился, передаём предпоследний день
to_send.pack(days[days.size() - 2]);
} else {
to_send.set_invalid();
}
MPI_Send(&to_send, 8, MPI_DOUBLE, rank + 1, 0, MPI_COMM_WORLD);
}
return result;
}
double collect_intervals(
std::vector<Interval>& local_intervals,
int rank, int size)
{
double wait_time = 0.0;
// Упакованный Interval для MPI (9 doubles)
// start_day, end_day, open_min, open_max, close_min, close_max, start_avg, end_avg, change
if (rank == 0) {
// Собираем интервалы со всех остальных ранков
for (int r = 1; r < size; r++) {
double wait_start = MPI_Wtime();
// Сначала получаем количество интервалов
int count;
MPI_Recv(&count, 1, MPI_INT, r, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
if (count > 0) {
std::vector<double> buffer(count * 9);
MPI_Recv(buffer.data(), count * 9, MPI_DOUBLE, r, 2, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
// Распаковываем
for (int i = 0; i < count; i++) {
Interval iv;
iv.start_day = static_cast<DayIndex>(buffer[i * 9 + 0]);
iv.end_day = static_cast<DayIndex>(buffer[i * 9 + 1]);
iv.open_min = buffer[i * 9 + 2];
iv.open_max = buffer[i * 9 + 3];
iv.close_min = buffer[i * 9 + 4];
iv.close_max = buffer[i * 9 + 5];
iv.start_avg = buffer[i * 9 + 6];
iv.end_avg = buffer[i * 9 + 7];
iv.change = buffer[i * 9 + 8];
local_intervals.push_back(iv);
}
}
wait_time += MPI_Wtime() - wait_start;
}
// Сортируем по start_day
std::sort(local_intervals.begin(), local_intervals.end(),
[](const Interval& a, const Interval& b) {
return a.start_day < b.start_day;
});
} else {
// Отправляем свои интервалы на ранк 0
int count = static_cast<int>(local_intervals.size());
MPI_Send(&count, 1, MPI_INT, 0, 1, MPI_COMM_WORLD);
if (count > 0) {
std::vector<double> buffer(count * 9);
for (int i = 0; i < count; i++) {
const auto& iv = local_intervals[i];
buffer[i * 9 + 0] = static_cast<double>(iv.start_day);
buffer[i * 9 + 1] = static_cast<double>(iv.end_day);
buffer[i * 9 + 2] = iv.open_min;
buffer[i * 9 + 3] = iv.open_max;
buffer[i * 9 + 4] = iv.close_min;
buffer[i * 9 + 5] = iv.close_max;
buffer[i * 9 + 6] = iv.start_avg;
buffer[i * 9 + 7] = iv.end_avg;
buffer[i * 9 + 8] = iv.change;
}
MPI_Send(buffer.data(), count * 9, MPI_DOUBLE, 0, 2, MPI_COMM_WORLD);
}
}
return wait_time;
}
std::string day_index_to_date(DayIndex day) {
time_t ts = static_cast<time_t>(day) * 86400;
struct tm* tm_info = gmtime(&ts);
std::ostringstream oss;
oss << std::setfill('0')
<< (tm_info->tm_year + 1900) << "-"
<< std::setw(2) << (tm_info->tm_mon + 1) << "-"
<< std::setw(2) << tm_info->tm_mday;
return oss.str();
}
void write_intervals(const std::string& filename, const std::vector<Interval>& intervals) {
std::ofstream out(filename);
out << std::fixed << std::setprecision(2);
out << "start_date,end_date,open_min,open_max,close_min,close_max,start_avg,end_avg,change\n";
for (const auto& iv : intervals) {
out << day_index_to_date(iv.start_day) << ","
<< day_index_to_date(iv.end_day) << ","
<< iv.open_min << ","
<< iv.open_max << ","
<< iv.close_min << ","
<< iv.close_max << ","
<< iv.start_avg << ","
<< iv.end_avg << ","
<< std::setprecision(6) << iv.change << "\n";
}
}

46
src/intervals.hpp Normal file
View File

@@ -0,0 +1,46 @@
#pragma once
#include "day_stats.hpp"
#include <vector>
#include <string>
// Интервал с изменением >= threshold
struct Interval {
DayIndex start_day;
DayIndex end_day;
double open_min;
double open_max;
double close_min;
double close_max;
double start_avg;
double end_avg;
double change;
};
// Результат параллельного построения интервалов
struct IntervalResult {
std::vector<Interval> intervals;
double compute_time; // время вычислений
double wait_time; // время ожидания данных от предыдущего ранка
};
// Параллельное построение интервалов с использованием MPI
// Каждый ранк обрабатывает свою часть дней и передаёт незавершённый интервал следующему
IntervalResult find_intervals_parallel(
const std::vector<DayStats>& days,
int rank, int size,
double threshold = 0.10
);
// Сбор интервалов со всех ранков на ранк 0
// Возвращает время ожидания данных
double collect_intervals(
std::vector<Interval>& local_intervals,
int rank, int size
);
// Вывод интервалов в файл
void write_intervals(const std::string& filename, const std::vector<Interval>& intervals);
// Преобразование DayIndex в строку даты (YYYY-MM-DD)
std::string day_index_to_date(DayIndex day);

View File

@@ -1,87 +1,92 @@
#include <mpi.h> #include <mpi.h>
#include <iostream> #include <iostream>
#include <vector> #include <vector>
#include <map> #include <iomanip>
#include "csv_loader.hpp" #include "csv_loader.hpp"
#include "utils.hpp"
#include "record.hpp" #include "record.hpp"
#include "gpu_loader.hpp" #include "day_stats.hpp"
#include "aggregation.hpp"
// Функция: отобрать записи для конкретного ранга #include "intervals.hpp"
std::vector<Record> select_records_for_rank( #include "utils.hpp"
const std::map<long long, std::vector<Record>>& days,
const std::vector<long long>& day_list)
{
std::vector<Record> out;
for (auto d : day_list) {
auto it = days.find(d);
if (it != days.end()) {
const auto& vec = it->second;
out.insert(out.end(), vec.begin(), vec.end());
}
}
return out;
}
int main(int argc, char** argv) { int main(int argc, char** argv) {
MPI_Init(&argc, &argv); MPI_Init(&argc, &argv);
double total_start = MPI_Wtime();
int rank, size; int rank, size;
MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size); MPI_Comm_size(MPI_COMM_WORLD, &size);
std::vector<Record> local_records; // Параллельное чтение данных
double read_start = MPI_Wtime();
std::vector<Record> records = load_csv_parallel(rank, size);
double read_time = MPI_Wtime() - read_start;
std::cout << "Rank " << rank
<< ": read " << records.size() << " records"
<< " in " << std::fixed << std::setprecision(3) << read_time << " sec"
<< std::endl;
// Агрегация по дням
double agg_start = MPI_Wtime();
std::vector<DayStats> days = aggregate_days(records);
double agg_time = MPI_Wtime() - agg_start;
std::cout << "Rank " << rank
<< ": aggregated " << days.size() << " days"
<< " [" << (days.empty() ? 0 : days.front().day)
<< ".." << (days.empty() ? 0 : days.back().day) << "]"
<< " in " << std::fixed << std::setprecision(3) << agg_time << " sec"
<< std::endl;
// Удаляем крайние дни (могут быть неполными из-за параллельного чтения)
trim_edge_days(days, rank, size);
std::cout << "Rank " << rank
<< ": after trim " << days.size() << " days"
<< " [" << (days.empty() ? 0 : days.front().day)
<< ".." << (days.empty() ? 0 : days.back().day) << "]"
<< std::endl;
// Параллельное построение интервалов
IntervalResult iv_result = find_intervals_parallel(days, rank, size);
std::cout << "Rank " << rank
<< ": found " << iv_result.intervals.size() << " intervals"
<< ", compute " << std::fixed << std::setprecision(6) << iv_result.compute_time << " sec"
<< ", wait " << iv_result.wait_time << " sec"
<< std::endl;
// Сбор интервалов на ранке 0
double collect_wait = collect_intervals(iv_result.intervals, rank, size);
if (rank == 0) { if (rank == 0) {
std::cout << "Rank 0 loading CSV..." << std::endl; std::cout << "Rank 0: collected " << iv_result.intervals.size() << " total intervals"
<< ", wait " << std::fixed << std::setprecision(3) << collect_wait << " sec"
// Запускаем из build << std::endl;
auto records = load_csv("../data/data.csv");
auto days = group_by_day(records);
auto parts = split_days(days, size);
// Рассылаем данные
for (int r = 0; r < size; r++) {
auto vec = select_records_for_rank(days, parts[r]);
if (r == 0) {
// себе не отправляем — сразу сохраняем
local_records = vec;
continue;
}
int count = vec.size();
MPI_Send(&count, 1, MPI_INT, r, 0, MPI_COMM_WORLD);
MPI_Send(vec.data(), count * sizeof(Record), MPI_BYTE, r, 1, MPI_COMM_WORLD);
}
}
else {
// Принимает данные
int count = 0;
MPI_Recv(&count, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
local_records.resize(count);
MPI_Recv(local_records.data(), count * sizeof(Record),
MPI_BYTE, 0, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
} }
// Запись результатов в файл (только ранк 0)
if (rank == 0) {
double write_start = MPI_Wtime();
write_intervals("result.csv", iv_result.intervals);
double write_time = MPI_Wtime() - write_start;
std::cout << "Rank 0: wrote result.csv"
<< " in " << std::fixed << std::setprecision(3) << write_time << " sec"
<< std::endl;
}
// Вывод общего времени выполнения
MPI_Barrier(MPI_COMM_WORLD); MPI_Barrier(MPI_COMM_WORLD);
double total_time = MPI_Wtime() - total_start;
std::cout << "Rank " << rank << " received " if (rank == 0) {
<< local_records.size() << " records" << std::endl; std::cout << "Total execution time: "
<< std::fixed << std::setprecision(3)
<< total_time << " sec" << std::endl;
auto gpu_is_available = load_gpu_is_available();
int have_gpu = 0;
if (gpu_is_available) {
std::cout << "Rank " << rank << " dll loaded" << std::endl;
have_gpu = gpu_is_available();
} }
std::cout << "Rank " << rank << ": gpu_available=" << have_gpu << "\n";
MPI_Finalize(); MPI_Finalize();
return 0; return 0;
} }

View File

@@ -1,11 +0,0 @@
#include "mpi_utils.hpp"
#include <mpi.h>
#include <iostream>
void mpi_print_basic() {
int rank, size;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
std::cout << "Hello from rank " << rank << " of " << size << std::endl;
}

View File

@@ -1,2 +0,0 @@
#pragma once
void mpi_print_basic();

View File

@@ -1,4 +1,8 @@
#include "utils.hpp" #include "utils.hpp"
#include <fstream>
#include <sstream>
#include <stdexcept>
#include <numeric>
std::map<DayIndex, std::vector<Record>> group_by_day(const std::vector<Record>& recs) { std::map<DayIndex, std::vector<Record>> group_by_day(const std::vector<Record>& recs) {
std::map<DayIndex, std::vector<Record>> days; std::map<DayIndex, std::vector<Record>> days;
@@ -23,3 +27,101 @@ std::vector<std::vector<DayIndex>> split_days(const std::map<DayIndex, std::vect
return out; return out;
} }
int get_num_cpu_threads() {
const char* env_threads = std::getenv("NUM_CPU_THREADS");
int num_cpu_threads = 1;
if (env_threads) {
num_cpu_threads = std::atoi(env_threads);
if (num_cpu_threads < 1) num_cpu_threads = 1;
}
return num_cpu_threads;
}
std::string get_env(const char* name) {
const char* env = std::getenv(name);
if (!env) {
throw std::runtime_error(std::string("Environment variable not set: ") + name);
}
return std::string(env);
}
std::string get_data_path() {
return get_env("DATA_PATH");
}
std::vector<int> get_data_read_shares() {
std::vector<int> shares;
std::stringstream ss(get_env("DATA_READ_SHARES"));
std::string item;
while (std::getline(ss, item, ',')) {
shares.push_back(std::stoi(item));
}
return shares;
}
int64_t get_read_overlap_bytes() {
return std::stoll(get_env("READ_OVERLAP_BYTES"));
}
int64_t get_file_size(const std::string& path) {
std::ifstream file(path, std::ios::binary | std::ios::ate);
if (!file.is_open()) {
throw std::runtime_error("Cannot open file: " + path);
}
return static_cast<int64_t>(file.tellg());
}
ByteRange calculate_byte_range(int rank, int size, int64_t file_size,
const std::vector<int>& shares, int64_t overlap_bytes) {
// Если shares пустой или не соответствует size, используем равные доли
std::vector<int> effective_shares;
if (shares.size() == static_cast<size_t>(size)) {
effective_shares = shares;
} else {
effective_shares.assign(size, 1);
}
int total_shares = std::accumulate(effective_shares.begin(), effective_shares.end(), 0);
// Вычисляем базовые границы для каждого ранка
int64_t bytes_per_share = file_size / total_shares;
int64_t base_start = 0;
for (int i = 0; i < rank; i++) {
base_start += bytes_per_share * effective_shares[i];
}
int64_t base_end = base_start + bytes_per_share * effective_shares[rank];
// Применяем overlap
ByteRange range;
if (rank == 0) {
// Первый ранк: начинаем с 0, добавляем overlap в конце
range.start = 0;
range.end = std::min(base_end + overlap_bytes, file_size);
} else if (rank == size - 1) {
// Последний ранк: вычитаем overlap в начале, читаем до конца файла
range.start = std::max(base_start - overlap_bytes, static_cast<int64_t>(0));
range.end = file_size;
} else {
// Промежуточные ранки: overlap с обеих сторон
range.start = std::max(base_start - overlap_bytes, static_cast<int64_t>(0));
range.end = std::min(base_end + overlap_bytes, file_size);
}
return range;
}
void trim_edge_days(std::vector<DayStats>& days, int rank, int size) {
if (days.empty()) return;
if (rank == 0) {
days.pop_back();
} else if (rank == size - 1) {
days.erase(days.begin());
} else {
days.pop_back();
days.erase(days.begin());
}
}

View File

@@ -1,11 +1,38 @@
#pragma once #pragma once
#include "record.hpp" #include "record.hpp"
#include "day_stats.hpp"
#include <map> #include <map>
#include <vector> #include <vector>
#include <string>
#include <cstdlib>
#include <cstdint>
using DayIndex = long long; // Группировка записей по дням
std::map<DayIndex, std::vector<Record>> group_by_day(const std::vector<Record>& recs); std::map<DayIndex, std::vector<Record>> group_by_day(const std::vector<Record>& recs);
std::vector<std::vector<DayIndex>> split_days(const std::map<DayIndex, std::vector<Record>>& days, int parts); std::vector<std::vector<DayIndex>> split_days(const std::map<DayIndex, std::vector<Record>>& days, int parts);
// Чтение переменных окружения
int get_num_cpu_threads();
std::string get_data_path();
std::vector<int> get_data_read_shares();
int64_t get_read_overlap_bytes();
// Структура для хранения диапазона байт для чтения
struct ByteRange {
int64_t start;
int64_t end; // exclusive
};
// Вычисляет диапазон байт для конкретного ранка
ByteRange calculate_byte_range(int rank, int size, int64_t file_size,
const std::vector<int>& shares, int64_t overlap_bytes);
// Получение размера файла
int64_t get_file_size(const std::string& path);
// Удаляет крайние дни, которые могут быть неполными из-за параллельного чтения
// rank 0: удаляет последний день
// последний rank: удаляет первый день
// промежуточные: удаляют первый и последний дни
void trim_edge_days(std::vector<DayStats>& days, int rank, int size);