pluto/handler/ux/commands.py

903 lines
33 KiB
Python

# Other library imports
from json import dumps, loads
# Project imports
from settings import settings
class GenericCommands(object):
class trades(object):
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux, caller):
"""
Get details of open trades and post on IRC.
"""
trades = caller.get_dashboard()
if not trades:
msg("No open trades.")
return
for trade_id in trades:
msg(trade_id)
class create(object):
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux, asset_list, provider_list, caller):
"""
Post an ad on AgoraDesk/LBTC with the given country and currency code.
"""
if length == 4:
asset = spl[1]
country = spl[2]
currency = spl[3]
if asset not in loads(asset_list):
msg(f"Not a valid asset: {asset}")
return
_, account_info = tx.markets.get_valid_account_details()
posted = caller.create_ad(
asset,
country,
currency,
"NATIONAL_BANK",
payment_details=account_info[currency],
)
if posted["success"]:
msg(f"{posted['response']['data']['message']}: {posted['response']['data']['ad_id']}")
else:
msg(dumps(posted["response"]))
elif length == 5:
asset = spl[1]
country = spl[2]
currency = spl[3]
provider = spl[4]
if asset not in loads(asset_list):
msg(f"Not a valid asset: {asset}")
return
if provider not in loads(provider_list):
msg(f"Not a valid provider: {provider}")
return
_, account_info = tx.markets.get_valid_account_details()
posted = caller.create_ad(
asset,
country,
currency,
provider,
payment_details=account_info[currency],
)
if posted["success"]:
msg(f"{posted['response']['data']['message']}: {posted['response']['data']['ad_id']}")
else:
msg(dumps(posted["response"]))
class messages(object):
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux, caller):
"""
Get all messages for all open trades or a given trade.
"""
if length == 1:
messages = caller.get_recent_messages()
if messages is False:
msg("Error getting messages.")
return
if not messages:
msg("No messages.")
return
for reference in messages:
for message in messages[reference]:
msg(f"{reference}: {message[0]} {message[1]}")
msg("---")
elif length == 2:
tx = tx.ref_to_tx(spl[1])
if not tx:
msg(f"No such reference: {spl[1]}")
return
messages = caller.get_messages(spl[1], send_irc=False)
if not messages:
msg("No messages.")
for message in messages:
msg(f"{spl[1]}: {message}")
class dist(object):
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux, asset_list, caller):
# Distribute out our ad to all countries in the config
if length == 2:
asset = spl[1]
if asset not in loads(asset_list):
msg(f"Not a valid asset: {spl[1]}")
return
for x in caller.dist_countries(filter_asset=asset):
if x["success"]:
msg(f"{x['response']['data']['message']}: {x['response']['data']['ad_id']}")
else:
msg(dumps(x["response"]))
elif length == 1:
for x in caller.dist_countries():
if x["success"]:
msg(f"{x['response']['data']['message']}: {x['response']['data']['ad_id']}")
else:
msg(dumps(x["response"]))
class redist(object):
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux, caller):
for x in caller.redist_countries():
if x[0]["success"]:
msg(f"{x[0]['response']['data']['message']}: {x[1]}")
else:
msg(dumps(x[0]["response"]))
class stripdupes(object):
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux, caller):
rtrn = caller.strip_duplicate_ads()
msg(dumps(rtrn))
class message(object):
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux, caller):
if length > 2:
full_msg = " ".join(spl[2:])
reference = tx.ref_to_tx(spl[1])
if not reference:
msg(f"No such reference: {spl[1]}")
return
rtrn = caller.contact_message_post(reference, full_msg)
msg(f"Sent {full_msg} to {reference}: {rtrn}")
class release(object):
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux, caller):
if length == 2:
tx = tx.ref_to_tx(spl[1])
if not tx:
msg(f"No such reference: {spl[1]}")
return
rtrn = caller.release_funds(tx)
message = rtrn["message"]
message_long = rtrn["response"]["data"]["message"]
msg(f"{message} - {message_long}")
class pubads(object):
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux, asset_list, caller):
if length == 3:
asset = spl[1]
if asset not in loads(asset_list):
msg(f"Not a valid asset: {spl[1]}")
return
currency = spl[2]
rtrn = caller.get_all_public_ads(assets=[asset], currencies=[currency])
if not rtrn:
msg("No results.")
return
for ad in rtrn[currency]:
msg(f"({ad[0]}) {ad[1]} {ad[2]} {ad[3]} {ad[4]} {ad[5]} {ad[6]}")
elif length == 4:
asset = spl[1]
if asset not in loads(asset_list):
msg(f"Not a valid asset: {spl[1]}")
return
providers = spl[3].split(",")
currency = spl[2]
rtrn = caller.get_all_public_ads(assets=[asset], currencies=[currency], providers=providers)
if not rtrn:
msg("No results.")
return
for ad in rtrn[currency]:
msg(f"({ad[0]}) {ad[1]} {ad[2]} {ad[3]} {ad[4]} {ad[5]} {ad[6]}")
class cheat(object):
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux, asset_list, caller):
if length == 1:
caller.run_cheat_in_thread()
msg("Running cheat in thread.")
elif length == 2:
asset = spl[1]
if asset not in loads(asset_list):
msg(f"Not a valid asset: {spl[1]}")
return
caller.run_cheat_in_thread([asset])
msg(f"Running cheat in thread for {asset}.")
class cheatnext(object):
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux, caller):
if length == 1:
asset = caller.run_cheat_in_thread()
msg(f"Running next asset for cheat in thread: {asset}")
class ads(object):
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux, caller):
ads = caller.enum_ads()
for ad in ads:
msg(f"({ad[0]}) {ad[1]} {ad[2]} {ad[3]} {ad[4]}")
class withdraw(object):
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux, caller):
caller.withdraw_funds()
class nuke(object):
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux, caller):
rtrn = caller.nuke_ads()
msg(dumps(rtrn))
class IRCCommands(object):
class atrades(object):
authed = True
name = "atrades"
helptext = "Get all open trades for Agora."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
GenericCommands.trades.run(cmd, spl, length, authed, msg, agora, tx, ux, agora)
class ltrades(object):
authed = True
name = "ltrades"
helptext = "Get all open trades for LBTC."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
GenericCommands.trades.run(cmd, spl, length, authed, msg, agora, tx, ux, tx.lbtc)
class acreate(object):
name = "acreate"
authed = True
helptext = "Create an ad on Agora. Usage: acreate <XMR/BTC> <country> <currency> [<provider>]"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
GenericCommands.create.run(
cmd,
spl,
length,
authed,
msg,
agora,
tx,
ux,
settings.Agora.AssetList,
settings.Agora.ProviderList,
agora,
)
class lcreate(object):
name = "lcreate"
authed = True
helptext = "Create an ad on LBTC. Usage: lcreate <XMR/BTC> <country> <currency> [<provider>]"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
GenericCommands.create.run(
cmd,
spl,
length,
authed,
msg,
agora,
tx,
ux,
settings.LocalBitcoins.AssetList,
settings.LocalBitcoins.ProviderList,
tx.lbtc,
)
class amessages(object):
authed = True
name = "amessages"
helptext = "Get messages for Agora. Usage: amessages [<reference>]"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
GenericCommands.messages.run(cmd, spl, length, authed, msg, agora, tx, ux, agora)
class lmessages(object):
authed = True
name = "lmessages"
helptext = "Get messages for LBTC. Usage: lmessages [<reference>]"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
GenericCommands.messages.run(cmd, spl, length, authed, msg, agora, tx, ux, tx.lbtc)
class adist(object):
authed = True
name = "adist"
helptext = "Distribute all our chosen currency and country ad pairs for Agora. Usage: adist [<XMR/BTC>]"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
GenericCommands.dist.run(cmd, spl, length, authed, msg, agora, tx, ux, settings.Agora.AssetList, agora)
class ldist(object):
authed = True
name = "ldist"
helptext = "Distribute all our chosen currency and country ad pairs for LBTC. Usage: ldist [<XMR/BTC>]"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
GenericCommands.dist.run(
cmd, spl, length, authed, msg, agora, tx, ux, settings.LocalBitcoins.AssetList, tx.lbtc
)
class aredist(object):
authed = True
name = "aredist"
helptext = "Update all ads with details for Agora."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
GenericCommands.redist.run(cmd, spl, length, authed, msg, agora, tx, ux, agora)
class lredist(object):
authed = True
name = "lredist"
helptext = "Update all ads with details for LBTC."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
GenericCommands.redist.run(cmd, spl, length, authed, msg, agora, tx, ux, tx.lbtc)
class astripdupes(object):
authed = True
name = "astripdupes"
helptext = "Remove all duplicate adverts for Agora."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
GenericCommands.stripdupes.run(cmd, spl, length, authed, msg, agora, tx, ux, agora)
class lstripdupes(object):
authed = True
name = "lstripdupes"
helptext = "Remove all duplicate adverts for LBTC."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
GenericCommands.stripdupes.run(cmd, spl, length, authed, msg, agora, tx, ux, tx.lbtc)
class total(object):
name = "total"
authed = True
helptext = "Get total account balance from Sinks and Agora."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
totals_all = tx.get_total()
totals = totals_all[0]
wallets = totals_all[1]
msg(f"Totals: SEK: {totals[0]} | USD: {totals[1]} | GBP: {totals[2]}")
msg(f"Wallets: XMR USD: {wallets[0]} | BTC USD: {wallets[1]}")
class ping(object):
name = "ping"
authed = False
helptext = "Pong!"
@staticmethod
def run(cmd, spl, length, authed, msg):
msg("Pong!")
class summon(object):
name = "summon"
authed = True
helptext = "Summon all operators."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
ux.notify.sendmsg("You have been summoned!")
class amessage(object):
authed = True
name = "amsg"
helptext = "Send a message on a trade. Usage: amsg <reference> <message...>"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
GenericCommands.message.run(cmd, spl, length, authed, msg, agora, tx, ux, agora.agora)
class lmessage(object):
authed = True
name = "lmsg"
helptext = "Send a message on a trade. Usage: lmsg <reference> <message...>"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
GenericCommands.message.run(cmd, spl, length, authed, msg, agora, tx, ux, tx.lbtc.lbtc)
class refs(object):
name = "refs"
authed = True
helptext = "List all references"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
msg(f"References: {', '.join(tx.get_refs())}")
class ref(object):
name = "ref"
authed = True
helptext = "Get more information about a reference. Usage: ref <reference>"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
if length == 2:
ref_data = tx.get_ref(spl[1])
if not ref_data:
msg(f"No such reference: {spl[1]}")
return
msg(f"{spl[1]}: {dumps(ref_data)}")
class delete(object):
name = "del"
authed = True
helptext = "Delete a reference. Usage: del <reference>"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
if length == 2:
ref_data = tx.get_ref(spl[1])
if not ref_data:
msg(f"No such reference: {spl[1]}")
return
tx.del_ref(spl[1])
msg(f"Deleted reference: {spl[1]}")
class arelease(object):
authed = True
name = "arelease"
helptext = "Release funds for a trade. Usage: arelease <reference>"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
GenericCommands.release.run(cmd, spl, length, authed, msg, agora, tx, ux, agora)
class lrelease(object):
authed = True
name = "lrelease"
helptext = "Release funds for a trade. Usage: lrelease <reference>"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
GenericCommands.release.run(cmd, spl, length, authed, msg, agora, tx, ux, tx.lbtc)
class map(object):
name = "map"
authed = True
helptext = "Release funds for a trade. Usage: map <reference> <txid>"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
if length == 2:
reference = spl[1]
txid = spl[2]
is_released = tx.release_map_trade(reference, txid)
if is_released is None:
msg("Trade or TX invalid")
return
elif is_released is True:
msg(f"Trade released")
return
elif is_released is False:
msg(f"Could not release trade")
return
class anuke(object):
authed = True
name = "anuke"
helptext = "Delete all our adverts for Agora."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
GenericCommands.nuke.run(cmd, spl, length, authed, msg, agora, tx, ux, agora)
class lnuke(object):
authed = True
name = "lnuke"
helptext = "Delete all our adverts for Agora."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
GenericCommands.nuke.run(cmd, spl, length, authed, msg, agora, tx, ux, tx.lbtc)
class wallet(object):
name = "wallet"
authed = True
helptext = "Get Agora wallet balances."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
rtrn_xmr = agora.agora.wallet_balance_xmr()
if not rtrn_xmr["success"]:
msg("Error getting XMR wallet details.")
return
rtrn_btc = agora.agora.wallet_balance()
if not rtrn_btc["success"]:
msg("Error getting BTC wallet details.")
return
balance_xmr = rtrn_xmr["response"]["data"]["total"]["balance"]
balance_btc = rtrn_btc["response"]["data"]["total"]["balance"]
msg(f"XMR wallet balance: {balance_xmr}")
msg(f"BTC wallet balance: {balance_btc}")
class apubads(object):
authed = True
name = "apubads"
helptext = "View public adverts for Agora. Usage: apubads <XMR/BTC> <currency> [<provider,...>]"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
GenericCommands.pubads.run(cmd, spl, length, authed, msg, agora, tx, ux, settings.Agora.AssetList, agora)
class lpubads(object):
authed = True
name = "lpubads"
helptext = "View public adverts for LBTC. Usage: lpubads <XMR/BTC> <currency> [<provider,...>]"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
GenericCommands.pubads.run(
cmd, spl, length, authed, msg, agora, tx, ux, settings.LocalBitcoins.AssetList, tx.lbtc
)
class acheat(object):
authed = True
name = "acheat"
helptext = "Cheat the markets by manipulating our prices to exploit people on Agora. Usage: acheat [<XMR/BTC>]"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
GenericCommands.cheat.run(cmd, spl, length, authed, msg, agora, tx, ux, settings.Agora.AssetList, agora)
class lcheat(object):
authed = True
name = "lcheat"
helptext = "Cheat the markets by manipulating our prices to exploit people on LBTC. Usage: lcheat [<XMR/BTC>]"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
GenericCommands.cheat.run(
cmd, spl, length, authed, msg, agora, tx, ux, settings.LocalBitcoins.AssetList, tx.lbtc
)
class acheatnext(object):
authed = True
name = "acheatnext"
helptext = "Run the next currency for cheat on Agora."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
GenericCommands.cheatnext.run(cmd, spl, length, authed, msg, agora, tx, ux, agora)
class lcheatnext(object):
authed = True
name = "lcheatnext"
helptext = "Run the next currency for cheat on LBTC."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
GenericCommands.cheatnext.run(cmd, spl, length, authed, msg, agora, tx, ux, tx.lbtc)
class aads(object):
authed = True
name = "aads"
helptext = "Get all our ad regions for Agora."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
GenericCommands.ads.run(cmd, spl, length, authed, msg, agora, tx, ux, agora)
class lads(object):
authed = True
name = "lads"
helptext = "Get all our ad regions for LBTC."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
GenericCommands.ads.run(cmd, spl, length, authed, msg, agora, tx, ux, tx.lbtc)
class xmr(object):
name = "xmr"
authed = True
helptext = "Get current XMR price."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
xmr_prices = agora.money.cg.get_price(ids="monero", vs_currencies=["sek", "usd", "gbp"])
price_sek = xmr_prices["monero"]["sek"]
price_usd = xmr_prices["monero"]["usd"]
price_gbp = xmr_prices["monero"]["gbp"]
msg(f"SEK: {price_sek} | USD: {price_usd} | GBP: {price_gbp}")
class btc(object):
name = "btc"
authed = True
helptext = "Get current BTC price."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
xmr_prices = agora.money.cg.get_price(ids="bitcoin", vs_currencies=["sek", "usd", "gbp"])
price_sek = xmr_prices["bitcoin"]["sek"]
price_usd = xmr_prices["bitcoin"]["usd"]
price_gbp = xmr_prices["bitcoin"]["gbp"]
msg(f"SEK: {price_sek} | USD: {price_usd} | GBP: {price_gbp}")
class awithdraw(object):
authed = True
name = "awithdraw"
helptext = "Take profit for Agora."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
GenericCommands.withdraw.run(cmd, spl, length, authed, msg, agora, tx, ux, agora)
class lwithdraw(object):
authed = True
name = "lwithdraw"
helptext = "Take profit for LBTC."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
GenericCommands.withdraw.run(cmd, spl, length, authed, msg, agora, tx, ux, tx.lbtc)
class remaining(object):
name = "r"
authed = True
helptext = "Show how much is left before we are able to withdraw funds."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
remaining = tx.get_remaining()
msg(f"Remaining: {remaining}USD")
class total_remaining(object):
name = "tr"
authed = True
helptext = "Show how much is left before we are able to withdraw funds (including open trades)."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
remaining = tx.get_total_remaining()
msg(f"Total remaining: {remaining}USD")
class tradetotal(object):
name = "tradetotal"
authed = True
helptext = "Get total value of all open trades in USD."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
total = tx.get_open_trades_usd()
msg(f"Total trades: {total}USD")
class dollar(object):
name = "$"
authed = True
helptext = "Get total value of everything, including open trades."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
total = tx.get_total_with_trades()
msg(f"${total}")
class profit(object):
name = "profit"
authed = True
helptext = "Get total profit."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
total = tx.money.get_profit()
msg(f"Profit: {total}USD")
class tprofit(object):
name = "tprofit"
authed = True
helptext = "Get total profit with open trades."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
total = tx.money.get_profit(True)
msg(f"Profit: {total}USD")
class signin(object):
name = "signin"
authed = True
helptext = "Generate a TrueLayer signin URL. Usage: signin <account>"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
if length == 2:
account = spl[1]
auth_url = tx.sinks.truelayer.create_auth_url(account)
msg(f"Auth URL for {account}: {auth_url}")
class nsignin(object):
name = "nsignin"
authed = True
helptext = "Generate a Nordigen signin URL. Usage: nsignin <country> <bank>"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
if length >= 3:
country = spl[1]
bank_name = " ".join(spl[2:])
auth_url = tx.sinks.nordigen.create_auth_url(country, bank_name)
if not auth_url:
msg("Could not find bank.")
return
msg(f"Auth URL for {bank_name}: {auth_url}")
class accounts(object):
name = "accounts"
authed = True
helptext = "Get a list of acccounts from TrueLayer. Usage: accounts <account>"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
if length == 2:
account = spl[1]
accounts = tx.sinks.truelayer.get_accounts(account)
for account in accounts["results"]:
msg(f"{account['account_id']} {account['display_name']} {account['currency']}")
class naccounts(object):
name = "naccounts"
authed = True
helptext = "Get a list of acccounts from Nordigen. Usage: naccounts"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
if length == 1:
accounts = tx.sinks.nordigen.get_all_account_info()
for name, accounts in accounts.items():
for account in accounts:
msg(f"{name} {account['account_id']} {account['details']} {account['currency']}")
class transactions(object):
name = "transactions"
authed = True
helptext = "Get a list of transactions from TrueLayer. Usage: transactions <account> <account_id>"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
if length == 3:
account = spl[1]
account_id = spl[2]
transactions = tx.sinks.truelayer.get_transactions(account, account_id)
for transaction in transactions:
txid = transaction["transaction_id"]
# ptxid = transaction["meta"]["provider_transaction_id"]
txtype = transaction["transaction_type"]
timestamp = transaction["timestamp"]
amount = transaction["amount"]
currency = transaction["currency"]
description = transaction["description"]
msg(f"{timestamp} {txid} {txtype} {amount}{currency} {description}")
class ntransactions(object):
name = "ntransactions"
authed = True
helptext = "Get a list of transactions from Nordigen. Usage: ntransactions <account_id>"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
if length == 2:
account_id = spl[1]
transactions = tx.sinks.nordigen.get_transactions(account_id)
for transaction in transactions:
txid = transaction["transaction_id"]
timestamp = transaction["timestamp"]
amount = transaction["amount"]
currency = transaction["currency"]
reference = transaction["reference"]
msg(f"{timestamp} {txid} {amount}{currency} {reference}")
class nreqs(object):
name = "nreqs"
authed = True
helptext = "Get a list of requisitions from Nordigen."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
reqs = tx.sinks.nordigen.get_requisitions()
for req in reqs:
id = req["id"]
institution_id = req["institution_id"]
redirect = req["link"]
msg(f"{id} {institution_id} {redirect}")
class ndelreq(object):
name = "ndelreq"
authed = True
helptext = "Delete a requisition from Nordigen."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
if length == 2:
requisition_id = spl[1]
rtrn = tx.sinks.nordigen.delete_requisition(requisition_id)
msg(f"{rtrn['summary']}")
class mapaccount(object):
name = "mapaccount"
authed = True
helptext = "Enable an account_id at a bank for use in TrueLayer. Usage: mapaccount <bank> <account_id>"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
if length == 3:
bank = spl[1]
account_id = spl[2]
account_name = tx.sinks.truelayer.map_account(bank, account_id)
if not account_name:
msg(f"Failed to map the account")
return
msg(f"Mapped account ID {account_id} at bank {bank} to {account_name}")
class nmapaccount(object):
name = "nmapaccount"
authed = True
helptext = "Enable an account_id at a bank for use in Nordigen. Usage: nmapaccount <account_id>"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
if length == 2:
account_id = spl[1]
account_name = tx.sinks.nordigen.map_account(account_id)
if not account_name:
msg(f"Failed to map the account")
return
msg(f"Mapped account ID {account_id} to {account_name}")
class nunmapaccount(object):
name = "nunmapaccount"
authed = True
helptext = "Disable an account_id at a bank for use in Nordigen. Usage: nunmapaccount <account_id>"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
if length == 2:
account_id = spl[1]
tx.sinks.nordigen.unmap_account(account_id)
msg(f"Unmapped account ID {account_id}")
class unmapped(object):
name = "unmapped"
authed = True
helptext = "Get unmapped accounts for a bank. Usage: unmapped <bank>"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
if length == 2:
bank = spl[1]
accounts_active = []
for bank, accounts in tx.sinks.truelayer.banks.items():
for account in accounts:
accounts_active.append(account)
accounts_all = tx.sinks.truelayer.get_accounts(bank)
accounts_unmapped = [
x["account_id"] for x in accounts_all["results"] if x["account_id"] not in accounts_active
]
msg(f"Unmapped accounts: {', '.join(accounts_unmapped)}")
class distdetails(object):
name = "distdetails"
authed = True
helptext = "Distribute account details among all ads."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
currencies = tx.sinks.currencies
tx.markets.distribute_account_details()
msg(f"Distributing account details for currencies: {', '.join(currencies)}")