neptune/core/management/commands/processing.py

25 lines
783 B
Python
Raw Normal View History

2023-01-12 07:20:48 +00:00
import msgpack
2023-01-12 07:20:48 +00:00
from django.core.management.base import BaseCommand, CommandError
from redis import StrictRedis
2023-01-12 07:20:48 +00:00
2023-01-12 07:20:48 +00:00
from core.lib.rules import process_rules
2023-01-12 07:20:48 +00:00
from core.util import logs
2023-01-12 07:20:48 +00:00
log = logs.get_logger("processing")
2023-01-12 07:20:48 +00:00
2023-01-12 07:20:48 +00:00
class Command(BaseCommand):
def handle(self, *args, **options):
r = StrictRedis(unix_socket_path="/var/run/socks/redis.sock", db=0)
p = r.pubsub()
p.psubscribe("messages")
for message in p.listen():
if message:
if message["channel"] == b"messages":
data = message["data"]
try:
unpacked = msgpack.unpackb(data, raw=False)
except TypeError:
continue
process_rules(unpacked)