neptune/core/lib/manage/threshold.py

196 lines
4.4 KiB
Python

import urllib.parse
from django.conf import settings
from core.lib.opensearch import client, run_main_query
from core.lib.threshold import threshold_request
def get_irc_stats():
url = "irc/stats"
payload = {}
stats = threshold_request(url, payload)
if not stats:
return {}
return stats
def get_irc_networks():
url = "irc/networks"
payload = {}
networks = threshold_request(url, payload)
if not networks:
return {}
return networks
def get_irc_network(net):
url = f"irc/network/{net}"
payload = {}
network = threshold_request(url, payload)
if not network:
return {}
return network
def edit_irc_network(net, data):
url = f"irc/network/{net}/edit"
payload = dict(data)
result = threshold_request(url, payload)
return result
def change_network_status(net, num, status):
url = f"irc/network/{net}/{num}"
payload = {"status": status}
result = threshold_request(url, payload)
return result
def get_irc_relays(net):
url = f"irc/network/{net}/relays"
payload = {}
relays = threshold_request(url, payload)
if not relays:
return {}
return relays
def get_irc_channels(net):
url = f"irc/network/{net}/channels"
payload = {}
channels = threshold_request(url, payload)
if not channels:
return {}
return channels
def part_channel(net, channel):
channel = urllib.parse.quote(channel, safe="")
url = f"irc/network/{net}/channel/{channel}"
payload = {}
parted = threshold_request(url, payload, method="DELETE")
if not parted:
return {}
return parted
def join_channel(net, channel):
channel = urllib.parse.quote(channel, safe="")
url = f"irc/network/{net}/channel/{channel}"
payload = {}
joined = threshold_request(url, payload, method="PUT")
if not joined:
return {}
return joined
def get_aliases():
url = "aliases"
payload = {}
aliases = threshold_request(url, payload, method="GET")
return aliases
def add_relay(net, num):
if num:
url = f"irc/network/{net}/{num}"
else:
url = f"irc/network/{net}"
payload = {}
created = threshold_request(url, payload, method="PUT")
return created
def del_relay(net, num):
url = f"irc/network/{net}/{num}"
payload = {}
deleted = threshold_request(url, payload, method="DELETE")
return deleted
def update_aliases(aliases):
url = "aliases"
payload = aliases
deleted = threshold_request(url, payload, method="POST")
return deleted
def run_auto(net):
url = f"irc/auto/{net}"
payload = {}
deleted = threshold_request(url, payload, method="POST")
return deleted
def run_list(net):
url = f"irc/list/{net}"
payload = {}
ran = threshold_request(url, payload, method="POST")
return ran
def create_network(data):
url = "irc/network/create"
payload = data
ran = threshold_request(url, payload, method="PUT")
return ran
def del_network(net):
url = f"irc/network/{net}"
payload = {}
deleted = threshold_request(url, payload, method="DELETE")
return deleted
def construct_alert_query():
# Get the initial query
query = {
"size": 25,
"query": {
"bool": {
"must": [
{"match": {"src": "irc"}},
]
}
},
"sort": [
{
"ts": {
"order": "desc",
}
}
],
}
return query
def get_irc_alerts(user):
query = construct_alert_query()
results = run_main_query(
client,
user, # passed through run_main_query to filter_blacklisted
query,
custom_query=True,
index=settings.OPENSEARCH_INDEX_INT,
)
if not results:
return []
results_parsed = []
if "hits" in results.keys():
if "hits" in results["hits"]:
for item in results["hits"]["hits"]:
element = item["_source"]
element["id"] = item["_id"]
# Split the timestamp into date and time
ts = element["ts"]
ts_spl = ts.split("T")
date = ts_spl[0]
time = ts_spl[1]
element["date"] = date
element["time"] = time
results_parsed.append(element)
return results_parsed