import logging from json import dumps from operator import itemgetter from typing import OrderedDict import requests from django.conf import settings from requests.exceptions import JSONDecodeError logger = logging.getLogger(__name__) def escape(obj): chars = ["[", "]", "^", "-", "*", "?"] if isinstance(obj, str): obj = obj.replace("\\", "\\\\") for i in chars: obj = obj.replace(i, "\\" + i) elif isinstance(obj, list): for i in obj: i = escape(i) elif isinstance(obj, dict): for key in obj: obj[key] = escape(obj[key]) return obj def sort_data(data): for item in data.keys(): if isinstance(data[item], list): cont = False for v in data[item]: if isinstance(v, list): cont = True if isinstance(v, dict): cont = True if cont: continue data[item].sort() elif isinstance(data[item], dict): # Don't sort nested stuff cont = False for k, v in data[item].items(): if isinstance(v, list): cont = True if isinstance(v, dict): cont = True if cont: continue sorted_item = sorted(data[item].items(), key=itemgetter(1), reverse=True) data[item] = OrderedDict({k: v for k, v in sorted_item}) def threshold_request(url, data, method="POST", esc=False): headers = { "ApiKey": settings.THRESHOLD_API_KEY, "Token": settings.THRESHOLD_API_TOKEN, } if esc: for key in data: data[key] = escape(data[key]) if method == "POST": method = requests.post elif method == "GET": method = requests.get elif method == "DELETE": method = requests.delete elif method == "PUT": method = requests.put else: logger.error("Invalid method specified") method = requests.get r = method( f"{settings.THRESHOLD_ENDPOINT}/{url}/", data=dumps(data), headers=headers ) if not r.headers.get("Counter") == settings.THRESHOLD_API_COUNTER: logger.error( ( f"Threshold API counter mismatch: " f"{r.headers.get('Counter')} != " f"{settings.THRESHOLD_API_COUNTER}" ) ) return False try: response = r.json() sort_data(response) except JSONDecodeError: logging.error(f"Invalid JSON response: {r.text}") return False return response def get_chans(net, query): url = "chans" payload = {"net": net, "query": query} channels = threshold_request(url, payload, esc=True) if not channels: return [] return channels["chans"] def get_users(net, query): url = "users" payload = {"net": net, "query": query} users = threshold_request(url, payload, esc=True) if not users: return [] return users["users"] def annotate_online(net, query): url = "online" payload = {"net": net, "query": query} online_info = threshold_request(url, payload, esc=True) if not online_info: return {} return online_info def annotate_num_users(net, query): url = "num_users" payload = {"net": net, "query": query} user_num_map = threshold_request(url, payload, esc=True) if not user_num_map: return {} return user_num_map def annotate_num_chans(net, query): url = "num_chans" payload = {"net": net, "query": query} chan_num_map = threshold_request(url, payload, esc=True) if not chan_num_map: return {} return chan_num_map