Compare commits

..

7 Commits

Author SHA1 Message Date
ab18d9770f Агрегация 2025-12-13 12:45:29 +00:00
6a22dc3ef7 Параллельное чтение данных 2025-12-13 12:13:23 +00:00
f90a641754 gpu_is_available 2025-12-13 11:07:31 +00:00
10bd6db2b8 Замечание про NFS 2025-12-13 11:06:41 +00:00
d82fde7116 Уточнил задание 2025-12-11 16:47:30 +03:00
7f16a5c17a Больше таймеров 2025-12-11 10:08:22 +00:00
44f297e55a Удалил бесполезные файлы 2025-12-11 09:06:48 +00:00
19 changed files with 778 additions and 645 deletions

2
.gitignore vendored
View File

@@ -1,4 +1,4 @@
data data
build build
out.txt out.txt
result.csv *.csv

View File

@@ -10,10 +10,30 @@
не менее чем на 10% от даты начала интервала, вместе с минимальными и максимальными не менее чем на 10% от даты начала интервала, вместе с минимальными и максимальными
значениями Open и Close за все дни внутри интервала. значениями Open и Close за все дни внутри интервала.
## Параллельное чтение данных
Нет смысла параллельно читать данные из NFS, так как в реальности файлы с данными
будут лежать только на NFS сервере. То есть другие узлы лишь отправляют сетевые запросы
на NFS сервер, который уже читает реальные данные с диска и лишь затем отправляет
их другим узлам.
Чтобы этого избежать, нужно на всех машинах скопировать файлы с данными в их реальные
файловые системы. Например в папку `/data`.
```sh
# На каждом узле создаем директорию /data
sudo mkdir /data
sudo chown $USER /data
# Копируем данные
cd /mnt/shared/supercomputers/data
cp data.csv /data/
```
## Сборка ## Сборка
Проект обязательно должен быть расположен в общей директории для всех узлов, Проект обязательно должен быть расположен в общей директории для всех узлов,
например, в `/mnt/shared/supercomputers/bitcoin-project/build`. например, в `/mnt/shared/supercomputers/build`.
Перед запуском указать актуальный путь в `run.slurm`. Перед запуском указать актуальный путь в `run.slurm`.
```sh ```sh

View File

