import ipaddress import os import socket from fnmatch import fnmatch from urllib.parse import urlparse from django.conf import settings from core.util import logs log = logs.get_logger("security.attachments") _DEFAULT_ALLOWED_MIME_PATTERNS = ( "image/*", "video/*", "audio/*", "text/plain", "application/pdf", "application/zip", "application/json", "application/msword", "application/vnd.*", ) _DEFAULT_BLOCKED_MIME_TYPES = { "text/html", "application/xhtml+xml", "application/javascript", "text/javascript", "application/x-sh", "application/x-msdownload", "application/x-dosexec", "application/x-executable", } _BLOCKED_FILENAME_EXTENSIONS = { ".exe", ".dll", ".bat", ".cmd", ".msi", ".sh", ".ps1", ".jar", ".hta", } def _strip_content_type(value: str | None) -> str: raw = str(value or "").strip().lower() if not raw: return "" return raw.split(";", 1)[0].strip() def normalize_filename(value: str | None, fallback: str = "attachment.bin") -> str: name = os.path.basename(str(value or "").strip()) name = name.replace("\x00", "").strip() return name or fallback def normalized_content_type( value: str | None, *, fallback: str = "application/octet-stream", ) -> str: clean = _strip_content_type(value) return clean or fallback def _allowed_mime_patterns() -> tuple[str, ...]: configured = getattr(settings, "ATTACHMENT_ALLOWED_MIME_TYPES", None) if isinstance(configured, (list, tuple)): patterns = tuple(str(item or "").strip().lower() for item in configured if item) if patterns: return patterns return _DEFAULT_ALLOWED_MIME_PATTERNS def _blocked_mime_types() -> set[str]: configured = getattr(settings, "ATTACHMENT_BLOCKED_MIME_TYPES", None) if isinstance(configured, (list, tuple, set)): values = {str(item or "").strip().lower() for item in configured if item} if values: return values return set(_DEFAULT_BLOCKED_MIME_TYPES) def validate_attachment_metadata( *, filename: str | None, content_type: str | None, size: int | None = None, ) -> tuple[str, str]: normalized_name = normalize_filename(filename) normalized_type = normalized_content_type(content_type) _, ext = os.path.splitext(normalized_name.lower()) if ext in _BLOCKED_FILENAME_EXTENSIONS: raise ValueError(f"blocked_filename_extension:{ext}") if normalized_type in _blocked_mime_types(): raise ValueError(f"blocked_mime_type:{normalized_type}") allow_unmatched = bool(getattr(settings, "ATTACHMENT_ALLOW_UNKNOWN_MIME", False)) if not any( fnmatch(normalized_type, pattern) for pattern in _allowed_mime_patterns() ): if not allow_unmatched: raise ValueError(f"unsupported_mime_type:{normalized_type}") max_bytes = int(getattr(settings, "ATTACHMENT_MAX_BYTES", 25 * 1024 * 1024) or 0) if size and max_bytes > 0 and int(size) > max_bytes: raise ValueError(f"attachment_too_large:{size}>{max_bytes}") return normalized_name, normalized_type def _host_is_private(hostname: str) -> bool: if not hostname: return True lower = hostname.strip().lower() if lower in {"localhost", "localhost.localdomain"} or lower.endswith(".local"): return True try: addr = ipaddress.ip_address(lower) return ( addr.is_private or addr.is_loopback or addr.is_link_local or addr.is_multicast or addr.is_reserved or addr.is_unspecified ) except ValueError: pass try: infos = socket.getaddrinfo(lower, None) except Exception: return True for info in infos: ip = info[4][0] try: addr = ipaddress.ip_address(ip) except ValueError: return True if ( addr.is_private or addr.is_loopback or addr.is_link_local or addr.is_multicast or addr.is_reserved or addr.is_unspecified ): return True return False def validate_attachment_url(url_value: str | None) -> str: url_text = str(url_value or "").strip() parsed = urlparse(url_text) if parsed.scheme not in {"http", "https"}: raise ValueError("unsupported_url_scheme") if not parsed.netloc: raise ValueError("attachment_url_missing_host") if parsed.username or parsed.password: raise ValueError("attachment_url_embedded_credentials") allow_private = bool(getattr(settings, "ATTACHMENT_ALLOW_PRIVATE_URLS", False)) host = str(parsed.hostname or "").strip() if not allow_private and _host_is_private(host): raise ValueError(f"attachment_private_host_blocked:{host or '-'}") return url_text