Move IRC, commands and notify classes into a UX class

This commit is contained in:
Mark Veidemanis 2022-03-04 22:47:06 +00:00
parent 2423e4a066
commit 50c71703fd
Signed by: m
GPG Key ID: 5ACFCEED46C0904F
9 changed files with 815 additions and 13 deletions

View File

@ -685,4 +685,4 @@ class Agora(object):
rtrn2 = self.agora.wallet_send_xmr(**send_cast)
self.irc.sendmsg(f"Withdrawal: {rtrn1['success']} | {rtrn2['success']}")
self.notify.notify_withdrawal(half_rounded)
self.ux.notify.notify_withdrawal(half_rounded)

View File

@ -12,17 +12,16 @@ from signal import signal, SIGINT
# Project imports
from settings import settings
import util
# Old style classes
from agora import Agora
from transactions import Transactions
from irc import bot
from notify import Notify
from markets import Markets
from money import Money
# from sinks.nordigen import Nordigen
# from sinks.truelayer import TrueLayer
# from sinks.fidor import Fidor
# New style classes
import sinks
import ux
init_map = None
@ -95,8 +94,7 @@ class WebApp(object):
if __name__ == "__main__":
init_map = {
"notify": Notify(),
"irc": bot(),
"ux": ux.UX(),
"agora": Agora(),
"markets": Markets(),
"sinks": sinks.Sinks(),
@ -107,6 +105,11 @@ if __name__ == "__main__":
# Merge all classes into each other
util.xmerge_attrs(init_map)
# Let the classes know they have been merged
for class_name, class_instance in init_map.items():
if hasattr(class_instance, "__xmerged__"):
class_instance.__xmerged__()
# Set up the loops to put data in ES
init_map["tx"].setup_loops()

24
handler/sinks/__init__.py Normal file
View File

@ -0,0 +1,24 @@
# Twisted/Klein imports
from twisted.logger import Logger
# Other library imports
# import requests
# from json import dumps
# Project imports
# from settings import settings
import sinks.fidor
import sinks.nordigen
import sinks.truelayer
class Sinks(object):
"""
Class to manage calls to various sinks.
"""
def __init__(self):
self.log = Logger("sinks")
self.fidor = sinks.fidor.Fidor()
self.nordigen = sinks.nordigen.Nordigen()
self.truelayer = sinks.truelayer.TrueLayer()

View File

@ -45,8 +45,9 @@ class TestTransactions(TestCase):
self.transactions.irc = MagicMock()
self.transactions.irc.sendmsg = MagicMock()
self.transactions.release_funds = MagicMock()
self.transactions.notify = MagicMock()
self.transactions.notify.notify_complete_trade = MagicMock()
self.transactions.ux = MagicMock()
self.transactions.ux.notify = MagicMock()
self.transactions.ux.notify.notify_complete_trade = MagicMock()
# Mock the rates
self.transactions.money = MagicMock()

View File

@ -117,7 +117,7 @@ class Transactions(object):
r.hmset(f"tx.{txid}", stored_trade)
reference = self.tx_to_ref(stored_trade["trade_id"])
self.release_funds(stored_trade["trade_id"], reference)
self.notify.notify_complete_trade(stored_trade["amount"], stored_trade["currency"])
self.ux.notify.notify_complete_trade(stored_trade["amount"], stored_trade["currency"])
return
# If type not in inside and we haven't hit any more returns
return
@ -267,7 +267,7 @@ class Transactions(object):
r.hmset(f"tx.{txid}", to_store)
self.release_funds(stored_trade["id"], stored_trade["reference"])
self.notify.notify_complete_trade(amount, currency)
self.ux.notify.notify_complete_trade(amount, currency)
def release_funds(self, trade_id, reference):
self.log.info("All checks passed, releasing funds for {trade_id} {reference}", trade_id=trade_id, reference=reference)
@ -303,7 +303,7 @@ class Transactions(object):
self.log.info("Storing trade information: {info}", info=str(to_store))
r.hmset(f"trade.{reference}", to_store)
self.irc.sendmsg(f"Generated reference for {trade_id}: {reference}")
self.notify.notify_new_trade(amount, currency)
self.ux.notify.notify_new_trade(amount, currency)
if settings.Agora.Send == "1":
self.agora.agora.contact_message_post(trade_id, f"Hi! When sending the payment please use reference code: {reference}")
if existing_ref:

44
handler/ux/__init__.py Normal file
View File

@ -0,0 +1,44 @@
# Twisted/Klein imports
from twisted.logger import Logger
# Other library imports
# import requests
# from json import dumps
# Project imports
# from settings import settings
import util
import ux.irc
import ux.commands
import ux.notify
class UX(object):
"""
Class to manage calls to various user interfaces.
"""
def __init__(self):
self.log = Logger("ux")
self.irc = ux.irc.bot()
self.notify = ux.notify.Notify()
def __xmerged__(self):
"""
Called when xmerge has been completed in the webapp.
Merge all instances into child classes.
"""
init_map = {
"ux": self,
"agora": self.agora,
"markets": self.markets,
"sinks": self.sinks,
"tx": self.tx,
"webapp": self.webapp,
"money": self.money,
"irc": self.irc,
"notify": self.notify,
}
util.xmerge_attrs(init_map)

451
handler/ux/commands.py Normal file
View File

@ -0,0 +1,451 @@
# Other library imports
from json import dumps, loads
# Project imports
from settings import settings
class IRCCommands(object):
class trades(object):
name = "trades"
authed = True
helptext = "Get all open trades."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
"""
Get details of open trades and post on IRC.
"""
# Send IRC - we don't want to automatically send messages on IRC, even though
# this variable seems counter-intuitive here, we are doing something with the result
# then calling msg() ourselves, and we don't want extra spam in the channel.
trades = agora.get_dashboard()
if not trades:
msg("No open trades.")
return
for trade_id in trades:
msg(trade_id)
class create(object):
name = "create"
authed = True
helptext = "Create an ad. Usage: create <XMR/BTC> <country> <currency> [<provider>]"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
"""
Post an ad on AgoraDesk with the given country and currency code.
"""
if length == 4:
if spl[1] not in loads(settings.Agora.AssetList):
msg(f"Not a valid asset: {spl[1]}")
return
posted = agora.create_ad(spl[1], spl[2], spl[3], "REVOLUT")
if posted["success"]:
msg(f"{posted['response']['data']['message']}: {posted['response']['data']['ad_id']}")
else:
msg(dumps(posted["response"]))
elif length == 5:
if spl[1] not in loads(settings.Agora.AssetList):
msg(f"Not a valid asset: {spl[1]}")
return
if spl[4] not in loads(settings.Agora.ProviderList):
msg(f"Not a valid provider: {spl[4]}")
return
posted = agora.create_ad(spl[1], spl[2], spl[3], spl[4])
if posted["success"]:
msg(f"{posted['response']['data']['message']}: {posted['response']['data']['ad_id']}")
else:
msg(dumps(posted["response"]))
class messages(object):
name = "messages"
authed = True
helptext = "Get messages. Usage: messages [<reference>]"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
"""
Get all messages for all open trades or a given trade.
"""
if length == 1:
messages = agora.get_recent_messages()
if messages is False:
msg("Error getting messages.")
return
if not messages:
msg("No messages.")
return
for reference in messages:
for message in messages[reference]:
msg(f"{reference}: {message[0]} {message[1]}")
msg("---")
elif length == 2:
tx = tx.ref_to_tx(spl[1])
if not tx:
msg(f"No such reference: {spl[1]}")
return
messages = agora.get_messages(spl[1], send_irc=False)
if not messages:
msg("No messages.")
for message in messages:
msg(f"{spl[1]}: {message}")
class dist(object):
name = "dist"
authed = True
helptext = "Distribute all our chosen currency and country ad pairs. Usage: dist [<XMR/BTC>]"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
# Distribute out our ad to all countries in the config
if length == 2:
asset = spl[1]
if asset not in loads(settings.Agora.AssetList):
msg(f"Not a valid asset: {spl[1]}")
return
for x in agora.dist_countries(filter_asset=asset):
if x["success"]:
msg(f"{x['response']['data']['message']}: {x['response']['data']['ad_id']}")
else:
msg(dumps(x["response"]))
elif length == 1:
for x in agora.dist_countries():
if x["success"]:
msg(f"{x['response']['data']['message']}: {x['response']['data']['ad_id']}")
else:
msg(dumps(x["response"]))
class redist(object):
name = "redist"
authed = True
helptext = "Update all ads with details."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
for x in agora.redist_countries():
if x[0]["success"]:
msg(f"{x[0]['response']['data']['message']}: {x[1]}")
else:
msg(dumps(x[0]["response"]))
class stripdupes(object):
name = "stripdupes"
authed = True
helptext = "Remove all duplicate adverts."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
rtrn = agora.strip_duplicate_ads()
msg(dumps(rtrn))
class total(object):
name = "total"
authed = True
helptext = "Get total account balance from Sinks and Agora."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
totals_all = tx.get_total()
totals = totals_all[0]
wallets = totals_all[1]
msg(f"Totals: SEK: {totals[0]} | USD: {totals[1]} | GBP: {totals[2]}")
msg(f"Wallets: XMR USD: {wallets[0]} | BTC USD: {wallets[1]}")
class ping(object):
name = "ping"
authed = False
helptext = "Pong!"
@staticmethod
def run(cmd, spl, length, authed, msg):
msg("Pong!")
class summon(object):
name = "summon"
authed = True
helptext = "Summon all operators."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
ux.notify.sendmsg("You have been summoned!")
class message(object):
name = "msg"
authed = True
helptext = "Send a message on a trade. Usage: msg <reference> <message...>"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
if length > 2:
full_msg = " ".join(spl[2:])
reference = tx.ref_to_tx(spl[1])
if not reference:
msg(f"No such reference: {spl[1]}")
return
rtrn = agora.agora.contact_message_post(reference, full_msg)
msg(f"Sent {full_msg} to {reference}: {rtrn}")
class refs(object):
name = "refs"
authed = True
helptext = "List all references"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
msg(f"References: {', '.join(tx.get_refs())}")
class ref(object):
name = "ref"
authed = True
helptext = "Get more information about a reference. Usage: ref <reference>"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
if length == 2:
ref_data = tx.get_ref(spl[1])
if not ref_data:
msg(f"No such reference: {spl[1]}")
return
msg(f"{spl[1]}: {dumps(ref_data)}")
class delete(object):
name = "del"
authed = True
helptext = "Delete a reference. Usage: del <reference>"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
if length == 2:
ref_data = tx.get_ref(spl[1])
if not ref_data:
msg(f"No such reference: {spl[1]}")
return
tx.del_ref(spl[1])
msg(f"Deleted reference: {spl[1]}")
class release(object):
name = "release"
authed = True
helptext = "Release funds for a trade. Usage: release <reference>"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
if length == 2:
tx = tx.ref_to_tx(spl[1])
if not tx:
msg(f"No such reference: {spl[1]}")
return
rtrn = agora.release_funds(tx)
message = rtrn["message"]
message_long = rtrn["response"]["data"]["message"]
msg(f"{message} - {message_long}")
class nuke(object):
name = "nuke"
authed = True
helptext = "Delete all our adverts."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
rtrn = agora.nuke_ads()
msg(dumps(rtrn))
class wallet(object):
name = "wallet"
authed = True
helptext = "Get Agora wallet balances."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
rtrn_xmr = agora.agora.wallet_balance_xmr()
if not rtrn_xmr["success"]:
msg("Error getting XMR wallet details.")
return
rtrn_btc = agora.agora.wallet_balance()
if not rtrn_btc["success"]:
msg("Error getting BTC wallet details.")
return
balance_xmr = rtrn_xmr["response"]["data"]["total"]["balance"]
balance_btc = rtrn_btc["response"]["data"]["total"]["balance"]
msg(f"XMR wallet balance: {balance_xmr}")
msg(f"BTC wallet balance: {balance_btc}")
class pubads(object):
name = "pubads"
authed = True
helptext = "View public adverts. Usage: pubads <XMR/BTC> <currency> [<provider,...>]"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
if length == 3:
asset = spl[1]
if asset not in loads(settings.Agora.AssetList):
msg(f"Not a valid asset: {spl[1]}")
return
currency = spl[2]
rtrn = agora.get_all_public_ads(assets=[asset], currencies=[currency])
if not rtrn:
msg("No results.")
return
for ad in rtrn[currency]:
msg(f"({ad[0]}) {ad[1]} {ad[2]} {ad[3]} {ad[4]} {ad[5]} {ad[6]}")
elif length == 4:
asset = spl[1]
if asset not in loads(settings.Agora.AssetList):
msg(f"Not a valid asset: {spl[1]}")
return
providers = spl[3].split(",")
currency = spl[2]
rtrn = agora.get_all_public_ads(assets=[asset], currencies=[currency], providers=providers)
if not rtrn:
msg("No results.")
return
for ad in rtrn[currency]:
msg(f"({ad[0]}) {ad[1]} {ad[2]} {ad[3]} {ad[4]} {ad[5]} {ad[6]}")
class cheat(object):
name = "cheat"
authed = True
helptext = "Cheat the markets by manipulating our prices to exploit people. Usage: cheat [<XMR/BTC>]"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
if length == 1:
agora.run_cheat_in_thread()
msg("Running cheat in thread.")
elif length == 2:
asset = spl[1]
if asset not in loads(settings.Agora.AssetList):
msg(f"Not a valid asset: {spl[1]}")
return
agora.run_cheat_in_thread([asset])
msg(f"Running cheat in thread for {asset}.")
class cheatnext(object):
name = "cheatnext"
authed = True
helptext = "Run the next currency for cheat."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
if length == 1:
asset = agora.run_cheat_in_thread()
msg(f"Running next asset for cheat in thread: {asset}")
class ads(object):
name = "ads"
authed = True
helptext = "Get all our ad regions"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
ads = agora.enum_ads()
for ad in ads:
msg(f"({ad[0]}) {ad[1]} {ad[2]} {ad[3]} {ad[4]}")
class xmr(object):
name = "xmr"
authed = True
helptext = "Get current XMR price."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
xmr_prices = agora.cg.get_price(ids="monero", vs_currencies=["sek", "usd", "gbp"])
price_sek = xmr_prices["monero"]["sek"]
price_usd = xmr_prices["monero"]["usd"]
price_gbp = xmr_prices["monero"]["gbp"]
msg(f"SEK: {price_sek} | USD: {price_usd} | GBP: {price_gbp}")
class btc(object):
name = "btc"
authed = True
helptext = "Get current BTC price."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
xmr_prices = agora.cg.get_price(ids="bitcoin", vs_currencies=["sek", "usd", "gbp"])
price_sek = xmr_prices["bitcoin"]["sek"]
price_usd = xmr_prices["bitcoin"]["usd"]
price_gbp = xmr_prices["bitcoin"]["gbp"]
msg(f"SEK: {price_sek} | USD: {price_usd} | GBP: {price_gbp}")
class withdraw(object):
name = "withdraw"
authed = True
helptext = "Take profit."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
agora.withdraw_funds()
class remaining(object):
name = "r"
authed = True
helptext = "Show how much is left before we are able to withdraw funds."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
remaining = tx.get_remaining()
msg(f"Remaining: {remaining}USD")
class total_remaining(object):
name = "tr"
authed = True
helptext = "Show how much is left before we are able to withdraw funds (including open trades)."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
remaining = tx.get_total_remaining()
msg(f"Total remaining: {remaining}USD")
class tradetotal(object):
name = "tradetotal"
authed = True
helptext = "Get total value of all open trades in USD."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
total = tx.get_open_trades_usd()
msg(f"Total trades: {total}USD")
class dollar(object):
name = "$"
authed = True
helptext = "Get total value of everything, including open trades."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
total = tx.get_total_with_trades()
msg(f"${total}")
class profit(object):
name = "profit"
authed = True
helptext = "Get total profit."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
total = tx.money.get_profit()
msg(f"Profit: {total}USD")
class tprofit(object):
name = "tprofit"
authed = True
helptext = "Get total profit with open trades."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
total = tx.money.get_profit(True)
msg(f"Profit: {total}USD")
class signin(object):
name = "signin"
authed = True
helptext = "Generate a TrueLayer signin URL."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
auth_url = agora.truelayer.create_auth_url()
msg(f"Auth URL: {auth_url}")

234
handler/ux/irc.py Normal file
View File

@ -0,0 +1,234 @@
# Twisted/Klein imports
from twisted.logger import Logger
from twisted.words.protocols import irc
from twisted.internet import protocol, reactor, ssl
from twisted.internet.task import deferLater
# Project imports
from settings import settings
from ux.commands import IRCCommands
class IRCBot(irc.IRCClient):
def __init__(self, log):
"""
Initialise IRC bot.
:param log: logger instance
:type log: Logger
"""
self.log = log
self.cmd = IRCCommands()
# Parse the commands into "commandname": "commandclass"
self.cmdhash = {getattr(self.cmd, x).name: x for x in dir(self.cmd) if not x.startswith("_")}
self.nickname = settings.IRC.Nick
self.password = settings.IRC.Pass
self.realname = self.nickname
self.username = self.nickname
# Don't give away information about our client
self.userinfo = None
self.fingerReply = None
self.versionName = None
self.sourceURL = None
self.lineRate = None # Don't throttle messages, we may need to send a lot
self.prefix = settings.IRC.Prefix
self.admins = (settings.IRC.Admins).split("\n")
self.highlight = (settings.IRC.Highlight).split("\n")
self.channel = settings.IRC.Channel
def parse(self, user, host, channel, msg):
"""
Simple handler for IRC commands.
:param user: full user string with host
:param host: user's hostname
:param channel: channel the message was received on
:param msg: the message
:type user: string
:type host: string
:type channel: string
:type msg: string
"""
spl = msg.split()
# nick = user.split("!")[0]
cmd = spl[0]
length = len(spl)
# Check if user is authenticated
authed = host in self.admins
if cmd == "help" and length == 2 and authed:
if spl[1] in self.cmdhash:
cmdname = self.cmdhash[spl[1]]
obj = getattr(self.cmd, cmdname)
helptext = getattr(obj, "helptext")
self.msg(channel, helptext)
return
else:
self.msg(channel, f"No such command: {spl[1]}")
return
if cmd == "helpall" and authed:
for command in self.cmdhash:
cmdname = self.cmdhash[command]
obj = getattr(self.cmd, cmdname)
helptext = getattr(obj, "helptext")
self.msg(channel, f"{cmdname}: {helptext}")
return
if cmd in self.cmdhash:
# Get the class name of the referenced command
cmdname = self.cmdhash[cmd]
# Get the class name
obj = getattr(self.cmd, cmdname)
def msgl(x):
self.msg(channel, x)
# Check if the command required authentication
if obj.authed:
if host in self.admins:
obj.run(cmd, spl, length, authed, msgl, self.agora, self.tx, self.ux)
else:
# Handle authentication here instead of in the command module for security
self.msg(channel, "Access denied.")
else:
# Run an unauthenticated command, without passing through secure library calls
obj.run(cmd, spl, len(spl), authed, msgl)
return
self.msg(channel, "Command not found.")
if authed:
# Give user command hints if they are authenticated
self.msg(channel, f"Commands loaded: {', '.join(self.cmdhash.keys())}")
def signedOn(self):
"""
Called when we have signed on to IRC.
Join our channel.
"""
self.log.info("Signed on as %s" % (self.nickname))
deferLater(reactor, 2, self.join, self.channel)
def joined(self, channel):
"""
Called when we have joined a channel.
Setup the Agora LoopingCall to get trades.
This is here to ensure the IRC client is initialised enough to send the trades.
:param channel: channel we joined
:type channel: string
"""
self.agora.setup_loop()
self.log.info("Joined channel %s" % (channel))
def privmsg(self, user, channel, msg):
"""
Called on received PRIVMSGs.
Pass through identified commands to the parse function.
:param user: full user string with host
:param channel: channel the message was received on
:param msg: the message
:type user: string
:type channel: string
:type msg: string
"""
nick = user.split("!")[0]
if channel == self.nickname:
channel = nick
host = user.split("!")[1]
host = host.split("@")[1]
ident = user.split("!")[1]
ident = ident.split("@")[0]
self.log.info("(%s) %s: %s" % (channel, user, msg))
if msg[0] == self.prefix:
if len(msg) > 1:
if msg.split()[0] != "!":
self.parse(user, host, channel, msg[1:])
elif host in self.admins and channel == nick:
if len(msg) > 0:
spl = msg.split()
if len(spl) > 0:
if spl[0] != "!":
self.parse(user, host, channel, msg)
def noticed(self, user, channel, msg):
"""
Called on received NOTICEs.
:param user: full user string with host
:param channel: channel the notice was received on
:param msg: the message
:type user: string
:type channel: string
:type msg: string
"""
nick = user.split("!")[0]
if channel == self.nickname:
channel = nick
# self.log.info("[%s] %s: %s" % (channel, user, msg))
class IRCBotFactory(protocol.ClientFactory):
def __init__(self):
self.log = Logger("irc")
def sendmsg(self, msg):
"""
Passthrough function to send a message to the channel.
"""
if self.client:
self.client.msg(self.client.channel, msg)
else:
self.log.error("Trying to send a message without connected client: {msg}", msg=msg)
return
def buildProtocol(self, addr):
"""
Custom override for the Twisted buildProtocol so we can access the Protocol instance.
Passes through the Agora instance to IRC.
:return: IRCBot Protocol instance
"""
prcol = IRCBot(self.log)
self.client = prcol
setattr(self.client, "agora", self.agora)
setattr(self.client, "sinks", self.sinks)
setattr(self.client, "tx", self.tx)
setattr(self.client, "ux", self.ux)
return prcol
def clientConnectionLost(self, connector, reason):
"""
Called when connection to IRC server lost. Reconnect.
:param connector: connector object
:param reason: reason connection lost
:type connector: object
:type reason: string
"""
self.log.error("Lost connection: {reason}, reconnecting", reason=reason)
connector.connect()
def clientConnectionFailed(self, connector, reason):
"""
Called when connection to IRC server failed. Reconnect.
:param connector: connector object
:param reason: reason connection failed
:type connector: object
:type reason: string
"""
self.log.error("Could not connect: {reason}", reason=reason)
connector.connect()
def bot():
"""
Load the certificates, start the Bot Factory and connect it to the IRC server.
:return: Factory instance
:rtype: Factory
"""
# Load the certificates
context = ssl.DefaultOpenSSLContextFactory(settings.IRC.Cert, settings.IRC.Cert)
# Define the factory instance
factory = IRCBotFactory()
reactor.connectSSL(settings.IRC.Host, int(settings.IRC.Port), factory, context)
return factory

45
handler/ux/notify.py Normal file
View File

@ -0,0 +1,45 @@
# Twisted/Klein imports
from twisted.logger import Logger
# Other library imports
import requests
# Project imports
from settings import settings
class Notify(object):
"""
Class to handle more robust notifications.
"""
def __init__(self):
self.log = Logger("notify")
def sendmsg(self, msg, title=None, priority=None, tags=None):
headers = {"Title": "Bot"}
if title:
headers["Title"] = title
if priority:
headers["Priority"] = priority
if tags:
headers["Tags"] = tags
requests.post(
f"{settings.Notify.Host}/{settings.Notify.Topic}",
data=msg,
headers=headers,
)
def notify_new_trade(self, amount, currency):
amount_usd = self.money.to_usd(amount, currency)
self.sendmsg(f"Total: {amount_usd}", title="New trade", tags="trades", priority="2")
def notify_complete_trade(self, amount, currency):
amount_usd = self.money.to_usd(amount, currency)
self.sendmsg(f"Total: {amount_usd}", title="Trade complete", tags="trades,profit", priority="3")
def notify_withdrawal(self, amount_usd):
self.sendmsg(f"Total: {amount_usd}", title="Withdrawal", tags="profit", priority="4")
def notify_need_topup(self, amount_usd_xmr, amount_usd_btc):
self.sendmsg(f"XMR: {amount_usd_xmr} | BTC: {amount_usd_btc}", title="Topup needed", tags="admin", priority="5")