Begin adding platform support

This commit is contained in:
2023-03-09 23:27:16 +00:00
parent ac483711c4
commit 1e7d8f6c8d
17 changed files with 3724 additions and 47 deletions

View File

@@ -2,7 +2,7 @@ from redis import asyncio as aioredis
from core.util import logs
log = logs.get_logger("scheduling")
log = logs.get_logger("db")
r = aioredis.from_url("redis://redis:6379", db=0)
@@ -22,20 +22,20 @@ def convert(data):
return data
def get_refs():
async def get_refs():
"""
Get all reference IDs for trades.
:return: list of trade IDs
:rtype: list
"""
references = []
ref_keys = r.keys("trade.*.reference")
ref_keys = await r.keys("trade.*.reference")
for key in ref_keys:
references.append(r.get(key))
return convert(references)
def tx_to_ref(tx):
async def tx_to_ref(tx):
"""
Convert a trade ID to a reference.
:param tx: trade ID
@@ -43,16 +43,16 @@ def tx_to_ref(tx):
:return: reference
:rtype: string
"""
refs = get_refs()
refs = await get_refs()
for reference in refs:
ref_data = convert(r.hgetall(f"trade.{reference}"))
ref_data = convert(await r.hgetall(f"trade.{reference}"))
if not ref_data:
continue
if ref_data["id"] == tx:
return reference
def ref_to_tx(reference):
async def ref_to_tx(reference):
"""
Convert a reference to a trade ID.
:param reference: trade reference
@@ -60,27 +60,27 @@ def ref_to_tx(reference):
:return: trade ID
:rtype: string
"""
ref_data = convert(r.hgetall(f"trade.{reference}"))
ref_data = convert(await r.hgetall(f"trade.{reference}"))
if not ref_data:
return False
return ref_data["id"]
def get_ref_map():
async def get_ref_map():
"""
Get all reference IDs for trades.
:return: dict of references keyed by TXID
:rtype: dict
"""
references = {}
ref_keys = r.keys("trade.*.reference")
ref_keys = await r.keys("trade.*.reference")
for key in ref_keys:
tx = convert(key).split(".")[1]
references[tx] = r.get(key)
references[tx] = await r.get(key)
return convert(references)
def get_ref(reference):
async def get_ref(reference):
"""
Get the trade information for a reference.
:param reference: trade reference
@@ -88,7 +88,7 @@ def get_ref(reference):
:return: dict of trade information
:rtype: dict
"""
ref_data = r.hgetall(f"trade.{reference}")
ref_data = await r.hgetall(f"trade.{reference}")
ref_data = convert(ref_data)
if "subclass" not in ref_data:
ref_data["subclass"] = "agora"
@@ -97,7 +97,7 @@ def get_ref(reference):
return ref_data
def get_tx(tx):
async def get_tx(tx):
"""
Get the transaction information for a transaction ID.
:param reference: trade reference
@@ -105,31 +105,31 @@ def get_tx(tx):
:return: dict of trade information
:rtype: dict
"""
tx_data = r.hgetall(f"tx.{tx}")
tx_data = await r.hgetall(f"tx.{tx}")
tx_data = convert(tx_data)
if not tx_data:
return False
return tx_data
def get_subclass(reference):
obj = r.hget(f"trade.{reference}", "subclass")
async def get_subclass(reference):
obj = await r.hget(f"trade.{reference}", "subclass")
subclass = convert(obj)
return subclass
def del_ref(reference):
async def del_ref(reference):
"""
Delete a given reference from the Redis database.
:param reference: trade reference to delete
:type reference: string
"""
tx = ref_to_tx(reference)
r.delete(f"trade.{reference}")
r.delete(f"trade.{tx}.reference")
tx = await ref_to_tx(reference)
await r.delete(f"trade.{reference}")
await r.delete(f"trade.{tx}.reference")
def cleanup(subclass, references):
async def cleanup(subclass, references):
"""
Reconcile the internal reference database with a given list of references.
Delete all internal references not present in the list and clean up artifacts.
@@ -137,14 +137,44 @@ def cleanup(subclass, references):
:type references: list
"""
messages = []
for tx, reference in get_ref_map().items():
for tx, reference in await get_ref_map().items():
if reference not in references:
if get_subclass(reference) == subclass:
if await get_subclass(reference) == subclass:
logmessage = (
f"[{reference}] ({subclass}): Archiving trade reference. TX: {tx}"
)
messages.append(logmessage)
log.info(logmessage)
r.rename(f"trade.{tx}.reference", f"archive.trade.{tx}.reference")
r.rename(f"trade.{reference}", f"archive.trade.{reference}")
await r.rename(f"trade.{tx}.reference", f"archive.trade.{tx}.reference")
await r.rename(f"trade.{reference}", f"archive.trade.{reference}")
return messages
async def find_trade(self, txid, currency, amount):
"""
Get a trade reference that matches the given currency and amount.
Only works if there is one result.
:param txid: Sink transaction ID
:param currency: currency
:param amount: amount
:type txid: string
:type currency: string
:type amount: int
:return: matching trade object or False
:rtype: dict or bool
"""
refs = await get_refs()
matching_refs = []
# TODO: use get_ref_map in this function instead of calling get_ref multiple times
for ref in refs:
stored_trade = await get_ref(ref)
if stored_trade["currency"] == currency and float(
stored_trade["amount"]
) == float(amount):
matching_refs.append(stored_trade)
if len(matching_refs) != 1:
log.error(
f"Find trade returned multiple results for TXID {txid}: {matching_refs}"
)
return False
return matching_refs[0]