pluto/core/management/commands/scheduling.py

228 lines
8.0 KiB
Python

import asyncio
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from django.conf import settings
from django.core.management.base import BaseCommand
from pyotp import TOTP
from core.clients.aggregators.nordigen import NordigenClient
from core.clients.platforms.agora import AgoraClient
from core.lib.money import Money
from core.lib.notify import sendmsg
from core.models import (
INTERVAL_CHOICES,
Aggregator,
LinkGroup,
Payout,
Platform,
Requisition,
)
from core.util import logs
from core.util.validation import Validation
log = logs.get_logger("scheduling")
INTERVAL_AGGREGATOR = 10
INTERVAL_WITHDRAWAL = 7200
INTERVALS_PLATFORM = [x[0] for x in INTERVAL_CHOICES]
async def withdrawal_job(group=None):
money = Money()
if group is not None:
groups = [group]
else:
groups = LinkGroup.objects.filter(enabled=True)
for group in groups:
checks = await money.check_all(
link_group=group, nordigen=NordigenClient, agora=AgoraClient
)
if checks["total_remaining"] > 0:
# More than 0 remaining, so we can't withdraw
await sendmsg(
group.user,
f"{checks['total_remaining']} left until you can withdraw.",
title="Balance update",
)
continue
print("CHECKS", checks)
aggregators = Aggregator.objects.filter(
user=group.user,
link_group=group,
)
platforms = Platform.objects.filter(
user=group.user,
link_group=group,
)
requisitions = Requisition.objects.filter(
user=group.user,
aggregator__in=aggregators,
)
pay_list = money.get_pay_list(
group,
requisitions,
platforms,
group.user,
checks["total_profit_in_xmr"],
)
collapsed = money.collapse_pay_list(pay_list)
if any(collapsed.values()):
message = ""
print("COLLAPSED", collapsed)
for wallet, amount in collapsed.items():
print("ITER", wallet, amount)
message += f"{wallet}: {amount}\n"
print("MESSAGE", message)
await sendmsg(
group.user,
message,
title="Your withdrawal is ready!",
)
# TODO: UNCOMMENT
# COMMENTED FOR TESTING
if not checks["total_profit_in_xmr"] >= 0:
return
total_withdrawal = sum(collapsed.values())
if checks["total_xmr_agora"] < total_withdrawal:
await sendmsg(
group.user,
(
f"Attempting to withdraw {total_withdrawal}, but you only have"
f" {checks['total_xmr_agora']} in your Agora wallet."
),
title="Withdrawal failed",
)
continue
if group.platforms.count() != 1:
raise Exception("You can only have one platform per group")
platform = group.platforms.first()
run = await AgoraClient(platform)
for wallet, pay_list_iter in pay_list.items():
print("WALLET ITER", wallet)
if not Validation.is_address("xmr", wallet.address):
print("NOT VALID", wallet.address)
await sendmsg(
group.user,
f"Invalid XMR address: {wallet.address}, ignored",
title="Invalid XMR address",
)
continue
for amount, reason in pay_list_iter:
print("ITER", wallet, pay_list_iter)
print("ITER SENT", wallet, amount, reason)
# for wallet, amount in collapsed.items():
print("ITER SEND", wallet, amount)
amount_rounded = round(amount, 8)
cast = {
"address": wallet.address,
"amount": amount_rounded,
"password": platform.password,
"otp": TOTP(platform.otp_token).now(),
}
print("CAST ADDRESS", cast["address"])
print("CAST AMOUNT", cast["amount"])
print("CAST OTP TRUNCATED BY 2", cast["otp"][-2])
if not settings.DUMMY:
sent = await run.call("wallet_send_xmr", **cast)
print("SENT", sent)
payout = Payout.objects.create( # noqa
user=group.user,
wallet=wallet,
amount=amount_rounded,
description=reason,
)
if not settings.DUMMY:
payout.response = sent
payout.save()
async def aggregator_job():
aggregators = Aggregator.objects.filter(enabled=True)
for aggregator in aggregators:
open_trade_currencies = aggregator.trades_currencies
if aggregator.service == "nordigen":
instance = None
if aggregator.fetch_accounts is True:
aggregator.account_info = {}
aggregator.save()
instance = await NordigenClient(aggregator)
await instance.get_all_account_info(store=True)
# fetch_tasks = []
for bank, accounts in aggregator.account_info.items():
for account in accounts:
account_id = account["account_id"]
requisition_id = account["requisition_id"]
if account["currency"] not in open_trade_currencies:
continue # Next account
# Avoid hammering the API with new access token requests
if instance is None:
instance = await NordigenClient(aggregator)
# task = instance.get_transactions(
# account_id, req=requisition_id, process=True
# )
await instance.get_transactions(
account_id, req=requisition_id, process=True
)
# fetch_tasks.append(task)
# await asyncio.gather(*fetch_tasks)
else:
raise NotImplementedError(f"No such client library: {aggregator.service}")
aggregator.fetch_accounts = False
aggregator.save()
async def platform_job(interval):
if interval == 0:
return
platforms = Platform.objects.filter(enabled=True, cheat_interval_seconds=interval)
for platform in platforms:
if platform.service == "agora":
if platform.cheat is True:
instance = await AgoraClient(platform)
await instance.cheat()
else:
raise NotImplementedError(f"No such client library: {platform.service}")
class Command(BaseCommand):
def handle(self, *args, **options):
"""
Start the scheduling process.
"""
scheduler = AsyncIOScheduler()
log.debug(f"Scheduling {INTERVAL_AGGREGATOR} second aggregator job")
scheduler.add_job(aggregator_job, "interval", seconds=INTERVAL_AGGREGATOR)
log.debug(f"Scheduling {INTERVAL_WITHDRAWAL} second withdrawal job")
scheduler.add_job(withdrawal_job, "interval", seconds=INTERVAL_WITHDRAWAL)
for interval in INTERVALS_PLATFORM:
if interval == 0:
continue
log.debug(f"Scheduling {interval} second platform job")
scheduler.add_job(
platform_job, "interval", seconds=interval, args=[interval]
)
scheduler.start()
loop = asyncio.get_event_loop()
try:
loop.run_forever()
except (KeyboardInterrupt, SystemExit):
log.info("Process terminating")
finally:
loop.close()