from datetime import datetime from django.conf import settings from django.core.exceptions import ValidationError from core.models import NotificationRule class QueryError(Exception): pass def parse_rule(user, query_params): """ Parse a rule query. """ if "rule" in query_params: try: rule_object = NotificationRule.objects.filter(id=query_params["rule"]) except ValidationError: message = "Rule is not a valid UUID" message_class = "danger" return {"message": message, "class": message_class} if not rule_object.exists(): message = "Rule does not exist" message_class = "danger" return {"message": message, "class": message_class} rule_object = rule_object.first() if not rule_object.user == user: message = "Rule does not belong to you" message_class = "danger" return {"message": message, "class": message_class} return rule_object else: return None def parse_size(query_params, sizes): if "size" in query_params: size = query_params["size"] if size not in sizes: message = "Size is not permitted" message_class = "danger" return {"message": message, "class": message_class} size = int(size) else: size = 15 return size def parse_index(user, query_params, raise_error=False): if "index" in query_params: index = query_params["index"] if index == "main": index = settings.INDEX_MAIN else: if not user.has_perm(f"core.index_{index}"): message = f"Not permitted to search by this index: {index}" if raise_error: raise QueryError(message) message_class = "danger" return { "message": message, "class": message_class, } if index == "meta": index = settings.INDEX_META elif index == "internal": index = settings.INDEX_INT elif index == "restricted": if not user.has_perm("core.restricted_sources"): message = f"Not permitted to search by this index: {index}" if raise_error: raise QueryError(message) message_class = "danger" return { "message": message, "class": message_class, } index = settings.INDEX_RESTRICTED else: message = f"Index is not valid: {index}" if raise_error: raise QueryError(message) message_class = "danger" return { "message": message, "class": message_class, } else: index = settings.INDEX_MAIN print("GOT INDEX", index) return index def parse_source(user, query_params, raise_error=False): source = None if "source" in query_params: source = query_params["source"] # Validate permissions for restricted sources if source in settings.SOURCES_RESTRICTED: if not user.has_perm("core.restricted_sources"): message = f"Access denied: {source}" if raise_error: raise QueryError(message) message_class = "danger" return {"message": message, "class": message_class} # Check validity of source elif source not in settings.MAIN_SOURCES: message = f"Invalid source: {source}" if raise_error: raise QueryError(message) message_class = "danger" return {"message": message, "class": message_class} if source == "all": source = None # the next block will populate it if source: sources = [source] else: # Here we need to populate what "all" means for the user. # They may only have access to a subset of the sources. # We build a custom source list with ones they have access # to, and then remove "all" from the list. sources = list(settings.MAIN_SOURCES) if user.has_perm("core.restricted_sources"): # If the user can use restricted sources, add them in. for source_iter in settings.SOURCES_RESTRICTED: sources.append(source_iter) # Get rid of "all", it's just a meta-source if "all" in sources: sources.remove("all") return sources def parse_sort(query_params): sort = None if "sorting" in query_params: sorting = query_params["sorting"] if sorting not in ("asc", "desc", "none"): message = "Invalid sort" message_class = "danger" return {"message": message, "class": message_class} if sorting == "asc": sort = "ascending" elif sorting == "desc": sort = "descending" return sort def parse_date_time(query_params): if set({"from_date", "to_date", "from_time", "to_time"}).issubset( query_params.keys() ): from_ts = f"{query_params['from_date']}T{query_params['from_time']}Z" to_ts = f"{query_params['to_date']}T{query_params['to_time']}Z" from_ts = datetime.strptime(from_ts, "%Y-%m-%dT%H:%MZ") to_ts = datetime.strptime(to_ts, "%Y-%m-%dT%H:%MZ") return (from_ts, to_ts) return (None, None) def parse_sentiment(query_params): sentiment = None if "check_sentiment" in query_params: if "sentiment_method" not in query_params: message = "No sentiment method" message_class = "danger" return {"message": message, "class": message_class} if "sentiment" in query_params: sentiment = query_params["sentiment"] try: sentiment = float(sentiment) except ValueError: message = "Sentiment is not a float" message_class = "danger" return {"message": message, "class": message_class} sentiment_method = query_params["sentiment_method"] return (sentiment_method, sentiment)