neptune/core/lib/threshold.py

111 lines
2.9 KiB
Python

import logging
from json import dumps
from operator import itemgetter
from typing import OrderedDict
import requests
from django.conf import settings
from requests.exceptions import JSONDecodeError
logger = logging.getLogger(__name__)
def escape(obj):
chars = ["[", "]", "^", "-", "*", "?"]
if isinstance(obj, str):
obj = obj.replace("\\", "\\\\")
for i in chars:
obj = obj.replace(i, "\\" + i)
elif isinstance(obj, list):
for i in obj:
i = escape(i)
elif isinstance(obj, dict):
for key in obj:
obj[key] = escape(obj[key])
return obj
def sort_data(data):
for item in data:
if isinstance(data[item], list):
data[item].sort()
elif isinstance(data[item], dict):
# if all([isinstance(x, int) for k, v in
# data[item].items() for x in v.values()]):
sorted_item = sorted(data[item].items(), key=itemgetter(1), reverse=True)
data[item] = OrderedDict({k: v for k, v in sorted_item})
def threshold_request(url, data):
headers = {
"ApiKey": settings.THRESHOLD_API_KEY,
"Token": settings.THRESHOLD_API_TOKEN,
}
for key in data:
data[key] = escape(data[key])
r = requests.post(
f"{settings.THRESHOLD_ENDPOINT}/{url}/", data=dumps(data), headers=headers
)
if not r.headers.get("Counter") == settings.THRESHOLD_API_COUNTER:
logger.error(
(
f"Threshold API counter mismatch: "
f"{r.headers.get('Counter')} != "
f"{settings.THRESHOLD_API_COUNTER}"
)
)
return False
try:
response = r.json()
sort_data(response)
except JSONDecodeError:
logging.error(f"Invalid JSON response: {r.text}")
return False
return response
def get_chans(net, query):
url = "chans"
payload = {"net": net, "query": query}
channels = threshold_request(url, payload)
if not channels:
return []
return channels["chans"]
def get_users(net, query):
url = "users"
payload = {"net": net, "query": query}
users = threshold_request(url, payload)
if not users:
return []
return users["users"]
def annotate_online(net, query):
url = "online"
payload = {"net": net, "query": query}
online_info = threshold_request(url, payload)
if not online_info:
return {}
return online_info
def annotate_num_users(net, query):
url = "num_users"
payload = {"net": net, "query": query}
user_num_map = threshold_request(url, payload)
if not user_num_map:
return {}
return user_num_map
def annotate_num_chans(net, query):
url = "num_chans"
payload = {"net": net, "query": query}
chan_num_map = threshold_request(url, payload)
if not chan_num_map:
return {}
return chan_num_map