137 lines
3.9 KiB
Python
137 lines
3.9 KiB
Python
#!/usr/bin/env python3
|
||
import argparse
|
||
import sys
|
||
import time
|
||
|
||
|
||
def parse_row(line: str):
|
||
# Timestamp,Open,High,Low,Close,Volume
|
||
ts, o, h, l, c, v = line.split(',')
|
||
return int(float(ts)), float(o), float(h), float(l), float(c), float(v)
|
||
|
||
|
||
def fmt_row(ts, o, h, l, c, v):
|
||
return f"{ts},{o:.2f},{h:.2f},{l:.2f},{c:.2f},{v:.8f}\n"
|
||
|
||
|
||
def count_lines_fast(path: str) -> int:
|
||
with open(path, "rb") as f:
|
||
return sum(1 for _ in f) - 1 # минус header
|
||
|
||
|
||
def main(inp, out, step, flush_every):
|
||
# считаем количество строк для прогресса
|
||
total_lines = count_lines_fast(inp)
|
||
print(f"Total input rows: {total_lines:,}", file=sys.stderr)
|
||
|
||
start_time = time.time()
|
||
processed = 0
|
||
last_report = start_time
|
||
|
||
with open(inp, "r", buffering=8 * 1024 * 1024) as fin, \
|
||
open(out, "w", buffering=8 * 1024 * 1024) as fout:
|
||
|
||
fin.readline() # пропускаем header
|
||
fout.write("Timestamp,Open,High,Low,Close,Volume\n")
|
||
|
||
first = fin.readline()
|
||
if not first:
|
||
return
|
||
|
||
prev = parse_row(first.strip())
|
||
|
||
out_buf = []
|
||
out_rows = 0
|
||
|
||
for line in fin:
|
||
line = line.strip()
|
||
if not line:
|
||
continue
|
||
|
||
cur = parse_row(line)
|
||
|
||
t1, o1, h1, l1, c1, v1 = prev
|
||
t2, o2, h2, l2, c2, v2 = cur
|
||
|
||
dt = t2 - t1
|
||
steps = dt // step
|
||
|
||
if steps > 0:
|
||
do = o2 - o1
|
||
dh = h2 - h1
|
||
dl = l2 - l1
|
||
dc = c2 - c1
|
||
dv = v2 - v1
|
||
|
||
inv = 1.0 / steps
|
||
for i in range(steps):
|
||
a = i * inv
|
||
out_buf.append(fmt_row(
|
||
t1 + i * step,
|
||
o1 + do * a,
|
||
h1 + dh * a,
|
||
l1 + dl * a,
|
||
c1 + dc * a,
|
||
v1 + dv * a
|
||
))
|
||
|
||
out_rows += steps
|
||
|
||
prev = cur
|
||
processed += 1
|
||
|
||
# прогресс
|
||
if processed % 100_000 == 0:
|
||
now = time.time()
|
||
if now - last_report >= 0.5:
|
||
pct = processed * 100.0 / total_lines
|
||
elapsed = now - start_time
|
||
speed = processed / elapsed if elapsed > 0 else 0
|
||
eta = (total_lines - processed) / speed if speed > 0 else 0
|
||
|
||
print(
|
||
f"\rprocessed: {processed:,} / {total_lines:,} "
|
||
f"({pct:5.1f}%) | "
|
||
f"out ~ {out_rows:,} | "
|
||
f"{speed:,.0f} rows/s | "
|
||
f"ETA {eta/60:5.1f} min",
|
||
end="",
|
||
file=sys.stderr,
|
||
flush=True,
|
||
)
|
||
last_report = now
|
||
|
||
# сброс буфера
|
||
if out_rows >= flush_every:
|
||
fout.write("".join(out_buf))
|
||
out_buf.clear()
|
||
out_rows = 0
|
||
|
||
# остатки
|
||
if out_buf:
|
||
fout.write("".join(out_buf))
|
||
|
||
# последнюю строку пишем как есть
|
||
t, o, h, l, c, v = prev
|
||
fout.write(fmt_row(t, o, h, l, c, v))
|
||
|
||
total_time = time.time() - start_time
|
||
print(
|
||
f"\nDone in {total_time/60:.1f} min",
|
||
file=sys.stderr
|
||
)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
ap = argparse.ArgumentParser()
|
||
ap.add_argument("-i", "--input", required=True)
|
||
ap.add_argument("-o", "--output", required=True)
|
||
ap.add_argument("-s", "--step", type=int, default=10)
|
||
ap.add_argument("--flush-every", type=int, default=200_000)
|
||
args = ap.parse_args()
|
||
|
||
if args.step <= 0:
|
||
raise SystemExit("step must be > 0")
|
||
|
||
main(args.input, args.output, args.step, args.flush_every)
|