"""
SSL related tools and utilities.
"""
import os
import struct
import subprocess
from duck.exceptions.all import SettingsError
from duck.logging import logger
from duck.settings import SETTINGS
[docs]
def is_ssl_data(data: bytes) -> bool:
"""
Checks if the given data is an SSL/TLS record.
Args:
data (bytes): Raw bytes received from a socket.
Returns:
bool: True if data appears to be SSL/TLS, False otherwise.
"""
if len(data) < 3:
return False # Not enough data to determine SSL
first_byte, version_major, version_minor = struct.unpack("!BBB", data[:3])
# SSL/TLS content types (Handshake, ChangeCipherSpec, Alert, Application Data)
if first_byte in {0x14, 0x15, 0x16, 0x17}:
# Check for valid SSL/TLS versions
if (version_major == 0x03 and version_minor in {0x00, 0x01, 0x02, 0x03, 0x04}):
return True
return False
[docs]
def generate_server_cert():
"""
Generates a key pair (Key), csr (Certificate Signing Request ) and a self-signed certificate (CRT) for server-side use.
This will generate 3 files using openssl:
server.csr
server.key
server.crt
This uses variables set in settings.py
"""
base_dir = SETTINGS.get("BASE_DIR")
ssl_dir = os.path.join(str(base_dir), "etc", "ssl") if base_dir else os.getcwd()
csr_path = SETTINGS.get("SSL_CSR_LOCATION") or os.path.join(ssl_dir, "server.csr")
certfile_path = SETTINGS.get("SSL_CERTFILE_LOCATION") or os.path.join(ssl_dir, "server.crt")
private_key_path = SETTINGS.get("SSL_PRIVATE_KEY_LOCATION") or os.path.join(ssl_dir, "server.key")
os.makedirs(os.path.dirname(str(certfile_path)), exist_ok=True)
logger.log(
"Generating SSL certificate to use for HTTPS",
level=logger.DEBUG,
)
logger.log(
"This will generate a self-signed certificate for development.",
level=logger.DEBUG,
)
logger.log(
"For production, please submit the CSR (Certificate Signing Request) to trusted CA (Certificate Authority) for "
"signing to ensure browser compatibility and trust.\n",
level=logger.WARNING,
)
# check if certfile and private key both exist, if not then continue
if os.path.isfile(certfile_path) and os.path.isfile(private_key_path):
# both are present flag a warning
overwrite_existing = input(
"SSL certfile and key pair already exist. Overwrite existing (y/N): "
)
print()
if not overwrite_existing.lower().startswith("y"):
logger.log("Cancelled SSL Certificate generation",
level=logger.DEBUG)
return
domain = SETTINGS.get("SERVER_DOMAIN") or "localhost"
country = SETTINGS.get("SERVER_COUNTRY") or "US"
state = SETTINGS.get("SERVER_STATE") or "State"
locality = SETTINGS.get("SERVER_LOCALITY") or "Locality"
organization = SETTINGS.get("SERVER_ORGANIZATION") or "Duck"
organization_unit = SETTINGS.get("SERVER_ORGANIZATION_UNIT") or "Development"
if len(country) != 2:
country = "US"
# checking for openssl availability
try:
# Run the openssl command to check if it's available
result = subprocess.run(
["openssl", "version"],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
# If openssl is available, result.returncode should be 0
if result.returncode == 0:
logger.log(
f"OpenSSL version: {result.stdout.decode('utf-8').strip()}",
level=logger.INFO,
)
else:
logger.log(
"Error: OpenSSL command failed unexpectedly",
level=logger.ERROR,
)
logger.log("Cancelled SSL Certificate generation",
level=logger.ERROR)
return
except FileNotFoundError:
# If OpenSSL is not found on the system
logger.log(
"Error: OpenSSL is not installed on your system",
level=logger.ERROR,
)
logger.log("Cancelled SSL Certificate generation", level=logger.ERROR)
return
except subprocess.CalledProcessError as e:
# Catch any errors during the command execution
logger.log(
f"Error: OpenSSL command failed with message: {e.stderr.decode('utf-8')}",
level=logger.ERROR,
)
logger.log("Cancelled SSL Certificate generation", level=logger.ERROR)
return
# create commands
private_key_cmd = ["openssl", "genrsa", "-out", str(private_key_path), "2048"]
csr_cmd = [
"openssl",
"req",
"-new",
"-key",
str(private_key_path),
"-out",
str(csr_path),
"-subj",
f"/C={country}/ST={state}/L={locality}/O={organization}/OU={organization_unit}/CN={domain}",
]
certfile_signing_cmd = [
"openssl",
"x509",
"-req",
"-in",
str(csr_path),
"-signkey",
str(private_key_path),
"-out",
str(certfile_path),
"-days",
"365",
]
# generate private key
process = subprocess.run(private_key_cmd, check=True)
if process.returncode == 0:
logger.log(
f"Created private key in {private_key_path}",
custom_color=logger.Fore.GREEN,
)
else:
logger.log("Failed to create a private key", level=logger.ERROR)
logger.log_raw(
f"{process.stderr.decode('utf-8')}",
level=logger.ERROR,
custom_color=logger.Style.RESET_ALL,
)
logger.log("Cancelled SSL Certificate generation", level=logger.ERROR)
return
# generate csr (certificate signing request)
process = subprocess.run(csr_cmd, check=True)
if process.returncode == 0:
logger.log(
f"Created CSR (certificate signing request) in {csr_path}",
custom_color=logger.Fore.GREEN,
)
else:
logger.log("Failed to create CSR", level=logger.ERROR)
logger.log_raw(
f"{process.stderr.decode('utf-8')}",
level=logger.ERROR,
custom_color=logger.Style.RESET_ALL,
)
logger.log("Cancelled SSL Certificate generation", level=logger.ERROR)
return
# self-sign and create certificate
process = subprocess.run(certfile_signing_cmd, check=True)
if process.returncode == 0:
logger.log(
f"Created self-signed certificate in {certfile_path}",
custom_color=logger.Fore.GREEN,
)
logger.log(
"SSL certificate generated successfully.",
custom_color=logger.Fore.GREEN,
)
else:
logger.log("Failed to create ssl certificate", level=logger.ERROR)
logger.log_raw(
f"{process.stderr.decode('utf-8')}",
level=logger.ERROR,
custom_color=logger.Style.RESET_ALL,
)
logger.log("Cancelled SSL Certificate generation", level=logger.ERROR)
return