Implement threshold writing to Redis and manticore ingesting from Redis

This commit is contained in:
Mark Veidemanis 2022-09-07 07:20:30 +01:00
parent 137299fe9e
commit cdd12cd082
Signed by: m
GPG Key ID: 5ACFCEED46C0904F
7 changed files with 184 additions and 14 deletions

71
db.py
View File

@ -1,5 +1,6 @@
from math import ceil
import aioredis
import manticoresearch
import ujson
from manticoresearch.rest import ApiException
@ -7,21 +8,51 @@ from numpy import array_split
from redis import StrictRedis
import util
from schemas.mc_s import schema
import aioredis
from schemas import mc_s
configuration = manticoresearch.Configuration(host="http://monolith-db-1:9308")
api_client = manticoresearch.ApiClient(configuration)
api_instance = manticoresearch.IndexApi(api_client)
log = util.get_logger("db")
# Redis (legacy)
r = StrictRedis(unix_socket_path="/var/run/redis/redis.sock", db=0)
ar = aioredis.from_url("unix:///var/run/redis/redis.sock")
# AIORedis
ar = aioredis.from_url("unix:///var/run/redis/redis.sock", db=0)
TYPES_MAIN = [
"msg",
"notice",
"action",
"part",
"join",
"kick",
"quit",
"nick",
"mode",
"topic",
]
TYPES_META = ["who"]
TYPES_INT = ["conn", "highlight", "znc", "query", "self"]
def store_message(msg):
"""
Store a message into Manticore
:param msg: dict
"""
# Duplicated to avoid extra function call
if msg["type"] in TYPES_MAIN:
index = "main"
schema = mc_s.schema_main
elif msg["type"] in TYPES_META:
index = "meta"
schema = mc_s.schema_meta
elif msg["type"] in TYPES_INT:
index = "internal"
schema = mc_s.schema_int
# normalise fields
for key, value in list(msg.items()):
if value is None:
@ -31,7 +62,7 @@ def store_message(msg):
if schema[key].startswith("string"):
msg[key] = str(value)
body = [{"insert": {"index": "main", "doc": msg}}]
body = [{"insert": {"index": index, "doc": msg}}]
body_post = ""
for item in body:
body_post += ujson.dumps(item)
@ -44,6 +75,7 @@ def store_message(msg):
# print(api_response)
except ApiException as e:
print("Exception when calling IndexApi->bulk: %s\n" % e)
print("ATTEMPT", body_post)
def store_message_bulk(data):
@ -59,6 +91,16 @@ def store_message_bulk(data):
for messages in split_posts:
total = []
for msg in messages:
# Duplicated to avoid extra function call (see above)
if msg["type"] in TYPES_MAIN:
index = "main"
schema = mc_s.schema_main
elif msg["type"] in TYPES_META:
index = "meta"
schema = mc_s.schema_meta
elif msg["type"] in TYPES_INT:
index = "internal"
schema = mc_s.schema_int
# normalise fields
for key, value in list(msg.items()):
if value is None:
@ -68,7 +110,7 @@ def store_message_bulk(data):
if schema[key].startswith("string"):
msg[key] = str(value)
body = {"insert": {"index": "main", "doc": msg}}
body = {"insert": {"index": index, "doc": msg}}
total.append(body)
body_post = ""
@ -80,9 +122,10 @@ def store_message_bulk(data):
try:
# Bulk index operations
api_response = api_instance.bulk(body_post) # , async_req=True
# print(api_response)
print(api_response)
except ApiException as e:
print("Exception when calling IndexApi->bulk: %s\n" % e)
print("ATTEMPT", body_post)
def update_schema():
@ -91,11 +134,19 @@ def update_schema():
def create_index(api_client):
util_instance = manticoresearch.UtilsApi(api_client)
schema_types = ", ".join([f"{k} {v}" for k, v in schema.items()])
schemas = {
"main": mc_s.schema_main,
"meta": mc_s.schema_meta,
"internal": mc_s.schema_int,
}
for name, schema in schemas.items():
schema_types = ", ".join([f"{k} {v}" for k, v in schema.items()])
create_query = f"create table if not exists main({schema_types}) engine='columnar'"
print("Schema types", create_query)
util_instance.sql(create_query)
create_query = (
f"create table if not exists {name}({schema_types}) engine='columnar'"
)
print("Schema types", create_query)
util_instance.sql(create_query)
create_index(api_client)

View File

