Implement mapping and transaction fetching

This commit is contained in:
Mark Veidemanis 2022-03-07 19:59:30 +00:00
parent 878a261295
commit cbb7294f23
Signed by: m
GPG Key ID: 5ACFCEED46C0904F
3 changed files with 122 additions and 29 deletions

View File

@ -19,4 +19,8 @@ class Sinks(util.Base):
super().__init__() super().__init__()
self.fidor = sinks.fidor.Fidor() self.fidor = sinks.fidor.Fidor()
self.nordigen = sinks.nordigen.Nordigen() self.nordigen = sinks.nordigen.Nordigen()
self.truelayer = sinks.truelayer.TrueLayer() self.truelayer = sinks.truelayer.TrueLayer(self)
# setattr(self.truelayer, "sinks", self)
def got_transactions(self, bank, account_id, transactions):
print("GOT transactions", bank, account_id, transactions)

View File

@ -18,14 +18,32 @@ class TrueLayer(util.Base):
Class to manage calls to Open Banking APIs through TrueLayer. Class to manage calls to Open Banking APIs through TrueLayer.
""" """
def __init__(self): def __init__(self, sinks):
super().__init__() super().__init__()
self.sinks = sinks
self.tokens = {} self.tokens = {}
self.banks = {}
# Get the banks from the config and cache them
self.get_mapped_accounts()
# account we are authenticating - where to store the refresh keys # account we are authenticating - where to store the refresh keys
self.current_authcode_account = None self.current_authcode_bank = None
self.lc = LoopingCall(self.get_new_tokens_all) self.lc = LoopingCall(self.get_new_tokens_all)
self.lc.start(int(settings.TrueLayer.RefreshSec)) # self.get_new_tokens_all()
# self.get_new_token(bank)
# -> set self.tokens[bank] = access_token
self.lc.start(int(settings.TrueLayer.TokenRefreshSec))
self.lc_tx = LoopingCall(self.transaction_loop)
self.lc_tx.start(int(settings.TrueLayer.RefreshSec))
def transaction_loop(self):
for bank in self.banks:
for account_id in self.banks[bank]:
# account_data = self.get_account(bank, account_id)
transactions = self.get_transactions(bank, account_id)
self.sinks.got_transactions(bank, account_id, transactions)
def add_refresh_token(self, refresh_token): def add_refresh_token(self, refresh_token):
""" """
@ -33,25 +51,27 @@ class TrueLayer(util.Base):
Data type: {"monzo": refresh_token, Data type: {"monzo": refresh_token,
"revolut": refresh_token} "revolut": refresh_token}
""" """
account = self.current_authcode_account account = self.current_authcode_bank
if not account: if not account:
return False return False
existing_entry = loads(settings.TrueLayer.RefreshKeys) existing_entry = loads(settings.TrueLayer.RefreshKeys)
existing_entry[account] = refresh_token existing_entry[account] = refresh_token
settings.TrueLayer.RefreshKeys = dumps(existing_entry) settings.TrueLayer.RefreshKeys = dumps(existing_entry)
# Set the cached entry
self.refresh_tokens = existing_entry
settings.write() settings.write()
def get_refresh_tokens(self): # def get_refresh_tokens(self):
existing_entry = loads(settings.TrueLayer.RefreshKeys) # existing_entry = loads(settings.TrueLayer.RefreshKeys)
return existing_entry # return existing_entry
def get_key(self, account): def get_key(self, bank):
if account in self.tokens: if bank in self.tokens:
return self.tokens[account] return self.tokens[bank]
else: else:
return False return False
def create_auth_url(self, account): def create_auth_url(self, bank):
query = urllib.parse.urlencode( query = urllib.parse.urlencode(
{ {
"response_type": "code", "response_type": "code",
@ -64,7 +84,7 @@ class TrueLayer(util.Base):
} }
) )
auth_uri = f"{settings.TrueLayer.AuthBase}/?{query}&redirect_uri={settings.TrueLayer.CallbackURL}" auth_uri = f"{settings.TrueLayer.AuthBase}/?{query}&redirect_uri={settings.TrueLayer.CallbackURL}"
self.current_authcode_account = account self.current_authcode_bank = bank
return auth_uri return auth_uri
def handle_authcode_received(self, authcode): def handle_authcode_received(self, authcode):
@ -92,32 +112,33 @@ class TrueLayer(util.Base):
self.add_refresh_token(refresh_token) self.add_refresh_token(refresh_token)
# Add the access # Add the access
if self.current_authcode_account: if self.current_authcode_bank:
self.tokens[self.current_authcode_account] = access_token self.tokens[self.current_authcode_bank] = access_token
else: else:
self.log.error(f"Received an authcode we didn't ask for") self.log.error(f"Received an authcode we didn't ask for")
return return
self.log.info(f"Retrieved access/refresh tokens for {self.current_authcode_account}") self.log.info(f"Retrieved access/refresh tokens for {self.current_authcode_bank}")
def get_new_tokens_all(self): def get_new_tokens_all(self):
refresh_list = loads(settings.TrueLayer.RefreshKeys) refresh_tokens = loads(settings.TrueLayer.RefreshKeys)
for account in refresh_list: # Set the cached entry
self.get_new_token(account) self.refresh_tokens = refresh_tokens
for bank in refresh_tokens:
self.get_new_token(bank)
def get_new_token(self, account): def get_new_token(self, bank):
""" """
Exchange our refresh token for an access token. Exchange our refresh token for an access token.
:param account: account to refresh the token for :param account: account to refresh the token for
:type account: :type account:
""" """
refresh_tokens = self.get_refresh_tokens() if bank not in self.refresh_tokens:
if account not in refresh_tokens:
return return
headers = {"Content-Type": "application/x-www-form-urlencoded"} headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = { data = {
"grant_type": "refresh_token", "grant_type": "refresh_token",
"refresh_token": refresh_tokens[account], "refresh_token": self.refresh_tokens[bank],
"client_id": settings.TrueLayer.ID, "client_id": settings.TrueLayer.ID,
"client_secret": settings.TrueLayer.Key, "client_secret": settings.TrueLayer.Key,
} }
@ -128,8 +149,8 @@ class TrueLayer(util.Base):
return False return False
if r.status_code == 200: if r.status_code == 200:
if "access_token" in parsed.keys(): if "access_token" in parsed.keys():
self.tokens[account] = parsed["access_token"] self.tokens[bank] = parsed["access_token"]
self.log.info(f"Refreshed access token for {account}") self.log.info(f"Refreshed access token for {bank}")
return True return True
else: else:
self.log.error(f"Token refresh didn't contain access token: {parsed}", parsed=parsed) self.log.error(f"Token refresh didn't contain access token: {parsed}", parsed=parsed)
@ -138,11 +159,11 @@ class TrueLayer(util.Base):
self.log.error(f"Cannot refresh token: {parsed}", parsed=parsed) self.log.error(f"Cannot refresh token: {parsed}", parsed=parsed)
return False return False
def get_accounts(self, account): def get_accounts(self, bank):
""" """
Get a list of accounts. Get a list of accounts.
""" """
token = self.get_key(account) token = self.get_key(bank)
headers = {"Authorization": f"Bearer {token}"} headers = {"Authorization": f"Bearer {token}"}
path = f"{settings.TrueLayer.DataBase}/accounts" path = f"{settings.TrueLayer.DataBase}/accounts"
r = requests.get(path, headers=headers) r = requests.get(path, headers=headers)
@ -154,14 +175,66 @@ class TrueLayer(util.Base):
return parsed return parsed
def get_transactions(self, account, account_id): def _get_account(self, bank, account_id):
token = self.get_key(bank)
headers = {"Authorization": f"Bearer {token}"}
path = f"{settings.TrueLayer.DataBase}/accounts/{account_id}"
r = requests.get(path, headers=headers)
try:
parsed = r.json()
except JSONDecodeError:
self.log.error("Error parsing accounts response: {content}", content=r.content)
return False
return parsed
def get_mapped_accounts(self):
existing_entry = loads(settings.TrueLayer.Maps)
self.banks = existing_entry
def get_account(self, bank, account_id):
account_data = self._get_account(bank, account_id)
if "results" not in account_data:
return False
if not len(account_data["results"]) == 1:
return False
if not len(account_data) == 2:
return False
if not account_data["status"] == "Succeeded":
return False
base = account_data["results"][0]
return base
def map_account(self, bank, account_id):
"""
Map an account_id at a bank to an account_name.
This enables the account for fetching.
Data type: {"monzo": [account, ids, here],
"revolut": [account, ids, here]}
"""
account_data = self.get_account(bank, account_id)
currency = account_data["currency"]
existing_entry = loads(settings.TrueLayer.Maps)
if bank in existing_entry:
if account_id not in existing_entry[bank]:
existing_entry[bank].append(account_id)
else:
existing_entry[bank] = [account_id]
settings.TrueLayer.Maps = dumps(existing_entry)
self.banks = existing_entry
settings.write()
return currency
def get_transactions(self, bank, account_id):
""" """
Get a list of transactions from an account. Get a list of transactions from an account.
:param account_id: account to fetch transactions for :param account_id: account to fetch transactions for
:return: list of transactions :return: list of transactions
:rtype: dict :rtype: dict
""" """
token = self.get_key(account) token = self.get_key(bank)
headers = {"Authorization": f"Bearer {token}"} headers = {"Authorization": f"Bearer {token}"}
path = f"{settings.TrueLayer.DataBase}/accounts/{account_id}/transactions" path = f"{settings.TrueLayer.DataBase}/accounts/{account_id}/transactions"
r = requests.get(path, headers=headers) r = requests.get(path, headers=headers)

View File

@ -485,3 +485,19 @@ class IRCCommands(object):
currency = transaction["currency"] currency = transaction["currency"]
description = transaction["description"] description = transaction["description"]
msg(f"{timestamp} {txid} {ptxid} {txtype} {amount}{currency} {description}") msg(f"{timestamp} {txid} {ptxid} {txtype} {amount}{currency} {description}")
class mapaccount(object):
name = "mapaccount"
authed = True
helptext = "Enable an account_id at a bank for use. Usage: mapaccount <bank> <account_id>"
@staticmethod
def run(cmd, spl, length, authed, msg, agora, tx, ux):
if length == 3:
bank = spl[1]
account_id = spl[2]
account_name = tx.sinks.truelayer.map_account(bank, account_id)
if not account_name:
msg(f"Failed to map the account")
return
msg(f"Mapped account ID {account_id} at bank {bank} to {account_name}")