import asyncio from apscheduler.schedulers.asyncio import AsyncIOScheduler from django.core.management.base import BaseCommand from core.clients.aggregators.nordigen import NordigenClient from core.clients.platforms.agora import AgoraClient from core.models import INTERVAL_CHOICES, Aggregator, Platform from core.util import logs log = logs.get_logger("scheduling") INTERVAL_AGGREGATOR = 10 INTERVALS_PLATFORM = [x[0] for x in INTERVAL_CHOICES] async def aggregator_job(): aggregators = Aggregator.objects.filter(enabled=True) for aggregator in aggregators: open_trade_currencies = aggregator.trades_currencies log.debug(f"Currencies of open trades: {open_trade_currencies}") if aggregator.service == "nordigen": instance = None if aggregator.fetch_accounts is True: aggregator.account_info = {} aggregator.save() instance = await NordigenClient(aggregator) await instance.get_all_account_info(store=True) # fetch_tasks = [] for bank, accounts in aggregator.account_info.items(): for account in accounts: account_id = account["account_id"] requisition_id = account["requisition_id"] if account["currency"] not in open_trade_currencies: continue # Next account # Avoid hammering the API with new access token requests if instance is None: instance = await NordigenClient(aggregator) # task = instance.get_transactions( # account_id, req=requisition_id, process=True # ) await instance.get_transactions( account_id, req=requisition_id, process=True ) # fetch_tasks.append(task) # await asyncio.gather(*fetch_tasks) else: raise NotImplementedError(f"No such client library: {aggregator.service}") aggregator.fetch_accounts = False aggregator.save() async def platform_job(interval): if interval == 0: return platforms = Platform.objects.filter(enabled=True, cheat_interval_seconds=interval) for platform in platforms: if platform.service == "agora": if platform.cheat is True: instance = await AgoraClient(platform) await instance.cheat() else: raise NotImplementedError(f"No such client library: {platform.service}") class Command(BaseCommand): def handle(self, *args, **options): """ Start the scheduling process. """ scheduler = AsyncIOScheduler() log.debug(f"Scheduling {INTERVAL_AGGREGATOR} second aggregator job") scheduler.add_job(aggregator_job, "interval", seconds=INTERVAL_AGGREGATOR) for interval in INTERVALS_PLATFORM: if interval == 0: continue log.debug(f"Scheduling {interval} second platform job") scheduler.add_job( platform_job, "interval", seconds=interval, args=[interval] ) scheduler.start() loop = asyncio.get_event_loop() try: loop.run_forever() except (KeyboardInterrupt, SystemExit): log.info("Process terminating") finally: loop.close()