import asyncio from apscheduler.schedulers.asyncio import AsyncIOScheduler from django.core.management.base import BaseCommand # from core.clients.aggregators.nordigen import NordigenClient from core.clients.platforms.agora import AgoraClient from core.models import Aggregator, Platform from core.util import logs log = logs.get_logger("polling") INTERVAL = 5 async def poll_aggregator(aggregator): print("Polling aggregator", aggregator) async def poll_platform(platform): print("Polling platform", platform) client = await AgoraClient(platform) await client.poll() async def job(): platforms = Platform.objects.filter(enabled=True) aggregators = Aggregator.objects.filter(enabled=True) tasks = [] for platform in platforms: tasks.append(poll_platform(platform)) for aggregator in aggregators: tasks.append(poll_aggregator(aggregator)) # Run it all at once await asyncio.gather(*tasks) class Command(BaseCommand): def handle(self, *args, **options): """ Start the polling process. """ scheduler = AsyncIOScheduler() log.debug(f"Scheduling polling process job every {INTERVAL} seconds") scheduler.add_job(job, "interval", seconds=INTERVAL) scheduler.start() loop = asyncio.get_event_loop() try: loop.run_forever() except (KeyboardInterrupt, SystemExit): log.info("Process terminating") finally: loop.close()