366 lines
12 KiB
Python
366 lines
12 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
import os
|
|
import socket
|
|
import stat
|
|
import struct
|
|
import sys
|
|
import threading
|
|
|
|
from cryptography.hazmat.decrepit.ciphers.algorithms import TripleDES
|
|
from cryptography.hazmat.primitives.ciphers import Cipher, modes
|
|
from cryptography.hazmat.primitives import padding as sym_padding
|
|
|
|
MAX_MSG_LEN = 80
|
|
DEFAULT_KEY_FILE = "key.txt"
|
|
|
|
|
|
def check_key_permissions(path: str) -> None:
|
|
st = os.stat(path)
|
|
mode = st.st_mode
|
|
bad = stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IWOTH | stat.S_IXOTH
|
|
if mode & bad:
|
|
current = oct(mode & 0o777)
|
|
print(f"[!] Key file permissions too open ({current}): {path}")
|
|
print(f" Run: chmod 600 {path}")
|
|
sys.exit(1)
|
|
|
|
|
|
def load_key(path: str) -> str:
|
|
if not os.path.isfile(path):
|
|
print(f"[!] Key file not found: {path}")
|
|
sys.exit(1)
|
|
check_key_permissions(path)
|
|
with open(path) as f:
|
|
key = f.read().strip()
|
|
if len(key) < 32:
|
|
print(f"[!] Key must be at least 32 characters (got {len(key)})")
|
|
sys.exit(1)
|
|
return key
|
|
|
|
|
|
def derive_3des_key(file_key: str, salt: bytes) -> bytes:
|
|
return hashlib.sha256(file_key.encode() + salt).digest()[:24]
|
|
|
|
|
|
def encrypt_message(plaintext: str, file_key: str) -> tuple[bytes, bytes, bytes]:
|
|
salt = os.urandom(16)
|
|
key = derive_3des_key(file_key, salt)
|
|
iv = os.urandom(8)
|
|
|
|
padder = sym_padding.PKCS7(64).padder()
|
|
padded = padder.update(plaintext.encode()) + padder.finalize()
|
|
|
|
encryptor = Cipher(TripleDES(key), modes.CBC(iv)).encryptor()
|
|
ct = encryptor.update(padded) + encryptor.finalize()
|
|
return salt, iv, ct
|
|
|
|
|
|
def decrypt_message(salt: bytes, iv: bytes, ct: bytes, file_key: str) -> str:
|
|
key = derive_3des_key(file_key, salt)
|
|
decryptor = Cipher(TripleDES(key), modes.CBC(iv)).decryptor()
|
|
padded = decryptor.update(ct) + decryptor.finalize()
|
|
|
|
unpadder = sym_padding.PKCS7(64).unpadder()
|
|
plaintext = unpadder.update(padded) + unpadder.finalize()
|
|
return plaintext.decode()
|
|
|
|
|
|
def compute_hash(text: str) -> str:
|
|
return hashlib.sha256(text.encode()).hexdigest()
|
|
|
|
|
|
def compute_hmac(file_key: str, text: str, salt: bytes) -> str:
|
|
return hmac.new(file_key.encode(), text.encode() + salt, hashlib.sha256).hexdigest()
|
|
|
|
|
|
def recv_exactly(sock: socket.socket, n: int) -> bytes | None:
|
|
buf = b""
|
|
while len(buf) < n:
|
|
chunk = sock.recv(n - len(buf))
|
|
if not chunk:
|
|
return None
|
|
buf += chunk
|
|
return buf
|
|
|
|
|
|
def send_packet(sock: socket.socket, data: bytes) -> None:
|
|
sock.sendall(struct.pack("!I", len(data)) + data)
|
|
|
|
|
|
def recv_packet(sock: socket.socket) -> dict | None:
|
|
header = recv_exactly(sock, 4)
|
|
if header is None:
|
|
return None
|
|
length = struct.unpack("!I", header)[0]
|
|
payload = recv_exactly(sock, length)
|
|
if payload is None:
|
|
return None
|
|
return json.loads(payload.decode())
|
|
|
|
|
|
def do_send(
|
|
sock: socket.socket,
|
|
text: str,
|
|
file_key: str | None,
|
|
use_encrypt: bool,
|
|
use_integrity: bool,
|
|
use_hmac: bool,
|
|
test_integrity: bool,
|
|
) -> None:
|
|
msg: dict = {}
|
|
print(f" [TX] plaintext: {text}")
|
|
|
|
enc_salt: bytes | None = None
|
|
if use_encrypt:
|
|
assert file_key is not None
|
|
salt, iv, ct = encrypt_message(text, file_key)
|
|
enc_salt = salt
|
|
derived_hex = derive_3des_key(file_key, salt).hex()
|
|
msg["encrypted"] = True
|
|
msg["salt"] = salt.hex()
|
|
msg["iv"] = iv.hex()
|
|
msg["data"] = ct.hex()
|
|
print(f" [TX] salt: {salt.hex()}")
|
|
print(f" [TX] 3DES key: {derived_hex}")
|
|
print(f" [TX] IV: {iv.hex()}")
|
|
print(f" [TX] ciphertext: {ct.hex()}")
|
|
else:
|
|
msg["encrypted"] = False
|
|
msg["data"] = text
|
|
|
|
if use_integrity:
|
|
h = compute_hash(text)
|
|
if test_integrity:
|
|
h = hashlib.sha256(b"CORRUPTED_" + os.urandom(4)).hexdigest()
|
|
print(f" [TX] SHA-256: {h} (CORRUPTED!)")
|
|
else:
|
|
print(f" [TX] SHA-256: {h}")
|
|
msg["hash"] = h
|
|
|
|
if use_hmac:
|
|
assert file_key is not None
|
|
hmac_salt = enc_salt if enc_salt is not None else os.urandom(16)
|
|
h = compute_hmac(file_key, text, hmac_salt)
|
|
msg["hmac_salt"] = hmac_salt.hex()
|
|
if test_integrity:
|
|
h = hmac.new(b"WRONG_KEY", os.urandom(8), hashlib.sha256).hexdigest()
|
|
print(f" [TX] HMAC salt: {hmac_salt.hex()}")
|
|
print(f" [TX] HMAC-256: {h} (CORRUPTED!)")
|
|
else:
|
|
print(f" [TX] HMAC salt: {hmac_salt.hex()}")
|
|
print(f" [TX] HMAC-256: {h}")
|
|
msg["hmac"] = h
|
|
|
|
send_packet(sock, json.dumps(msg).encode())
|
|
print()
|
|
|
|
|
|
def do_recv(
|
|
msg: dict,
|
|
file_key: str | None,
|
|
) -> str | None:
|
|
encrypted = msg.get("encrypted", False)
|
|
|
|
if encrypted:
|
|
salt = bytes.fromhex(msg["salt"])
|
|
iv = bytes.fromhex(msg["iv"])
|
|
ct = bytes.fromhex(msg["data"])
|
|
print(f" [RX] ciphertext: {ct.hex()}")
|
|
print(f" [RX] salt: {salt.hex()}")
|
|
print(f" [RX] IV: {iv.hex()}")
|
|
if file_key is None:
|
|
print(" [RX] ERROR: message is encrypted but no key loaded")
|
|
return None
|
|
derived_hex = derive_3des_key(file_key, salt).hex()
|
|
print(f" [RX] 3DES key: {derived_hex}")
|
|
try:
|
|
text = decrypt_message(salt, iv, ct, file_key)
|
|
except Exception as e:
|
|
print(f" [RX] decryption failed: {e}")
|
|
return None
|
|
print(f" [RX] decrypted: {text}")
|
|
else:
|
|
text = msg["data"]
|
|
print(f" [RX] plaintext: {text}")
|
|
|
|
if "hash" in msg:
|
|
received_hash = msg["hash"]
|
|
computed_hash = compute_hash(text)
|
|
ok = received_hash == computed_hash
|
|
print(f" [RX] recv hash: {received_hash}")
|
|
print(f" [RX] calc hash: {computed_hash}")
|
|
if ok:
|
|
print(" [RX] integrity: OK")
|
|
else:
|
|
print(" [RX] integrity: FAIL - message may have been tampered with!")
|
|
|
|
if "hmac" in msg:
|
|
hmac_salt = bytes.fromhex(msg["hmac_salt"])
|
|
received_hmac = msg["hmac"]
|
|
if file_key is None:
|
|
print(" [RX] ERROR: HMAC present but no key loaded")
|
|
else:
|
|
computed_h = compute_hmac(file_key, text, hmac_salt)
|
|
ok = hmac.compare_digest(received_hmac, computed_h)
|
|
print(f" [RX] HMAC salt: {hmac_salt.hex()}")
|
|
print(f" [RX] recv HMAC: {received_hmac}")
|
|
print(f" [RX] calc HMAC: {computed_h}")
|
|
if ok:
|
|
print(" [RX] HMAC auth: OK - sender verified")
|
|
else:
|
|
print(" [RX] HMAC auth: FAIL - sender NOT verified!")
|
|
|
|
print()
|
|
return text
|
|
|
|
|
|
def receiver_thread(
|
|
sock: socket.socket,
|
|
file_key: str | None,
|
|
stop_event: threading.Event,
|
|
) -> None:
|
|
try:
|
|
while not stop_event.is_set():
|
|
msg = recv_packet(sock)
|
|
if msg is None:
|
|
print("\n[*] Connection closed by remote side.")
|
|
stop_event.set()
|
|
break
|
|
do_recv(msg, file_key)
|
|
except OSError:
|
|
if not stop_event.is_set():
|
|
print("\n[*] Connection lost.")
|
|
stop_event.set()
|
|
|
|
|
|
def sender_thread(
|
|
sock: socket.socket,
|
|
file_key: str | None,
|
|
use_encrypt: bool,
|
|
use_integrity: bool,
|
|
use_hmac: bool,
|
|
test_integrity: bool,
|
|
stop_event: threading.Event,
|
|
) -> None:
|
|
try:
|
|
while not stop_event.is_set():
|
|
try:
|
|
text = input()
|
|
except EOFError:
|
|
stop_event.set()
|
|
break
|
|
if stop_event.is_set():
|
|
break
|
|
if len(text) > MAX_MSG_LEN:
|
|
print(f" [!] Message too long ({len(text)} > {MAX_MSG_LEN}), truncated.")
|
|
text = text[:MAX_MSG_LEN]
|
|
do_send(sock, text, file_key, use_encrypt, use_integrity, use_hmac, test_integrity)
|
|
except OSError:
|
|
if not stop_event.is_set():
|
|
stop_event.set()
|
|
|
|
|
|
def run_session(
|
|
sock: socket.socket,
|
|
addr: tuple,
|
|
file_key: str | None,
|
|
use_encrypt: bool,
|
|
use_integrity: bool,
|
|
use_hmac: bool,
|
|
test_integrity: bool,
|
|
) -> None:
|
|
flags = []
|
|
if use_encrypt:
|
|
flags.append("3DES")
|
|
if use_integrity:
|
|
flags.append("SHA-256")
|
|
if use_hmac:
|
|
flags.append("HMAC-SHA256")
|
|
if test_integrity:
|
|
flags.append("test-integrity")
|
|
mode_str = ", ".join(flags) if flags else "plaintext"
|
|
print(f"[*] Session with {addr[0]}:{addr[1]} [{mode_str}]")
|
|
print("[*] Type messages and press Enter to send. Ctrl+C / Ctrl+D to quit.\n")
|
|
|
|
stop = threading.Event()
|
|
rx = threading.Thread(target=receiver_thread, args=(sock, file_key, stop), daemon=True)
|
|
tx = threading.Thread(target=sender_thread, args=(sock, file_key, use_encrypt, use_integrity, use_hmac, test_integrity, stop), daemon=True)
|
|
rx.start()
|
|
tx.start()
|
|
|
|
try:
|
|
while not stop.is_set():
|
|
stop.wait(0.5)
|
|
except KeyboardInterrupt:
|
|
print("\n[*] Interrupted.")
|
|
stop.set()
|
|
|
|
sock.close()
|
|
|
|
|
|
def run_server(args: argparse.Namespace) -> None:
|
|
file_key = load_key(args.key) if (args.encrypt or args.hmac) else None
|
|
|
|
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
srv.bind(("0.0.0.0", args.port))
|
|
srv.listen(1)
|
|
print(f"[*] Listening on 0.0.0.0:{args.port} ...")
|
|
|
|
conn, addr = srv.accept()
|
|
print(f"[*] Client connected: {addr[0]}:{addr[1]}")
|
|
srv.close()
|
|
run_session(conn, addr, file_key, args.encrypt, args.integrity, args.hmac, args.test_integrity)
|
|
|
|
|
|
def run_client(args: argparse.Namespace) -> None:
|
|
file_key = load_key(args.key) if (args.encrypt or args.hmac) else None
|
|
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
sock.connect((args.host, args.port))
|
|
print(f"[*] Connected to {args.host}:{args.port}")
|
|
run_session(sock, (args.host, args.port), file_key, args.encrypt, args.integrity, args.hmac, args.test_integrity)
|
|
|
|
|
|
def build_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(description="Encrypted messaging (3DES-CBC + SHA-256)")
|
|
sub = parser.add_subparsers(dest="role", required=True)
|
|
|
|
for name, sp in [("server", sub.add_parser("server", help="Start server")),
|
|
("client", sub.add_parser("client", help="Start client"))]:
|
|
if name == "server":
|
|
sp.add_argument("--port", type=int, required=True)
|
|
else:
|
|
sp.add_argument("host")
|
|
sp.add_argument("port", type=int)
|
|
sp.add_argument("--encrypt", action="store_true", help="Enable 3DES-CBC encryption")
|
|
sp.add_argument("--integrity", action="store_true", help="SHA-256 integrity check")
|
|
sp.add_argument("--hmac", action="store_true", help="HMAC-SHA256 integrity + authentication")
|
|
sp.add_argument("--test-integrity", action="store_true", help="Send corrupted hash/HMAC")
|
|
sp.add_argument("--key", default=DEFAULT_KEY_FILE, help="Path to key file (default: key.txt)")
|
|
|
|
return parser
|
|
|
|
|
|
def main() -> None:
|
|
args = build_parser().parse_args()
|
|
if args.integrity and args.hmac:
|
|
print("[!] --integrity and --hmac are mutually exclusive")
|
|
sys.exit(1)
|
|
if args.test_integrity and not (args.integrity or args.hmac):
|
|
print("[!] --test-integrity requires --integrity or --hmac")
|
|
sys.exit(1)
|
|
if args.role == "server":
|
|
run_server(args)
|
|
else:
|
|
run_client(args)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|