From 5b596ae4b507d06cf541676da8fadaa56452d689 Mon Sep 17 00:00:00 2001 From: Mark Veidemanis Date: Sun, 19 Jun 2022 18:24:22 +0100 Subject: [PATCH] Implement ingesting to Logstash --- handler/app.py | 3 ++- handler/lib/logstash.py | 30 ++++++++++++++++++++++++++++++ handler/lib/money.py | 16 ++++++++++------ handler/lib/transactions.py | 5 ++++- handler/sources/local.py | 34 ++++++++++++++++++---------------- 5 files changed, 64 insertions(+), 24 deletions(-) create mode 100644 handler/lib/logstash.py diff --git a/handler/app.py b/handler/app.py index 9e5f05e..be0446e 100755 --- a/handler/app.py +++ b/handler/app.py @@ -21,6 +21,7 @@ import lib.antifraud import lib.transactions import lib.markets import lib.money +import lib.logstash init_map = None Factory.noisy = False @@ -135,6 +136,6 @@ if __name__ == "__main__": # Set up the loops to put data in ES # init_map["tx"].setup_loops() - + lib.logstash.init_logstash() # Run the WebApp init_map["webapp"].app.run(settings.App.BindHost, 8080) diff --git a/handler/lib/logstash.py b/handler/lib/logstash.py new file mode 100644 index 0000000..7a75b8d --- /dev/null +++ b/handler/lib/logstash.py @@ -0,0 +1,30 @@ +# Other library imports +from json import dumps +import logstash +import logging + +# Project imports +from settings import settings + +logger = None + + +def init_logstash(): + global logger + logger = logging.getLogger("ingest") + logger.setLevel(logging.INFO) + logger.addHandler( + logstash.TCPLogstashHandler( + settings.Logstash.Host, + int(settings.Logstash.Port), + version=1, + ) + ) + + +def send_logstash(text): + global logger + if logger is not None: + logger.info(dumps(text)) + return True + return False diff --git a/handler/lib/money.py b/handler/lib/money.py index 1724251..e58d4c5 100644 --- a/handler/lib/money.py +++ b/handler/lib/money.py @@ -13,6 +13,7 @@ from datetime import datetime # Project imports from settings import settings import util +from lib.logstash import send_logstash # TODO: secure ES traffic properly urllib3.disable_warnings() @@ -80,19 +81,22 @@ class Money(util.Base): """ Set up the LoopingCalls to get the balance so we have data in ES. """ - if settings.ES.Enabled == "1": + if settings.ES.Enabled == "1" or settings.Logstash.Enabled == "1": self.lc_es_checks = LoopingCall(self.run_checks_in_thread) delay = int(settings.ES.RefreshSec) self.lc_es_checks.start(delay) - self.agora.es = self.es - self.lbtc.es = self.es + if settings.ES.Enabled == "1": + self.agora.es = self.es + self.lbtc.es = self.es def write_to_es(self, msgtype, cast): + cast["type"] = "money" + cast["ts"] = str(datetime.now().isoformat()) + cast["xtype"] = msgtype if settings.ES.Enabled == "1": - cast["type"] = msgtype - cast["ts"] = str(datetime.now().isoformat()) - cast["xtype"] = "tx" self.es.index(index=settings.ES.Index, body=cast) + elif settings.Logstash.Enabled == "1": + send_logstash(cast) def lookup_rates(self, platform, ads, rates=None): """ diff --git a/handler/lib/transactions.py b/handler/lib/transactions.py index c14c540..eaa374d 100644 --- a/handler/lib/transactions.py +++ b/handler/lib/transactions.py @@ -264,7 +264,9 @@ class Transactions(util.Base): # self.log.error(f"Cannot release trade {reference}.") # return - yield self.release_funds(stored_trade["id"], stored_trade["reference"]) + rtrn = yield self.release_funds(stored_trade["id"], stored_trade["reference"]) + if rtrn: + self.ux.notify.notify_complete_trade(amount, currency) @inlineCallbacks def release_funds(self, trade_id, reference): @@ -283,6 +285,7 @@ class Transactions(util.Base): rtrn = yield release(trade_id) if rtrn["message"] == "OK": post_message(trade_id, "Thanks! Releasing now :)") + return True else: logmessage = f"Release funds unsuccessful: {rtrn['message']}" self.log.error(logmessage) diff --git a/handler/sources/local.py b/handler/sources/local.py index 077ca5d..cef8395 100644 --- a/handler/sources/local.py +++ b/handler/sources/local.py @@ -14,6 +14,7 @@ import util from lib.agoradesk_py import AgoraDesk from lib.localbitcoins_py import LocalBitcoins import db +from lib.logstash import send_logstash class Local(util.Base): @@ -319,7 +320,6 @@ class Local(util.Base): ads = yield self.api.buy_monero_online(currency_code=currency, page=page) elif asset == "BTC": ads = yield self.api.buy_bitcoins_online(currency_code=currency, page=page) - # with open("pub.json", "a") as f: # import json # f.write(json.dumps([page, currency, asset, ads])+"\n") @@ -451,22 +451,24 @@ class Local(util.Base): return public_ads def write_to_es_ads(self, msgtype, ads): - if settings.ES.Enabled == "1": - for ad in ads: - cast = { - "id": ad[0], - "username": ad[1], - "price": ad[2], - "provider": ad[3], - "asset": ad[4], - "currency": ad[5], - "margin": ad[6], - "type": msgtype, - "ts": str(datetime.now().isoformat()), - "xtype": "platform", - "market": self.platform, - } + for ad in ads: + cast = { + "id": ad[0], + "username": ad[1], + "price": ad[2], + "provider": ad[3], + "asset": ad[4], + "currency": ad[5], + "margin": ad[6], + "ts": str(datetime.now().isoformat()), + "xtype": msgtype, + "market": self.platform, + "type": "platform", + } + if settings.ES.Enabled == "1": self.es.index(index=settings.ES.MetaIndex, body=cast) + elif settings.Logstash.Enabled == "1": + send_logstash(cast) @inlineCallbacks def slow_ad_update(self, ads):