You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

143 lines
5.0 KiB
Python

2 years ago
import asyncio
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from django.core.management.base import BaseCommand
from core.clients.aggregators.nordigen import NordigenClient
from core.clients.platforms.agora import AgoraClient
from core.lib.money import Money
from core.lib.notify import sendmsg
from core.models import INTERVAL_CHOICES, Aggregator, LinkGroup, Platform, Requisition
2 years ago
from core.util import logs
log = logs.get_logger("scheduling")
INTERVAL_AGGREGATOR = 10
INTERVAL_WITHDRAWAL = 7200
2 years ago
INTERVALS_PLATFORM = [x[0] for x in INTERVAL_CHOICES]
2 years ago
async def withdrawal_job(group=None):
money = Money()
if group is not None:
groups = [group]
else:
groups = LinkGroup.objects.filter(enabled=True)
for group in groups:
checks = await money.check_all(
user=group.user, nordigen=NordigenClient, agora=AgoraClient
)
print("CHECKS", checks)
aggregators = Aggregator.objects.filter(
user=group.user,
link_group=group,
)
platforms = Platform.objects.filter(
user=group.user,
link_group=group,
)
requisitions = Requisition.objects.filter(
user=group.user,
aggregator__in=aggregators,
)
pay_list = money.get_pay_list(
group,
requisitions,
platforms,
group.user,
checks["total_profit_in_xmr"],
)
collapsed = money.collapse_pay_list(pay_list)
if any(collapsed.values()):
message = ""
print("COLLAPSED", collapsed)
for wallet, amount in collapsed.items():
print("ITER", wallet, amount)
message += f"{wallet}: {amount}\n"
print("MESSAGE", message)
await sendmsg(
group.user,
message,
title="Your withdrawal is ready!",
)
async def aggregator_job():
aggregators = Aggregator.objects.filter(enabled=True)
for aggregator in aggregators:
open_trade_currencies = aggregator.trades_currencies
if aggregator.service == "nordigen":
instance = None
if aggregator.fetch_accounts is True:
aggregator.account_info = {}
aggregator.save()
instance = await NordigenClient(aggregator)
await instance.get_all_account_info(store=True)
# fetch_tasks = []
for bank, accounts in aggregator.account_info.items():
for account in accounts:
account_id = account["account_id"]
requisition_id = account["requisition_id"]
if account["currency"] not in open_trade_currencies:
continue # Next account
# Avoid hammering the API with new access token requests
if instance is None:
instance = await NordigenClient(aggregator)
# task = instance.get_transactions(
# account_id, req=requisition_id, process=True
# )
await instance.get_transactions(
account_id, req=requisition_id, process=True
)
# fetch_tasks.append(task)
# await asyncio.gather(*fetch_tasks)
else:
raise NotImplementedError(f"No such client library: {aggregator.service}")
aggregator.fetch_accounts = False
aggregator.save()
2 years ago
async def platform_job(interval):
if interval == 0:
return
platforms = Platform.objects.filter(enabled=True, cheat_interval_seconds=interval)
for platform in platforms:
if platform.service == "agora":
if platform.cheat is True:
instance = await AgoraClient(platform)
await instance.cheat()
else:
raise NotImplementedError(f"No such client library: {platform.service}")
2 years ago
class Command(BaseCommand):
def handle(self, *args, **options):
"""
Start the scheduling process.
"""
scheduler = AsyncIOScheduler()
log.debug(f"Scheduling {INTERVAL_AGGREGATOR} second aggregator job")
scheduler.add_job(aggregator_job, "interval", seconds=INTERVAL_AGGREGATOR)
log.debug(f"Scheduling {INTERVAL_WITHDRAWAL} second withdrawal job")
scheduler.add_job(withdrawal_job, "interval", seconds=INTERVAL_WITHDRAWAL)
for interval in INTERVALS_PLATFORM:
if interval == 0:
continue
log.debug(f"Scheduling {interval} second platform job")
scheduler.add_job(
platform_job, "interval", seconds=interval, args=[interval]
)
2 years ago
scheduler.start()
loop = asyncio.get_event_loop()
try:
loop.run_forever()
except (KeyboardInterrupt, SystemExit):
log.info("Process terminating")
finally:
loop.close()