Compare commits

...

5 Commits

Author SHA1 Message Date
ab18d9770f Агрегация 2025-12-13 12:45:29 +00:00
6a22dc3ef7 Параллельное чтение данных 2025-12-13 12:13:23 +00:00
f90a641754 gpu_is_available 2025-12-13 11:07:31 +00:00
10bd6db2b8 Замечание про NFS 2025-12-13 11:06:41 +00:00
d82fde7116 Уточнил задание 2025-12-11 16:47:30 +03:00
17 changed files with 648 additions and 640 deletions

2
.gitignore vendored
View File

@@ -1,4 +1,4 @@
data data
build build
out.txt out.txt
result.csv *.csv

View File

@@ -10,10 +10,30 @@
не менее чем на 10% от даты начала интервала, вместе с минимальными и максимальными не менее чем на 10% от даты начала интервала, вместе с минимальными и максимальными
значениями Open и Close за все дни внутри интервала. значениями Open и Close за все дни внутри интервала.
## Параллельное чтение данных
Нет смысла параллельно читать данные из NFS, так как в реальности файлы с данными
будут лежать только на NFS сервере. То есть другие узлы лишь отправляют сетевые запросы
на NFS сервер, который уже читает реальные данные с диска и лишь затем отправляет
их другим узлам.
Чтобы этого избежать, нужно на всех машинах скопировать файлы с данными в их реальные
файловые системы. Например в папку `/data`.
```sh
# На каждом узле создаем директорию /data
sudo mkdir /data
sudo chown $USER /data
# Копируем данные
cd /mnt/shared/supercomputers/data
cp data.csv /data/
```
## Сборка ## Сборка
Проект обязательно должен быть расположен в общей директории для всех узлов, Проект обязательно должен быть расположен в общей директории для всех узлов,
например, в `/mnt/shared/supercomputers/bitcoin-project/build`. например, в `/mnt/shared/supercomputers/build`.
Перед запуском указать актуальный путь в `run.slurm`. Перед запуском указать актуальный путь в `run.slurm`.
```sh ```sh

View File

