Harden security
This commit is contained in:
169
core/security/attachments.py
Normal file
169
core/security/attachments.py
Normal file
@@ -0,0 +1,169 @@
|
||||
import ipaddress
|
||||
import os
|
||||
import socket
|
||||
from fnmatch import fnmatch
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from django.conf import settings
|
||||
|
||||
from core.util import logs
|
||||
|
||||
log = logs.get_logger("security.attachments")
|
||||
|
||||
_DEFAULT_ALLOWED_MIME_PATTERNS = (
|
||||
"image/*",
|
||||
"video/*",
|
||||
"audio/*",
|
||||
"text/plain",
|
||||
"application/pdf",
|
||||
"application/zip",
|
||||
"application/json",
|
||||
"application/msword",
|
||||
"application/vnd.*",
|
||||
)
|
||||
|
||||
_DEFAULT_BLOCKED_MIME_TYPES = {
|
||||
"text/html",
|
||||
"application/xhtml+xml",
|
||||
"application/javascript",
|
||||
"text/javascript",
|
||||
"application/x-sh",
|
||||
"application/x-msdownload",
|
||||
"application/x-dosexec",
|
||||
"application/x-executable",
|
||||
}
|
||||
|
||||
_BLOCKED_FILENAME_EXTENSIONS = {
|
||||
".exe",
|
||||
".dll",
|
||||
".bat",
|
||||
".cmd",
|
||||
".msi",
|
||||
".sh",
|
||||
".ps1",
|
||||
".jar",
|
||||
".hta",
|
||||
}
|
||||
|
||||
|
||||
def _strip_content_type(value: str | None) -> str:
|
||||
raw = str(value or "").strip().lower()
|
||||
if not raw:
|
||||
return ""
|
||||
return raw.split(";", 1)[0].strip()
|
||||
|
||||
|
||||
def normalize_filename(value: str | None, fallback: str = "attachment.bin") -> str:
|
||||
name = os.path.basename(str(value or "").strip())
|
||||
name = name.replace("\x00", "").strip()
|
||||
return name or fallback
|
||||
|
||||
|
||||
def normalized_content_type(
|
||||
value: str | None,
|
||||
*,
|
||||
fallback: str = "application/octet-stream",
|
||||
) -> str:
|
||||
clean = _strip_content_type(value)
|
||||
return clean or fallback
|
||||
|
||||
|
||||
def _allowed_mime_patterns() -> tuple[str, ...]:
|
||||
configured = getattr(settings, "ATTACHMENT_ALLOWED_MIME_TYPES", None)
|
||||
if isinstance(configured, (list, tuple)):
|
||||
patterns = tuple(str(item or "").strip().lower() for item in configured if item)
|
||||
if patterns:
|
||||
return patterns
|
||||
return _DEFAULT_ALLOWED_MIME_PATTERNS
|
||||
|
||||
|
||||
def _blocked_mime_types() -> set[str]:
|
||||
configured = getattr(settings, "ATTACHMENT_BLOCKED_MIME_TYPES", None)
|
||||
if isinstance(configured, (list, tuple, set)):
|
||||
values = {str(item or "").strip().lower() for item in configured if item}
|
||||
if values:
|
||||
return values
|
||||
return set(_DEFAULT_BLOCKED_MIME_TYPES)
|
||||
|
||||
|
||||
def validate_attachment_metadata(
|
||||
*,
|
||||
filename: str | None,
|
||||
content_type: str | None,
|
||||
size: int | None = None,
|
||||
) -> tuple[str, str]:
|
||||
normalized_name = normalize_filename(filename)
|
||||
normalized_type = normalized_content_type(content_type)
|
||||
_, ext = os.path.splitext(normalized_name.lower())
|
||||
if ext in _BLOCKED_FILENAME_EXTENSIONS:
|
||||
raise ValueError(f"blocked_filename_extension:{ext}")
|
||||
if normalized_type in _blocked_mime_types():
|
||||
raise ValueError(f"blocked_mime_type:{normalized_type}")
|
||||
|
||||
allow_unmatched = bool(getattr(settings, "ATTACHMENT_ALLOW_UNKNOWN_MIME", False))
|
||||
if not any(fnmatch(normalized_type, pattern) for pattern in _allowed_mime_patterns()):
|
||||
if not allow_unmatched:
|
||||
raise ValueError(f"unsupported_mime_type:{normalized_type}")
|
||||
|
||||
max_bytes = int(getattr(settings, "ATTACHMENT_MAX_BYTES", 25 * 1024 * 1024) or 0)
|
||||
if size and max_bytes > 0 and int(size) > max_bytes:
|
||||
raise ValueError(f"attachment_too_large:{size}>{max_bytes}")
|
||||
return normalized_name, normalized_type
|
||||
|
||||
|
||||
def _host_is_private(hostname: str) -> bool:
|
||||
if not hostname:
|
||||
return True
|
||||
lower = hostname.strip().lower()
|
||||
if lower in {"localhost", "localhost.localdomain"} or lower.endswith(".local"):
|
||||
return True
|
||||
try:
|
||||
addr = ipaddress.ip_address(lower)
|
||||
return (
|
||||
addr.is_private
|
||||
or addr.is_loopback
|
||||
or addr.is_link_local
|
||||
or addr.is_multicast
|
||||
or addr.is_reserved
|
||||
or addr.is_unspecified
|
||||
)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
try:
|
||||
infos = socket.getaddrinfo(lower, None)
|
||||
except Exception:
|
||||
return True
|
||||
for info in infos:
|
||||
ip = info[4][0]
|
||||
try:
|
||||
addr = ipaddress.ip_address(ip)
|
||||
except ValueError:
|
||||
return True
|
||||
if (
|
||||
addr.is_private
|
||||
or addr.is_loopback
|
||||
or addr.is_link_local
|
||||
or addr.is_multicast
|
||||
or addr.is_reserved
|
||||
or addr.is_unspecified
|
||||
):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def validate_attachment_url(url_value: str | None) -> str:
|
||||
url_text = str(url_value or "").strip()
|
||||
parsed = urlparse(url_text)
|
||||
if parsed.scheme not in {"http", "https"}:
|
||||
raise ValueError("unsupported_url_scheme")
|
||||
if not parsed.netloc:
|
||||
raise ValueError("attachment_url_missing_host")
|
||||
if parsed.username or parsed.password:
|
||||
raise ValueError("attachment_url_embedded_credentials")
|
||||
|
||||
allow_private = bool(getattr(settings, "ATTACHMENT_ALLOW_PRIVATE_URLS", False))
|
||||
host = str(parsed.hostname or "").strip()
|
||||
if not allow_private and _host_is_private(host):
|
||||
raise ValueError(f"attachment_private_host_blocked:{host or '-'}")
|
||||
return url_text
|
||||
Reference in New Issue
Block a user