103 lines
3.2 KiB
Python
103 lines
3.2 KiB
Python
import requests
|
|
|
|
from core.util import logs
|
|
|
|
NTFY_URL = "https://ntfy.sh"
|
|
|
|
log = logs.get_logger(__name__)
|
|
|
|
|
|
# Actual function to send a message to a topic
|
|
def ntfy_sendmsg(**kwargs):
|
|
"""
|
|
Send a message to a topic using NTFY.
|
|
kwargs:
|
|
msg: Message to send, must be specified
|
|
notification_settings: Notification settings, must be specified
|
|
url: URL to NTFY server, can be None to use default
|
|
topic: Topic to send message to, must be specified
|
|
priority: Priority of message, optional
|
|
title: Title of message, optional
|
|
tags: Tags to add to message, optional
|
|
"""
|
|
msg = kwargs.get("msg", None)
|
|
notification_settings = kwargs.get("notification_settings")
|
|
|
|
title = kwargs.get("title", None)
|
|
priority = notification_settings.get("priority", None)
|
|
tags = kwargs.get("tags", None)
|
|
url = notification_settings.get("url") or NTFY_URL
|
|
topic = notification_settings.get("topic", None)
|
|
|
|
headers = {"Title": "Fisk"}
|
|
if title:
|
|
headers["Title"] = title
|
|
if priority:
|
|
headers["Priority"] = priority
|
|
if tags:
|
|
headers["Tags"] = tags
|
|
try:
|
|
requests.post(
|
|
f"{url}/{topic}",
|
|
data=msg,
|
|
headers=headers,
|
|
)
|
|
except requests.exceptions.ConnectionError as e:
|
|
log.error(f"Error sending notification: {e}")
|
|
|
|
|
|
def webhook_sendmsg(**kwargs):
|
|
"""
|
|
Send a message to a webhook.
|
|
kwargs:
|
|
msg: Message to send, must be specified
|
|
notification_settings: Notification settings, must be specified
|
|
url: URL to webhook, must be specified"""
|
|
msg = kwargs.get("msg", None)
|
|
notification_settings = kwargs.get("notification_settings")
|
|
url = notification_settings.get("url")
|
|
try:
|
|
requests.post(
|
|
f"{url}",
|
|
data=msg,
|
|
)
|
|
except requests.exceptions.ConnectionError as e:
|
|
log.error(f"Error sending webhook: {e}")
|
|
|
|
|
|
# Sendmsg helper to send a message to a user's notification settings
|
|
def sendmsg(**kwargs):
|
|
"""
|
|
Send a message to a user's notification settings.
|
|
Fetches the user's default notification settings if not specified.
|
|
kwargs:
|
|
user: User to send message to, must be specified
|
|
notification_settings: Notification settings, optional
|
|
service: Notification service to use
|
|
|
|
kwargs for both services:
|
|
msg: Message to send, must be specified
|
|
notification_settings: Notification settings, must be specified
|
|
url: URL to NTFY server, can be None to use default
|
|
|
|
extra kwargs for ntfy:
|
|
title: Title of message, optional
|
|
tags: Tags to add to message, optional
|
|
notification_settings: Notification settings, must be specified
|
|
topic: Topic to send message to, must be specified
|
|
priority: Priority of message, optional
|
|
"""
|
|
user = kwargs.get("user", None)
|
|
notification_settings = kwargs.get(
|
|
"notification_settings", user.get_notification_settings().__dict__
|
|
)
|
|
if not notification_settings:
|
|
return
|
|
|
|
service = notification_settings.get("service")
|
|
|
|
if service == "ntfy":
|
|
ntfy_sendmsg(**kwargs)
|
|
elif service == "webhook":
|
|
webhook_sendmsg(**kwargs)
|