@@ -2,7 +2,7 @@
"cells": [ "cells": [
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 1, "execution_count": 20,
"id": "2acce44b", "id": "2acce44b",
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
@@ -12,7 +12,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 14, "execution_count": 21,
"id": "5ba70af7", "id": "5ba70af7",
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
@@ -111,7 +111,7 @@
"7317758 0.410369 " "7317758 0.410369 "
] ]
}, },
"execution_count": 14, "execution_count": 21,
"metadata": {}, "metadata": {},
"output_type": "execute_result" "output_type": "execute_result"
} }
@@ -124,8 +124,8 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 19, "execution_count": 23,
"id": "d4b22f3b", "id": "3b320537",
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
@@ -150,67 +150,190 @@
" <tr style=\"text-align: right;\">\n", " <tr style=\"text-align: right;\">\n",
" <th></th>\n", " <th></th>\n",
" <th>Timestamp</th>\n", " <th>Timestamp</th>\n",
" <th>Low</th>\n",
" <th>High</th>\n",
" <th>Open</th>\n", " <th>Open</th>\n",
" <th>High</th>\n",
" <th>Low</th>\n",
" <th>Close</th>\n", " <th>Close</th>\n",
" <th>Volume</th>\n",
" <th>Avg</th>\n",
" </tr>\n", " </tr>\n",
" </thead>\n", " </thead>\n",
" <tbody>\n", " <tbody>\n",
" <tr>\n", " <tr>\n",
" <th>5078</th>\n", " <th>7317754</th>\n",
" <td>2025-11-26</td>\n", " <td>2025-11-30 23:55:00+00:00</td>\n",
" <td>86304.0</td>\n", " <td>90405.0</td>\n",
" <td>90646.0</td>\n", " <td>90452.0</td>\n",
" <td>87331.0</td>\n", " <td>90403.0</td>\n",
" <td>90477.0</td>\n", " <td>90452.0</td>\n",
" <td>0.531700</td>\n",
" <td>90427.5</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>5079</th>\n", " <th>7317755</th>\n",
" <td>2025-11-27</td>\n", " <td>2025-11-30 23:56:00+00:00</td>\n",
" <td>90091.0</td>\n", " <td>90452.0</td>\n",
" <td>91926.0</td>\n", " <td>90481.0</td>\n",
" <td>90476.0</td>\n", " <td>90420.0</td>\n",
" <td>91325.0</td>\n", " <td>90420.0</td>\n",
" <td>0.055547</td>\n",
" <td>90450.5</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>5080</th>\n", " <th>7317756</th>\n",
" <td>2025-11-28</td>\n", " <td>2025-11-30 23:57:00+00:00</td>\n",
" <td>90233.0</td>\n", " <td>90412.0</td>\n",
" <td>93091.0</td>\n", " <td>90458.0</td>\n",
" <td>91326.0</td>\n", " <td>90396.0</td>\n",
" <td>90913.0</td>\n", " <td>90435.0</td>\n",
" <td>0.301931</td>\n",
" <td>90427.0</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>5081</th>\n", " <th>7317757</th>\n",
" <td>2025-11-29</td>\n", " <td>2025-11-30 23:58:00+00:00</td>\n",
" <td>90216.0</td>\n", " <td>90428.0</td>\n",
" <td>91179.0</td>\n", " <td>90428.0</td>\n",
" <td>90913.0</td>\n", " <td>90362.0</td>\n",
" <td>90832.0</td>\n", " <td>90362.0</td>\n",
" </tr>\n", " <td>4.591653</td>\n",
" <tr>\n", " <td>90395.0</td>\n",
" <th>5082</th>\n", " </tr>\n",
" <td>2025-11-30</td>\n", " <tr>\n",
" <th>7317758</th>\n",
" <td>2025-11-30 23:59:00+00:00</td>\n",
" <td>90363.0</td>\n",
" <td>90386.0</td>\n",
" <td>90362.0</td>\n", " <td>90362.0</td>\n",
" <td>91969.0</td>\n",
" <td>90832.0</td>\n",
" <td>90382.0</td>\n", " <td>90382.0</td>\n",
" <td>0.410369</td>\n",
" <td>90374.0</td>\n",
" </tr>\n", " </tr>\n",
" </tbody>\n", " </tbody>\n",
"</table>\n", "</table>\n",
"</div>" "</div>"
], ],
"text/plain": [ "text/plain": [
" Timestamp Low High Open Close\n", " Timestamp Open High Low Close \\\n",
"5078 2025-11-26 86304.0 90646.0 87331.0 90477.0\n", "7317754 2025-11-30 23:55:00+00:00 90405.0 90452.0 90403.0 90452.0 \n",
"5079 2025-11-27 90091.0 91926.0 90476.0 91325.0\n", "7317755 2025-11-30 23:56:00+00:00 90452.0 90481.0 90420.0 90420.0 \n",
"5080 2025-11-28 90233.0 93091.0 91326.0 90913.0\n", "7317756 2025-11-30 23:57:00+00:00 90412.0 90458.0 90396.0 90435.0 \n",
"5081 2025-11-29 90216.0 91179.0 90913.0 90832.0\n", "7317757 2025-11-30 23:58:00+00:00 90428.0 90428.0 90362.0 90362.0 \n",
"5082 2025-11-30 90362.0 91969.0 90832.0 90382.0" "7317758 2025-11-30 23:59:00+00:00 90363.0 90386.0 90362.0 90382.0 \n",
"\n",
" Volume Avg \n",
"7317754 0.531700 90427.5 \n",
"7317755 0.055547 90450.5 \n",
"7317756 0.301931 90427.0 \n",
"7317757 4.591653 90395.0 \n",
"7317758 0.410369 90374.0 "
] ]
}, },
"execution_count": 19, "execution_count": 23,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df['Avg'] = (df['Low'] + df['High']) / 2\n",
"df.tail()"
]
},
{
"cell_type": "code",
"execution_count": 25,
"id": "4b1cd63c",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>Timestamp</th>\n",
" <th>Avg</th>\n",
" <th>OpenMin</th>\n",
" <th>OpenMax</th>\n",
" <th>CloseMin</th>\n",
" <th>CloseMax</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>5078</th>\n",
" <td>2025-11-26</td>\n",
" <td>88057.301736</td>\n",
" <td>86312.0</td>\n",
" <td>90574.0</td>\n",
" <td>86323.0</td>\n",
" <td>90574.0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5079</th>\n",
" <td>2025-11-27</td>\n",
" <td>91245.092708</td>\n",
" <td>90126.0</td>\n",
" <td>91888.0</td>\n",
" <td>90126.0</td>\n",
" <td>91925.0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5080</th>\n",
" <td>2025-11-28</td>\n",
" <td>91324.308681</td>\n",
" <td>90255.0</td>\n",
" <td>92970.0</td>\n",
" <td>90283.0</td>\n",
" <td>92966.0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5081</th>\n",
" <td>2025-11-29</td>\n",
" <td>90746.479514</td>\n",
" <td>90265.0</td>\n",
" <td>91158.0</td>\n",
" <td>90279.0</td>\n",
" <td>91179.0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5082</th>\n",
" <td>2025-11-30</td>\n",
" <td>91187.356250</td>\n",
" <td>90363.0</td>\n",
" <td>91940.0</td>\n",
" <td>90362.0</td>\n",
" <td>91940.0</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" Timestamp Avg OpenMin OpenMax CloseMin CloseMax\n",
"5078 2025-11-26 88057.301736 86312.0 90574.0 86323.0 90574.0\n",
"5079 2025-11-27 91245.092708 90126.0 91888.0 90126.0 91925.0\n",
"5080 2025-11-28 91324.308681 90255.0 92970.0 90283.0 92966.0\n",
"5081 2025-11-29 90746.479514 90265.0 91158.0 90279.0 91179.0\n",
"5082 2025-11-30 91187.356250 90363.0 91940.0 90362.0 91940.0"
]
},
"execution_count": 25,
"metadata": {}, "metadata": {},
"output_type": "execute_result" "output_type": "execute_result"
} }
@@ -218,7 +341,13 @@
"source": [ "source": [
"df_days = (\n", "df_days = (\n",
" df.groupby(df[\"Timestamp\"].dt.date)\n", " df.groupby(df[\"Timestamp\"].dt.date)\n",
" .agg({\"Low\": \"min\", \"High\": \"max\", \"Open\": \"first\", \"Close\": \"last\"})\n", " .agg(\n",
" Avg=(\"Avg\", \"mean\"),\n",
" OpenMin=(\"Open\", \"min\"),\n",
" OpenMax=(\"Open\", \"max\"),\n",
" CloseMin=(\"Close\", \"min\"),\n",
" CloseMax=(\"Close\", \"max\"),\n",
" )\n",
" .reset_index()\n", " .reset_index()\n",
")\n", ")\n",
"df_days.tail()" "df_days.tail()"
@@ -226,111 +355,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 21, "execution_count": 26,
"id": "91823496",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>Timestamp</th>\n",
" <th>Low</th>\n",
" <th>High</th>\n",
" <th>Open</th>\n",
" <th>Close</th>\n",
" <th>Avg</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>5078</th>\n",
" <td>2025-11-26</td>\n",
" <td>86304.0</td>\n",
" <td>90646.0</td>\n",
" <td>87331.0</td>\n",
" <td>90477.0</td>\n",
" <td>88475.0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5079</th>\n",
" <td>2025-11-27</td>\n",
" <td>90091.0</td>\n",
" <td>91926.0</td>\n",
" <td>90476.0</td>\n",
" <td>91325.0</td>\n",
" <td>91008.5</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5080</th>\n",
" <td>2025-11-28</td>\n",
" <td>90233.0</td>\n",
" <td>93091.0</td>\n",
" <td>91326.0</td>\n",
" <td>90913.0</td>\n",
" <td>91662.0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5081</th>\n",
" <td>2025-11-29</td>\n",
" <td>90216.0</td>\n",
" <td>91179.0</td>\n",
" <td>90913.0</td>\n",
" <td>90832.0</td>\n",
" <td>90697.5</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5082</th>\n",
" <td>2025-11-30</td>\n",
" <td>90362.0</td>\n",
" <td>91969.0</td>\n",
" <td>90832.0</td>\n",
" <td>90382.0</td>\n",
" <td>91165.5</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" Timestamp Low High Open Close Avg\n",
"5078 2025-11-26 86304.0 90646.0 87331.0 90477.0 88475.0\n",
"5079 2025-11-27 90091.0 91926.0 90476.0 91325.0 91008.5\n",
"5080 2025-11-28 90233.0 93091.0 91326.0 90913.0 91662.0\n",
"5081 2025-11-29 90216.0 91179.0 90913.0 90832.0 90697.5\n",
"5082 2025-11-30 90362.0 91969.0 90832.0 90382.0 91165.5"
]
},
"execution_count": 21,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df_days[\"Avg\"] = (df_days[\"Low\"] + df_days[\"High\"]) / 2\n",
"df_days.tail()"
]
},
{
"cell_type": "code",
"execution_count": 25,
"id": "9a7b3310", "id": "9a7b3310",
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
@@ -358,6 +383,8 @@
" <th>start_date</th>\n", " <th>start_date</th>\n",
" <th>end_date</th>\n", " <th>end_date</th>\n",
" <th>min_open</th>\n", " <th>min_open</th>\n",
" <th>max_open</th>\n",
" <th>min_close</th>\n",
" <th>max_close</th>\n", " <th>max_close</th>\n",
" <th>start_avg</th>\n", " <th>start_avg</th>\n",
" <th>end_avg</th>\n", " <th>end_avg</th>\n",
@@ -366,76 +393,86 @@
" </thead>\n", " </thead>\n",
" <tbody>\n", " <tbody>\n",
" <tr>\n", " <tr>\n",
" <th>335</th>\n", " <th>316</th>\n",
" <td>2025-02-27</td>\n", " <td>2025-02-27</td>\n",
" <td>2025-04-23</td>\n", " <td>2025-04-25</td>\n",
" <td>76252.0</td>\n", " <td>74509.0</td>\n",
" <td>94273.0</td>\n", " <td>95801.0</td>\n",
" <td>84801.5</td>\n", " <td>74515.0</td>\n",
" <td>93335.0</td>\n", " <td>95800.0</td>\n",
" <td>0.100629</td>\n", " <td>85166.063889</td>\n",
" <td>94303.907292</td>\n",
" <td>0.107294</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>336</th>\n", " <th>317</th>\n",
" <td>2025-04-24</td>\n", " <td>2025-04-26</td>\n",
" <td>2025-05-09</td>\n", " <td>2025-05-11</td>\n",
" <td>93730.0</td>\n", " <td>92877.0</td>\n",
" <td>103261.0</td>\n", " <td>104971.0</td>\n",
" <td>92867.5</td>\n", " <td>92872.0</td>\n",
" <td>103341.0</td>\n", " <td>104965.0</td>\n",
" <td>0.112779</td>\n", " <td>94500.950347</td>\n",
" <td>104182.167708</td>\n",
" <td>0.102446</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>337</th>\n", " <th>318</th>\n",
" <td>2025-05-10</td>\n", " <td>2025-05-12</td>\n",
" <td>2025-07-11</td>\n", " <td>2025-07-11</td>\n",
" <td>100990.0</td>\n", " <td>98384.0</td>\n",
" <td>117579.0</td>\n", " <td>118833.0</td>\n",
" <td>103915.0</td>\n", " <td>98382.0</td>\n",
" <td>117032.5</td>\n", " <td>118839.0</td>\n",
" <td>0.126233</td>\n", " <td>103569.791319</td>\n",
" <td>117463.666667</td>\n",
" <td>0.134150</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>338</th>\n", " <th>319</th>\n",
" <td>2025-07-12</td>\n", " <td>2025-07-12</td>\n",
" <td>2025-11-04</td>\n", " <td>2025-11-04</td>\n",
" <td>106470.0</td>\n", " <td>98944.0</td>\n",
" <td>124728.0</td>\n", " <td>126202.0</td>\n",
" <td>117599.0</td>\n", " <td>98943.0</td>\n",
" <td>103079.0</td>\n", " <td>126202.0</td>\n",
" <td>0.123470</td>\n", " <td>117640.026389</td>\n",
" <td>103712.985764</td>\n",
" <td>0.118387</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>339</th>\n", " <th>320</th>\n",
" <td>2025-11-05</td>\n", " <td>2025-11-05</td>\n",
" <td>2025-11-18</td>\n", " <td>2025-11-18</td>\n",
" <td>92112.0</td>\n", " <td>89291.0</td>\n",
" <td>105972.0</td>\n", " <td>107343.0</td>\n",
" <td>101737.5</td>\n", " <td>89286.0</td>\n",
" <td>91471.0</td>\n", " <td>107343.0</td>\n",
" <td>0.100912</td>\n", " <td>102514.621181</td>\n",
" <td>91705.833333</td>\n",
" <td>0.105437</td>\n",
" </tr>\n", " </tr>\n",
" </tbody>\n", " </tbody>\n",
"</table>\n", "</table>\n",
"</div>" "</div>"
], ],
"text/plain": [ "text/plain": [
" start_date end_date min_open max_close start_avg end_avg \\\n", " start_date end_date min_open max_open min_close max_close \\\n",
"335 2025-02-27 2025-04-23 76252.0 94273.0 84801.5 93335.0 \n", "316 2025-02-27 2025-04-25 74509.0 95801.0 74515.0 95800.0 \n",
"336 2025-04-24 2025-05-09 93730.0 103261.0 92867.5 103341.0 \n", "317 2025-04-26 2025-05-11 92877.0 104971.0 92872.0 104965.0 \n",
"337 2025-05-10 2025-07-11 100990.0 117579.0 103915.0 117032.5 \n", "318 2025-05-12 2025-07-11 98384.0 118833.0 98382.0 118839.0 \n",
"338 2025-07-12 2025-11-04 106470.0 124728.0 117599.0 103079.0 \n", "319 2025-07-12 2025-11-04 98944.0 126202.0 98943.0 126202.0 \n",
"339 2025-11-05 2025-11-18 92112.0 105972.0 101737.5 91471.0 \n", "320 2025-11-05 2025-11-18 89291.0 107343.0 89286.0 107343.0 \n",
"\n", "\n",
" change \n", " start_avg end_avg change \n",
"335 0.100629 \n", "316 85166.063889 94303.907292 0.107294 \n",
"336 0.112779 \n", "317 94500.950347 104182.167708 0.102446 \n",
"337 0.126233 \n", "318 103569.791319 117463.666667 0.134150 \n",
"338 0.123470 \n", "319 117640.026389 103712.985764 0.118387 \n",
"339 0.100912 " "320 102514.621181 91705.833333 0.105437 "
] ]
}, },
"execution_count": 25, "execution_count": 26,
"metadata": {}, "metadata": {},
"output_type": "execute_result" "output_type": "execute_result"
} }
@@ -455,8 +492,10 @@
" intervals.append({\n", " intervals.append({\n",
" \"start_date\": df_days.loc[start_idx, \"Timestamp\"],\n", " \"start_date\": df_days.loc[start_idx, \"Timestamp\"],\n",
" \"end_date\": df_days.loc[i, \"Timestamp\"],\n", " \"end_date\": df_days.loc[i, \"Timestamp\"],\n",
" \"min_open\": interval[\"Open\"].min(),\n", " \"min_open\": interval[\"OpenMin\"].min(),\n",
" \"max_close\": interval[\"Close\"].max(),\n", " \"max_open\": interval[\"OpenMax\"].max(),\n",
" \"min_close\": interval[\"CloseMin\"].min(),\n",
" \"max_close\": interval[\"CloseMax\"].max(),\n",
" \"start_avg\": price_base,\n", " \"start_avg\": price_base,\n",
" \"end_avg\": price_now,\n", " \"end_avg\": price_now,\n",
" \"change\": change,\n", " \"change\": change,\n",
@@ -470,6 +509,14 @@
"df_intervals = pd.DataFrame(intervals)\n", "df_intervals = pd.DataFrame(intervals)\n",
"df_intervals.tail()" "df_intervals.tail()"
] ]
},
{
"cell_type": "code",
"execution_count": null,
"id": "07f1cd58",
"metadata": {},
"outputs": [],
"source": []
} }
], ],
"metadata": { "metadata": {

View File

@@ -5,8 +5,14 @@
#SBATCH --cpus-per-task=2 #SBATCH --cpus-per-task=2
#SBATCH --output=out.txt #SBATCH --output=out.txt
# Количество CPU потоков на узел (должно соответствовать cpus-per-task) # Путь к файлу данных (должен существовать на всех узлах)
export NUM_CPU_THREADS=2 export DATA_PATH="/mnt/shared/supercomputers/data/data.csv"
# Доли данных для каждого ранка (сумма определяет пропорции)
export DATA_READ_SHARES="10,12,13,13"
# Размер перекрытия в байтах для обработки границ строк
export READ_OVERLAP_BYTES=131072
cd /mnt/shared/supercomputers/build cd /mnt/shared/supercomputers/build
mpirun -np $SLURM_NTASKS ./bitcoin_app mpirun -np $SLURM_NTASKS ./bitcoin_app

View File

@@ -1,87 +1,48 @@
#include "aggregation.hpp" #include "aggregation.hpp"
#include <map>
#include <algorithm> #include <algorithm>
#include <limits> #include <limits>
#include <cmath>
std::vector<DayStats> aggregate_days(const std::vector<Record>& records) { std::vector<DayStats> aggregate_days(const std::vector<Record>& records) {
// Группируем записи по дням // Накопители для каждого дня
std::map<DayIndex, std::vector<const Record*>> day_records; struct DayAccumulator {
double avg_sum = 0.0;
double open_min = std::numeric_limits<double>::max();
double open_max = std::numeric_limits<double>::lowest();
double close_min = std::numeric_limits<double>::max();
double close_max = std::numeric_limits<double>::lowest();
int64_t count = 0;
};
std::map<DayIndex, DayAccumulator> days;
for (const auto& r : records) { for (const auto& r : records) {
DayIndex day = static_cast<DayIndex>(r.timestamp) / 86400; DayIndex day = static_cast<DayIndex>(r.timestamp) / 86400;
day_records[day].push_back(&r); auto& acc = days[day];
double avg = (r.low + r.high) / 2.0;
acc.avg_sum += avg;
acc.open_min = std::min(acc.open_min, r.open);
acc.open_max = std::max(acc.open_max, r.open);
acc.close_min = std::min(acc.close_min, r.close);
acc.close_max = std::max(acc.close_max, r.close);
acc.count++;
} }
std::vector<DayStats> result; std::vector<DayStats> result;
result.reserve(day_records.size()); result.reserve(days.size());
for (auto& [day, recs] : day_records) { for (const auto& [day, acc] : days) {
// Сортируем по timestamp для определения first/last
std::sort(recs.begin(), recs.end(),
[](const Record* a, const Record* b) {
return a->timestamp < b->timestamp;
});
DayStats stats; DayStats stats;
stats.day = day; stats.day = day;
stats.low = std::numeric_limits<double>::max(); stats.avg = acc.avg_sum / static_cast<double>(acc.count);
stats.high = std::numeric_limits<double>::lowest(); stats.open_min = acc.open_min;
stats.open = recs.front()->open; stats.open_max = acc.open_max;
stats.close = recs.back()->close; stats.close_min = acc.close_min;
stats.first_ts = recs.front()->timestamp; stats.close_max = acc.close_max;
stats.last_ts = recs.back()->timestamp; stats.count = acc.count;
for (const auto* r : recs) {
stats.low = std::min(stats.low, r->low);
stats.high = std::max(stats.high, r->high);
}
stats.avg = (stats.low + stats.high) / 2.0;
result.push_back(stats); result.push_back(stats);
} }
return result; return result;
} }
std::vector<DayStats> merge_day_stats(const std::vector<DayStats>& all_stats) {
// Объединяем статистику по одинаковым дням (если такие есть)
std::map<DayIndex, DayStats> merged;
for (const auto& s : all_stats) {
auto it = merged.find(s.day);
if (it == merged.end()) {
merged[s.day] = s;
} else {
// Объединяем данные за один день
auto& m = it->second;
m.low = std::min(m.low, s.low);
m.high = std::max(m.high, s.high);
// open берём от записи с меньшим timestamp
if (s.first_ts < m.first_ts) {
m.open = s.open;
m.first_ts = s.first_ts;
}
// close берём от записи с большим timestamp
if (s.last_ts > m.last_ts) {
m.close = s.close;
m.last_ts = s.last_ts;
}
m.avg = (m.low + m.high) / 2.0;
}
}
// Преобразуем в отсортированный вектор
std::vector<DayStats> result;
result.reserve(merged.size());
for (auto& [day, stats] : merged) {
result.push_back(stats);
}
return result;
}

View File

@@ -3,12 +3,6 @@
#include "record.hpp" #include "record.hpp"
#include "day_stats.hpp" #include "day_stats.hpp"
#include <vector> #include <vector>
#include <map>
// Агрегация записей по дням на одном узле // Агрегация записей по дням на одном узле
std::vector<DayStats> aggregate_days(const std::vector<Record>& records); std::vector<DayStats> aggregate_days(const std::vector<Record>& records);
// Объединение агрегированных данных с разных узлов
// (на случай если один день попал на разные узлы - но в нашей схеме это не должно случиться)
std::vector<DayStats> merge_day_stats(const std::vector<DayStats>& all_stats);

View File

@@ -2,46 +2,134 @@
#include <fstream> #include <fstream>
#include <sstream> #include <sstream>
#include <iostream> #include <iostream>
#include <stdexcept>
std::vector<Record> load_csv(const std::string& filename) { bool parse_csv_line(const std::string& line, Record& record) {
if (line.empty()) {
return false;
}
std::stringstream ss(line);
std::string item;
try {
// timestamp
if (!std::getline(ss, item, ',') || item.empty()) return false;
record.timestamp = std::stod(item);
// open
if (!std::getline(ss, item, ',') || item.empty()) return false;
record.open = std::stod(item);
// high
if (!std::getline(ss, item, ',') || item.empty()) return false;
record.high = std::stod(item);
// low
if (!std::getline(ss, item, ',') || item.empty()) return false;
record.low = std::stod(item);
// close
if (!std::getline(ss, item, ',') || item.empty()) return false;
record.close = std::stod(item);
// volume
if (!std::getline(ss, item, ',')) return false;
// Volume может быть пустым или содержать данные
if (item.empty()) {
record.volume = 0.0;
} else {
record.volume = std::stod(item);
}
return true;
} catch (const std::exception&) {
return false;
}
}
std::vector<Record> load_csv_parallel(int rank, int size) {
std::vector<Record> data; std::vector<Record> data;
std::ifstream file(filename);
// Читаем настройки из переменных окружения
std::string data_path = get_data_path();
std::vector<int> shares = get_data_read_shares();
int64_t overlap_bytes = get_read_overlap_bytes();
// Получаем размер файла
int64_t file_size = get_file_size(data_path);
// Вычисляем диапазон байт для этого ранка
ByteRange range = calculate_byte_range(rank, size, file_size, shares, overlap_bytes);
// Открываем файл и читаем нужный диапазон
std::ifstream file(data_path, std::ios::binary);
if (!file.is_open()) { if (!file.is_open()) {
throw std::runtime_error("Cannot open file: " + filename); throw std::runtime_error("Cannot open file: " + data_path);
} }
std::string line; // Переходим к началу диапазона
file.seekg(range.start);
// читаем первую строку (заголовок)
std::getline(file, line); // Читаем данные в буфер
int64_t bytes_to_read = range.end - range.start;
while (std::getline(file, line)) { std::vector<char> buffer(bytes_to_read);
std::stringstream ss(line); file.read(buffer.data(), bytes_to_read);
std::string item; int64_t bytes_read = file.gcount();
Record row; file.close();
std::getline(ss, item, ','); // Преобразуем в строку для удобства парсинга
row.timestamp = std::stod(item); std::string content(buffer.data(), bytes_read);
std::getline(ss, item, ','); // Находим позицию начала первой полной строки
row.open = std::stod(item); size_t parse_start = 0;
if (rank == 0) {
std::getline(ss, item, ','); // Первый ранк: пропускаем заголовок (первую строку)
row.high = std::stod(item); size_t header_end = content.find('\n');
if (header_end != std::string::npos) {
std::getline(ss, item, ','); parse_start = header_end + 1;
row.low = std::stod(item); }
} else {
std::getline(ss, item, ','); // Остальные ранки: начинаем с первого \n (пропускаем неполную строку)
row.close = std::stod(item); size_t first_newline = content.find('\n');
if (first_newline != std::string::npos) {
std::getline(ss, item, ','); parse_start = first_newline + 1;
row.volume = std::stod(item); }
data.push_back(row);
} }
// Находим позицию конца последней полной строки
size_t parse_end = content.size();
if (rank != size - 1) {
// Не последний ранк: ищем последний \n
size_t last_newline = content.rfind('\n');
if (last_newline != std::string::npos && last_newline > parse_start) {
parse_end = last_newline;
}
}
// Парсим строки
size_t pos = parse_start;
while (pos < parse_end) {
size_t line_end = content.find('\n', pos);
if (line_end == std::string::npos || line_end > parse_end) {
line_end = parse_end;
}
std::string line = content.substr(pos, line_end - pos);
// Убираем \r если есть (Windows line endings)
if (!line.empty() && line.back() == '\r') {
line.pop_back();
}
Record record;
if (parse_csv_line(line, record)) {
data.push_back(record);
}
pos = line_end + 1;
}
return data; return data;
} }

View File

@@ -2,5 +2,14 @@
#include <string> #include <string>
#include <vector> #include <vector>
#include "record.hpp" #include "record.hpp"
#include "utils.hpp"
std::vector<Record> load_csv(const std::string& filename); // Параллельное чтение CSV файла для MPI
// rank - номер текущего ранка
// size - общее количество ранков
// Возвращает вектор записей, прочитанных этим ранком
std::vector<Record> load_csv_parallel(int rank, int size);
// Парсинг одной строки CSV в Record
// Возвращает true если парсинг успешен
bool parse_csv_line(const std::string& line, Record& record);

View File

@@ -1,28 +1,15 @@
#pragma once #pragma once
#include <cstdint> #include <cstdint>
using DayIndex = long long; using DayIndex = int64_t;
// Агрегированные данные за один день // Агрегированные данные за один день
struct DayStats { struct DayStats {
DayIndex day; // индекс дня (timestamp / 86400) DayIndex day; // индекс дня (timestamp / 86400)
double low; // минимальный Low за день double avg; // среднее значение (Low + High) / 2 по всем записям
double high; // максимальный High за день double open_min; // минимальный Open за день
double open; // первый Open за день double open_max; // максимальный Open за день
double close; // последний Close за день double close_min; // минимальный Close за день
double avg; // среднее = (low + high) / 2 double close_max; // максимальный Close за день
double first_ts; // timestamp первой записи (для определения порядка open) int64_t count; // количество записей, по которым агрегировали
double last_ts; // timestamp последней записи (для определения close)
}; };
// Интервал с изменением >= 10%
struct Interval {
DayIndex start_day;
DayIndex end_day;
double min_open;
double max_close;
double start_avg;
double end_avg;
double change;
};

View File

@@ -2,6 +2,9 @@
#include <dlfcn.h> #include <dlfcn.h>
#include <map> #include <map>
#include <algorithm> #include <algorithm>
#include <iostream>
#include <iomanip>
#include <omp.h>
static void* get_gpu_lib_handle() { static void* get_gpu_lib_handle() {
static void* h = dlopen("./libgpu_compute.so", RTLD_NOW | RTLD_LOCAL); static void* h = dlopen("./libgpu_compute.so", RTLD_NOW | RTLD_LOCAL);
@@ -16,6 +19,16 @@ gpu_is_available_fn load_gpu_is_available() {
return fn; return fn;
} }
bool gpu_is_available() {
auto gpu_is_available_fn = load_gpu_is_available();
if (gpu_is_available_fn && gpu_is_available_fn()) {
return true;
}
return false;
}
gpu_aggregate_days_fn load_gpu_aggregate_days() { gpu_aggregate_days_fn load_gpu_aggregate_days() {
void* h = get_gpu_lib_handle(); void* h = get_gpu_lib_handle();
if (!h) return nullptr; if (!h) return nullptr;
@@ -33,6 +46,12 @@ bool aggregate_days_gpu(
return false; return false;
} }
// Общий таймер всей функции
double t_total_start = omp_get_wtime();
// Таймер CPU preprocessing
double t_preprocess_start = omp_get_wtime();
// Группируем записи по дням и подготавливаем данные для GPU // Группируем записи по дням и подготавливаем данные для GPU
std::map<DayIndex, std::vector<size_t>> day_record_indices; std::map<DayIndex, std::vector<size_t>> day_record_indices;
@@ -80,7 +99,12 @@ bool aggregate_days_gpu(
// Выделяем память для результата // Выделяем память для результата
std::vector<GpuDayStats> gpu_stats(num_days); std::vector<GpuDayStats> gpu_stats(num_days);
// Вызываем GPU функцию double t_preprocess_ms = (omp_get_wtime() - t_preprocess_start) * 1000.0;
std::cout << " GPU CPU preprocessing: " << std::fixed << std::setprecision(3)
<< std::setw(7) << t_preprocess_ms << " ms" << std::endl << std::flush;
// Вызываем GPU функцию (включает: malloc, memcpy H->D, kernel, memcpy D->H, free)
// Детальные тайминги выводятся внутри GPU функции
int result = gpu_fn( int result = gpu_fn(
gpu_records.data(), gpu_records.data(),
static_cast<int>(gpu_records.size()), static_cast<int>(gpu_records.size()),
@@ -92,6 +116,7 @@ bool aggregate_days_gpu(
); );
if (result != 0) { if (result != 0) {
std::cout << " GPU: Function returned error code " << result << std::endl;
return false; return false;
} }
@@ -102,15 +127,19 @@ bool aggregate_days_gpu(
for (const auto& gs : gpu_stats) { for (const auto& gs : gpu_stats) {
DayStats ds; DayStats ds;
ds.day = gs.day; ds.day = gs.day;
ds.low = gs.low;
ds.high = gs.high;
ds.open = gs.open;
ds.close = gs.close;
ds.avg = gs.avg; ds.avg = gs.avg;
ds.first_ts = gs.first_ts; ds.open_min = gs.open_min;
ds.last_ts = gs.last_ts; ds.open_max = gs.open_max;
ds.close_min = gs.close_min;
ds.close_max = gs.close_max;
ds.count = gs.count;
out_stats.push_back(ds); out_stats.push_back(ds);
} }
// Общее время всей GPU функции (включая preprocessing)
double t_total_ms = (omp_get_wtime() - t_total_start) * 1000.0;
std::cout << " GPU TOTAL (with prep): " << std::fixed << std::setprecision(3)
<< std::setw(7) << t_total_ms << " ms" << std::endl << std::flush;
return true; return true;
} }

View File

@@ -3,6 +3,8 @@
#include "record.hpp" #include "record.hpp"
#include <vector> #include <vector>
bool gpu_is_available();
// Типы функций из GPU плагина // Типы функций из GPU плагина
using gpu_is_available_fn = int (*)(); using gpu_is_available_fn = int (*)();
@@ -18,13 +20,12 @@ struct GpuRecord {
struct GpuDayStats { struct GpuDayStats {
long long day; long long day;
double low;
double high;
double open;
double close;
double avg; double avg;
double first_ts; double open_min;
double last_ts; double open_max;
double close_min;
double close_max;
long long count;
}; };
using gpu_aggregate_days_fn = int (*)( using gpu_aggregate_days_fn = int (*)(

View File

@@ -1,6 +1,15 @@
#include <cuda_runtime.h> #include <cuda_runtime.h>
#include <cstdint> #include <cstdint>
#include <cfloat> #include <cfloat>
#include <cstdio>
#include <ctime>
// CPU таймер в миллисекундах
static double get_time_ms() {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return ts.tv_sec * 1000.0 + ts.tv_nsec / 1000000.0;
}
// Структуры данных (должны совпадать с C++ кодом) // Структуры данных (должны совпадать с C++ кодом)
struct GpuRecord { struct GpuRecord {
@@ -14,19 +23,22 @@ struct GpuRecord {
struct GpuDayStats { struct GpuDayStats {
long long day; long long day;
double low;
double high;
double open;
double close;
double avg; double avg;
double first_ts; double open_min;
double last_ts; double open_max;
double close_min;
double close_max;
long long count;
}; };
extern "C" int gpu_is_available() { extern "C" int gpu_is_available() {
int n = 0; int n = 0;
cudaError_t err = cudaGetDeviceCount(&n); cudaError_t err = cudaGetDeviceCount(&n);
if (err != cudaSuccess) return 0; if (err != cudaSuccess) return 0;
if (n > 0) {
// Инициализируем CUDA контекст заранее (cudaFree(0) форсирует инициализацию)
cudaFree(0);
}
return (n > 0) ? 1 : 0; return (n > 0) ? 1 : 0;
} }
@@ -50,32 +62,30 @@ __global__ void aggregate_kernel(
GpuDayStats stats; GpuDayStats stats;
stats.day = day_indices[d]; stats.day = day_indices[d];
stats.low = DBL_MAX; stats.open_min = DBL_MAX;
stats.high = -DBL_MAX; stats.open_max = -DBL_MAX;
stats.first_ts = DBL_MAX; stats.close_min = DBL_MAX;
stats.last_ts = -DBL_MAX; stats.close_max = -DBL_MAX;
stats.open = 0; stats.count = count;
stats.close = 0;
double avg_sum = 0.0;
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
const GpuRecord& r = records[offset + i]; const GpuRecord& r = records[offset + i];
// min/max // Accumulate avg = (low + high) / 2
if (r.low < stats.low) stats.low = r.low; avg_sum += (r.low + r.high) / 2.0;
if (r.high > stats.high) stats.high = r.high;
// first/last по timestamp // min/max Open
if (r.timestamp < stats.first_ts) { if (r.open < stats.open_min) stats.open_min = r.open;
stats.first_ts = r.timestamp; if (r.open > stats.open_max) stats.open_max = r.open;
stats.open = r.open;
} // min/max Close
if (r.timestamp > stats.last_ts) { if (r.close < stats.close_min) stats.close_min = r.close;
stats.last_ts = r.timestamp; if (r.close > stats.close_max) stats.close_max = r.close;
stats.close = r.close;
}
} }
stats.avg = (stats.low + stats.high) / 2.0; stats.avg = avg_sum / static_cast<double>(count);
out_stats[d] = stats; out_stats[d] = stats;
} }
@@ -89,7 +99,33 @@ extern "C" int gpu_aggregate_days(
int num_days, int num_days,
GpuDayStats* h_out_stats) GpuDayStats* h_out_stats)
{ {
// Выделяем память на GPU double cpu_total_start = get_time_ms();
// === Создаём CUDA события для измерения времени ===
double cpu_event_create_start = get_time_ms();
cudaEvent_t start_malloc, stop_malloc;
cudaEvent_t start_transfer, stop_transfer;
cudaEvent_t start_kernel, stop_kernel;
cudaEvent_t start_copy_back, stop_copy_back;
cudaEvent_t start_free, stop_free;
cudaEventCreate(&start_malloc);
cudaEventCreate(&stop_malloc);
cudaEventCreate(&start_transfer);
cudaEventCreate(&stop_transfer);
cudaEventCreate(&start_kernel);
cudaEventCreate(&stop_kernel);
cudaEventCreate(&start_copy_back);
cudaEventCreate(&stop_copy_back);
cudaEventCreate(&start_free);
cudaEventCreate(&stop_free);
double cpu_event_create_ms = get_time_ms() - cpu_event_create_start;
// === ИЗМЕРЕНИЕ cudaMalloc ===
cudaEventRecord(start_malloc);
GpuRecord* d_records = nullptr; GpuRecord* d_records = nullptr;
int* d_day_offsets = nullptr; int* d_day_offsets = nullptr;
int* d_day_counts = nullptr; int* d_day_counts = nullptr;
@@ -113,7 +149,15 @@ extern "C" int gpu_aggregate_days(
err = cudaMalloc(&d_out_stats, num_days * sizeof(GpuDayStats)); err = cudaMalloc(&d_out_stats, num_days * sizeof(GpuDayStats));
if (err != cudaSuccess) { cudaFree(d_records); cudaFree(d_day_offsets); cudaFree(d_day_counts); cudaFree(d_day_indices); return -5; } if (err != cudaSuccess) { cudaFree(d_records); cudaFree(d_day_offsets); cudaFree(d_day_counts); cudaFree(d_day_indices); return -5; }
// Копируем данные на GPU cudaEventRecord(stop_malloc);
cudaEventSynchronize(stop_malloc);
float time_malloc_ms = 0;
cudaEventElapsedTime(&time_malloc_ms, start_malloc, stop_malloc);
// === ИЗМЕРЕНИЕ memcpy H->D ===
cudaEventRecord(start_transfer);
err = cudaMemcpy(d_records, h_records, num_records * sizeof(GpuRecord), cudaMemcpyHostToDevice); err = cudaMemcpy(d_records, h_records, num_records * sizeof(GpuRecord), cudaMemcpyHostToDevice);
if (err != cudaSuccess) return -10; if (err != cudaSuccess) return -10;
@@ -126,17 +170,24 @@ extern "C" int gpu_aggregate_days(
err = cudaMemcpy(d_day_indices, h_day_indices, num_days * sizeof(long long), cudaMemcpyHostToDevice); err = cudaMemcpy(d_day_indices, h_day_indices, num_days * sizeof(long long), cudaMemcpyHostToDevice);
if (err != cudaSuccess) return -13; if (err != cudaSuccess) return -13;
// Запускаем kernel: каждый поток обрабатывает один день cudaEventRecord(stop_transfer);
cudaEventSynchronize(stop_transfer);
float time_transfer_ms = 0;
cudaEventElapsedTime(&time_transfer_ms, start_transfer, stop_transfer);
// === ИЗМЕРЕНИЕ kernel ===
const int THREADS_PER_BLOCK = 256; const int THREADS_PER_BLOCK = 256;
int num_blocks = (num_days + THREADS_PER_BLOCK - 1) / THREADS_PER_BLOCK; int num_blocks = (num_days + THREADS_PER_BLOCK - 1) / THREADS_PER_BLOCK;
cudaEventRecord(start_kernel);
aggregate_kernel<<<num_blocks, THREADS_PER_BLOCK>>>( aggregate_kernel<<<num_blocks, THREADS_PER_BLOCK>>>(
d_records, num_records, d_records, num_records,
d_day_offsets, d_day_counts, d_day_indices, d_day_offsets, d_day_counts, d_day_indices,
num_days, d_out_stats num_days, d_out_stats
); );
// Проверяем ошибку запуска kernel
err = cudaGetLastError(); err = cudaGetLastError();
if (err != cudaSuccess) { if (err != cudaSuccess) {
cudaFree(d_records); cudaFree(d_records);
@@ -147,26 +198,65 @@ extern "C" int gpu_aggregate_days(
return -7; return -7;
} }
// Ждём завершения cudaEventRecord(stop_kernel);
err = cudaDeviceSynchronize(); cudaEventSynchronize(stop_kernel);
if (err != cudaSuccess) {
cudaFree(d_records);
cudaFree(d_day_offsets);
cudaFree(d_day_counts);
cudaFree(d_day_indices);
cudaFree(d_out_stats);
return -6;
}
// Копируем результат обратно float time_kernel_ms = 0;
cudaEventElapsedTime(&time_kernel_ms, start_kernel, stop_kernel);
// === ИЗМЕРЕНИЕ memcpy D->H ===
cudaEventRecord(start_copy_back);
cudaMemcpy(h_out_stats, d_out_stats, num_days * sizeof(GpuDayStats), cudaMemcpyDeviceToHost); cudaMemcpy(h_out_stats, d_out_stats, num_days * sizeof(GpuDayStats), cudaMemcpyDeviceToHost);
cudaEventRecord(stop_copy_back);
cudaEventSynchronize(stop_copy_back);
float time_copy_back_ms = 0;
cudaEventElapsedTime(&time_copy_back_ms, start_copy_back, stop_copy_back);
// === ИЗМЕРЕНИЕ cudaFree ===
cudaEventRecord(start_free);
// Освобождаем память
cudaFree(d_records); cudaFree(d_records);
cudaFree(d_day_offsets); cudaFree(d_day_offsets);
cudaFree(d_day_counts); cudaFree(d_day_counts);
cudaFree(d_day_indices); cudaFree(d_day_indices);
cudaFree(d_out_stats); cudaFree(d_out_stats);
cudaEventRecord(stop_free);
cudaEventSynchronize(stop_free);
float time_free_ms = 0;
cudaEventElapsedTime(&time_free_ms, start_free, stop_free);
// Общее время GPU
float time_total_ms = time_malloc_ms + time_transfer_ms + time_kernel_ms + time_copy_back_ms + time_free_ms;
// === Освобождаем события ===
double cpu_event_destroy_start = get_time_ms();
cudaEventDestroy(start_malloc);
cudaEventDestroy(stop_malloc);
cudaEventDestroy(start_transfer);
cudaEventDestroy(stop_transfer);
cudaEventDestroy(start_kernel);
cudaEventDestroy(stop_kernel);
cudaEventDestroy(start_copy_back);
cudaEventDestroy(stop_copy_back);
cudaEventDestroy(start_free);
cudaEventDestroy(stop_free);
double cpu_event_destroy_ms = get_time_ms() - cpu_event_destroy_start;
double cpu_total_ms = get_time_ms() - cpu_total_start;
// Выводим детальную статистику
printf(" GPU Timings (%d records, %d days):\n", num_records, num_days);
printf(" cudaMalloc: %7.3f ms\n", time_malloc_ms);
printf(" memcpy H->D: %7.3f ms\n", time_transfer_ms);
printf(" kernel execution: %7.3f ms\n", time_kernel_ms);
printf(" memcpy D->H: %7.3f ms\n", time_copy_back_ms);
printf(" cudaFree: %7.3f ms\n", time_free_ms);
printf(" GPU TOTAL: %7.3f ms\n", cpu_total_ms);
fflush(stdout);
return 0; return 0;
} }

View File

@@ -28,13 +28,17 @@ std::vector<Interval> find_intervals(const std::vector<DayStats>& days, double t
interval.end_avg = price_now; interval.end_avg = price_now;
interval.change = change; interval.change = change;
// Находим min(Open) и max(Close) в интервале // Находим min/max Open и Close в интервале
interval.min_open = days[start_idx].open; interval.open_min = days[start_idx].open_min;
interval.max_close = days[start_idx].close; interval.open_max = days[start_idx].open_max;
interval.close_min = days[start_idx].close_min;
interval.close_max = days[start_idx].close_max;
for (size_t j = start_idx; j <= i; j++) { for (size_t j = start_idx + 1; j <= i; j++) {
interval.min_open = std::min(interval.min_open, days[j].open); interval.open_min = std::min(interval.open_min, days[j].open_min);
interval.max_close = std::max(interval.max_close, days[j].close); interval.open_max = std::max(interval.open_max, days[j].open_max);
interval.close_min = std::min(interval.close_min, days[j].close_min);
interval.close_max = std::max(interval.close_max, days[j].close_max);
} }
intervals.push_back(interval); intervals.push_back(interval);
@@ -68,16 +72,17 @@ void write_intervals(const std::string& filename, const std::vector<Interval>& i
std::ofstream out(filename); std::ofstream out(filename);
out << std::fixed << std::setprecision(2); out << std::fixed << std::setprecision(2);
out << "start_date,end_date,min_open,max_close,start_avg,end_avg,change\n"; out << "start_date,end_date,open_min,open_max,close_min,close_max,start_avg,end_avg,change\n";
for (const auto& iv : intervals) { for (const auto& iv : intervals) {
out << day_index_to_date(iv.start_day) << "," out << day_index_to_date(iv.start_day) << ","
<< day_index_to_date(iv.end_day) << "," << day_index_to_date(iv.end_day) << ","
<< iv.min_open << "," << iv.open_min << ","
<< iv.max_close << "," << iv.open_max << ","
<< iv.close_min << ","
<< iv.close_max << ","
<< iv.start_avg << "," << iv.start_avg << ","
<< iv.end_avg << "," << iv.end_avg << ","
<< std::setprecision(6) << iv.change << "\n"; << std::setprecision(6) << iv.change << "\n";
} }
} }

View File

@@ -4,6 +4,19 @@
#include <vector> #include <vector>
#include <string> #include <string>
// Интервал с изменением >= threshold
struct Interval {
DayIndex start_day;
DayIndex end_day;
double open_min; // минимальный Open в интервале
double open_max; // максимальный Open в интервале
double close_min; // минимальный Close в интервале
double close_max; // максимальный Close в интервале
double start_avg;
double end_avg;
double change;
};
// Вычисление интервалов с изменением >= threshold (по умолчанию 10%) // Вычисление интервалов с изменением >= threshold (по умолчанию 10%)
std::vector<Interval> find_intervals(const std::vector<DayStats>& days, double threshold = 0.10); std::vector<Interval> find_intervals(const std::vector<DayStats>& days, double threshold = 0.10);
@@ -12,4 +25,3 @@ void write_intervals(const std::string& filename, const std::vector<Interval>& i
// Преобразование DayIndex в строку даты (YYYY-MM-DD) // Преобразование DayIndex в строку даты (YYYY-MM-DD)
std::string day_index_to_date(DayIndex day); std::string day_index_to_date(DayIndex day);

View File

@@ -1,54 +1,12 @@
#include <mpi.h> #include <mpi.h>
#include <omp.h>
#include <iostream> #include <iostream>
#include <vector> #include <vector>
#include <map>
#include <iomanip> #include <iomanip>
#include <cstdlib>
#include "csv_loader.hpp" #include "csv_loader.hpp"
#include "utils.hpp"
#include "record.hpp" #include "record.hpp"
#include "day_stats.hpp" #include "day_stats.hpp"
#include "aggregation.hpp" #include "aggregation.hpp"
#include "intervals.hpp"
#include "gpu_loader.hpp"
// Функция: отобрать записи для конкретного ранга
std::vector<Record> select_records_for_rank(
const std::map<DayIndex, std::vector<Record>>& days,
const std::vector<DayIndex>& day_list)
{
std::vector<Record> out;
for (auto d : day_list) {
auto it = days.find(d);
if (it != days.end()) {
const auto& vec = it->second;
out.insert(out.end(), vec.begin(), vec.end());
}
}
return out;
}
// Разделить записи на N частей (по дням)
std::vector<std::vector<Record>> split_records(const std::vector<Record>& records, int n_parts) {
// Группируем по дням
std::map<DayIndex, std::vector<Record>> by_day;
for (const auto& r : records) {
DayIndex day = static_cast<DayIndex>(r.timestamp) / 86400;
by_day[day].push_back(r);
}
// Распределяем дни по частям
std::vector<std::vector<Record>> parts(n_parts);
int i = 0;
for (auto& [day, recs] : by_day) {
parts[i % n_parts].insert(parts[i % n_parts].end(), recs.begin(), recs.end());
i++;
}
return parts;
}
int main(int argc, char** argv) { int main(int argc, char** argv) {
MPI_Init(&argc, &argv); MPI_Init(&argc, &argv);
@@ -57,200 +15,27 @@ int main(int argc, char** argv) {
MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size); MPI_Comm_size(MPI_COMM_WORLD, &size);
// Читаем количество CPU потоков из переменной окружения (по умолчанию 2) // Параллельное чтение данных
int num_cpu_threads = 2; double read_start = MPI_Wtime();
const char* env_threads = std::getenv("NUM_CPU_THREADS"); std::vector<Record> records = load_csv_parallel(rank, size);
if (env_threads) { double read_time = MPI_Wtime() - read_start;
num_cpu_threads = std::atoi(env_threads);
if (num_cpu_threads < 1) num_cpu_threads = 1;
}
omp_set_num_threads(num_cpu_threads + 1); // +1 для GPU потока если есть
// ====== ЗАГРУЗКА GPU ФУНКЦИЙ ====== std::cout << "Rank " << rank
auto gpu_is_available = load_gpu_is_available(); << ": read " << records.size() << " records"
auto gpu_aggregate = load_gpu_aggregate_days(); << " in " << std::fixed << std::setprecision(3) << read_time << " sec"
<< std::endl;
bool have_gpu = false;
if (gpu_is_available && gpu_is_available()) {
have_gpu = true;
std::cout << "Rank " << rank << ": GPU available + " << num_cpu_threads << " CPU threads" << std::endl;
} else {
std::cout << "Rank " << rank << ": " << num_cpu_threads << " CPU threads only" << std::endl;
}
std::vector<Record> local_records; // Агрегация по дням
double agg_start = MPI_Wtime();
std::vector<DayStats> days = aggregate_days(records);
double agg_time = MPI_Wtime() - agg_start;
if (rank == 0) { std::cout << "Rank " << rank
std::cout << "Rank 0 loading CSV..." << std::endl; << ": aggregated " << days.size() << " days"
<< " [" << (days.empty() ? 0 : days.front().day)
// Запускаем из build << ".." << (days.empty() ? 0 : days.back().day) << "]"
auto records = load_csv("../data/data.csv"); << " in " << std::fixed << std::setprecision(3) << agg_time << " sec"
<< std::endl;
auto days = group_by_day(records);
auto parts = split_days(days, size);
// Рассылаем данные
for (int r = 0; r < size; r++) {
auto vec = select_records_for_rank(days, parts[r]);
if (r == 0) {
// себе не отправляем — сразу сохраняем
local_records = vec;
continue;
}
int count = static_cast<int>(vec.size());
MPI_Send(&count, 1, MPI_INT, r, 0, MPI_COMM_WORLD);
MPI_Send(vec.data(), count * sizeof(Record), MPI_BYTE, r, 1, MPI_COMM_WORLD);
}
}
else {
// Принимает данные
int count = 0;
MPI_Recv(&count, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
local_records.resize(count);
MPI_Recv(local_records.data(), count * sizeof(Record),
MPI_BYTE, 0, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
}
MPI_Barrier(MPI_COMM_WORLD);
std::cout << "Rank " << rank << " received "
<< local_records.size() << " records" << std::endl;
// ====== АГРЕГАЦИЯ НА КАЖДОМ УЗЛЕ ======
std::vector<DayStats> local_stats;
double time_start = omp_get_wtime();
// Время работы: [0] = GPU (если есть), [1..n] = CPU потоки
std::vector<double> worker_times(num_cpu_threads + 1, 0.0);
if (have_gpu && gpu_aggregate) {
// GPU узел: делим на (1 + num_cpu_threads) частей
int n_workers = 1 + num_cpu_threads;
auto parts = split_records(local_records, n_workers);
std::vector<std::vector<DayStats>> results(n_workers);
std::vector<bool> success(n_workers, true);
#pragma omp parallel
{
int tid = omp_get_thread_num();
if (tid < n_workers) {
double t0 = omp_get_wtime();
if (tid == 0) {
// GPU поток
success[0] = aggregate_days_gpu(parts[0], results[0], gpu_aggregate);
} else {
// CPU потоки
results[tid] = aggregate_days(parts[tid]);
}
worker_times[tid] = omp_get_wtime() - t0;
}
}
// Объединяем результаты
for (int i = 0; i < n_workers; i++) {
if (i == 0 && !success[0]) {
// GPU failed - обработаем на CPU
std::cout << "Rank " << rank << ": GPU failed, processing on CPU" << std::endl;
double t0 = omp_get_wtime();
results[0] = aggregate_days(parts[0]);
worker_times[0] = omp_get_wtime() - t0;
}
local_stats.insert(local_stats.end(), results[i].begin(), results[i].end());
}
} else {
// CPU-only узел
auto parts = split_records(local_records, num_cpu_threads);
std::vector<std::vector<DayStats>> results(num_cpu_threads);
#pragma omp parallel
{
int tid = omp_get_thread_num();
if (tid < num_cpu_threads) {
double t0 = omp_get_wtime();
results[tid] = aggregate_days(parts[tid]);
worker_times[tid + 1] = omp_get_wtime() - t0; // +1 т.к. [0] для GPU
}
}
for (int i = 0; i < num_cpu_threads; i++) {
local_stats.insert(local_stats.end(), results[i].begin(), results[i].end());
}
}
double time_total = omp_get_wtime() - time_start;
// Вывод времени
std::cout << std::fixed << std::setprecision(3);
std::cout << "Rank " << rank << " aggregated " << local_stats.size() << " days in "
<< time_total << "s (";
if (have_gpu) {
std::cout << "GPU: " << worker_times[0] << "s, ";
}
for (int i = 0; i < num_cpu_threads; i++) {
int idx = have_gpu ? (i + 1) : (i + 1);
std::cout << "CPU" << i << ": " << worker_times[idx] << "s";
if (i < num_cpu_threads - 1) std::cout << ", ";
}
std::cout << ")" << std::endl;
// ====== СБОР АГРЕГИРОВАННЫХ ДАННЫХ НА RANK 0 ======
std::vector<DayStats> all_stats;
if (rank == 0) {
// Добавляем свои данные
all_stats.insert(all_stats.end(), local_stats.begin(), local_stats.end());
// Получаем данные от других узлов
for (int r = 1; r < size; r++) {
int count = 0;
MPI_Recv(&count, 1, MPI_INT, r, 2, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
std::vector<DayStats> remote_stats(count);
MPI_Recv(remote_stats.data(), count * sizeof(DayStats),
MPI_BYTE, r, 3, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
all_stats.insert(all_stats.end(), remote_stats.begin(), remote_stats.end());
}
} else {
// Отправляем свои агрегированные данные на rank 0
int count = static_cast<int>(local_stats.size());
MPI_Send(&count, 1, MPI_INT, 0, 2, MPI_COMM_WORLD);
MPI_Send(local_stats.data(), count * sizeof(DayStats), MPI_BYTE, 0, 3, MPI_COMM_WORLD);
}
// ====== ВЫЧИСЛЕНИЕ ИНТЕРВАЛОВ НА RANK 0 ======
if (rank == 0) {
std::cout << "Rank 0: merging " << all_stats.size() << " day stats..." << std::endl;
// Объединяем и сортируем
auto merged_stats = merge_day_stats(all_stats);
std::cout << "Rank 0: total " << merged_stats.size() << " unique days" << std::endl;
// Вычисляем интервалы
auto intervals = find_intervals(merged_stats, 0.10);
std::cout << "Found " << intervals.size() << " intervals with >=10% change" << std::endl;
// Записываем результат
write_intervals("../result.csv", intervals);
std::cout << "Results written to result.csv" << std::endl;
// Выводим первые несколько интервалов
std::cout << "\nFirst 5 intervals:\n";
std::cout << "start_date,end_date,min_open,max_close,change\n";
for (size_t i = 0; i < std::min(intervals.size(), size_t(5)); i++) {
const auto& iv = intervals[i];
std::cout << day_index_to_date(iv.start_day) << ","
<< day_index_to_date(iv.end_day) << ","
<< iv.min_open << ","
<< iv.max_close << ","
<< iv.change << "\n";
}
}
MPI_Finalize(); MPI_Finalize();
return 0; return 0;

View File

@@ -1,11 +0,0 @@
#include "mpi_utils.hpp"
#include <mpi.h>
#include <iostream>
void mpi_print_basic() {
int rank, size;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
std::cout << "Hello from rank " << rank << " of " << size << std::endl;
}

View File

@@ -1,2 +0,0 @@
#pragma once
void mpi_print_basic();

View File

@@ -1,4 +1,8 @@
#include "utils.hpp" #include "utils.hpp"
#include <fstream>
#include <sstream>
#include <stdexcept>
#include <numeric>
std::map<DayIndex, std::vector<Record>> group_by_day(const std::vector<Record>& recs) { std::map<DayIndex, std::vector<Record>> group_by_day(const std::vector<Record>& recs) {
std::map<DayIndex, std::vector<Record>> days; std::map<DayIndex, std::vector<Record>> days;
@@ -23,3 +27,88 @@ std::vector<std::vector<DayIndex>> split_days(const std::map<DayIndex, std::vect
return out; return out;
} }
int get_num_cpu_threads() {
const char* env_threads = std::getenv("NUM_CPU_THREADS");
int num_cpu_threads = 1;
if (env_threads) {
num_cpu_threads = std::atoi(env_threads);
if (num_cpu_threads < 1) num_cpu_threads = 1;
}
return num_cpu_threads;
}
std::string get_env(const char* name) {
const char* env = std::getenv(name);
if (!env) {
throw std::runtime_error(std::string("Environment variable not set: ") + name);
}
return std::string(env);
}
std::string get_data_path() {
return get_env("DATA_PATH");
}
std::vector<int> get_data_read_shares() {
std::vector<int> shares;
std::stringstream ss(get_env("DATA_READ_SHARES"));
std::string item;
while (std::getline(ss, item, ',')) {
shares.push_back(std::stoi(item));
}
return shares;
}
int64_t get_read_overlap_bytes() {
return std::stoll(get_env("READ_OVERLAP_BYTES"));
}
int64_t get_file_size(const std::string& path) {
std::ifstream file(path, std::ios::binary | std::ios::ate);
if (!file.is_open()) {
throw std::runtime_error("Cannot open file: " + path);
}
return static_cast<int64_t>(file.tellg());
}
ByteRange calculate_byte_range(int rank, int size, int64_t file_size,
const std::vector<int>& shares, int64_t overlap_bytes) {
// Если shares пустой или не соответствует size, используем равные доли
std::vector<int> effective_shares;
if (shares.size() == static_cast<size_t>(size)) {
effective_shares = shares;
} else {
effective_shares.assign(size, 1);
}
int total_shares = std::accumulate(effective_shares.begin(), effective_shares.end(), 0);
// Вычисляем базовые границы для каждого ранка
int64_t bytes_per_share = file_size / total_shares;
int64_t base_start = 0;
for (int i = 0; i < rank; i++) {
base_start += bytes_per_share * effective_shares[i];
}
int64_t base_end = base_start + bytes_per_share * effective_shares[rank];
// Применяем overlap
ByteRange range;
if (rank == 0) {
// Первый ранк: начинаем с 0, добавляем overlap в конце
range.start = 0;
range.end = std::min(base_end + overlap_bytes, file_size);
} else if (rank == size - 1) {
// Последний ранк: вычитаем overlap в начале, читаем до конца файла
range.start = std::max(base_start - overlap_bytes, static_cast<int64_t>(0));
range.end = file_size;
} else {
// Промежуточные ранки: overlap с обеих сторон
range.start = std::max(base_start - overlap_bytes, static_cast<int64_t>(0));
range.end = std::min(base_end + overlap_bytes, file_size);
}
return range;
}

View File

@@ -4,6 +4,29 @@
#include "day_stats.hpp" #include "day_stats.hpp"
#include <map> #include <map>
#include <vector> #include <vector>
#include <string>
#include <cstdlib>
#include <cstdint>
// Группировка записей по дням
std::map<DayIndex, std::vector<Record>> group_by_day(const std::vector<Record>& recs); std::map<DayIndex, std::vector<Record>> group_by_day(const std::vector<Record>& recs);
std::vector<std::vector<DayIndex>> split_days(const std::map<DayIndex, std::vector<Record>>& days, int parts); std::vector<std::vector<DayIndex>> split_days(const std::map<DayIndex, std::vector<Record>>& days, int parts);
// Чтение переменных окружения
int get_num_cpu_threads();
std::string get_data_path();
std::vector<int> get_data_read_shares();
int64_t get_read_overlap_bytes();
// Структура для хранения диапазона байт для чтения
struct ByteRange {
int64_t start;
int64_t end; // exclusive
};
// Вычисляет диапазон байт для конкретного ранка
ByteRange calculate_byte_range(int rank, int size, int64_t file_size,
const std::vector<int>& shares, int64_t overlap_bytes);
// Получение размера файла
int64_t get_file_size(const std::string& path);