"""
Module containing CSRFMiddleware class which mitigates against Cross-Site-Request-Forgery (CSRF) attacks.
"""
import re
import os
import string
import secrets
import hashlib
import datetime
import urllib.parse
from duck.http.middlewares import BaseMiddleware
from duck.http.request import HttpRequest
from duck.http.response import HttpForbiddenRequestResponse
from duck.meta import Meta
from duck.settings import SETTINGS
from duck.utils.urlcrack import URL
from duck.utils.safe_compare import constant_time_compare
from duck.shortcuts import simple_response
from duck.etc.internals.template import internal_render
CSRF_USE_SESSIONS = SETTINGS["CSRF_USE_SESSIONS"]
CSRF_SECRET_LENGTH = SETTINGS["CSRF_SECRET_LENGTH"]
CSRF_TOKEN_LENGTH = SETTINGS["CSRF_TOKEN_LENGTH"]
CSRF_SESSION_KEY = SETTINGS["CSRF_SESSION_KEY"]
# Allowed characters: Letters and Digits
ALLOWED_CHARACTERS = string.ascii_letters + string.digits
[docs]
def generate_dynamic_secret_key() -> bytes:
"""
Dynamically generates a secure, consistent key based on system-specific data.
Returns:
bytes: A dynamic, secure key derived from system-specific data.
"""
# Use machine-specific information (e.g., os.urandom, hostname, or MAC address)
machine_specific_data = os.getenv("HOSTNAME", "default_host").encode() + os.urandom(16)
key = hashlib.sha256(machine_specific_data).digest()
return key
[docs]
def generate_csrf_secret() -> str:
"""
Returns a secure random CSRF secret containing only letters and digits.
"""
return "".join(
secrets.choice(ALLOWED_CHARACTERS) for _ in range(CSRF_SECRET_LENGTH))
[docs]
def mask_cipher_secret(secret: str) -> str:
"""
Masks CSRF secret to produce a secure CSRF token.
Args:
secret (str): The CSRF secret.
Returns:
str: The CSRF token.
Raises:
ValueError: If the secret contains invalid characters.
"""
if not all(char in ALLOWED_CHARACTERS for char in secret):
raise ValueError(
"Secret contains invalid characters. Only letters and digits are allowed."
)
# Generate a random mask of the same length as the secret
mask = "".join(
secrets.choice(ALLOWED_CHARACTERS) for _ in range(CSRF_SECRET_LENGTH))
# XOR-like masking using modular arithmetic
masked_secret = "".join(
ALLOWED_CHARACTERS[(ALLOWED_CHARACTERS.index(secret[i])
+ ALLOWED_CHARACTERS.index(mask[i]))
% len(ALLOWED_CHARACTERS)]
for i in range(CSRF_SECRET_LENGTH))
# Return the full token (mask + masked secret)
return mask + masked_secret
[docs]
def unmask_cipher_token(token: str) -> str:
"""
Unmasks a CSRF token to retrieve the original CSRF secret.
Args:
token (str): The CSRF token.
Returns:
str: The original CSRF secret.
Raises:
ValueError: If the token is invalid or tampered with.
"""
# Validate token length
if len(token) != CSRF_TOKEN_LENGTH:
raise ValueError("Invalid token length.")
# Extract mask and masked secret
mask = token[:CSRF_SECRET_LENGTH]
masked_secret = token[CSRF_SECRET_LENGTH:]
# Reverse the masking process to retrieve the original secret
secret = "".join(
ALLOWED_CHARACTERS[(ALLOWED_CHARACTERS.index(masked_secret[i])
- ALLOWED_CHARACTERS.index(mask[i]))
% len(ALLOWED_CHARACTERS)]
for i in range(CSRF_SECRET_LENGTH))
return secret
[docs]
def add_new_csrf_cookie(request):
"""
Generates a new CSRF secret and saves it in the request's metadata.
Args:
request: The HTTP request object.
secret_key: The dynamic secret key used for signing the CSRF token.
"""
csrf_secret = generate_csrf_secret()
request.META["CSRF_COOKIE"] = csrf_secret
request.META["CSRF_COOKIE_NEEDS_UPDATE"] = True
return csrf_secret
[docs]
def get_csrf_token(request):
"""
Generates a new CSRF token and saves the CSRF secret in the request.META.
Args:
request: The http request.
This function performs the following actions:
1. Generates a new CSRF token (Csrf_Token), a scrambled/random token to be sent to the user.
2. Saves the CSRF secret (Csrf_Secret) in:
- request.META under the key 'CSRF_COOKIE'
The CSRF token (Csrf_Token) is sent to the client each time this function is called. (This is done by CSRFMiddleware)
"""
if "CSRF_COOKIE" in request.META:
csrf_secret = request.META["CSRF_COOKIE"]
# Since the cookie is being used, flag to send the cookie to client (even if the client already has it) in order to
# renew the expiry timer.
request.META["CSRF_COOKIE_NEEDS_UPDATE"] = True
else:
csrf_secret = add_new_csrf_cookie(request)
# add_new_csrf_cookie adds these to request.META:
# 1) CSRF_COOKIE
# 2) CSRF_COOKIE_NEEDS_UPDATE
try:
csrf_token = mask_cipher_secret(csrf_secret)
except Exception:
# no substring found or csrf_secret is None
# the csrf secret in request.META is invalid, add new one
csrf_secret = add_new_csrf_cookie(request)
csrf_token = mask_cipher_secret(csrf_secret)
if CSRF_USE_SESSIONS:
request.session[CSRF_SESSION_KEY] = csrf_secret
return csrf_token
[docs]
class OriginError(Exception):
"""
Exception class for invalid HTTP Origin
"""
[docs]
class RefererError(Exception):
"""
Exception class for invalid HTTP Referer
"""
[docs]
class CSRFCookieError(Exception):
"""
Exception class for CSRF cookie errors.
"""
[docs]
class CSRFMiddleware(BaseMiddleware):
"""
Middleware for mitigating Cross-Site Request Forgery (CSRF) attacks.
This middleware verifies the authenticity of requests by comparing the CSRF token
included in the request body (for methods such as POST, PUT, etc.) with the one
securely stored in the user's session (`request.SESSION`). This helps protect against
unauthorized actions being performed using an authenticated user's session.
The middleware operates with the following behavior:
- **Conditional Activation:** When `USE_DJANGO=True`, this middleware is skipped
unless the request path corresponds to a Duck explicit URL. Duck explicit URLs
are listed in `DUCK_EXPLICIT_URLS` and should not be proxied to Django at any point.
- **Prevention of CSRF Attacks:** CSRF attacks exploit a user's authenticated session
to perform unauthorized actions on their behalf. By ensuring the CSRF token in the
request body matches the one stored in the Session or Cookie, this middleware mitigates the risk
of such attacks.
Attributes:
USE_DJANGO (bool): Flag indicating whether to use Django for handling requests.
DUCK_EXPLICIT_URLS (list): List of URLs that should be handled by Duck and not
proxied to Django.
Methods:
- process_request(request): Verifies the `CSRF token` in the request and compares it
with the `Csrf Cookie/Secret` to ensure authenticity.
"""
debug_message: str = "CSRFMiddleware: CSRF token missing or invalid"
[docs]
@classmethod
@staticmethod
def rotate_csrf_token(request: HttpRequest):
"""
Resets the request csrf secret and returns the rotated csrf secret.
"""
# rotate token after login, user may need to reload page to get new csrf_token
csrf_secret = add_new_csrf_cookie(request)
[docs]
@classmethod
def _check_origin_ok(cls, request):
"""
Checks if request Origin is good origin
Returns:
True if request origin is ok
Raises:
OriginError: If origin provided is invalid in any way
"""
request_origin = request.origin
if not request_origin:
raise OriginError("No Origin header found in request")
parsed_good_origin = URL(request.host)
parsed_good_origin.scheme = request.scheme
parsed_request_origin = URL(request_origin)
if parsed_request_origin.port:
# There is port in origin header
if parsed_request_origin.port != parsed_good_origin.port:
raise OriginError("Port specified in Origin header is not allowed")
if (parsed_good_origin.host != parsed_request_origin.host
and parsed_good_origin.scheme != parsed_request_origin.scheme):
raise OriginError(f"Bad Origin header. Good origin '{parsed_good_origin.to_str()}' but got '{parsed_request_origin.to_str()}'.")
return True
[docs]
@classmethod
def _check_referer_ok(cls, request):
"""
Checks if request Referer is good referer
Returns:
True if request referer is ok
Raises:
RefererError: If referer provided is invalid in any way
"""
request_referer = request.referer
if not request_referer:
raise OriginError("No Referer header found in request")
parsed_good_referer = URL(request.host)
parsed_good_referer.scheme = request.scheme
parsed_request_referer = URL(request_referer)
if parsed_request_referer.port:
# There is port in referer header
if parsed_request_referer.port != parsed_good_referer.port:
raise RefererError("Port specified in Referer header is not allowed")
if (parsed_good_referer.host != parsed_request_referer.host
or parsed_good_referer.scheme != parsed_request_referer.scheme):
raise RefererError(f"Bad Referer header. Good referer '{parsed_good_referer.to_str()}' but got '{parsed_request_referer.to_str()}'.")
return True
[docs]
@classmethod
def check_csrf_cookie(cls, request):
"""
Checks for the csrf cookie sent in request.
Raises:
CSRFCookieError: This is raised if there is any issue with the CSRF cookie sent by the client.
"""
csrf_secret_from_cookie = request.META.get("CSRF_COOKIE")
if not csrf_secret_from_cookie:
cls.debug_message: str = "CSRFMiddleware: CSRF cookie not set"
raise CSRFCookieError(
"CSRF Cookie not set. This is required to ensure that your browser is not being hijacked by third parties"
)
if not len(csrf_secret_from_cookie) == CSRF_SECRET_LENGTH:
cls.debug_message: str = "CSRFMiddleware: CSRF cookie length invalid"
raise CSRFCookieError("CSRF Cookie length invalid")
if SETTINGS["CSRF_USE_SESSIONS"]:
csrf_secret_from_session = request.SESSION.get(CSRF_SESSION_KEY)
if (csrf_secret_from_session and not csrf_secret_from_cookie == csrf_secret_from_session):
cls.debug_message: str = "CSRFMiddleware: CSRF cookie mismatch"
raise CSRFCookieError(
"CSRF Cookie mismatch. CSRF Cookie not matching csrf cookie from session"
)
[docs]
@classmethod
def get_error_response(cls, request):
if SETTINGS["DEBUG"]:
csrf_error_context = {"reason": "CSRFMiddleware error"}
if hasattr(request, "csrf_error_reason"):
csrf_error_context["reason"] = request.csrf_error_reason
cls.debug_message = f"CSRFMiddleware: {request.csrf_error_reason}"
response = internal_render(
request,
"csrf_error.html",
context=csrf_error_context,
content_type="text/html",
status_code=403,
)
else:
body = None
response = simple_response(HttpForbiddenRequestResponse, body=body)
return response
[docs]
@classmethod
def process_response(cls, response, request):
if request.META.get("CSRF_COOKIE_NEEDS_UPDATE"):
# Csrf cookie needs to be sent to client
csrf_secret = request.META.get("CSRF_COOKIE")
csrf_cookie_name = SETTINGS["CSRF_COOKIE_NAME"]
csrf_cookie_domain = SETTINGS["CSRF_COOKIE_DOMAIN"] or Meta.get_metadata("DUCK_SERVER_DOMAIN")
max_age = SETTINGS["CSRF_COOKIE_AGE"]
expires = datetime.datetime.now(datetime.UTC) + datetime.timedelta(seconds=max_age)
path = SETTINGS["CSRF_COOKIE_PATH"]
secure = SETTINGS["CSRF_COOKIE_SECURE"]
httponly = SETTINGS["CSRF_COOKIE_HTTPONLY"]
samesite = SETTINGS["CSRF_COOKIE_SAMESITE"]
if csrf_cookie_name in response.cookies:
# Csrf cookie has been modified somehow, no need to set it.
return
if CSRF_USE_SESSIONS:
# No need to send cookie, the csrf secret is available in user session
return
# Set csrf cookie
response.set_cookie(
csrf_cookie_name,
value=csrf_secret,
domain=csrf_cookie_domain,
path=path,
expires=expires,
secure=secure,
httponly=httponly,
samesite=samesite,
)
[docs]
@classmethod
def process_request(cls, request: HttpRequest):
from duck.http.core.processor import (
is_django_side_url,
is_duck_explicit_url,
)
if SETTINGS["USE_DJANGO"] and (is_django_side_url(request.path) or not is_duck_explicit_url(request.path)):
# This request is meant for Django to handle, no need to do Csrf middleware checks (Django will do it).
return cls.request_ok
csrf_token_name = "csrfmiddlewaretoken"
csrf_session_key = CSRF_SESSION_KEY
csrf_cookie_name = SETTINGS["CSRF_COOKIE_NAME"]
csrf_header_name = SETTINGS["CSRF_HEADER_NAME"]
csrf_secret_from_cookie = request.get_header(csrf_header_name) or request.COOKIES.get(csrf_cookie_name)
if csrf_secret_from_cookie:
request.META["CSRF_COOKIE"] = csrf_secret_from_cookie
# Assume that anything not defined as 'safe' by RFC 9110 needs protection
if request.method in ("GET", "HEAD", "OPTIONS", "TRACE"):
return cls.request_ok
if SETTINGS["CSRF_USE_SESSIONS"]:
correct_csrf_secret = request.SESSION.get(csrf_session_key)
else:
correct_csrf_secret = csrf_secret_from_cookie
try:
cls.check_csrf_cookie(request)
except CSRFCookieError as e:
request.csrf_error_reason = str(e)
return cls.request_bad
# perform origin and referer checks
try:
cls._check_origin_ok(request)
cls._check_referer_ok(request)
except Exception as e:
if not isinstance(e, (OriginError, RefererError)):
e = "Error in performing Origin and Referer header checks"
request.csrf_error_reason = str(e)
return cls.request_bad
if not correct_csrf_secret:
request.csrf_error_reason = (
"Request might have expired, try reloading page")
return cls.request_bad
sent_token = request.QUERY["CONTENT_QUERY"].get(
csrf_token_name, "") or request.get_header(csrf_header_name, "")
try:
sent_csrf_token_secret = (
unmask_cipher_token(sent_token)
or "<no-token>")
except:
sent_csrf_token_secret = "<invalid-token>"
if not (constant_time_compare(sent_csrf_token_secret, correct_csrf_secret)
and len(correct_csrf_secret) == CSRF_SECRET_LENGTH
and len(sent_csrf_token_secret) == CSRF_SECRET_LENGTH):
request.csrf_error_reason = (
"CSRF token missing or invalid, try reloading page")
return cls.request_bad
# Warning: don't rotate csrf_token yet as this might mean user needs to reload web form every time.
return cls.request_ok