import requests from core.util import logs NTFY_URL = "https://ntfy.sh" log = logs.get_logger(__name__) # Actual function to send a message to a topic def ntfy_sendmsg(**kwargs): """ Send a message to a topic using NTFY. kwargs: msg: Message to send, must be specified notification_settings: Notification settings, must be specified url: URL to NTFY server, can be None to use default topic: Topic to send message to, must be specified priority: Priority of message, optional title: Title of message, optional tags: Tags to add to message, optional """ msg = kwargs.get("msg", None) notification_settings = kwargs.get("notification_settings") title = kwargs.get("title", None) priority = notification_settings.get("priority", None) tags = kwargs.get("tags", None) url = notification_settings.get("url") or NTFY_URL topic = notification_settings.get("topic", None) headers = {"Title": "Fisk"} if title: headers["Title"] = title if priority: headers["Priority"] = priority if tags: headers["Tags"] = tags try: requests.post( f"{url}/{topic}", data=msg, headers=headers, ) except requests.exceptions.ConnectionError as e: log.error(f"Error sending notification: {e}") def webhook_sendmsg(**kwargs): """ Send a message to a webhook. kwargs: msg: Message to send, must be specified notification_settings: Notification settings, must be specified url: URL to webhook, must be specified""" msg = kwargs.get("msg", None) notification_settings = kwargs.get("notification_settings") url = notification_settings.get("url") try: requests.post( f"{url}", data=msg, ) except requests.exceptions.ConnectionError as e: log.error(f"Error sending webhook: {e}") # Sendmsg helper to send a message to a user's notification settings def sendmsg(**kwargs): """ Send a message to a user's notification settings. Fetches the user's default notification settings if not specified. kwargs: user: User to send message to, must be specified notification_settings: Notification settings, optional service: Notification service to use kwargs for both services: msg: Message to send, must be specified notification_settings: Notification settings, must be specified url: URL to NTFY server, can be None to use default extra kwargs for ntfy: title: Title of message, optional tags: Tags to add to message, optional notification_settings: Notification settings, must be specified topic: Topic to send message to, must be specified priority: Priority of message, optional """ user = kwargs.get("user", None) notification_settings = kwargs.get( "notification_settings", user.get_notification_settings().__dict__ ) if not notification_settings: return service = notification_settings.get("service") if service == "ntfy": ntfy_sendmsg(**kwargs) elif service == "webhook": webhook_sendmsg(**kwargs)