Source code for duck.utils.ssl

"""
SSL related tools and utilities.
"""
import os
import struct
import subprocess

from duck.exceptions.all import SettingsError
from duck.logging import logger
from duck.settings import SETTINGS


[docs] def is_ssl_data(data: bytes) -> bool: """ Checks if the given data is an SSL/TLS record. Args: data (bytes): Raw bytes received from a socket. Returns: bool: True if data appears to be SSL/TLS, False otherwise. """ if len(data) < 3: return False # Not enough data to determine SSL first_byte, version_major, version_minor = struct.unpack("!BBB", data[:3]) # SSL/TLS content types (Handshake, ChangeCipherSpec, Alert, Application Data) if first_byte in {0x14, 0x15, 0x16, 0x17}: # Check for valid SSL/TLS versions if (version_major == 0x03 and version_minor in {0x00, 0x01, 0x02, 0x03, 0x04}): return True return False
[docs] def generate_server_cert(): """ Generates a key pair (Key), csr (Certificate Signing Request ) and a self-signed certificate (CRT) for server-side use. This will generate 3 files using openssl: server.csr server.key server.crt This uses variables set in settings.py """ csr_path = SETTINGS["SSL_CSR_LOCATION"] certfile_path = SETTINGS["SSL_CERTFILE_LOCATION"] private_key_path = SETTINGS["SSL_PRIVATE_KEY_LOCATION"] logger.log( "Generating SSL certificate to use for HTTPS", level=logger.DEBUG, ) logger.log( "This will generate a self-signed certificate for development.", level=logger.DEBUG, ) logger.log( "For production, please submit the CSR (Certificate Signing Request) to trusted CA (Certificate Authority) for " "signing to ensure browser compatibility and trust.\n", level=logger.WARNING, ) # check if certfile and private key both exist, if not then continue if os.path.isfile(certfile_path) and os.path.isfile(private_key_path): # both are present flag a warning overwrite_existing = input( "SSL certfile and key pair already exist. Overwrite existing (y/N): " ) print() if not overwrite_existing.lower().startswith("y"): logger.log("Cancelled SSL Certificate generation", level=logger.DEBUG) return domain = SETTINGS["SERVER_DOMAIN"] country = SETTINGS["SERVER_COUNTRY"] state = SETTINGS["SERVER_STATE"] locality = SETTINGS["SERVER_LOCALITY"] organization = SETTINGS["SERVER_ORGANIZATION"] organization_unit = SETTINGS["SERVER_ORGANIZATION_UNIT"] if not domain: raise SettingsError("Please set SERVER_DOMAIN in settings.py") if len(country) != 2: raise SettingsError( "SERVER_COUNTRY should be a two-letter country code in settings.py" ) if not state: raise SettingsError("SERVER_STATE should be set in settings.py") if not locality: raise SettingsError("SERVER_LOCALITY should be set in settings.py") if not organization: raise SettingsError("SERVER_ORGANIZATION should be set in settings.py") if not organization_unit: raise SettingsError( "SERVER_ORGANIZATION_UNIT should be set in settings.py") # checking for openssl availability try: # Run the openssl command to check if it's available result = subprocess.run( ["openssl", "version"], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) # If openssl is available, result.returncode should be 0 if result.returncode == 0: logger.log( f"OpenSSL version: {result.stdout.decode('utf-8').strip()}", level=logger.INFO, ) else: logger.log( "Error: OpenSSL command failed unexpectedly", level=logger.ERROR, ) logger.log("Cancelled SSL Certificate generation", level=logger.ERROR) return except FileNotFoundError: # If OpenSSL is not found on the system logger.log( "Error: OpenSSL is not installed on your system", level=logger.ERROR, ) logger.log("Cancelled SSL Certificate generation", level=logger.ERROR) return except subprocess.CalledProcessError as e: # Catch any errors during the command execution logger.log( f"Error: OpenSSL command failed with message: {e.stderr.decode('utf-8')}", level=logger.ERROR, ) logger.log("Cancelled SSL Certificate generation", level=logger.ERROR) return # create commands private_key_cmd = ["openssl", "genrsa", "-out", "server.key", "2048"] csr_cmd = [ "openssl", "req", "-new", "-key", "server.key", "-out", csr_path, ] certfile_signing_cmd = [ "openssl", "req", "-new", "-key", private_key_path, "-out", csr_path, "-subj", f"/C={country}/ST={state}/L={locality}/O={organization}/OU={organization_unit}/CN={domain}", ] # generate private key process = subprocess.run(private_key_cmd, check=True) if process.returncode == 0: logger.log( f"Created private key in {private_key_path}", custom_color=logger.Fore.GREEN, ) else: logger.log("Failed to create a private key", level=logger.ERROR) logger.log_raw( f"{process.stderr.decode('utf-8')}", level=logger.ERROR, custom_color=logger.Style.RESET_ALL, ) logger.log("Cancelled SSL Certificate generation", level=logger.ERROR) return # generate csr (certificate signing request) process = subprocess.run(csr_cmd, check=True) if process.returncode == 0: logger.log( f"Created CSR (certificate signing request) in {csr_path}", custom_color=logger.Fore.GREEN, ) else: logger.log("Failed to create CSR", level=logger.ERROR) logger.log_raw( f"{process.stderr.decode('utf-8')}", level=logger.ERROR, custom_color=logger.Style.RESET_ALL, ) logger.log("Cancelled SSL Certificate generation", level=logger.ERROR) return # self-sign and create certificate process = subprocess.run(certfile_signing_cmd, check=True) if process.returncode == 0: logger.log( f"Created self-signed certificate in {certfile_path}", custom_color=logger.Fore.GREEN, ) logger.log( "SSL certificate generated successfully.", custom_color=logger.Fore.GREEN, ) else: logger.log("Failed to create ssl certificate", level=logger.ERROR) logger.log_raw( f"{process.stderr.decode('utf-8')}", level=logger.ERROR, custom_color=logger.Style.RESET_ALL, ) logger.log("Cancelled SSL Certificate generation", level=logger.ERROR) return