Implement signin command for TrueLayer

master
Mark Veidemanis 3 years ago
parent 9b96bb8031
commit 8ab915fa65
Signed by: m
GPG Key ID: 5ACFCEED46C0904F

@ -20,14 +20,14 @@ from notify import Notify
from markets import Markets
from money import Money
# from sinks.nordigen import Nordigen
# from sinks.yapily import Yapily
from sinks.nordigen import Nordigen
from sinks.truelayer import TrueLayer
from sinks.fidor import Fidor
init_map = None
# TODO: extend this with more
def cleanup(sig, frame):
if init_map:
try:
@ -74,11 +74,13 @@ class WebApp(object):
# self.tx.transaction(parsed)
return dumps(True)
# set up another connection to a bank
@app.route("/signin", methods=["GET"])
def signin(self, request):
auth_url = self.truelayer.create_auth_url()
return f'Please sign in <a href="{auth_url}" target="_blank">here.</a>'
# endpoint called after we finish setting up a connection above
@app.route("/callback-truelayer", methods=["POST"])
def signin_callback(self, request):
code = request.args[b"code"]
@ -88,7 +90,7 @@ class WebApp(object):
@app.route("/accounts", methods=["GET"])
def balance(self, request):
accounts = self.truelayer.get_accounts()
return dumps(accounts)
return dumps(accounts, indent=2)
if __name__ == "__main__":
@ -98,8 +100,7 @@ if __name__ == "__main__":
"agora": Agora(),
"markets": Markets(),
"revolut": Revolut(),
# "nordigen": Nordigen(),
# "yapily": Yapily(),
"nordigen": Nordigen(),
"truelayer": TrueLayer(),
"fidor": Fidor(),
"tx": Transactions(),
@ -110,19 +111,19 @@ if __name__ == "__main__":
util.xmerge_attrs(init_map)
# Setup the authcode -> refresh token and refresh_token -> auth_token stuff
util.setup_call_loops(
token_setting=settings.Revolut.SetupToken,
function_init=init_map["revolut"].setup_auth,
function_continuous=init_map["revolut"].get_new_token,
delay=int(settings.Revolut.RefreshSec),
function_post_start=init_map["revolut"].setup_webhook,
)
util.setup_call_loops(
token_setting=settings.TrueLayer.SetupToken,
function_init=init_map["truelayer"].setup_auth,
function_continuous=init_map["truelayer"].get_new_token,
delay=int(settings.TrueLayer.RefreshSec),
)
# util.setup_call_loops(
# token_setting=settings.Revolut.SetupToken,
# function_init=init_map["revolut"].setup_auth,
# function_continuous=init_map["revolut"].get_new_token,
# delay=int(settings.Revolut.RefreshSec),
# function_post_start=init_map["revolut"].setup_webhook,
# )
# util.setup_call_loops(
# token_setting=settings.TrueLayer.SetupToken,
# function_init=init_map["truelayer"].setup_auth,
# function_continuous=init_map["truelayer"].get_new_token,
# delay=int(settings.TrueLayer.RefreshSec),
# )
# Set up the loops to put data in ES
init_map["tx"].setup_loops()

@ -485,3 +485,13 @@ class IRCCommands(object):
def run(cmd, spl, length, authed, msg, agora, revolut, tx, notify):
total = tx.money.get_profit(True)
msg(f"Profit: {total}USD")
class signin(object):
name = "signin"
authed = True
helptext = "Generate a TrueLayer signin URL."
@staticmethod
def run(cmd, spl, length, authed, msg, agora, revolut, tx, notify):
auth_url = agora.truelayer.create_auth_url()
msg(f"Auth URL: {auth_url}")

@ -1,53 +0,0 @@
# Twisted/Klein imports
from twisted.logger import Logger
# Other library imports
import requests
from requests.auth import HTTPBasicAuth
from json import dumps
from simplejson.errors import JSONDecodeError
# Project imports
from settings import settings
class Yapily(object):
"""
Class to manage calls to Open Banking APIs through Yapily.
"""
def __init__(self):
self.log = Logger("yapily")
self.auth = HTTPBasicAuth(settings.Yapily.ID, settings.Yapily.Key)
print(self.get_institutions())
authorisation = self.get_authorisation("monzo_ob", settings.Yapily.CallbackURL)
print("authirisation", authorisation)
def get_institutions(self, filter_name=None):
"""
Get a list of supported institutions.
"""
path = f"{settings.Yapily.Base}/institutions"
r = requests.get(path, auth=self.auth)
try:
parsed = r.json()
except JSONDecodeError:
self.log.error("Error parsing institutions response: {content}", content=r.content)
return False
return parsed
def get_authorisation(self, institution_id, callback_url):
"""
Get an authorisation URL for linking a bank account of an institution.
"""
headers = {"Content-Type": "application/json"}
data = {"applicationUserId": "account-data-and-transactions-tutorial", "institutionId": institution_id, "callback": callback_url}
path = f"{settings.Yapily.Base}/account-auth-requests"
r = requests.post(path, headers=headers, data=dumps(data), auth=self.auth)
try:
parsed = r.json()
except JSONDecodeError:
self.log.error("Error parsing institutions response: {content}", content=r.content)
return False
return parsed
Loading…
Cancel
Save