from redis import asyncio as aioredis from core.util import logs log = logs.get_logger("scheduling") r = aioredis.from_url("redis://redis:6379", db=0) def convert(data): """ Recursively convert a dictionary. """ if isinstance(data, bytes): return data.decode("ascii") if isinstance(data, dict): return dict(map(convert, data.items())) if isinstance(data, tuple): return map(convert, data) if isinstance(data, list): return list(map(convert, data)) return data def get_refs(): """ Get all reference IDs for trades. :return: list of trade IDs :rtype: list """ references = [] ref_keys = r.keys("trade.*.reference") for key in ref_keys: references.append(r.get(key)) return convert(references) def tx_to_ref(tx): """ Convert a trade ID to a reference. :param tx: trade ID :type tx: string :return: reference :rtype: string """ refs = get_refs() for reference in refs: ref_data = convert(r.hgetall(f"trade.{reference}")) if not ref_data: continue if ref_data["id"] == tx: return reference def ref_to_tx(reference): """ Convert a reference to a trade ID. :param reference: trade reference :type reference: string :return: trade ID :rtype: string """ ref_data = convert(r.hgetall(f"trade.{reference}")) if not ref_data: return False return ref_data["id"] def get_ref_map(): """ Get all reference IDs for trades. :return: dict of references keyed by TXID :rtype: dict """ references = {} ref_keys = r.keys("trade.*.reference") for key in ref_keys: tx = convert(key).split(".")[1] references[tx] = r.get(key) return convert(references) def get_ref(reference): """ Get the trade information for a reference. :param reference: trade reference :type reference: string :return: dict of trade information :rtype: dict """ ref_data = r.hgetall(f"trade.{reference}") ref_data = convert(ref_data) if "subclass" not in ref_data: ref_data["subclass"] = "agora" if not ref_data: return False return ref_data def get_tx(tx): """ Get the transaction information for a transaction ID. :param reference: trade reference :type reference: string :return: dict of trade information :rtype: dict """ tx_data = r.hgetall(f"tx.{tx}") tx_data = convert(tx_data) if not tx_data: return False return tx_data def get_subclass(reference): obj = r.hget(f"trade.{reference}", "subclass") subclass = convert(obj) return subclass def del_ref(reference): """ Delete a given reference from the Redis database. :param reference: trade reference to delete :type reference: string """ tx = ref_to_tx(reference) r.delete(f"trade.{reference}") r.delete(f"trade.{tx}.reference") def cleanup(subclass, references): """ Reconcile the internal reference database with a given list of references. Delete all internal references not present in the list and clean up artifacts. :param references: list of references to reconcile against :type references: list """ messages = [] for tx, reference in get_ref_map().items(): if reference not in references: if get_subclass(reference) == subclass: logmessage = ( f"[{reference}] ({subclass}): Archiving trade reference. TX: {tx}" ) messages.append(logmessage) log.info(logmessage) r.rename(f"trade.{tx}.reference", f"archive.trade.{tx}.reference") r.rename(f"trade.{reference}", f"archive.trade.{reference}") return messages