Files
GIA/core/security/attachments.py
2026-03-05 05:42:19 +00:00

170 lines
4.8 KiB
Python

import ipaddress
import os
import socket
from fnmatch import fnmatch
from urllib.parse import urlparse
from django.conf import settings
from core.util import logs
log = logs.get_logger("security.attachments")
_DEFAULT_ALLOWED_MIME_PATTERNS = (
"image/*",
"video/*",
"audio/*",
"text/plain",
"application/pdf",
"application/zip",
"application/json",
"application/msword",
"application/vnd.*",
)
_DEFAULT_BLOCKED_MIME_TYPES = {
"text/html",
"application/xhtml+xml",
"application/javascript",
"text/javascript",
"application/x-sh",
"application/x-msdownload",
"application/x-dosexec",
"application/x-executable",
}
_BLOCKED_FILENAME_EXTENSIONS = {
".exe",
".dll",
".bat",
".cmd",
".msi",
".sh",
".ps1",
".jar",
".hta",
}
def _strip_content_type(value: str | None) -> str:
raw = str(value or "").strip().lower()
if not raw:
return ""
return raw.split(";", 1)[0].strip()
def normalize_filename(value: str | None, fallback: str = "attachment.bin") -> str:
name = os.path.basename(str(value or "").strip())
name = name.replace("\x00", "").strip()
return name or fallback
def normalized_content_type(
value: str | None,
*,
fallback: str = "application/octet-stream",
) -> str:
clean = _strip_content_type(value)
return clean or fallback
def _allowed_mime_patterns() -> tuple[str, ...]:
configured = getattr(settings, "ATTACHMENT_ALLOWED_MIME_TYPES", None)
if isinstance(configured, (list, tuple)):
patterns = tuple(str(item or "").strip().lower() for item in configured if item)
if patterns:
return patterns
return _DEFAULT_ALLOWED_MIME_PATTERNS
def _blocked_mime_types() -> set[str]:
configured = getattr(settings, "ATTACHMENT_BLOCKED_MIME_TYPES", None)
if isinstance(configured, (list, tuple, set)):
values = {str(item or "").strip().lower() for item in configured if item}
if values:
return values
return set(_DEFAULT_BLOCKED_MIME_TYPES)
def validate_attachment_metadata(
*,
filename: str | None,
content_type: str | None,
size: int | None = None,
) -> tuple[str, str]:
normalized_name = normalize_filename(filename)
normalized_type = normalized_content_type(content_type)
_, ext = os.path.splitext(normalized_name.lower())
if ext in _BLOCKED_FILENAME_EXTENSIONS:
raise ValueError(f"blocked_filename_extension:{ext}")
if normalized_type in _blocked_mime_types():
raise ValueError(f"blocked_mime_type:{normalized_type}")
allow_unmatched = bool(getattr(settings, "ATTACHMENT_ALLOW_UNKNOWN_MIME", False))
if not any(fnmatch(normalized_type, pattern) for pattern in _allowed_mime_patterns()):
if not allow_unmatched:
raise ValueError(f"unsupported_mime_type:{normalized_type}")
max_bytes = int(getattr(settings, "ATTACHMENT_MAX_BYTES", 25 * 1024 * 1024) or 0)
if size and max_bytes > 0 and int(size) > max_bytes:
raise ValueError(f"attachment_too_large:{size}>{max_bytes}")
return normalized_name, normalized_type
def _host_is_private(hostname: str) -> bool:
if not hostname:
return True
lower = hostname.strip().lower()
if lower in {"localhost", "localhost.localdomain"} or lower.endswith(".local"):
return True
try:
addr = ipaddress.ip_address(lower)
return (
addr.is_private
or addr.is_loopback
or addr.is_link_local
or addr.is_multicast
or addr.is_reserved
or addr.is_unspecified
)
except ValueError:
pass
try:
infos = socket.getaddrinfo(lower, None)
except Exception:
return True
for info in infos:
ip = info[4][0]
try:
addr = ipaddress.ip_address(ip)
except ValueError:
return True
if (
addr.is_private
or addr.is_loopback
or addr.is_link_local
or addr.is_multicast
or addr.is_reserved
or addr.is_unspecified
):
return True
return False
def validate_attachment_url(url_value: str | None) -> str:
url_text = str(url_value or "").strip()
parsed = urlparse(url_text)
if parsed.scheme not in {"http", "https"}:
raise ValueError("unsupported_url_scheme")
if not parsed.netloc:
raise ValueError("attachment_url_missing_host")
if parsed.username or parsed.password:
raise ValueError("attachment_url_embedded_credentials")
allow_private = bool(getattr(settings, "ATTACHMENT_ALLOW_PRIVATE_URLS", False))
host = str(parsed.hostname or "").strip()
if not allow_private and _host_is_private(host):
raise ValueError(f"attachment_private_host_blocked:{host or '-'}")
return url_text