@ -13,6 +13,23 @@ services:
depends_on:
- db
threshold:
image: pathogen/threshold:latest
build: ./legacy/docker
volumes:
- ${PORTAINER_GIT_DIR}:/code
- ${THRESHOLD_CONFIG_DIR}:/code/legacy/conf/live
#- ${THRESHOLD_TEMPLATE_DIR}:/code/conf/templates
- ${THRESHOLD_CERT_DIR}:/code/legacy/conf/cert
ports:
- "${THRESHOLD_LISTENER_PORT}:${THRESHOLD_LISTENER_PORT}"
- "${THRESHOLD_RELAY_PORT}:${THRESHOLD_RELAY_PORT}"
- "${THRESHOLD_API_PORT}:${THRESHOLD_API_PORT}"
env_file:
- ../stack.env
volumes_from:
- tmp
db:
image: manticoresearch/manticore
restart: always

View File

@ -154,7 +154,7 @@ class IRCBot(IRCClient):
def event(self, **cast):
if "ts" not in cast.keys():
cast["ts"] = str(datetime.now().isoformat())
cast["ts"] = int(datetime.now().timestamp())
# remove odd stuff
for i in list(
@ -832,7 +832,7 @@ class IRCBot(IRCClient):
self.event(type="kick", muser=kicker, channel=channel, msg=message, user=kickee)
def chanlessEvent(self, cast):
cast["ts"] = str(datetime.now().isoformat())
cast["ts"] = int(datetime.now().timestamp())
cast["nick"], cast["ident"], cast["host"] = parsen(cast["muser"])
if dedup(self.name, cast): # Needs to be kept self.name until the dedup
# function is converted to the new net, num

View File

@ -4,6 +4,7 @@ from os import getenv
import util
from sources.ch4 import Chan4
from sources.dis import DiscordClient
from sources.ingest import Ingest
# For development
# if not getenv("DISCORD_TOKEN", None):
@ -29,7 +30,11 @@ async def main(loop):
log.info("Starting 4chan handler.")
chan = Chan4()
await chan.run()
loop.create_task(chan.run())
log.info("Starting ingest handler.")
ingest = Ingest()
loop.create_task(ingest.run())
loop = asyncio.get_event_loop()

0
schemas/__init__.py Normal file
View File

View File

@ -1,4 +1,4 @@
schema = {
schema_main = {
"id": "bigint",
# 1
"archived": "int",
@ -130,3 +130,67 @@ schema = {
# 1, 2
"version_tokens": "int",
}
schema_meta = {
"id": "bigint",
# 393598265, #main, Rust Programmer's Club
"channel": "text",
# 9f7b2e6a0e9b
"host": "text",
# "522, trans rights shill", myname
"ident": "text",
# The quick brown fox jumped over the lazy dog
"msg": "text",
# pol
"net": "text",
# André de Santa Cruz, santa
"nick": "text",
# 1, 2, 3, 4, 5, 6, ...
"num": "int",
# Greens
"realname": "text",
# irc.freenode.net
"server": "text",
# 4ch, irc, dis
"src": "string indexed attribute",
# true, false
"status": "bool",
# 2022-09-02T16:10:36
"ts": "timestamp",
# msg, notice, update, who
"type": "string indexed attribute",
}
schema_int = {
"id": "bigint",
# 393598265, #main, Rust Programmer's Club
"channel": "text",
# 9f7b2e6a0e9b
"host": "text",
# "522, trans rights shill", myname
"ident": "text",
# 0
"mode": "string indexed attribute",
# b0n3
"modearg": "string indexed attribute",
# The quick brown fox jumped over the lazy dog
"msg": "text",
# pol
"net": "text",
# André de Santa Cruz, santa
"nick": "text",
# 1, 2, 3, 4, 5, 6, ...
"num": "int",
# 4ch, irc, dis
"src": "string indexed attribute",
# true, false
"status": "bool",
# 2022-09-02T16:10:36
"ts": "timestamp",
# Anonymous
"user": "text",
# msg, notice, update, who
"type": "string indexed attribute",
# msg, notice, update, who
"mtype": "string indexed attribute",
}

33
sources/ingest.py Normal file
View File

@ -0,0 +1,33 @@
import db
import util
import ujson
import asyncio
SOURCES = ["irc"]
KEYPREFIX = "queue."
CHUNK_SIZE = 1000
ITER_DELAY = 0.5
class Ingest(object):
def __init__(self):
name = self.__class__.__name__
self.log = util.get_logger(name)
async def run(self):
while True:
await self.process_chunk()
await asyncio.sleep(ITER_DELAY)
async def process_chunk(self):
items = []
for source in SOURCES:
key = f"{KEYPREFIX}{source}"
chunk = await db.ar.spop(key, CHUNK_SIZE)
if not chunk:
continue
self.log.info(f"Got chunk: {chunk}")
for item in chunk:
item = ujson.loads(item)
self.log.info(f"Got item: {item}")
items.append(item)
db.store_message_bulk(items)