upsampling данных

This commit is contained in:
2025-12-16 13:25:38 +00:00
parent e4e01e1df3
commit 2833d2f7b4
2 changed files with 146 additions and 0 deletions

View File

@@ -2,6 +2,16 @@
[Kaggle Bitcoin Historical Data](https://www.kaggle.com/datasets/mczielinski/bitcoin-historical-data)
Исходные данные хранят информацию по каждой минуте. Чтобы увеличить объём данных
для более наглядной демонстрации эффективности параллельных вычислений
и вычислений на GPU, с помощью линейной интерполяции данные были преобразованы
из данных о каждой минуте в данные о каждых 10 секундах, то есть объём данных увеличился
в 6 раз.
```
python3 upsample.py -i ./data/data.csv -o ./data/data_10s.csv -s 10
```
## Задание
Группируем данные по дням (Timestamp), за каждый день вычисляем среднюю цену

136
upsample.py Normal file
View File

@@ -0,0 +1,136 @@
#!/usr/bin/env python3
import argparse
import sys
import time
def parse_row(line: str):
# Timestamp,Open,High,Low,Close,Volume
ts, o, h, l, c, v = line.split(',')
return int(float(ts)), float(o), float(h), float(l), float(c), float(v)
def fmt_row(ts, o, h, l, c, v):
return f"{ts},{o:.2f},{h:.2f},{l:.2f},{c:.2f},{v:.8f}\n"
def count_lines_fast(path: str) -> int:
with open(path, "rb") as f:
return sum(1 for _ in f) - 1 # минус header
def main(inp, out, step, flush_every):
# считаем количество строк для прогресса
total_lines = count_lines_fast(inp)
print(f"Total input rows: {total_lines:,}", file=sys.stderr)
start_time = time.time()
processed = 0
last_report = start_time
with open(inp, "r", buffering=8 * 1024 * 1024) as fin, \
open(out, "w", buffering=8 * 1024 * 1024) as fout:
fin.readline() # пропускаем header
fout.write("Timestamp,Open,High,Low,Close,Volume\n")
first = fin.readline()
if not first:
return
prev = parse_row(first.strip())
out_buf = []
out_rows = 0
for line in fin:
line = line.strip()
if not line:
continue
cur = parse_row(line)
t1, o1, h1, l1, c1, v1 = prev
t2, o2, h2, l2, c2, v2 = cur
dt = t2 - t1
steps = dt // step
if steps > 0:
do = o2 - o1
dh = h2 - h1
dl = l2 - l1
dc = c2 - c1
dv = v2 - v1
inv = 1.0 / steps
for i in range(steps):
a = i * inv
out_buf.append(fmt_row(
t1 + i * step,
o1 + do * a,
h1 + dh * a,
l1 + dl * a,
c1 + dc * a,
v1 + dv * a
))
out_rows += steps
prev = cur
processed += 1
# прогресс
if processed % 100_000 == 0:
now = time.time()
if now - last_report >= 0.5:
pct = processed * 100.0 / total_lines
elapsed = now - start_time
speed = processed / elapsed if elapsed > 0 else 0
eta = (total_lines - processed) / speed if speed > 0 else 0
print(
f"\rprocessed: {processed:,} / {total_lines:,} "
f"({pct:5.1f}%) | "
f"out ~ {out_rows:,} | "
f"{speed:,.0f} rows/s | "
f"ETA {eta/60:5.1f} min",
end="",
file=sys.stderr,
flush=True,
)
last_report = now
# сброс буфера
if out_rows >= flush_every:
fout.write("".join(out_buf))
out_buf.clear()
out_rows = 0
# остатки
if out_buf:
fout.write("".join(out_buf))
# последнюю строку пишем как есть
t, o, h, l, c, v = prev
fout.write(fmt_row(t, o, h, l, c, v))
total_time = time.time() - start_time
print(
f"\nDone in {total_time/60:.1f} min",
file=sys.stderr
)
if __name__ == "__main__":
ap = argparse.ArgumentParser()
ap.add_argument("-i", "--input", required=True)
ap.add_argument("-o", "--output", required=True)
ap.add_argument("-s", "--step", type=int, default=10)
ap.add_argument("--flush-every", type=int, default=200_000)
args = ap.parse_args()
if args.step <= 0:
raise SystemExit("step must be > 0")
main(args.input, args.output, args.step, args.flush_every)