You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

79 lines
2.7 KiB
Python

import asyncio
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from django.core.management.base import BaseCommand
from core.clients.aggregators.nordigen import NordigenClient
from core.clients.platforms.agora import AgoraClient
from core.models import INTERVAL_CHOICES, Aggregator, Platform
from core.util import logs
log = logs.get_logger("scheduling")
INTERVAL_AGGREGATOR = 10
INTERVALS_PLATFORM = [x[0] for x in INTERVAL_CHOICES]
async def aggregator_job():
aggregators = Aggregator.objects.filter(enabled=True)
for aggregator in aggregators:
if aggregator.service == "nordigen":
instance = await NordigenClient(aggregator)
if aggregator.fetch_accounts is True:
await instance.get_all_account_info(store=True)
fetch_tasks = []
for bank, accounts in aggregator.account_info.items():
for account in accounts:
account_id = account["account_id"]
requisition_id = account["requisition_id"]
task = instance.get_transactions(
account_id, req=requisition_id, process=True
)
fetch_tasks.append(task)
await asyncio.gather(*fetch_tasks)
else:
raise NotImplementedError(f"No such client library: {aggregator.service}")
aggregator.fetch_accounts = False
aggregator.save()
async def platform_job(interval):
if interval == 0:
return
platforms = Platform.objects.filter(enabled=True, cheat_interval_seconds=interval)
for platform in platforms:
if platform.service == "agora":
if platform.cheat is True:
instance = await AgoraClient(platform)
await instance.cheat()
else:
raise NotImplementedError(f"No such client library: {platform.service}")
class Command(BaseCommand):
def handle(self, *args, **options):
"""
Start the scheduling process.
"""
scheduler = AsyncIOScheduler()
log.debug(f"Scheduling {INTERVAL_AGGREGATOR} second aggregator job")
scheduler.add_job(aggregator_job, "interval", seconds=INTERVAL_AGGREGATOR)
for interval in INTERVALS_PLATFORM:
if interval == 0:
continue
log.debug(f"Scheduling {interval} second platform job")
scheduler.add_job(
platform_job, "interval", seconds=interval, args=[interval]
)
scheduler.start()
loop = asyncio.get_event_loop()
try:
loop.run_forever()
except (KeyboardInterrupt, SystemExit):
log.info("Process terminating")
finally:
loop.close()