from __future__ import annotations import argparse import hashlib import hmac import json import os import socket import stat import struct import sys import threading from cryptography.hazmat.decrepit.ciphers.algorithms import TripleDES from cryptography.hazmat.primitives.ciphers import Cipher, modes from cryptography.hazmat.primitives import padding as sym_padding MAX_MSG_LEN = 80 DEFAULT_KEY_FILE = "key.txt" def check_key_permissions(path: str) -> None: st = os.stat(path) mode = st.st_mode bad = stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IWOTH | stat.S_IXOTH if mode & bad: current = oct(mode & 0o777) print(f"[!] Key file permissions too open ({current}): {path}") print(f" Run: chmod 600 {path}") sys.exit(1) def load_key(path: str) -> str: if not os.path.isfile(path): print(f"[!] Key file not found: {path}") sys.exit(1) check_key_permissions(path) with open(path) as f: key = f.read().strip() if len(key) < 32: print(f"[!] Key must be at least 32 characters (got {len(key)})") sys.exit(1) return key def derive_3des_key(file_key: str, salt: bytes) -> bytes: return hashlib.sha256(file_key.encode() + salt).digest()[:24] def encrypt_message(plaintext: str, file_key: str) -> tuple[bytes, bytes, bytes]: salt = os.urandom(16) key = derive_3des_key(file_key, salt) iv = os.urandom(8) padder = sym_padding.PKCS7(64).padder() padded = padder.update(plaintext.encode()) + padder.finalize() encryptor = Cipher(TripleDES(key), modes.CBC(iv)).encryptor() ct = encryptor.update(padded) + encryptor.finalize() return salt, iv, ct def decrypt_message(salt: bytes, iv: bytes, ct: bytes, file_key: str) -> str: key = derive_3des_key(file_key, salt) decryptor = Cipher(TripleDES(key), modes.CBC(iv)).decryptor() padded = decryptor.update(ct) + decryptor.finalize() unpadder = sym_padding.PKCS7(64).unpadder() plaintext = unpadder.update(padded) + unpadder.finalize() return plaintext.decode() def compute_hash(text: str) -> str: return hashlib.md5(text.encode(), usedforsecurity=False).hexdigest() def compute_hmac(file_key: str, text: str, salt: bytes) -> str: return hmac.new(file_key.encode(), text.encode() + salt, hashlib.sha256).hexdigest() def recv_exactly(sock: socket.socket, n: int) -> bytes | None: buf = b"" while len(buf) < n: chunk = sock.recv(n - len(buf)) if not chunk: return None buf += chunk return buf def send_packet(sock: socket.socket, data: bytes) -> None: sock.sendall(struct.pack("!I", len(data)) + data) def recv_packet(sock: socket.socket) -> dict | None: header = recv_exactly(sock, 4) if header is None: return None length = struct.unpack("!I", header)[0] payload = recv_exactly(sock, length) if payload is None: return None return json.loads(payload.decode()) def do_send( sock: socket.socket, text: str, file_key: str | None, use_encrypt: bool, use_integrity: bool, use_hmac: bool, test_integrity: bool, ) -> None: msg: dict = {} print(f" [TX] plaintext: {text}") enc_salt: bytes | None = None if use_encrypt: assert file_key is not None salt, iv, ct = encrypt_message(text, file_key) enc_salt = salt derived_hex = derive_3des_key(file_key, salt).hex() msg["encrypted"] = True msg["salt"] = salt.hex() msg["iv"] = iv.hex() msg["data"] = ct.hex() print(f" [TX] salt: {salt.hex()}") print(f" [TX] 3DES key: {derived_hex}") print(f" [TX] IV: {iv.hex()}") print(f" [TX] ciphertext: {ct.hex()}") else: msg["encrypted"] = False msg["data"] = text if use_integrity: h = compute_hash(text) if test_integrity: h = hashlib.md5(b"CORRUPTED_" + os.urandom(4), usedforsecurity=False).hexdigest() print(f" [TX] MD5: {h} (CORRUPTED!)") else: print(f" [TX] MD5: {h}") msg["hash"] = h if use_hmac: assert file_key is not None hmac_salt = enc_salt if enc_salt is not None else os.urandom(16) h = compute_hmac(file_key, text, hmac_salt) msg["hmac_salt"] = hmac_salt.hex() if test_integrity: h = hmac.new(b"WRONG_KEY", os.urandom(8), hashlib.sha256).hexdigest() print(f" [TX] HMAC salt: {hmac_salt.hex()}") print(f" [TX] HMAC-256: {h} (CORRUPTED!)") else: print(f" [TX] HMAC salt: {hmac_salt.hex()}") print(f" [TX] HMAC-256: {h}") msg["hmac"] = h send_packet(sock, json.dumps(msg).encode()) print() def do_recv( msg: dict, file_key: str | None, ) -> str | None: encrypted = msg.get("encrypted", False) if encrypted: salt = bytes.fromhex(msg["salt"]) iv = bytes.fromhex(msg["iv"]) ct = bytes.fromhex(msg["data"]) print(f" [RX] ciphertext: {ct.hex()}") print(f" [RX] salt: {salt.hex()}") print(f" [RX] IV: {iv.hex()}") if file_key is None: print(" [RX] ERROR: message is encrypted but no key loaded") return None derived_hex = derive_3des_key(file_key, salt).hex() print(f" [RX] 3DES key: {derived_hex}") try: text = decrypt_message(salt, iv, ct, file_key) except Exception as e: print(f" [RX] decryption failed: {e}") return None print(f" [RX] decrypted: {text}") else: text = msg["data"] print(f" [RX] plaintext: {text}") if "hash" in msg: received_hash = msg["hash"] computed_hash = compute_hash(text) ok = received_hash == computed_hash print(f" [RX] recv hash: {received_hash}") print(f" [RX] calc hash: {computed_hash}") if ok: print(" [RX] integrity: OK") else: print(" [RX] integrity: FAIL - message may have been tampered with!") if "hmac" in msg: hmac_salt = bytes.fromhex(msg["hmac_salt"]) received_hmac = msg["hmac"] if file_key is None: print(" [RX] ERROR: HMAC present but no key loaded") else: computed_h = compute_hmac(file_key, text, hmac_salt) ok = hmac.compare_digest(received_hmac, computed_h) print(f" [RX] HMAC salt: {hmac_salt.hex()}") print(f" [RX] recv HMAC: {received_hmac}") print(f" [RX] calc HMAC: {computed_h}") if ok: print(" [RX] HMAC auth: OK - sender verified") else: print(" [RX] HMAC auth: FAIL - sender NOT verified!") print() return text def receiver_thread( sock: socket.socket, file_key: str | None, stop_event: threading.Event, ) -> None: try: while not stop_event.is_set(): msg = recv_packet(sock) if msg is None: print("\n[*] Connection closed by remote side.") stop_event.set() break do_recv(msg, file_key) except OSError: if not stop_event.is_set(): print("\n[*] Connection lost.") stop_event.set() def sender_thread( sock: socket.socket, file_key: str | None, use_encrypt: bool, use_integrity: bool, use_hmac: bool, test_integrity: bool, stop_event: threading.Event, ) -> None: try: while not stop_event.is_set(): try: text = input() except EOFError: stop_event.set() break if stop_event.is_set(): break if len(text) > MAX_MSG_LEN: print(f" [!] Message too long ({len(text)} > {MAX_MSG_LEN}), truncated.") text = text[:MAX_MSG_LEN] do_send(sock, text, file_key, use_encrypt, use_integrity, use_hmac, test_integrity) except OSError: if not stop_event.is_set(): stop_event.set() def run_session( sock: socket.socket, addr: tuple, file_key: str | None, use_encrypt: bool, use_integrity: bool, use_hmac: bool, test_integrity: bool, ) -> None: flags = [] if use_encrypt: flags.append("3DES") if use_integrity: flags.append("MD5") if use_hmac: flags.append("HMAC-SHA256") if test_integrity: flags.append("test-integrity") mode_str = ", ".join(flags) if flags else "plaintext" print(f"[*] Session with {addr[0]}:{addr[1]} [{mode_str}]") print("[*] Type messages and press Enter to send. Ctrl+C / Ctrl+D to quit.\n") stop = threading.Event() rx = threading.Thread(target=receiver_thread, args=(sock, file_key, stop), daemon=True) tx = threading.Thread(target=sender_thread, args=(sock, file_key, use_encrypt, use_integrity, use_hmac, test_integrity, stop), daemon=True) rx.start() tx.start() try: while not stop.is_set(): stop.wait(0.5) except KeyboardInterrupt: print("\n[*] Interrupted.") stop.set() sock.close() def run_server(args: argparse.Namespace) -> None: file_key = load_key(args.key) if (args.encrypt or args.hmac) else None srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) srv.bind(("0.0.0.0", args.port)) srv.listen(1) print(f"[*] Listening on 0.0.0.0:{args.port} ...") conn, addr = srv.accept() print(f"[*] Client connected: {addr[0]}:{addr[1]}") srv.close() run_session(conn, addr, file_key, args.encrypt, args.integrity, args.hmac, args.test_integrity) def run_client(args: argparse.Namespace) -> None: file_key = load_key(args.key) if (args.encrypt or args.hmac) else None sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((args.host, args.port)) print(f"[*] Connected to {args.host}:{args.port}") run_session(sock, (args.host, args.port), file_key, args.encrypt, args.integrity, args.hmac, args.test_integrity) def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="Encrypted messaging (3DES-CBC; integrity MD5 per assignment; optional HMAC-SHA256)" ) sub = parser.add_subparsers(dest="role", required=True) for name, sp in [("server", sub.add_parser("server", help="Start server")), ("client", sub.add_parser("client", help="Start client"))]: if name == "server": sp.add_argument("--port", type=int, required=True) else: sp.add_argument("host") sp.add_argument("port", type=int) sp.add_argument("--encrypt", action="store_true", help="Enable 3DES-CBC encryption") sp.add_argument("--integrity", action="store_true", help="MD5 integrity hash (variant / lab 2)") sp.add_argument("--hmac", action="store_true", help="HMAC-SHA256 integrity + authentication") sp.add_argument("--test-integrity", action="store_true", help="Send corrupted hash/HMAC") sp.add_argument("--key", default=DEFAULT_KEY_FILE, help="Path to key file (default: key.txt)") return parser def main() -> None: args = build_parser().parse_args() if args.integrity and args.hmac: print("[!] --integrity and --hmac are mutually exclusive") sys.exit(1) if args.test_integrity and not (args.integrity or args.hmac): print("[!] --test-integrity requires --integrity or --hmac") sys.exit(1) if args.role == "server": run_server(args) else: run_client(args) if __name__ == "__main__": main()