Files
cyber-security/lab5/main.py
2026-04-04 11:25:35 +03:00

368 lines
12 KiB
Python

from __future__ import annotations
import argparse
import hashlib
import hmac
import json
import os
import socket
import stat
import struct
import sys
import threading
from cryptography.hazmat.decrepit.ciphers.algorithms import TripleDES
from cryptography.hazmat.primitives.ciphers import Cipher, modes
from cryptography.hazmat.primitives import padding as sym_padding
MAX_MSG_LEN = 80
DEFAULT_KEY_FILE = "key.txt"
def check_key_permissions(path: str) -> None:
st = os.stat(path)
mode = st.st_mode
bad = stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IWOTH | stat.S_IXOTH
if mode & bad:
current = oct(mode & 0o777)
print(f"[!] Key file permissions too open ({current}): {path}")
print(f" Run: chmod 600 {path}")
sys.exit(1)
def load_key(path: str) -> str:
if not os.path.isfile(path):
print(f"[!] Key file not found: {path}")
sys.exit(1)
check_key_permissions(path)
with open(path) as f:
key = f.read().strip()
if len(key) < 32:
print(f"[!] Key must be at least 32 characters (got {len(key)})")
sys.exit(1)
return key
def derive_3des_key(file_key: str, salt: bytes) -> bytes:
return hashlib.sha256(file_key.encode() + salt).digest()[:24]
def encrypt_message(plaintext: str, file_key: str) -> tuple[bytes, bytes, bytes]:
salt = os.urandom(16)
key = derive_3des_key(file_key, salt)
iv = os.urandom(8)
padder = sym_padding.PKCS7(64).padder()
padded = padder.update(plaintext.encode()) + padder.finalize()
encryptor = Cipher(TripleDES(key), modes.CBC(iv)).encryptor()
ct = encryptor.update(padded) + encryptor.finalize()
return salt, iv, ct
def decrypt_message(salt: bytes, iv: bytes, ct: bytes, file_key: str) -> str:
key = derive_3des_key(file_key, salt)
decryptor = Cipher(TripleDES(key), modes.CBC(iv)).decryptor()
padded = decryptor.update(ct) + decryptor.finalize()
unpadder = sym_padding.PKCS7(64).unpadder()
plaintext = unpadder.update(padded) + unpadder.finalize()
return plaintext.decode()
def compute_hash(text: str) -> str:
return hashlib.md5(text.encode(), usedforsecurity=False).hexdigest()
def compute_hmac(file_key: str, text: str, salt: bytes) -> str:
return hmac.new(file_key.encode(), text.encode() + salt, hashlib.sha256).hexdigest()
def recv_exactly(sock: socket.socket, n: int) -> bytes | None:
buf = b""
while len(buf) < n:
chunk = sock.recv(n - len(buf))
if not chunk:
return None
buf += chunk
return buf
def send_packet(sock: socket.socket, data: bytes) -> None:
sock.sendall(struct.pack("!I", len(data)) + data)
def recv_packet(sock: socket.socket) -> dict | None:
header = recv_exactly(sock, 4)
if header is None:
return None
length = struct.unpack("!I", header)[0]
payload = recv_exactly(sock, length)
if payload is None:
return None
return json.loads(payload.decode())
def do_send(
sock: socket.socket,
text: str,
file_key: str | None,
use_encrypt: bool,
use_integrity: bool,
use_hmac: bool,
test_integrity: bool,
) -> None:
msg: dict = {}
print(f" [TX] plaintext: {text}")
enc_salt: bytes | None = None
if use_encrypt:
assert file_key is not None
salt, iv, ct = encrypt_message(text, file_key)
enc_salt = salt
derived_hex = derive_3des_key(file_key, salt).hex()
msg["encrypted"] = True
msg["salt"] = salt.hex()
msg["iv"] = iv.hex()
msg["data"] = ct.hex()
print(f" [TX] salt: {salt.hex()}")
print(f" [TX] 3DES key: {derived_hex}")
print(f" [TX] IV: {iv.hex()}")
print(f" [TX] ciphertext: {ct.hex()}")
else:
msg["encrypted"] = False
msg["data"] = text
if use_integrity:
h = compute_hash(text)
if test_integrity:
h = hashlib.md5(b"CORRUPTED_" + os.urandom(4), usedforsecurity=False).hexdigest()
print(f" [TX] MD5: {h} (CORRUPTED!)")
else:
print(f" [TX] MD5: {h}")
msg["hash"] = h
if use_hmac:
assert file_key is not None
hmac_salt = enc_salt if enc_salt is not None else os.urandom(16)
h = compute_hmac(file_key, text, hmac_salt)
msg["hmac_salt"] = hmac_salt.hex()
if test_integrity:
h = hmac.new(b"WRONG_KEY", os.urandom(8), hashlib.sha256).hexdigest()
print(f" [TX] HMAC salt: {hmac_salt.hex()}")
print(f" [TX] HMAC-256: {h} (CORRUPTED!)")
else:
print(f" [TX] HMAC salt: {hmac_salt.hex()}")
print(f" [TX] HMAC-256: {h}")
msg["hmac"] = h
send_packet(sock, json.dumps(msg).encode())
print()
def do_recv(
msg: dict,
file_key: str | None,
) -> str | None:
encrypted = msg.get("encrypted", False)
if encrypted:
salt = bytes.fromhex(msg["salt"])
iv = bytes.fromhex(msg["iv"])
ct = bytes.fromhex(msg["data"])
print(f" [RX] ciphertext: {ct.hex()}")
print(f" [RX] salt: {salt.hex()}")
print(f" [RX] IV: {iv.hex()}")
if file_key is None:
print(" [RX] ERROR: message is encrypted but no key loaded")
return None
derived_hex = derive_3des_key(file_key, salt).hex()
print(f" [RX] 3DES key: {derived_hex}")
try:
text = decrypt_message(salt, iv, ct, file_key)
except Exception as e:
print(f" [RX] decryption failed: {e}")
return None
print(f" [RX] decrypted: {text}")
else:
text = msg["data"]
print(f" [RX] plaintext: {text}")
if "hash" in msg:
received_hash = msg["hash"]
computed_hash = compute_hash(text)
ok = received_hash == computed_hash
print(f" [RX] recv hash: {received_hash}")
print(f" [RX] calc hash: {computed_hash}")
if ok:
print(" [RX] integrity: OK")
else:
print(" [RX] integrity: FAIL - message may have been tampered with!")
if "hmac" in msg:
hmac_salt = bytes.fromhex(msg["hmac_salt"])
received_hmac = msg["hmac"]
if file_key is None:
print(" [RX] ERROR: HMAC present but no key loaded")
else:
computed_h = compute_hmac(file_key, text, hmac_salt)
ok = hmac.compare_digest(received_hmac, computed_h)
print(f" [RX] HMAC salt: {hmac_salt.hex()}")
print(f" [RX] recv HMAC: {received_hmac}")
print(f" [RX] calc HMAC: {computed_h}")
if ok:
print(" [RX] HMAC auth: OK - sender verified")
else:
print(" [RX] HMAC auth: FAIL - sender NOT verified!")
print()
return text
def receiver_thread(
sock: socket.socket,
file_key: str | None,
stop_event: threading.Event,
) -> None:
try:
while not stop_event.is_set():
msg = recv_packet(sock)
if msg is None:
print("\n[*] Connection closed by remote side.")
stop_event.set()
break
do_recv(msg, file_key)
except OSError:
if not stop_event.is_set():
print("\n[*] Connection lost.")
stop_event.set()
def sender_thread(
sock: socket.socket,
file_key: str | None,
use_encrypt: bool,
use_integrity: bool,
use_hmac: bool,
test_integrity: bool,
stop_event: threading.Event,
) -> None:
try:
while not stop_event.is_set():
try:
text = input()
except EOFError:
stop_event.set()
break
if stop_event.is_set():
break
if len(text) > MAX_MSG_LEN:
print(f" [!] Message too long ({len(text)} > {MAX_MSG_LEN}), truncated.")
text = text[:MAX_MSG_LEN]
do_send(sock, text, file_key, use_encrypt, use_integrity, use_hmac, test_integrity)
except OSError:
if not stop_event.is_set():
stop_event.set()
def run_session(
sock: socket.socket,
addr: tuple,
file_key: str | None,
use_encrypt: bool,
use_integrity: bool,
use_hmac: bool,
test_integrity: bool,
) -> None:
flags = []
if use_encrypt:
flags.append("3DES")
if use_integrity:
flags.append("MD5")
if use_hmac:
flags.append("HMAC-SHA256")
if test_integrity:
flags.append("test-integrity")
mode_str = ", ".join(flags) if flags else "plaintext"
print(f"[*] Session with {addr[0]}:{addr[1]} [{mode_str}]")
print("[*] Type messages and press Enter to send. Ctrl+C / Ctrl+D to quit.\n")
stop = threading.Event()
rx = threading.Thread(target=receiver_thread, args=(sock, file_key, stop), daemon=True)
tx = threading.Thread(target=sender_thread, args=(sock, file_key, use_encrypt, use_integrity, use_hmac, test_integrity, stop), daemon=True)
rx.start()
tx.start()
try:
while not stop.is_set():
stop.wait(0.5)
except KeyboardInterrupt:
print("\n[*] Interrupted.")
stop.set()
sock.close()
def run_server(args: argparse.Namespace) -> None:
file_key = load_key(args.key) if (args.encrypt or args.hmac) else None
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
srv.bind(("0.0.0.0", args.port))
srv.listen(1)
print(f"[*] Listening on 0.0.0.0:{args.port} ...")
conn, addr = srv.accept()
print(f"[*] Client connected: {addr[0]}:{addr[1]}")
srv.close()
run_session(conn, addr, file_key, args.encrypt, args.integrity, args.hmac, args.test_integrity)
def run_client(args: argparse.Namespace) -> None:
file_key = load_key(args.key) if (args.encrypt or args.hmac) else None
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((args.host, args.port))
print(f"[*] Connected to {args.host}:{args.port}")
run_session(sock, (args.host, args.port), file_key, args.encrypt, args.integrity, args.hmac, args.test_integrity)
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Encrypted messaging (3DES-CBC; integrity MD5 per assignment; optional HMAC-SHA256)"
)
sub = parser.add_subparsers(dest="role", required=True)
for name, sp in [("server", sub.add_parser("server", help="Start server")),
("client", sub.add_parser("client", help="Start client"))]:
if name == "server":
sp.add_argument("--port", type=int, required=True)
else:
sp.add_argument("host")
sp.add_argument("port", type=int)
sp.add_argument("--encrypt", action="store_true", help="Enable 3DES-CBC encryption")
sp.add_argument("--integrity", action="store_true", help="MD5 integrity hash (variant / lab 2)")
sp.add_argument("--hmac", action="store_true", help="HMAC-SHA256 integrity + authentication")
sp.add_argument("--test-integrity", action="store_true", help="Send corrupted hash/HMAC")
sp.add_argument("--key", default=DEFAULT_KEY_FILE, help="Path to key file (default: key.txt)")
return parser
def main() -> None:
args = build_parser().parse_args()
if args.integrity and args.hmac:
print("[!] --integrity and --hmac are mutually exclusive")
sys.exit(1)
if args.test_integrity and not (args.integrity or args.hmac):
print("[!] --test-integrity requires --integrity or --hmac")
sys.exit(1)
if args.role == "server":
run_server(args)
else:
run_client(args)
if __name__ == "__main__":
main()