from redis import asyncio as aioredis from core.util import logs log = logs.get_logger("db") r = aioredis.from_url("redis://redis:6379", db=0) def convert(data): """ Recursively convert a dictionary. """ if isinstance(data, bytes): return data.decode("ascii") if isinstance(data, dict): return dict(map(convert, data.items())) if isinstance(data, tuple): return map(convert, data) if isinstance(data, list): return list(map(convert, data)) return data async def get_refs(): """ Get all reference IDs for trades. :return: list of trade IDs :rtype: list """ references = [] ref_keys = await r.keys("trade.*.reference") for key in ref_keys: key_data = await r.get(key) references.append(key_data) return convert(references) async def tx_to_ref(tx): """ Convert a trade ID to a reference. :param tx: trade ID :type tx: string :return: reference :rtype: string """ refs = await get_refs() for reference in refs: ref_data = await r.hgetall(f"trade.{reference}") ref_data = convert(ref_data) if not ref_data: continue if ref_data["id"] == tx: return reference async def ref_to_tx(reference): """ Convert a reference to a trade ID. :param reference: trade reference :type reference: string :return: trade ID :rtype: string """ ref_data = convert(await r.hgetall(f"trade.{reference}")) if not ref_data: return False return ref_data["id"] async def get_ref_map(): """ Get all reference IDs for trades. :return: dict of references keyed by TXID :rtype: dict """ references = {} ref_keys = await r.keys("trade.*.reference") for key in ref_keys: tx = convert(key).split(".")[1] references[tx] = await r.get(key) return convert(references) async def get_ref(reference): """ Get the trade information for a reference. :param reference: trade reference :type reference: string :return: dict of trade information :rtype: dict """ ref_data = await r.hgetall(f"trade.{reference}") ref_data = convert(ref_data) if "subclass" not in ref_data: ref_data["subclass"] = "agora" if not ref_data: return False return ref_data async def get_tx(tx): """ Get the transaction information for a transaction ID. :param reference: trade reference :type reference: string :return: dict of trade information :rtype: dict """ tx_data = await r.hgetall(f"tx.{tx}") tx_data = convert(tx_data) if not tx_data: return False return tx_data async def get_subclass(reference): obj = await r.hget(f"trade.{reference}", "subclass") subclass = convert(obj) return subclass async def del_ref(reference): """ Delete a given reference from the Redis database. :param reference: trade reference to delete :type reference: string """ tx = await ref_to_tx(reference) await r.delete(f"trade.{reference}") await r.delete(f"trade.{tx}.reference") async def cleanup(subclass, references): """ Reconcile the internal reference database with a given list of references. Delete all internal references not present in the list and clean up artifacts. :param references: list of references to reconcile against :type references: list """ messages = [] ref_map = await get_ref_map() for tx, reference in ref_map.items(): if reference not in references: if await get_subclass(reference) == subclass: logmessage = ( f"[{reference}] ({subclass}): Archiving trade reference. TX: {tx}" ) messages.append(logmessage) log.info(logmessage) await r.rename(f"trade.{tx}.reference", f"archive.trade.{tx}.reference") await r.rename(f"trade.{reference}", f"archive.trade.{reference}") return messages async def find_trade(self, txid, currency, amount): """ Get a trade reference that matches the given currency and amount. Only works if there is one result. :param txid: Sink transaction ID :param currency: currency :param amount: amount :type txid: string :type currency: string :type amount: int :return: matching trade object or False :rtype: dict or bool """ refs = await get_refs() matching_refs = [] # TODO: use get_ref_map in this function instead of calling get_ref multiple times for ref in refs: stored_trade = await get_ref(ref) if stored_trade["currency"] == currency and float( stored_trade["amount"] ) == float(amount): matching_refs.append(stored_trade) if len(matching_refs) != 1: log.error( f"Find trade returned multiple results for TXID {txid}: {matching_refs}" ) return False return matching_refs[0]