@@ -2,7 +2,7 @@
"cells": [ "cells": [
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 1, "execution_count": 20,
"id": "2acce44b", "id": "2acce44b",
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
@@ -12,7 +12,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 14, "execution_count": 21,
"id": "5ba70af7", "id": "5ba70af7",
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
@@ -111,7 +111,7 @@
"7317758 0.410369 " "7317758 0.410369 "
] ]
}, },
"execution_count": 14, "execution_count": 21,
"metadata": {}, "metadata": {},
"output_type": "execute_result" "output_type": "execute_result"
} }
@@ -124,8 +124,8 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 19, "execution_count": 23,
"id": "d4b22f3b", "id": "3b320537",
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
@@ -150,67 +150,190 @@
" <tr style=\"text-align: right;\">\n", " <tr style=\"text-align: right;\">\n",
" <th></th>\n", " <th></th>\n",
" <th>Timestamp</th>\n", " <th>Timestamp</th>\n",
" <th>Low</th>\n",
" <th>High</th>\n",
" <th>Open</th>\n", " <th>Open</th>\n",
" <th>High</th>\n",
" <th>Low</th>\n",
" <th>Close</th>\n", " <th>Close</th>\n",
" <th>Volume</th>\n",
" <th>Avg</th>\n",
" </tr>\n", " </tr>\n",
" </thead>\n", " </thead>\n",
" <tbody>\n", " <tbody>\n",
" <tr>\n", " <tr>\n",
" <th>5078</th>\n", " <th>7317754</th>\n",
" <td>2025-11-26</td>\n", " <td>2025-11-30 23:55:00+00:00</td>\n",
" <td>86304.0</td>\n", " <td>90405.0</td>\n",
" <td>90646.0</td>\n", " <td>90452.0</td>\n",
" <td>87331.0</td>\n", " <td>90403.0</td>\n",
" <td>90477.0</td>\n", " <td>90452.0</td>\n",
" <td>0.531700</td>\n",
" <td>90427.5</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>5079</th>\n", " <th>7317755</th>\n",
" <td>2025-11-27</td>\n", " <td>2025-11-30 23:56:00+00:00</td>\n",
" <td>90091.0</td>\n", " <td>90452.0</td>\n",
" <td>91926.0</td>\n", " <td>90481.0</td>\n",
" <td>90476.0</td>\n", " <td>90420.0</td>\n",
" <td>91325.0</td>\n", " <td>90420.0</td>\n",
" <td>0.055547</td>\n",
" <td>90450.5</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>5080</th>\n", " <th>7317756</th>\n",
" <td>2025-11-28</td>\n", " <td>2025-11-30 23:57:00+00:00</td>\n",
" <td>90233.0</td>\n", " <td>90412.0</td>\n",
" <td>93091.0</td>\n", " <td>90458.0</td>\n",
" <td>91326.0</td>\n", " <td>90396.0</td>\n",
" <td>90913.0</td>\n", " <td>90435.0</td>\n",
" <td>0.301931</td>\n",
" <td>90427.0</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>5081</th>\n", " <th>7317757</th>\n",
" <td>2025-11-29</td>\n", " <td>2025-11-30 23:58:00+00:00</td>\n",
" <td>90216.0</td>\n", " <td>90428.0</td>\n",
" <td>91179.0</td>\n", " <td>90428.0</td>\n",
" <td>90913.0</td>\n", " <td>90362.0</td>\n",
" <td>90832.0</td>\n", " <td>90362.0</td>\n",
" </tr>\n", " <td>4.591653</td>\n",
" <tr>\n", " <td>90395.0</td>\n",
" <th>5082</th>\n", " </tr>\n",
" <td>2025-11-30</td>\n", " <tr>\n",
" <th>7317758</th>\n",
" <td>2025-11-30 23:59:00+00:00</td>\n",
" <td>90363.0</td>\n",
" <td>90386.0</td>\n",
" <td>90362.0</td>\n", " <td>90362.0</td>\n",
" <td>91969.0</td>\n",
" <td>90832.0</td>\n",
" <td>90382.0</td>\n", " <td>90382.0</td>\n",
" <td>0.410369</td>\n",
" <td>90374.0</td>\n",
" </tr>\n", " </tr>\n",
" </tbody>\n", " </tbody>\n",
"</table>\n", "</table>\n",
"</div>" "</div>"
], ],
"text/plain": [ "text/plain": [
" Timestamp Low High Open Close\n", " Timestamp Open High Low Close \\\n",
"5078 2025-11-26 86304.0 90646.0 87331.0 90477.0\n", "7317754 2025-11-30 23:55:00+00:00 90405.0 90452.0 90403.0 90452.0 \n",
"5079 2025-11-27 90091.0 91926.0 90476.0 91325.0\n", "7317755 2025-11-30 23:56:00+00:00 90452.0 90481.0 90420.0 90420.0 \n",
"5080 2025-11-28 90233.0 93091.0 91326.0 90913.0\n", "7317756 2025-11-30 23:57:00+00:00 90412.0 90458.0 90396.0 90435.0 \n",
"5081 2025-11-29 90216.0 91179.0 90913.0 90832.0\n", "7317757 2025-11-30 23:58:00+00:00 90428.0 90428.0 90362.0 90362.0 \n",
"5082 2025-11-30 90362.0 91969.0 90832.0 90382.0" "7317758 2025-11-30 23:59:00+00:00 90363.0 90386.0 90362.0 90382.0 \n",
"\n",
" Volume Avg \n",
"7317754 0.531700 90427.5 \n",
"7317755 0.055547 90450.5 \n",
"7317756 0.301931 90427.0 \n",
"7317757 4.591653 90395.0 \n",
"7317758 0.410369 90374.0 "
] ]
}, },
"execution_count": 19, "execution_count": 23,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df['Avg'] = (df['Low'] + df['High']) / 2\n",
"df.tail()"
]
},
{
"cell_type": "code",
"execution_count": 25,
"id": "4b1cd63c",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>Timestamp</th>\n",
" <th>Avg</th>\n",
" <th>OpenMin</th>\n",
" <th>OpenMax</th>\n",
" <th>CloseMin</th>\n",
" <th>CloseMax</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>5078</th>\n",
" <td>2025-11-26</td>\n",
" <td>88057.301736</td>\n",
" <td>86312.0</td>\n",
" <td>90574.0</td>\n",
" <td>86323.0</td>\n",
" <td>90574.0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5079</th>\n",
" <td>2025-11-27</td>\n",
" <td>91245.092708</td>\n",
" <td>90126.0</td>\n",
" <td>91888.0</td>\n",
" <td>90126.0</td>\n",
" <td>91925.0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5080</th>\n",
" <td>2025-11-28</td>\n",
" <td>91324.308681</td>\n",
" <td>90255.0</td>\n",
" <td>92970.0</td>\n",
" <td>90283.0</td>\n",
" <td>92966.0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5081</th>\n",
" <td>2025-11-29</td>\n",
" <td>90746.479514</td>\n",
" <td>90265.0</td>\n",
" <td>91158.0</td>\n",
" <td>90279.0</td>\n",
" <td>91179.0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5082</th>\n",
" <td>2025-11-30</td>\n",
" <td>91187.356250</td>\n",
" <td>90363.0</td>\n",
" <td>91940.0</td>\n",
" <td>90362.0</td>\n",
" <td>91940.0</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" Timestamp Avg OpenMin OpenMax CloseMin CloseMax\n",
"5078 2025-11-26 88057.301736 86312.0 90574.0 86323.0 90574.0\n",
"5079 2025-11-27 91245.092708 90126.0 91888.0 90126.0 91925.0\n",
"5080 2025-11-28 91324.308681 90255.0 92970.0 90283.0 92966.0\n",
"5081 2025-11-29 90746.479514 90265.0 91158.0 90279.0 91179.0\n",
"5082 2025-11-30 91187.356250 90363.0 91940.0 90362.0 91940.0"
]
},
"execution_count": 25,
"metadata": {}, "metadata": {},
"output_type": "execute_result" "output_type": "execute_result"
} }
@@ -218,7 +341,13 @@
"source": [ "source": [
"df_days = (\n", "df_days = (\n",
" df.groupby(df[\"Timestamp\"].dt.date)\n", " df.groupby(df[\"Timestamp\"].dt.date)\n",
" .agg({\"Low\": \"min\", \"High\": \"max\", \"Open\": \"first\", \"Close\": \"last\"})\n", " .agg(\n",
" Avg=(\"Avg\", \"mean\"),\n",
" OpenMin=(\"Open\", \"min\"),\n",
" OpenMax=(\"Open\", \"max\"),\n",
" CloseMin=(\"Close\", \"min\"),\n",
" CloseMax=(\"Close\", \"max\"),\n",
" )\n",
" .reset_index()\n", " .reset_index()\n",
")\n", ")\n",
"df_days.tail()" "df_days.tail()"
@@ -226,111 +355,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 21, "execution_count": 26,
"id": "91823496",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>Timestamp</th>\n",
" <th>Low</th>\n",
" <th>High</th>\n",
" <th>Open</th>\n",
" <th>Close</th>\n",
" <th>Avg</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>5078</th>\n",
" <td>2025-11-26</td>\n",
" <td>86304.0</td>\n",
" <td>90646.0</td>\n",
" <td>87331.0</td>\n",
" <td>90477.0</td>\n",
" <td>88475.0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5079</th>\n",
" <td>2025-11-27</td>\n",
" <td>90091.0</td>\n",
" <td>91926.0</td>\n",
" <td>90476.0</td>\n",
" <td>91325.0</td>\n",
" <td>91008.5</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5080</th>\n",
" <td>2025-11-28</td>\n",
" <td>90233.0</td>\n",
" <td>93091.0</td>\n",
" <td>91326.0</td>\n",
" <td>90913.0</td>\n",
" <td>91662.0</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5081</th>\n",
" <td>2025-11-29</td>\n",
" <td>90216.0</td>\n",
" <td>91179.0</td>\n",
" <td>90913.0</td>\n",
" <td>90832.0</td>\n",
" <td>90697.5</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5082</th>\n",
" <td>2025-11-30</td>\n",
" <td>90362.0</td>\n",
" <td>91969.0</td>\n",
" <td>90832.0</td>\n",
" <td>90382.0</td>\n",
" <td>91165.5</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" Timestamp Low High Open Close Avg\n",
"5078 2025-11-26 86304.0 90646.0 87331.0 90477.0 88475.0\n",
"5079 2025-11-27 90091.0 91926.0 90476.0 91325.0 91008.5\n",
"5080 2025-11-28 90233.0 93091.0 91326.0 90913.0 91662.0\n",
"5081 2025-11-29 90216.0 91179.0 90913.0 90832.0 90697.5\n",
"5082 2025-11-30 90362.0 91969.0 90832.0 90382.0 91165.5"
]
},
"execution_count": 21,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df_days[\"Avg\"] = (df_days[\"Low\"] + df_days[\"High\"]) / 2\n",
"df_days.tail()"
]
},
{
"cell_type": "code",
"execution_count": 25,
"id": "9a7b3310", "id": "9a7b3310",
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
@@ -358,6 +383,8 @@
" <th>start_date</th>\n", " <th>start_date</th>\n",
" <th>end_date</th>\n", " <th>end_date</th>\n",
" <th>min_open</th>\n", " <th>min_open</th>\n",
" <th>max_open</th>\n",
" <th>min_close</th>\n",
" <th>max_close</th>\n", " <th>max_close</th>\n",
" <th>start_avg</th>\n", " <th>start_avg</th>\n",
" <th>end_avg</th>\n", " <th>end_avg</th>\n",
@@ -366,76 +393,86 @@
" </thead>\n", " </thead>\n",
" <tbody>\n", " <tbody>\n",
" <tr>\n", " <tr>\n",
" <th>335</th>\n", " <th>316</th>\n",
" <td>2025-02-27</td>\n", " <td>2025-02-27</td>\n",
" <td>2025-04-23</td>\n", " <td>2025-04-25</td>\n",
" <td>76252.0</td>\n", " <td>74509.0</td>\n",
" <td>94273.0</td>\n", " <td>95801.0</td>\n",
" <td>84801.5</td>\n", " <td>74515.0</td>\n",
" <td>93335.0</td>\n", " <td>95800.0</td>\n",
" <td>0.100629</td>\n", " <td>85166.063889</td>\n",
" <td>94303.907292</td>\n",
" <td>0.107294</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>336</th>\n", " <th>317</th>\n",
" <td>2025-04-24</td>\n", " <td>2025-04-26</td>\n",
" <td>2025-05-09</td>\n", " <td>2025-05-11</td>\n",
" <td>93730.0</td>\n", " <td>92877.0</td>\n",
" <td>103261.0</td>\n", " <td>104971.0</td>\n",
" <td>92867.5</td>\n", " <td>92872.0</td>\n",
" <td>103341.0</td>\n", " <td>104965.0</td>\n",
" <td>0.112779</td>\n", " <td>94500.950347</td>\n",
" <td>104182.167708</td>\n",
" <td>0.102446</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>337</th>\n", " <th>318</th>\n",
" <td>2025-05-10</td>\n", " <td>2025-05-12</td>\n",
" <td>2025-07-11</td>\n", " <td>2025-07-11</td>\n",
" <td>100990.0</td>\n", " <td>98384.0</td>\n",
" <td>117579.0</td>\n", " <td>118833.0</td>\n",
" <td>103915.0</td>\n", " <td>98382.0</td>\n",
" <td>117032.5</td>\n", " <td>118839.0</td>\n",
" <td>0.126233</td>\n", " <td>103569.791319</td>\n",
" <td>117463.666667</td>\n",
" <td>0.134150</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>338</th>\n", " <th>319</th>\n",
" <td>2025-07-12</td>\n", " <td>2025-07-12</td>\n",
" <td>2025-11-04</td>\n", " <td>2025-11-04</td>\n",
" <td>106470.0</td>\n", " <td>98944.0</td>\n",
" <td>124728.0</td>\n", " <td>126202.0</td>\n",
" <td>117599.0</td>\n", " <td>98943.0</td>\n",
" <td>103079.0</td>\n", " <td>126202.0</td>\n",
" <td>0.123470</td>\n", " <td>117640.026389</td>\n",
" <td>103712.985764</td>\n",
" <td>0.118387</td>\n",
" </tr>\n", " </tr>\n",
" <tr>\n", " <tr>\n",
" <th>339</th>\n", " <th>320</th>\n",
" <td>2025-11-05</td>\n", " <td>2025-11-05</td>\n",
" <td>2025-11-18</td>\n", " <td>2025-11-18</td>\n",
" <td>92112.0</td>\n", " <td>89291.0</td>\n",
" <td>105972.0</td>\n", " <td>107343.0</td>\n",
" <td>101737.5</td>\n", " <td>89286.0</td>\n",
" <td>91471.0</td>\n", " <td>107343.0</td>\n",
" <td>0.100912</td>\n", " <td>102514.621181</td>\n",
" <td>91705.833333</td>\n",
" <td>0.105437</td>\n",
" </tr>\n", " </tr>\n",
" </tbody>\n", " </tbody>\n",
"</table>\n", "</table>\n",
"</div>" "</div>"
], ],
"text/plain": [ "text/plain": [
" start_date end_date min_open max_close start_avg end_avg \\\n", " start_date end_date min_open max_open min_close max_close \\\n",
"335 2025-02-27 2025-04-23 76252.0 94273.0 84801.5 93335.0 \n", "316 2025-02-27 2025-04-25 74509.0 95801.0 74515.0 95800.0 \n",
"336 2025-04-24 2025-05-09 93730.0 103261.0 92867.5 103341.0 \n", "317 2025-04-26 2025-05-11 92877.0 104971.0 92872.0 104965.0 \n",
"337 2025-05-10 2025-07-11 100990.0 117579.0 103915.0 117032.5 \n", "318 2025-05-12 2025-07-11 98384.0 118833.0 98382.0 118839.0 \n",
"338 2025-07-12 2025-11-04 106470.0 124728.0 117599.0 103079.0 \n", "319 2025-07-12 2025-11-04 98944.0 126202.0 98943.0 126202.0 \n",
"339 2025-11-05 2025-11-18 92112.0 105972.0 101737.5 91471.0 \n", "320 2025-11-05 2025-11-18 89291.0 107343.0 89286.0 107343.0 \n",
"\n", "\n",
" change \n", " start_avg end_avg change \n",
"335 0.100629 \n", "316 85166.063889 94303.907292 0.107294 \n",
"336 0.112779 \n", "317 94500.950347 104182.167708 0.102446 \n",
"337 0.126233 \n", "318 103569.791319 117463.666667 0.134150 \n",
"338 0.123470 \n", "319 117640.026389 103712.985764 0.118387 \n",
"339 0.100912 " "320 102514.621181 91705.833333 0.105437 "
] ]
}, },
"execution_count": 25, "execution_count": 26,
"metadata": {}, "metadata": {},
"output_type": "execute_result" "output_type": "execute_result"
} }
@@ -455,8 +492,10 @@
" intervals.append({\n", " intervals.append({\n",
" \"start_date\": df_days.loc[start_idx, \"Timestamp\"],\n", " \"start_date\": df_days.loc[start_idx, \"Timestamp\"],\n",
" \"end_date\": df_days.loc[i, \"Timestamp\"],\n", " \"end_date\": df_days.loc[i, \"Timestamp\"],\n",
" \"min_open\": interval[\"Open\"].min(),\n", " \"min_open\": interval[\"OpenMin\"].min(),\n",
" \"max_close\": interval[\"Close\"].max(),\n", " \"max_open\": interval[\"OpenMax\"].max(),\n",
" \"min_close\": interval[\"CloseMin\"].min(),\n",
" \"max_close\": interval[\"CloseMax\"].max(),\n",
" \"start_avg\": price_base,\n", " \"start_avg\": price_base,\n",
" \"end_avg\": price_now,\n", " \"end_avg\": price_now,\n",
" \"change\": change,\n", " \"change\": change,\n",
@@ -470,6 +509,14 @@
"df_intervals = pd.DataFrame(intervals)\n", "df_intervals = pd.DataFrame(intervals)\n",
"df_intervals.tail()" "df_intervals.tail()"
] ]
},
{
"cell_type": "code",
"execution_count": null,
"id": "07f1cd58",
"metadata": {},
"outputs": [],
"source": []
} }
], ],
"metadata": { "metadata": {

View File

@@ -5,8 +5,14 @@
#SBATCH --cpus-per-task=2 #SBATCH --cpus-per-task=2
#SBATCH --output=out.txt #SBATCH --output=out.txt
# Количество CPU потоков на узел (должно соответствовать cpus-per-task) # Путь к файлу данных (должен существовать на всех узлах)
export NUM_CPU_THREADS=2 export DATA_PATH="/mnt/shared/supercomputers/data/data.csv"
# Доли данных для каждого ранка (сумма определяет пропорции)
export DATA_READ_SHARES="10,12,13,13"
# Размер перекрытия в байтах для обработки границ строк
export READ_OVERLAP_BYTES=131072
cd /mnt/shared/supercomputers/build cd /mnt/shared/supercomputers/build
mpirun -np $SLURM_NTASKS ./bitcoin_app mpirun -np $SLURM_NTASKS ./bitcoin_app

View File

@@ -1,87 +1,48 @@
#include "aggregation.hpp" #include "aggregation.hpp"
#include <map>
#include <algorithm> #include <algorithm>
#include <limits> #include <limits>
#include <cmath>
std::vector<DayStats> aggregate_days(const std::vector<Record>& records) { std::vector<DayStats> aggregate_days(const std::vector<Record>& records) {
// Группируем записи по дням // Накопители для каждого дня
std::map<DayIndex, std::vector<const Record*>> day_records; struct DayAccumulator {
double avg_sum = 0.0;
double open_min = std::numeric_limits<double>::max();
double open_max = std::numeric_limits<double>::lowest();
double close_min = std::numeric_limits<double>::max();
double close_max = std::numeric_limits<double>::lowest();
int64_t count = 0;
};
std::map<DayIndex, DayAccumulator> days;
for (const auto& r : records) { for (const auto& r : records) {
DayIndex day = static_cast<DayIndex>(r.timestamp) / 86400; DayIndex day = static_cast<DayIndex>(r.timestamp) / 86400;
day_records[day].push_back(&r); auto& acc = days[day];
double avg = (r.low + r.high) / 2.0;
acc.avg_sum += avg;
acc.open_min = std::min(acc.open_min, r.open);
acc.open_max = std::max(acc.open_max, r.open);
acc.close_min = std::min(acc.close_min, r.close);
acc.close_max = std::max(acc.close_max, r.close);
acc.count++;
} }
std::vector<DayStats> result; std::vector<DayStats> result;
result.reserve(day_records.size()); result.reserve(days.size());
for (auto& [day, recs] : day_records) { for (const auto& [day, acc] : days) {
// Сортируем по timestamp для определения first/last
std::sort(recs.begin(), recs.end(),
[](const Record* a, const Record* b) {
return a->timestamp < b->timestamp;
});
DayStats stats; DayStats stats;
stats.day = day; stats.day = day;
stats.low = std::numeric_limits<double>::max(); stats.avg = acc.avg_sum / static_cast<double>(acc.count);
stats.high = std::numeric_limits<double>::lowest(); stats.open_min = acc.open_min;
stats.open = recs.front()->open; stats.open_max = acc.open_max;
stats.close = recs.back()->close; stats.close_min = acc.close_min;
stats.first_ts = recs.front()->timestamp; stats.close_max = acc.close_max;
stats.last_ts = recs.back()->timestamp; stats.count = acc.count;
for (const auto* r : recs) {
stats.low = std::min(stats.low, r->low);
stats.high = std::max(stats.high, r->high);
}
stats.avg = (stats.low + stats.high) / 2.0;
result.push_back(stats); result.push_back(stats);
} }
return result; return result;
} }
std::vector<DayStats> merge_day_stats(const std::vector<DayStats>& all_stats) {
// Объединяем статистику по одинаковым дням (если такие есть)
std::map<DayIndex, DayStats> merged;
for (const auto& s : all_stats) {
auto it = merged.find(s.day);
if (it == merged.end()) {
merged[s.day] = s;
} else {
// Объединяем данные за один день
auto& m = it->second;
m.low = std::min(m.low, s.low);
m.high = std::max(m.high, s.high);
// open берём от записи с меньшим timestamp
if (s.first_ts < m.first_ts) {
m.open = s.open;
m.first_ts = s.first_ts;
}
// close берём от записи с большим timestamp
if (s.last_ts > m.last_ts) {
m.close = s.close;
m.last_ts = s.last_ts;
}
m.avg = (m.low + m.high) / 2.0;
}
}
// Преобразуем в отсортированный вектор
std::vector<DayStats> result;
result.reserve(merged.size());
for (auto& [day, stats] : merged) {
result.push_back(stats);
}
return result;
}

View File

@@ -3,12 +3,6 @@
#include "record.hpp" #include "record.hpp"
#include "day_stats.hpp" #include "day_stats.hpp"
#include <vector> #include <vector>
#include <map>
// Агрегация записей по дням на одном узле // Агрегация записей по дням на одном узле
std::vector<DayStats> aggregate_days(const std::vector<Record>& records); std::vector<DayStats> aggregate_days(const std::vector<Record>& records);
// Объединение агрегированных данных с разных узлов
// (на случай если один день попал на разные узлы - но в нашей схеме это не должно случиться)
std::vector<DayStats> merge_day_stats(const std::vector<DayStats>& all_stats);

View File

@@ -2,46 +2,134 @@
#include <fstream> #include <fstream>
#include <sstream> #include <sstream>
#include <iostream> #include <iostream>
#include <stdexcept>
std::vector<Record> load_csv(const std::string& filename) { bool parse_csv_line(const std::string& line, Record& record) {
if (line.empty()) {
return false;
}
std::stringstream ss(line);
std::string item;
try {
// timestamp
if (!std::getline(ss, item, ',') || item.empty()) return false;
record.timestamp = std::stod(item);
// open
if (!std::getline(ss, item, ',') || item.empty()) return false;
record.open = std::stod(item);
// high
if (!std::getline(ss, item, ',') || item.empty()) return false;
record.high = std::stod(item);
// low
if (!std::getline(ss, item, ',') || item.empty()) return false;
record.low = std::stod(item);
// close
if (!std::getline(ss, item, ',') || item.empty()) return false;
record.close = std::stod(item);
// volume
if (!std::getline(ss, item, ',')) return false;
// Volume может быть пустым или содержать данные
if (item.empty()) {
record.volume = 0.0;
} else {
record.volume = std::stod(item);
}
return true;
} catch (const std::exception&) {
return false;
}
}
std::vector<Record> load_csv_parallel(int rank, int size) {
std::vector<Record> data; std::vector<Record> data;
std::ifstream file(filename);
// Читаем настройки из переменных окружения
std::string data_path = get_data_path();
std::vector<int> shares = get_data_read_shares();
int64_t overlap_bytes = get_read_overlap_bytes();
// Получаем размер файла
int64_t file_size = get_file_size(data_path);
// Вычисляем диапазон байт для этого ранка
ByteRange range = calculate_byte_range(rank, size, file_size, shares, overlap_bytes);
// Открываем файл и читаем нужный диапазон
std::ifstream file(data_path, std::ios::binary);
if (!file.is_open()) { if (!file.is_open()) {
throw std::runtime_error("Cannot open file: " + filename); throw std::runtime_error("Cannot open file: " + data_path);
} }
std::string line; // Переходим к началу диапазона
file.seekg(range.start);
// читаем первую строку (заголовок)
std::getline(file, line); // Читаем данные в буфер
int64_t bytes_to_read = range.end - range.start;
while (std::getline(file, line)) { std::vector<char> buffer(bytes_to_read);
std::stringstream ss(line); file.read(buffer.data(), bytes_to_read);
std::string item; int64_t bytes_read = file.gcount();
Record row; file.close();
std::getline(ss, item, ','); // Преобразуем в строку для удобства парсинга
row.timestamp = std::stod(item); std::string content(buffer.data(), bytes_read);
std::getline(ss, item, ','); // Находим позицию начала первой полной строки
row.open = std::stod(item); size_t parse_start = 0;
if (rank == 0) {
std::getline(ss, item, ','); // Первый ранк: пропускаем заголовок (первую строку)
row.high = std::stod(item); size_t header_end = content.find('\n');
if (header_end != std::string::npos) {
std::getline(ss, item, ','); parse_start = header_end + 1;
row.low = std::stod(item); }
} else {
std::getline(ss, item, ','); // Остальные ранки: начинаем с первого \n (пропускаем неполную строку)
row.close = std::stod(item); size_t first_newline = content.find('\n');
if (first_newline != std::string::npos) {
std::getline(ss, item, ','); parse_start = first_newline + 1;
row.volume = std::stod(item); }
data.push_back(row);
} }
// Находим позицию конца последней полной строки
size_t parse_end = content.size();
if (rank != size - 1) {
// Не последний ранк: ищем последний \n
size_t last_newline = content.rfind('\n');
if (last_newline != std::string::npos && last_newline > parse_start) {
parse_end = last_newline;
}
}
// Парсим строки
size_t pos = parse_start;
while (pos < parse_end) {
size_t line_end = content.find('\n', pos);
if (line_end == std::string::npos || line_end > parse_end) {
line_end = parse_end;
}
std::string line = content.substr(pos, line_end - pos);
// Убираем \r если есть (Windows line endings)
if (!line.empty() && line.back() == '\r') {
line.pop_back();
}
Record record;
if (parse_csv_line(line, record)) {
data.push_back(record);
}
pos = line_end + 1;
}
return data; return data;
} }

View File

@@ -2,5 +2,14 @@
#include <string> #include <string>
#include <vector> #include <vector>
#include "record.hpp" #include "record.hpp"
#include "utils.hpp"
std::vector<Record> load_csv(const std::string& filename); // Параллельное чтение CSV файла для MPI
// rank - номер текущего ранка
// size - общее количество ранков
// Возвращает вектор записей, прочитанных этим ранком
std::vector<Record> load_csv_parallel(int rank, int size);
// Парсинг одной строки CSV в Record
// Возвращает true если парсинг успешен
bool parse_csv_line(const std::string& line, Record& record);

View File

@@ -1,28 +1,15 @@
#pragma once #pragma once
#include <cstdint> #include <cstdint>
using DayIndex = long long; using DayIndex = int64_t;
// Агрегированные данные за один день // Агрегированные данные за один день
struct DayStats { struct DayStats {
DayIndex day; // индекс дня (timestamp / 86400) DayIndex day; // индекс дня (timestamp / 86400)
double low; // минимальный Low за день double avg; // среднее значение (Low + High) / 2 по всем записям
double high; // максимальный High за день double open_min; // минимальный Open за день
double open; // первый Open за день double open_max; // максимальный Open за день
double close; // последний Close за день double close_min; // минимальный Close за день
double avg; // среднее = (low + high) / 2 double close_max; // максимальный Close за день
double first_ts; // timestamp первой записи (для определения порядка open) int64_t count; // количество записей, по которым агрегировали
double last_ts; // timestamp последней записи (для определения close)
}; };
// Интервал с изменением >= 10%
struct Interval {
DayIndex start_day;
DayIndex end_day;
double min_open;
double max_close;
double start_avg;
double end_avg;
double change;
};

View File

@@ -19,6 +19,16 @@ gpu_is_available_fn load_gpu_is_available() {
return fn; return fn;
} }
bool gpu_is_available() {
auto gpu_is_available_fn = load_gpu_is_available();
if (gpu_is_available_fn && gpu_is_available_fn()) {
return true;
}
return false;
}
gpu_aggregate_days_fn load_gpu_aggregate_days() { gpu_aggregate_days_fn load_gpu_aggregate_days() {
void* h = get_gpu_lib_handle(); void* h = get_gpu_lib_handle();
if (!h) return nullptr; if (!h) return nullptr;
@@ -117,13 +127,12 @@ bool aggregate_days_gpu(
for (const auto& gs : gpu_stats) { for (const auto& gs : gpu_stats) {
DayStats ds; DayStats ds;
ds.day = gs.day; ds.day = gs.day;
ds.low = gs.low;
ds.high = gs.high;
ds.open = gs.open;
ds.close = gs.close;
ds.avg = gs.avg; ds.avg = gs.avg;
ds.first_ts = gs.first_ts; ds.open_min = gs.open_min;
ds.last_ts = gs.last_ts; ds.open_max = gs.open_max;
ds.close_min = gs.close_min;
ds.close_max = gs.close_max;
ds.count = gs.count;
out_stats.push_back(ds); out_stats.push_back(ds);
} }

View File

@@ -3,6 +3,8 @@
#include "record.hpp" #include "record.hpp"
#include <vector> #include <vector>
bool gpu_is_available();
// Типы функций из GPU плагина // Типы функций из GPU плагина
using gpu_is_available_fn = int (*)(); using gpu_is_available_fn = int (*)();
@@ -18,13 +20,12 @@ struct GpuRecord {
struct GpuDayStats { struct GpuDayStats {
long long day; long long day;
double low;
double high;
double open;
double close;
double avg; double avg;
double first_ts; double open_min;
double last_ts; double open_max;
double close_min;
double close_max;
long long count;
}; };
using gpu_aggregate_days_fn = int (*)( using gpu_aggregate_days_fn = int (*)(

View File

@@ -23,13 +23,12 @@ struct GpuRecord {
struct GpuDayStats { struct GpuDayStats {
long long day; long long day;
double low;
double high;
double open;
double close;
double avg; double avg;
double first_ts; double open_min;
double last_ts; double open_max;
double close_min;
double close_max;
long long count;
}; };
extern "C" int gpu_is_available() { extern "C" int gpu_is_available() {
@@ -63,32 +62,30 @@ __global__ void aggregate_kernel(
GpuDayStats stats; GpuDayStats stats;
stats.day = day_indices[d]; stats.day = day_indices[d];
stats.low = DBL_MAX; stats.open_min = DBL_MAX;
stats.high = -DBL_MAX; stats.open_max = -DBL_MAX;
stats.first_ts = DBL_MAX; stats.close_min = DBL_MAX;
stats.last_ts = -DBL_MAX; stats.close_max = -DBL_MAX;
stats.open = 0; stats.count = count;
stats.close = 0;
double avg_sum = 0.0;
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
const GpuRecord& r = records[offset + i]; const GpuRecord& r = records[offset + i];
// min/max // Accumulate avg = (low + high) / 2
if (r.low < stats.low) stats.low = r.low; avg_sum += (r.low + r.high) / 2.0;
if (r.high > stats.high) stats.high = r.high;
// first/last по timestamp // min/max Open
if (r.timestamp < stats.first_ts) { if (r.open < stats.open_min) stats.open_min = r.open;
stats.first_ts = r.timestamp; if (r.open > stats.open_max) stats.open_max = r.open;
stats.open = r.open;
} // min/max Close
if (r.timestamp > stats.last_ts) { if (r.close < stats.close_min) stats.close_min = r.close;
stats.last_ts = r.timestamp; if (r.close > stats.close_max) stats.close_max = r.close;
stats.close = r.close;
}
} }
stats.avg = (stats.low + stats.high) / 2.0; stats.avg = avg_sum / static_cast<double>(count);
out_stats[d] = stats; out_stats[d] = stats;
} }

View File

@@ -28,13 +28,17 @@ std::vector<Interval> find_intervals(const std::vector<DayStats>& days, double t
interval.end_avg = price_now; interval.end_avg = price_now;
interval.change = change; interval.change = change;
// Находим min(Open) и max(Close) в интервале // Находим min/max Open и Close в интервале
interval.min_open = days[start_idx].open; interval.open_min = days[start_idx].open_min;
interval.max_close = days[start_idx].close; interval.open_max = days[start_idx].open_max;
interval.close_min = days[start_idx].close_min;
interval.close_max = days[start_idx].close_max;
for (size_t j = start_idx; j <= i; j++) { for (size_t j = start_idx + 1; j <= i; j++) {
interval.min_open = std::min(interval.min_open, days[j].open); interval.open_min = std::min(interval.open_min, days[j].open_min);
interval.max_close = std::max(interval.max_close, days[j].close); interval.open_max = std::max(interval.open_max, days[j].open_max);
interval.close_min = std::min(interval.close_min, days[j].close_min);
interval.close_max = std::max(interval.close_max, days[j].close_max);
} }
intervals.push_back(interval); intervals.push_back(interval);
@@ -68,16 +72,17 @@ void write_intervals(const std::string& filename, const std::vector<Interval>& i
std::ofstream out(filename); std::ofstream out(filename);
out << std::fixed << std::setprecision(2); out << std::fixed << std::setprecision(2);
out << "start_date,end_date,min_open,max_close,start_avg,end_avg,change\n"; out << "start_date,end_date,open_min,open_max,close_min,close_max,start_avg,end_avg,change\n";
for (const auto& iv : intervals) { for (const auto& iv : intervals) {
out << day_index_to_date(iv.start_day) << "," out << day_index_to_date(iv.start_day) << ","
<< day_index_to_date(iv.end_day) << "," << day_index_to_date(iv.end_day) << ","
<< iv.min_open << "," << iv.open_min << ","
<< iv.max_close << "," << iv.open_max << ","
<< iv.close_min << ","
<< iv.close_max << ","
<< iv.start_avg << "," << iv.start_avg << ","
<< iv.end_avg << "," << iv.end_avg << ","
<< std::setprecision(6) << iv.change << "\n"; << std::setprecision(6) << iv.change << "\n";
} }
} }

View File

@@ -4,6 +4,19 @@
#include <vector> #include <vector>
#include <string> #include <string>
// Интервал с изменением >= threshold
struct Interval {
DayIndex start_day;
DayIndex end_day;
double open_min; // минимальный Open в интервале
double open_max; // максимальный Open в интервале
double close_min; // минимальный Close в интервале
double close_max; // максимальный Close в интервале
double start_avg;
double end_avg;
double change;
};
// Вычисление интервалов с изменением >= threshold (по умолчанию 10%) // Вычисление интервалов с изменением >= threshold (по умолчанию 10%)
std::vector<Interval> find_intervals(const std::vector<DayStats>& days, double threshold = 0.10); std::vector<Interval> find_intervals(const std::vector<DayStats>& days, double threshold = 0.10);
@@ -12,4 +25,3 @@ void write_intervals(const std::string& filename, const std::vector<Interval>& i
// Преобразование DayIndex в строку даты (YYYY-MM-DD) // Преобразование DayIndex в строку даты (YYYY-MM-DD)
std::string day_index_to_date(DayIndex day); std::string day_index_to_date(DayIndex day);

View File

@@ -1,54 +1,12 @@
#include <mpi.h> #include <mpi.h>
#include <omp.h>
#include <iostream> #include <iostream>
#include <vector> #include <vector>
#include <map>
#include <iomanip> #include <iomanip>
#include <cstdlib>
#include "csv_loader.hpp" #include "csv_loader.hpp"
#include "utils.hpp"
#include "record.hpp" #include "record.hpp"
#include "day_stats.hpp" #include "day_stats.hpp"
#include "aggregation.hpp" #include "aggregation.hpp"
#include "intervals.hpp"
#include "gpu_loader.hpp"
// Функция: отобрать записи для конкретного ранга
std::vector<Record> select_records_for_rank(
const std::map<DayIndex, std::vector<Record>>& days,
const std::vector<DayIndex>& day_list)
{
std::vector<Record> out;
for (auto d : day_list) {
auto it = days.find(d);
if (it != days.end()) {
const auto& vec = it->second;
out.insert(out.end(), vec.begin(), vec.end());
}
}
return out;
}
// Разделить записи на N частей (по дням)
std::vector<std::vector<Record>> split_records(const std::vector<Record>& records, int n_parts) {
// Группируем по дням
std::map<DayIndex, std::vector<Record>> by_day;
for (const auto& r : records) {
DayIndex day = static_cast<DayIndex>(r.timestamp) / 86400;
by_day[day].push_back(r);
}
// Распределяем дни по частям
std::vector<std::vector<Record>> parts(n_parts);
int i = 0;
for (auto& [day, recs] : by_day) {
parts[i % n_parts].insert(parts[i % n_parts].end(), recs.begin(), recs.end());
i++;
}
return parts;
}
int main(int argc, char** argv) { int main(int argc, char** argv) {
MPI_Init(&argc, &argv); MPI_Init(&argc, &argv);
@@ -57,225 +15,27 @@ int main(int argc, char** argv) {
MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size); MPI_Comm_size(MPI_COMM_WORLD, &size);
// Читаем количество CPU потоков из переменной окружения // Параллельное чтение данных
int num_cpu_threads = 2; double read_start = MPI_Wtime();
const char* env_threads = std::getenv("NUM_CPU_THREADS"); std::vector<Record> records = load_csv_parallel(rank, size);
if (env_threads) { double read_time = MPI_Wtime() - read_start;
num_cpu_threads = std::atoi(env_threads);
if (num_cpu_threads < 1) num_cpu_threads = 1;
}
omp_set_num_threads(num_cpu_threads + 1); // +1 для GPU потока если есть
// ====== ЗАГРУЗКА GPU ФУНКЦИЙ ====== std::cout << "Rank " << rank
auto gpu_is_available = load_gpu_is_available(); << ": read " << records.size() << " records"
auto gpu_aggregate = load_gpu_aggregate_days(); << " in " << std::fixed << std::setprecision(3) << read_time << " sec"
<< std::endl;
bool have_gpu = false;
if (gpu_is_available && gpu_is_available()) {
have_gpu = true;
std::cout << "Rank " << rank << ": GPU available + " << num_cpu_threads << " CPU threads" << std::endl;
} else {
std::cout << "Rank " << rank << ": " << num_cpu_threads << " CPU threads only" << std::endl;
}
std::vector<Record> local_records; // Агрегация по дням
double agg_start = MPI_Wtime();
std::vector<DayStats> days = aggregate_days(records);
double agg_time = MPI_Wtime() - agg_start;
// ====== ТАЙМЕРЫ ====== std::cout << "Rank " << rank
double time_load_data = 0.0; << ": aggregated " << days.size() << " days"
double time_distribute = 0.0; << " [" << (days.empty() ? 0 : days.front().day)
<< ".." << (days.empty() ? 0 : days.back().day) << "]"
if (rank == 0) { << " in " << std::fixed << std::setprecision(3) << agg_time << " sec"
std::cout << "Rank 0 loading CSV..." << std::endl; << std::endl;
// Таймер загрузки данных
double t_load_start = MPI_Wtime();
// Запускаем из build
auto records = load_csv("../data/data.csv");
auto days = group_by_day(records);
auto parts = split_days(days, size);
time_load_data = MPI_Wtime() - t_load_start;
std::cout << "Rank 0: Data loading time: " << std::fixed << std::setprecision(3)
<< time_load_data << "s" << std::endl;
// Таймер рассылки данных
double t_distribute_start = MPI_Wtime();
// Рассылаем данные
for (int r = 0; r < size; r++) {
auto vec = select_records_for_rank(days, parts[r]);
if (r == 0) {
// себе не отправляем — сразу сохраняем
local_records = vec;
continue;
}
int count = static_cast<int>(vec.size());
MPI_Send(&count, 1, MPI_INT, r, 0, MPI_COMM_WORLD);
MPI_Send(vec.data(), count * sizeof(Record), MPI_BYTE, r, 1, MPI_COMM_WORLD);
}
time_distribute = MPI_Wtime() - t_distribute_start;
}
else {
// Таймер получения данных
double t_receive_start = MPI_Wtime();
// Принимает данные
int count = 0;
MPI_Recv(&count, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
local_records.resize(count);
MPI_Recv(local_records.data(), count * sizeof(Record),
MPI_BYTE, 0, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
time_distribute = MPI_Wtime() - t_receive_start;
}
MPI_Barrier(MPI_COMM_WORLD);
// Вывод времени рассылки/получения данных
std::cout << "Rank " << rank << ": Data distribution time: " << std::fixed
<< std::setprecision(3) << time_distribute << "s" << std::endl;
std::cout << "Rank " << rank << " received "
<< local_records.size() << " records" << std::endl;
// ====== АГРЕГАЦИЯ НА КАЖДОМ УЗЛЕ ======
std::vector<DayStats> local_stats;
double time_start = omp_get_wtime();
// Время работы: [0] = GPU (если есть), [1..n] = CPU потоки
std::vector<double> worker_times(num_cpu_threads + 1, 0.0);
if (have_gpu && gpu_aggregate) {
// GPU узел: делим на (1 + num_cpu_threads) частей
int n_workers = 1 + num_cpu_threads;
auto parts = split_records(local_records, n_workers);
std::vector<std::vector<DayStats>> results(n_workers);
std::vector<bool> success(n_workers, true);
#pragma omp parallel
{
int tid = omp_get_thread_num();
if (tid < n_workers) {
double t0 = omp_get_wtime();
if (tid == 0) {
// GPU поток
success[0] = aggregate_days_gpu(parts[0], results[0], gpu_aggregate);
} else {
// CPU потоки
results[tid] = aggregate_days(parts[tid]);
}
worker_times[tid] = omp_get_wtime() - t0;
}
}
// Объединяем результаты
for (int i = 0; i < n_workers; i++) {
if (i == 0 && !success[0]) {
// GPU failed - обработаем на CPU
std::cout << "Rank " << rank << ": GPU failed, processing on CPU" << std::endl;
double t0 = omp_get_wtime();
results[0] = aggregate_days(parts[0]);
worker_times[0] = omp_get_wtime() - t0;
}
local_stats.insert(local_stats.end(), results[i].begin(), results[i].end());
}
} else {
// CPU-only узел
auto parts = split_records(local_records, num_cpu_threads);
std::vector<std::vector<DayStats>> results(num_cpu_threads);
#pragma omp parallel
{
int tid = omp_get_thread_num();
if (tid < num_cpu_threads) {
double t0 = omp_get_wtime();
results[tid] = aggregate_days(parts[tid]);
worker_times[tid + 1] = omp_get_wtime() - t0; // +1 т.к. [0] для GPU
}
}
for (int i = 0; i < num_cpu_threads; i++) {
local_stats.insert(local_stats.end(), results[i].begin(), results[i].end());
}
}
double time_total = omp_get_wtime() - time_start;
// Вывод времени
std::cout << std::fixed << std::setprecision(3);
std::cout << "Rank " << rank << " aggregated " << local_stats.size() << " days in "
<< time_total << "s (";
if (have_gpu) {
std::cout << "GPU: " << worker_times[0] << "s, ";
}
for (int i = 0; i < num_cpu_threads; i++) {
int idx = have_gpu ? (i + 1) : (i + 1);
std::cout << "CPU" << i << ": " << worker_times[idx] << "s";
if (i < num_cpu_threads - 1) std::cout << ", ";
}
std::cout << ")" << std::endl;
// ====== СБОР АГРЕГИРОВАННЫХ ДАННЫХ НА RANK 0 ======
std::vector<DayStats> all_stats;
if (rank == 0) {
// Добавляем свои данные
all_stats.insert(all_stats.end(), local_stats.begin(), local_stats.end());
// Получаем данные от других узлов
for (int r = 1; r < size; r++) {
int count = 0;
MPI_Recv(&count, 1, MPI_INT, r, 2, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
std::vector<DayStats> remote_stats(count);
MPI_Recv(remote_stats.data(), count * sizeof(DayStats),
MPI_BYTE, r, 3, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
all_stats.insert(all_stats.end(), remote_stats.begin(), remote_stats.end());
}
} else {
// Отправляем свои агрегированные данные на rank 0
int count = static_cast<int>(local_stats.size());
MPI_Send(&count, 1, MPI_INT, 0, 2, MPI_COMM_WORLD);
MPI_Send(local_stats.data(), count * sizeof(DayStats), MPI_BYTE, 0, 3, MPI_COMM_WORLD);
}
// ====== ВЫЧИСЛЕНИЕ ИНТЕРВАЛОВ НА RANK 0 ======
if (rank == 0) {
std::cout << "Rank 0: merging " << all_stats.size() << " day stats..." << std::endl;
// Объединяем и сортируем
auto merged_stats = merge_day_stats(all_stats);
std::cout << "Rank 0: total " << merged_stats.size() << " unique days" << std::endl;
// Вычисляем интервалы
auto intervals = find_intervals(merged_stats, 0.10);
std::cout << "Found " << intervals.size() << " intervals with >=10% change" << std::endl;
// Записываем результат
write_intervals("../result.csv", intervals);
std::cout << "Results written to result.csv" << std::endl;
// Выводим первые несколько интервалов
std::cout << "\nFirst 5 intervals:\n";
std::cout << "start_date,end_date,min_open,max_close,change\n";
for (size_t i = 0; i < std::min(intervals.size(), size_t(5)); i++) {
const auto& iv = intervals[i];
std::cout << day_index_to_date(iv.start_day) << ","
<< day_index_to_date(iv.end_day) << ","
<< iv.min_open << ","
<< iv.max_close << ","
<< iv.change << "\n";
}
}
MPI_Finalize(); MPI_Finalize();
return 0; return 0;

View File

@@ -1,4 +1,8 @@
#include "utils.hpp" #include "utils.hpp"
#include <fstream>
#include <sstream>
#include <stdexcept>
#include <numeric>
std::map<DayIndex, std::vector<Record>> group_by_day(const std::vector<Record>& recs) { std::map<DayIndex, std::vector<Record>> group_by_day(const std::vector<Record>& recs) {
std::map<DayIndex, std::vector<Record>> days; std::map<DayIndex, std::vector<Record>> days;
@@ -23,3 +27,88 @@ std::vector<std::vector<DayIndex>> split_days(const std::map<DayIndex, std::vect
return out; return out;
} }
int get_num_cpu_threads() {
const char* env_threads = std::getenv("NUM_CPU_THREADS");
int num_cpu_threads = 1;
if (env_threads) {
num_cpu_threads = std::atoi(env_threads);
if (num_cpu_threads < 1) num_cpu_threads = 1;
}
return num_cpu_threads;
}
std::string get_env(const char* name) {
const char* env = std::getenv(name);
if (!env) {
throw std::runtime_error(std::string("Environment variable not set: ") + name);
}
return std::string(env);
}
std::string get_data_path() {
return get_env("DATA_PATH");
}
std::vector<int> get_data_read_shares() {
std::vector<int> shares;
std::stringstream ss(get_env("DATA_READ_SHARES"));
std::string item;
while (std::getline(ss, item, ',')) {
shares.push_back(std::stoi(item));
}
return shares;
}
int64_t get_read_overlap_bytes() {
return std::stoll(get_env("READ_OVERLAP_BYTES"));
}
int64_t get_file_size(const std::string& path) {
std::ifstream file(path, std::ios::binary | std::ios::ate);
if (!file.is_open()) {
throw std::runtime_error("Cannot open file: " + path);
}
return static_cast<int64_t>(file.tellg());
}
ByteRange calculate_byte_range(int rank, int size, int64_t file_size,
const std::vector<int>& shares, int64_t overlap_bytes) {
// Если shares пустой или не соответствует size, используем равные доли
std::vector<int> effective_shares;
if (shares.size() == static_cast<size_t>(size)) {
effective_shares = shares;
} else {
effective_shares.assign(size, 1);
}
int total_shares = std::accumulate(effective_shares.begin(), effective_shares.end(), 0);
// Вычисляем базовые границы для каждого ранка
int64_t bytes_per_share = file_size / total_shares;
int64_t base_start = 0;
for (int i = 0; i < rank; i++) {
base_start += bytes_per_share * effective_shares[i];
}
int64_t base_end = base_start + bytes_per_share * effective_shares[rank];
// Применяем overlap
ByteRange range;
if (rank == 0) {
// Первый ранк: начинаем с 0, добавляем overlap в конце
range.start = 0;
range.end = std::min(base_end + overlap_bytes, file_size);
} else if (rank == size - 1) {
// Последний ранк: вычитаем overlap в начале, читаем до конца файла
range.start = std::max(base_start - overlap_bytes, static_cast<int64_t>(0));
range.end = file_size;
} else {
// Промежуточные ранки: overlap с обеих сторон
range.start = std::max(base_start - overlap_bytes, static_cast<int64_t>(0));
range.end = std::min(base_end + overlap_bytes, file_size);
}
return range;
}

View File

@@ -4,6 +4,29 @@
#include "day_stats.hpp" #include "day_stats.hpp"
#include <map> #include <map>
#include <vector> #include <vector>
#include <string>
#include <cstdlib>
#include <cstdint>
// Группировка записей по дням
std::map<DayIndex, std::vector<Record>> group_by_day(const std::vector<Record>& recs); std::map<DayIndex, std::vector<Record>> group_by_day(const std::vector<Record>& recs);
std::vector<std::vector<DayIndex>> split_days(const std::map<DayIndex, std::vector<Record>>& days, int parts); std::vector<std::vector<DayIndex>> split_days(const std::map<DayIndex, std::vector<Record>>& days, int parts);
// Чтение переменных окружения
int get_num_cpu_threads();
std::string get_data_path();
std::vector<int> get_data_read_shares();
int64_t get_read_overlap_bytes();
// Структура для хранения диапазона байт для чтения
struct ByteRange {
int64_t start;
int64_t end; // exclusive
};
// Вычисляет диапазон байт для конкретного ранка
ByteRange calculate_byte_range(int rank, int size, int64_t file_size,
const std::vector<int>& shares, int64_t overlap_bytes);
// Получение размера файла
int64_t get_file_size(const std::string& path);