neptune/core/lib/rules.py

60 lines
2.0 KiB
Python

from core.db.storage import db
from yaml import load, dump
from yaml.scanner import ScannerError
from yaml.parser import ParserError
try:
from yaml import CLoader as Loader, CDumper as Dumper
except ImportError:
from yaml import Loader, Dumper
class NotificationRuleData(object):
def __init__(self, user, data):
self.user = user
self.data = data
self.parsed = None
self.parse_data()
self.validate_permissions()
def validate_permissions(self):
"""
Validate permissions for the source and index variables.
"""
if "index" in self.parsed:
index = self.parsed["index"]
if type(index) == list:
for i in index:
db.parse_index(self.user, {"index": i}, raise_error=True)
else:
db.parse_index(self.user, {"index": index}, raise_error=True)
else:
# Get the default value for the user if not present
index = db.parse_index(self.user, {}, raise_error=True)
self.parsed["index"] = index
if "source" in self.parsed:
source = self.parsed["source"]
if type(source) == list:
for i in source:
db.parse_source(self.user, {"source": i}, raise_error=True)
else:
db.parse_source(self.user, {"source": source}, raise_error=True)
else:
# Get the default value for the user if not present
source = db.parse_source(self.user, {}, raise_error=True)
self.parsed["source"] = source
def parse_data(self):
"""
Parse the data in the text field to YAML.
"""
try:
self.parsed = load(self.data, Loader=Loader)
except (ScannerError, ParserError) as e:
raise ValueError(f"Invalid YAML: {e}")
def __str__(self):
return dump(self.parsed, Dumper=Dumper)
def get_data(self):
return self.parsed