Libraries refactor and add some sinks #4

Closed
m wants to merge 136 commits from library-refactor into master
2 changed files with 58 additions and 50 deletions
Showing only changes of commit b0d4f5098d - Show all commits

View File

@ -74,6 +74,22 @@ class WebApp(object):
# self.tx.transaction(parsed) # self.tx.transaction(parsed)
return dumps(True) return dumps(True)
@app.route("/signin", methods=["GET"])
def signin(self, request):
auth_url = self.truelayer.create_auth_url()
return f'Please sign in <a href="{auth_url}" target="_blank">here.</a>'
@app.route("/callback-truelayer", methods=["POST"])
def signin_callback(self, request):
code = request.args[b"code"]
self.truelayer.handle_authcode_received(code)
return dumps(True)
@app.route("/accounts", methods=["GET"])
def balance(self, request):
accounts = self.truelayer.get_accounts()
return dumps(accounts)
if __name__ == "__main__": if __name__ == "__main__":
init_map = { init_map = {

View File

@ -3,8 +3,9 @@ from twisted.logger import Logger
# Other library imports # Other library imports
import requests import requests
from json import dumps
from simplejson.errors import JSONDecodeError from simplejson.errors import JSONDecodeError
from time import time
import urllib
# Project imports # Project imports
from settings import settings from settings import settings
@ -18,57 +19,61 @@ class TrueLayer(object):
def __init__(self): def __init__(self):
self.log = Logger("truelayer") self.log = Logger("truelayer")
self.token = None self.token = None
# self.auth = HTTPBasicAuth(settings.Yapily.ID, settings.Yapily.Key)
# print(self.get_institutions())
# authorisation = self.get_authorisation("monzo_ob", settings.Yapily.CallbackURL)
# print("authirisation", authorisation)
def setup_auth(self): def setup_auth(self):
""" pass
Exchange an authentication code for an access and refresh token.
""" def create_auth_url(self):
headers = {"Content-Type": "application/x-www-form-urlencoded"} query = urllib.parse.urlencode(
{
"response_type": "code",
"response_mode": "form_post",
"client_id": settings.TrueLayer.ID,
"scope": "info accounts balance transactions offline_access",
"nonce": int(time()),
"redirect_uri": settings.TrueLayer.CallbackURL,
"enable_mock": "true",
}
)
auth_uri = f"{settings.TrueLayer.AuthBase}/?{query}&redirect_uri={settings.TrueLayer.CallbackURL}"
return auth_uri
def handle_authcode_received(self, authcode):
data = { data = {
"grant_type": "authorization_code",
"code": settings.TrueLayer.Monzo_AuthCode,
"client_id": settings.TrueLayer.ID, "client_id": settings.TrueLayer.ID,
"client_secret": settings.TrueLayer.Key, "client_secret": settings.TrueLayer.Key,
"redirect_uri": "https://console.truelayer.com/redirect-page", "code": authcode,
"grant_type": "authorization_code",
"redirect_uri": settings.TrueLayer.CallbackURL,
} }
r = requests.post(f"{settings.TrueLayer.Base}/connect/token", data=data, headers=headers) r = requests.post(f"{settings.TrueLayer.AuthBase}/connect/token", data=data)
try: try:
parsed = r.json() parsed = r.json()
except JSONDecodeError: except JSONDecodeError:
self.log.error("Error parsing access token response: {content}", content=r.content)
return False return False
print("PARSED", parsed) if "error" in parsed:
if r.status_code == 200: self.log.error("Error requesting refresh token: {error}", error=parsed["error"])
try:
settings.TrueLayer.Monzo_RefreshToken = parsed["refresh_token"]
settings.TrueLayer.Monzo_SetupToken = "0"
settings.write()
self.log.info("Refreshed refresh token - TrueLayer")
self.token = parsed["access_token"]
self.log.info("Refreshed access token - TrueLayer")
except KeyError:
self.log.error(f"Token authorization didn't contain refresh or access token: {parsed}", parsed=parsed)
return False
else:
self.log.error(f"Cannot refresh token: {parsed}", parsed=parsed)
return False return False
settings.TrueLayer.RefreshToken = parsed["refresh_token"]
settings.TrueLayer.AuthCode = authcode
settings.write()
self.token = parsed["access_token"]
self.log.info("Retrieved access/refresh tokens - TrueLayer")
def get_new_token(self, fail=False): def get_new_token(self, fail=False):
""" """
Exchange our refresh token for an access token. Exchange our refresh token for an access token.
""" """
if not settings.TrueLayer.RefreshToken:
return
headers = {"Content-Type": "application/x-www-form-urlencoded"} headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = { data = {
"grant_type": "refresh_token", "grant_type": "refresh_token",
"refresh_token": settings.TrueLayer.Monzo_RefreshToken, "refresh_token": settings.TrueLayer.RefreshToken,
"client_id": settings.TrueLayer.ID, "client_id": settings.TrueLayer.ID,
"client_secret": settings.TrueLayer.Key, "client_secret": settings.TrueLayer.Key,
} }
r = requests.post(f"{settings.TrueLayer.Base}/connect/token", data=data, headers=headers) r = requests.post(f"{settings.TrueLayer.AuthBase}/connect/token", data=data, headers=headers)
try: try:
parsed = r.json() parsed = r.json()
except JSONDecodeError: except JSONDecodeError:
@ -91,31 +96,18 @@ class TrueLayer(object):
exit() exit()
return False return False
def get_institutions(self, filter_name=None): def get_accounts(self):
""" """
Get a list of supported institutions. Get a list of accounts.
""" """
headers = {"Authorization": f"Bearer {self.token}"}
path = f"{settings.Yapily.Base}/institutions" # path = f"{settings.TrueLayer.DataBase}/accounts"
r = requests.get(path, auth=self.auth) path = "https://api.truelayer-sandbox.com/data/v1/accounts"
try: r = requests.get(path, headers=headers)
parsed = r.json()
except JSONDecodeError:
self.log.error("Error parsing institutions response: {content}", content=r.content)
return False
return parsed
def get_authorisation(self, institution_id, callback_url):
"""
Get an authorisation URL for linking a bank account of an institution.
"""
headers = {"Content-Type": "application/json"}
data = {"applicationUserId": "account-data-and-transactions-tutorial", "institutionId": institution_id, "callback": callback_url}
path = f"{settings.Yapily.Base}/account-auth-requests"
r = requests.post(path, headers=headers, data=dumps(data), auth=self.auth)
try: try:
parsed = r.json() parsed = r.json()
except JSONDecodeError: except JSONDecodeError:
self.log.error("Error parsing institutions response: {content}", content=r.content) self.log.error("Error parsing institutions response: {content}", content=r.content)
return False return False
print("GET ACCOUNTS", parsed)
return parsed return parsed