neptune/core/lib/notify.py

108 lines
3.4 KiB
Python
Raw Permalink Normal View History

2023-01-12 07:20:48 +00:00
import requests
from core.util import logs
NTFY_URL = "https://ntfy.sh"
log = logs.get_logger(__name__)
# Actual function to send a message to a topic
def ntfy_sendmsg(**kwargs):
2023-01-16 07:20:37 +00:00
"""
Send a message to a topic using NTFY.
kwargs:
msg: Message to send, must be specified
notification_settings: Notification settings, must be specified
url: URL to NTFY server, can be None to use default
topic: Topic to send message to, must be specified
priority: Priority of message, optional
title: Title of message, optional
tags: Tags to add to message, optional
"""
msg = kwargs.get("msg", None)
notification_settings = kwargs.get("notification_settings")
2023-01-15 23:02:13 +00:00
title = kwargs.get("title", None)
priority = notification_settings.get("priority", None)
2023-01-15 23:02:13 +00:00
tags = kwargs.get("tags", None)
url = notification_settings.get("url") or NTFY_URL
topic = notification_settings.get("topic", None)
2023-01-15 23:02:13 +00:00
2023-01-12 07:20:48 +00:00
headers = {"Title": "Fisk"}
if title:
headers["Title"] = title
if priority:
headers["Priority"] = priority
if tags:
headers["Tags"] = tags
2023-01-12 19:00:06 +00:00
try:
requests.post(
f"{url}/{topic}",
data=msg,
headers=headers,
)
except requests.exceptions.ConnectionError as e:
log.error(f"Error sending notification: {e}")
2023-01-12 07:20:48 +00:00
def webhook_sendmsg(**kwargs):
2023-01-16 07:20:37 +00:00
"""
Send a message to a webhook.
kwargs:
msg: Message to send, must be specified
notification_settings: Notification settings, must be specified
url: URL to webhook, must be specified"""
msg = kwargs.get("msg", None)
notification_settings = kwargs.get("notification_settings")
url = notification_settings.get("url")
2023-02-10 22:52:59 +00:00
headers = {"Content-type": "application/json"}
2023-01-15 23:02:13 +00:00
try:
requests.post(
f"{url}",
2023-02-10 22:52:59 +00:00
headers=headers,
2023-01-15 23:02:13 +00:00
data=msg,
)
except requests.exceptions.ConnectionError as e:
log.error(f"Error sending webhook: {e}")
2023-01-12 07:20:48 +00:00
# Sendmsg helper to send a message to a user's notification settings
def sendmsg(**kwargs):
2023-01-16 07:20:37 +00:00
"""
Send a message to a user's notification settings.
Fetches the user's default notification settings if not specified.
kwargs:
user: User to send message to, must be specified
notification_settings: Notification settings, optional
service: Notification service to use
kwargs for both services:
msg: Message to send, must be specified
notification_settings: Notification settings, must be specified
url: URL to NTFY server, can be None to use default
extra kwargs for ntfy:
title: Title of message, optional
tags: Tags to add to message, optional
notification_settings: Notification settings, must be specified
topic: Topic to send message to, must be specified
priority: Priority of message, optional
"""
user = kwargs.get("user", None)
notification_settings = kwargs.get(
"notification_settings", user.get_notification_settings().__dict__
)
if not notification_settings:
return
2023-01-15 18:40:17 +00:00
service = notification_settings.get("service")
2023-02-09 07:20:07 +00:00
if service == "none":
# Don't send anything
return
2023-01-12 07:20:48 +00:00
2023-01-15 23:02:13 +00:00
if service == "ntfy":
ntfy_sendmsg(**kwargs)
2023-01-15 23:02:13 +00:00
elif service == "webhook":
webhook_sendmsg(**kwargs)