Implement ingesting to Logstash

This commit is contained in:
Mark Veidemanis 2022-06-19 18:24:22 +01:00
parent f9d227831b
commit 5b596ae4b5
Signed by: m
GPG Key ID: 5ACFCEED46C0904F
5 changed files with 64 additions and 24 deletions

View File

@ -21,6 +21,7 @@ import lib.antifraud
import lib.transactions import lib.transactions
import lib.markets import lib.markets
import lib.money import lib.money
import lib.logstash
init_map = None init_map = None
Factory.noisy = False Factory.noisy = False
@ -135,6 +136,6 @@ if __name__ == "__main__":
# Set up the loops to put data in ES # Set up the loops to put data in ES
# init_map["tx"].setup_loops() # init_map["tx"].setup_loops()
lib.logstash.init_logstash()
# Run the WebApp # Run the WebApp
init_map["webapp"].app.run(settings.App.BindHost, 8080) init_map["webapp"].app.run(settings.App.BindHost, 8080)

30
handler/lib/logstash.py Normal file
View File

@ -0,0 +1,30 @@
# Other library imports
from json import dumps
import logstash
import logging
# Project imports
from settings import settings
logger = None
def init_logstash():
global logger
logger = logging.getLogger("ingest")
logger.setLevel(logging.INFO)
logger.addHandler(
logstash.TCPLogstashHandler(
settings.Logstash.Host,
int(settings.Logstash.Port),
version=1,
)
)
def send_logstash(text):
global logger
if logger is not None:
logger.info(dumps(text))
return True
return False

View File

@ -13,6 +13,7 @@ from datetime import datetime
# Project imports # Project imports
from settings import settings from settings import settings
import util import util
from lib.logstash import send_logstash
# TODO: secure ES traffic properly # TODO: secure ES traffic properly
urllib3.disable_warnings() urllib3.disable_warnings()
@ -80,19 +81,22 @@ class Money(util.Base):
""" """
Set up the LoopingCalls to get the balance so we have data in ES. Set up the LoopingCalls to get the balance so we have data in ES.
""" """
if settings.ES.Enabled == "1": if settings.ES.Enabled == "1" or settings.Logstash.Enabled == "1":
self.lc_es_checks = LoopingCall(self.run_checks_in_thread) self.lc_es_checks = LoopingCall(self.run_checks_in_thread)
delay = int(settings.ES.RefreshSec) delay = int(settings.ES.RefreshSec)
self.lc_es_checks.start(delay) self.lc_es_checks.start(delay)
self.agora.es = self.es if settings.ES.Enabled == "1":
self.lbtc.es = self.es self.agora.es = self.es
self.lbtc.es = self.es
def write_to_es(self, msgtype, cast): def write_to_es(self, msgtype, cast):
cast["type"] = "money"
cast["ts"] = str(datetime.now().isoformat())
cast["xtype"] = msgtype
if settings.ES.Enabled == "1": if settings.ES.Enabled == "1":
cast["type"] = msgtype
cast["ts"] = str(datetime.now().isoformat())
cast["xtype"] = "tx"
self.es.index(index=settings.ES.Index, body=cast) self.es.index(index=settings.ES.Index, body=cast)
elif settings.Logstash.Enabled == "1":
send_logstash(cast)
def lookup_rates(self, platform, ads, rates=None): def lookup_rates(self, platform, ads, rates=None):
""" """

View File

@ -264,7 +264,9 @@ class Transactions(util.Base):
# self.log.error(f"Cannot release trade {reference}.") # self.log.error(f"Cannot release trade {reference}.")
# return # return
yield self.release_funds(stored_trade["id"], stored_trade["reference"]) rtrn = yield self.release_funds(stored_trade["id"], stored_trade["reference"])
if rtrn:
self.ux.notify.notify_complete_trade(amount, currency)
@inlineCallbacks @inlineCallbacks
def release_funds(self, trade_id, reference): def release_funds(self, trade_id, reference):
@ -283,6 +285,7 @@ class Transactions(util.Base):
rtrn = yield release(trade_id) rtrn = yield release(trade_id)
if rtrn["message"] == "OK": if rtrn["message"] == "OK":
post_message(trade_id, "Thanks! Releasing now :)") post_message(trade_id, "Thanks! Releasing now :)")
return True
else: else:
logmessage = f"Release funds unsuccessful: {rtrn['message']}" logmessage = f"Release funds unsuccessful: {rtrn['message']}"
self.log.error(logmessage) self.log.error(logmessage)

View File

@ -14,6 +14,7 @@ import util
from lib.agoradesk_py import AgoraDesk from lib.agoradesk_py import AgoraDesk
from lib.localbitcoins_py import LocalBitcoins from lib.localbitcoins_py import LocalBitcoins
import db import db
from lib.logstash import send_logstash
class Local(util.Base): class Local(util.Base):
@ -319,7 +320,6 @@ class Local(util.Base):
ads = yield self.api.buy_monero_online(currency_code=currency, page=page) ads = yield self.api.buy_monero_online(currency_code=currency, page=page)
elif asset == "BTC": elif asset == "BTC":
ads = yield self.api.buy_bitcoins_online(currency_code=currency, page=page) ads = yield self.api.buy_bitcoins_online(currency_code=currency, page=page)
# with open("pub.json", "a") as f: # with open("pub.json", "a") as f:
# import json # import json
# f.write(json.dumps([page, currency, asset, ads])+"\n") # f.write(json.dumps([page, currency, asset, ads])+"\n")
@ -451,22 +451,24 @@ class Local(util.Base):
return public_ads return public_ads
def write_to_es_ads(self, msgtype, ads): def write_to_es_ads(self, msgtype, ads):
if settings.ES.Enabled == "1": for ad in ads:
for ad in ads: cast = {
cast = { "id": ad[0],
"id": ad[0], "username": ad[1],
"username": ad[1], "price": ad[2],
"price": ad[2], "provider": ad[3],
"provider": ad[3], "asset": ad[4],
"asset": ad[4], "currency": ad[5],
"currency": ad[5], "margin": ad[6],
"margin": ad[6], "ts": str(datetime.now().isoformat()),
"type": msgtype, "xtype": msgtype,
"ts": str(datetime.now().isoformat()), "market": self.platform,
"xtype": "platform", "type": "platform",
"market": self.platform, }
} if settings.ES.Enabled == "1":
self.es.index(index=settings.ES.MetaIndex, body=cast) self.es.index(index=settings.ES.MetaIndex, body=cast)
elif settings.Logstash.Enabled == "1":
send_logstash(cast)
@inlineCallbacks @inlineCallbacks
def slow_ad_update(self, ads): def slow_ad_update(self, ads):