From 2833d2f7b46d2fc2db045ce43eda5b1849cb6871 Mon Sep 17 00:00:00 2001 From: Arity-T Date: Tue, 16 Dec 2025 13:25:38 +0000 Subject: [PATCH] =?UTF-8?q?upsampling=20=D0=B4=D0=B0=D0=BD=D0=BD=D1=8B?= =?UTF-8?q?=D1=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 10 ++++ upsample.py | 136 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 146 insertions(+) create mode 100644 upsample.py diff --git a/README.md b/README.md index d375f95..1cd549c 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,16 @@ [Kaggle Bitcoin Historical Data](https://www.kaggle.com/datasets/mczielinski/bitcoin-historical-data) +Исходные данные хранят информацию по каждой минуте. Чтобы увеличить объём данных +для более наглядной демонстрации эффективности параллельных вычислений +и вычислений на GPU, с помощью линейной интерполяции данные были преобразованы +из данных о каждой минуте в данные о каждых 10 секундах, то есть объём данных увеличился +в 6 раз. + +``` +python3 upsample.py -i ./data/data.csv -o ./data/data_10s.csv -s 10 +``` + ## Задание Группируем данные по дням (Timestamp), за каждый день вычисляем среднюю цену diff --git a/upsample.py b/upsample.py new file mode 100644 index 0000000..5507082 --- /dev/null +++ b/upsample.py @@ -0,0 +1,136 @@ +#!/usr/bin/env python3 +import argparse +import sys +import time + + +def parse_row(line: str): + # Timestamp,Open,High,Low,Close,Volume + ts, o, h, l, c, v = line.split(',') + return int(float(ts)), float(o), float(h), float(l), float(c), float(v) + + +def fmt_row(ts, o, h, l, c, v): + return f"{ts},{o:.2f},{h:.2f},{l:.2f},{c:.2f},{v:.8f}\n" + + +def count_lines_fast(path: str) -> int: + with open(path, "rb") as f: + return sum(1 for _ in f) - 1 # минус header + + +def main(inp, out, step, flush_every): + # считаем количество строк для прогресса + total_lines = count_lines_fast(inp) + print(f"Total input rows: {total_lines:,}", file=sys.stderr) + + start_time = time.time() + processed = 0 + last_report = start_time + + with open(inp, "r", buffering=8 * 1024 * 1024) as fin, \ + open(out, "w", buffering=8 * 1024 * 1024) as fout: + + fin.readline() # пропускаем header + fout.write("Timestamp,Open,High,Low,Close,Volume\n") + + first = fin.readline() + if not first: + return + + prev = parse_row(first.strip()) + + out_buf = [] + out_rows = 0 + + for line in fin: + line = line.strip() + if not line: + continue + + cur = parse_row(line) + + t1, o1, h1, l1, c1, v1 = prev + t2, o2, h2, l2, c2, v2 = cur + + dt = t2 - t1 + steps = dt // step + + if steps > 0: + do = o2 - o1 + dh = h2 - h1 + dl = l2 - l1 + dc = c2 - c1 + dv = v2 - v1 + + inv = 1.0 / steps + for i in range(steps): + a = i * inv + out_buf.append(fmt_row( + t1 + i * step, + o1 + do * a, + h1 + dh * a, + l1 + dl * a, + c1 + dc * a, + v1 + dv * a + )) + + out_rows += steps + + prev = cur + processed += 1 + + # прогресс + if processed % 100_000 == 0: + now = time.time() + if now - last_report >= 0.5: + pct = processed * 100.0 / total_lines + elapsed = now - start_time + speed = processed / elapsed if elapsed > 0 else 0 + eta = (total_lines - processed) / speed if speed > 0 else 0 + + print( + f"\rprocessed: {processed:,} / {total_lines:,} " + f"({pct:5.1f}%) | " + f"out ~ {out_rows:,} | " + f"{speed:,.0f} rows/s | " + f"ETA {eta/60:5.1f} min", + end="", + file=sys.stderr, + flush=True, + ) + last_report = now + + # сброс буфера + if out_rows >= flush_every: + fout.write("".join(out_buf)) + out_buf.clear() + out_rows = 0 + + # остатки + if out_buf: + fout.write("".join(out_buf)) + + # последнюю строку пишем как есть + t, o, h, l, c, v = prev + fout.write(fmt_row(t, o, h, l, c, v)) + + total_time = time.time() - start_time + print( + f"\nDone in {total_time/60:.1f} min", + file=sys.stderr + ) + + +if __name__ == "__main__": + ap = argparse.ArgumentParser() + ap.add_argument("-i", "--input", required=True) + ap.add_argument("-o", "--output", required=True) + ap.add_argument("-s", "--step", type=int, default=10) + ap.add_argument("--flush-every", type=int, default=200_000) + args = ap.parse_args() + + if args.step <= 0: + raise SystemExit("step must be > 0") + + main(args.input, args.output, args.step, args.flush_every)