pluto/core/management/commands/scheduling.py

181 lines
6.4 KiB
Python

import asyncio
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from django.core.management.base import BaseCommand
from pyotp import TOTP
from core.clients.aggregators.nordigen import NordigenClient
from core.clients.platforms.agora import AgoraClient
from core.lib.money import Money
from core.lib.notify import sendmsg
from core.models import INTERVAL_CHOICES, Aggregator, LinkGroup, Platform, Requisition
from core.util import logs
log = logs.get_logger("scheduling")
INTERVAL_AGGREGATOR = 10
INTERVAL_WITHDRAWAL = 7200
INTERVALS_PLATFORM = [x[0] for x in INTERVAL_CHOICES]
async def withdrawal_job(group=None):
money = Money()
if group is not None:
groups = [group]
else:
groups = LinkGroup.objects.filter(enabled=True)
for group in groups:
checks = await money.check_all(
user=group.user, nordigen=NordigenClient, agora=AgoraClient
)
print("CHECKS", checks)
aggregators = Aggregator.objects.filter(
user=group.user,
link_group=group,
)
platforms = Platform.objects.filter(
user=group.user,
link_group=group,
)
requisitions = Requisition.objects.filter(
user=group.user,
aggregator__in=aggregators,
)
pay_list = money.get_pay_list(
group,
requisitions,
platforms,
group.user,
checks["total_profit_in_xmr"],
)
collapsed = money.collapse_pay_list(pay_list)
if any(collapsed.values()):
message = ""
print("COLLAPSED", collapsed)
for wallet, amount in collapsed.items():
print("ITER", wallet, amount)
message += f"{wallet}: {amount}\n"
print("MESSAGE", message)
await sendmsg(
group.user,
message,
title="Your withdrawal is ready!",
)
if not checks["total_profit_in_xmr"] >= 0:
return
total_withdrawal = sum(collapsed.values())
if checks["total_xmr_agora"] < total_withdrawal:
await sendmsg(
group.user,
(
f"Attempting to withdraw {total_withdrawal}, but you only have"
f" {checks['total_xmr_agora']} in your Agora wallet."
),
title="Withdrawal failed",
)
continue
if group.platforms.count() != 1:
raise Exception("You can only have one platform per group")
platform = group.platforms.first()
# run = await AgoraClient(platform)
otp_code = TOTP(platform.otp_token).now()
for wallet, amount in collapsed.items():
print("ITER SEND", wallet, amount)
cast = {
"address": wallet.address,
"amount": amount,
"password": platform.password,
"otp": otp_code,
}
print("CAST ADDRESS", cast["address"])
print("CAST AMOUNT", cast["amount"])
print("CAST OTP TRUNCATED BY 2", cast["otp"][-2])
# sent = await run.call("wallet_send_xmr", **cast)
# print("SENT", sent)
async def aggregator_job():
aggregators = Aggregator.objects.filter(enabled=True)
for aggregator in aggregators:
open_trade_currencies = aggregator.trades_currencies
if aggregator.service == "nordigen":
instance = None
if aggregator.fetch_accounts is True:
aggregator.account_info = {}
aggregator.save()
instance = await NordigenClient(aggregator)
await instance.get_all_account_info(store=True)
# fetch_tasks = []
for bank, accounts in aggregator.account_info.items():
for account in accounts:
account_id = account["account_id"]
requisition_id = account["requisition_id"]
if account["currency"] not in open_trade_currencies:
continue # Next account
# Avoid hammering the API with new access token requests
if instance is None:
instance = await NordigenClient(aggregator)
# task = instance.get_transactions(
# account_id, req=requisition_id, process=True
# )
await instance.get_transactions(
account_id, req=requisition_id, process=True
)
# fetch_tasks.append(task)
# await asyncio.gather(*fetch_tasks)
else:
raise NotImplementedError(f"No such client library: {aggregator.service}")
aggregator.fetch_accounts = False
aggregator.save()
async def platform_job(interval):
if interval == 0:
return
platforms = Platform.objects.filter(enabled=True, cheat_interval_seconds=interval)
for platform in platforms:
if platform.service == "agora":
if platform.cheat is True:
instance = await AgoraClient(platform)
await instance.cheat()
else:
raise NotImplementedError(f"No such client library: {platform.service}")
class Command(BaseCommand):
def handle(self, *args, **options):
"""
Start the scheduling process.
"""
scheduler = AsyncIOScheduler()
log.debug(f"Scheduling {INTERVAL_AGGREGATOR} second aggregator job")
scheduler.add_job(aggregator_job, "interval", seconds=INTERVAL_AGGREGATOR)
log.debug(f"Scheduling {INTERVAL_WITHDRAWAL} second withdrawal job")
scheduler.add_job(withdrawal_job, "interval", seconds=INTERVAL_WITHDRAWAL)
for interval in INTERVALS_PLATFORM:
if interval == 0:
continue
log.debug(f"Scheduling {interval} second platform job")
scheduler.add_job(
platform_job, "interval", seconds=interval, args=[interval]
)
scheduler.start()
loop = asyncio.get_event_loop()
try:
loop.run_forever()
except (KeyboardInterrupt, SystemExit):
log.info("Process terminating")
finally:
loop.close()