Libraries refactor and add some sinks #4

Closed
m wants to merge 136 commits from library-refactor into master
1 changed files with 36 additions and 15 deletions
Showing only changes of commit 18011741c5 - Show all commits

View File

@ -31,13 +31,14 @@ class Transactions(object):
Set the logger. Set the logger.
""" """
self.log = Logger("transactions") self.log = Logger("transactions")
self.es = Elasticsearch( if settings.ES.Enabled == "1":
f"https://{settings.ES.Host}:9200", self.es = Elasticsearch(
verify_certs=False, f"https://{settings.ES.Host}:9200",
basic_auth=(settings.ES.Username, settings.ES.Pass), verify_certs=False,
# ssl_assert_fingerprint=("6b264fd2fd107d45652d8add1750a8a78f424542e13b056d0548173006260710"), basic_auth=(settings.ES.Username, settings.ES.Pass),
ca_certs="certs/ca.crt", # ssl_assert_fingerprint=("6b264fd2fd107d45652d8add1750a8a78f424542e13b056d0548173006260710"),
) ca_certs="certs/ca.crt",
)
def run_checks_in_thread(self): def run_checks_in_thread(self):
""" """
@ -55,9 +56,10 @@ class Transactions(object):
""" """
Set up the LoopingCalls to get the balance so we have data in ES. Set up the LoopingCalls to get the balance so we have data in ES.
""" """
self.lc_es_checks = LoopingCall(self.run_checks_in_thread) if settings.ES.Enabled == "1":
delay = int(settings.ES.RefreshSec) self.lc_es_checks = LoopingCall(self.run_checks_in_thread)
self.lc_es_checks.start(delay) delay = int(settings.ES.RefreshSec)
self.lc_es_checks.start(delay)
# TODO: write tests then refactor, this is terribly complicated! # TODO: write tests then refactor, this is terribly complicated!
def transaction(self, data): def transaction(self, data):
@ -536,10 +538,11 @@ class Transactions(object):
return cast return cast
def write_to_es(self, msgtype, cast): def write_to_es(self, msgtype, cast):
cast["type"] = msgtype if settings.ES.Enabled == "1":
cast["ts"] = str(datetime.now().isoformat()) cast["type"] = msgtype
cast["xtype"] = "tx" cast["ts"] = str(datetime.now().isoformat())
self.es.index(index=settings.ES.Index, document=cast) cast["xtype"] = "tx"
self.es.index(index=settings.ES.Index, document=cast)
def get_remaining(self): def get_remaining(self):
""" """
@ -599,7 +602,25 @@ class Transactions(object):
rates = self.agora.get_rates_all() rates = self.agora.get_rates_all()
cumul_usd = 0 cumul_usd = 0
for contact_id, contact in dash.items(): for contact_id, contact in dash.items():
amount = contact["data"]["amount"] # We need created at in order to look up the historical prices
created_at = contact["data"]["created_at"]
# Reformat the date how CoinGecko likes
date_parsed = datetime.strptime(created_at, "%Y-%m-%dT%H:%M:%S.%fZ")
date_formatted = date_parsed.strftime("%d-%m-%Y")
# Get the historical rates for the right asset, extract the price
asset = contact["data"]["advertisement"]["asset"]
if asset == "XMR":
amount_crypto = contact["data"]["amount_xmr"]
history = self.agora.cg.get_coin_history_by_id(id="monero", date=date_formatted)
crypto_usd = float(history["market_data"]["current_price"]["usd"])
elif asset == "BTC":
amount_crypto = contact["data"]["amount_btc"]
history = self.agora.cg.get_coin_history_by_id(id="bitcoin", date=date_formatted)
crypto_usd = float(history["market_data"]["current_price"]["usd"])
# Convert crypto to fiat
amount = float(amount_crypto) * crypto_usd
currency = contact["data"]["currency"] currency = contact["data"]["currency"]
if not contact["data"]["is_selling"]: if not contact["data"]["is_selling"]:
continue continue