neptune/core/lib/notify.py

67 lines
1.7 KiB
Python
Raw Normal View History

2023-01-12 07:20:48 +00:00
import requests
from core.util import logs
NTFY_URL = "https://ntfy.sh"
log = logs.get_logger(__name__)
# Actual function to send a message to a topic
2023-01-15 23:02:13 +00:00
def ntfy_sendmsg(msg, **kwargs):
title = kwargs.get("title", None)
priority = kwargs.get("priority", None)
tags = kwargs.get("tags", None)
url = kwargs.get("url", NTFY_URL)
topic = kwargs.get("topic", None)
2023-01-12 07:20:48 +00:00
headers = {"Title": "Fisk"}
if title:
headers["Title"] = title
if priority:
headers["Priority"] = priority
if tags:
headers["Tags"] = tags
2023-01-12 19:00:06 +00:00
try:
requests.post(
f"{url}/{topic}",
data=msg,
headers=headers,
)
except requests.exceptions.ConnectionError as e:
log.error(f"Error sending notification: {e}")
2023-01-12 07:20:48 +00:00
2023-01-15 23:02:13 +00:00
def webhook_sendmsg(msg, url):
try:
requests.post(
f"{url}",
data=msg,
)
except requests.exceptions.ConnectionError as e:
log.error(f"Error sending webhook: {e}")
2023-01-12 07:20:48 +00:00
# Sendmsg helper to send a message to a user's notification settings
2023-01-15 23:02:13 +00:00
def sendmsg(user, msg, **kwargs):
service = kwargs.get("service", "ntfy")
2023-01-12 07:20:48 +00:00
notification_settings = user.get_notification_settings()
2023-01-15 18:40:17 +00:00
# No custom topic specified
2023-01-12 07:20:48 +00:00
if "topic" not in kwargs:
2023-01-15 18:40:17 +00:00
# No user topic set either
if notification_settings.topic is None:
2023-01-12 07:20:48 +00:00
# No topic set, so don't send
return
else:
2023-01-15 18:40:17 +00:00
kwargs["topic"] = notification_settings.topic
2023-01-15 23:02:13 +00:00
if "url" not in kwargs:
if notification_settings.url is not None:
kwargs["url"] = notification_settings.url
2023-01-12 07:20:48 +00:00
2023-01-15 23:02:13 +00:00
if service == "ntfy":
ntfy_sendmsg(msg, **kwargs)
elif service == "webhook":
webhook_sendmsg(msg, kwargs["url"])