import functools from json import JSONDecodeError, dumps, loads from klein import Klein from twisted.web.server import Request import main from modules import chankeep, provision, userinfo from utils.logging.log import warn def login_required(func): @functools.wraps(func) def wrapper(self, *args, **kwargs): if isinstance(args[0], Request): request = args[0] apikey = request.getHeader("ApiKey") token = request.getHeader("Token") if not apikey: return "No API key provided" if not token: return "No token provided" if apikey not in main.tokens: return "No such API key" config_token = main.tokens[apikey] if not token == config_token["hello"]: return "Invalid token" counter = config_token["counter"] request.setHeader("Counter", counter) return func(self, *args, **kwargs) return wrapper class API(object): """ Our API webapp. """ app = Klein() @app.route("/", methods=["GET"]) @login_required def hello(self, request): return "Hello" @app.route("/who//", methods=["POST"]) @login_required def who(self, request, query): try: data = loads(request.content.read()) except JSONDecodeError: return "Invalid JSON" if "query" not in data: return "No query provided" result = userinfo.getWho(data["query"]) # Expand the generator return dumps({k: [x for x in v] for k, v in result.items()}) @app.route("/chans/", methods=["POST"]) @login_required def chans(self, request): try: data = loads(request.content.read()) except JSONDecodeError: return "Invalid JSON" if "net" not in data: return "No net provided" if "query" not in data: return "No query provided" if not data["query"]: warn(f"No query provided: for chans {data}") return dumps({}) result = userinfo.getChansSingle(data["net"], data["query"]) if not result: return dumps({}) return dumps({"chans": [x for x in result]}) @app.route("/users/", methods=["POST"]) @login_required def users(self, request): try: data = loads(request.content.read()) except JSONDecodeError: return "Invalid JSON" if "net" not in data: return "No net provided" if "query" not in data: return "No query provided" if not data["query"]: warn(f"No query provided for users: {data}") return dumps({}) result = userinfo.getUsersSingle(data["net"], data["query"]) if not result: return dumps({}) return dumps({"users": [x for x in result]}) @app.route("/online/", methods=["POST"]) @login_required def online(self, request): try: data = loads(request.content.read()) except JSONDecodeError: return "Invalid JSON" if "net" not in data: return "No net provided" if "query" not in data: return "No users provided" if not data["query"]: warn(f"No users provided: for online {data}") return dumps({}) net = data["net"] usermap = {} for user in data["query"]: channels = userinfo.getChansSingle(net, [user]) if channels: usermap[user] = True else: usermap[user] = False return dumps(usermap) @app.route("/num_users/", methods=["POST"]) @login_required def num_users(self, request): try: data = loads(request.content.read()) except JSONDecodeError: return "Invalid JSON" if "net" not in data: return "No net provided" if "query" not in data: return "No users provided" if not data["query"]: warn(f"No chans provided: for num_users {data}") return dumps({}) net = data["net"] results = userinfo.getUserNum(net, data["query"]) return dumps(results) @app.route("/num_chans/", methods=["POST"]) @login_required def num_chans(self, request): try: data = loads(request.content.read()) except JSONDecodeError: return "Invalid JSON" if "net" not in data: return "No net provided" if "query" not in data: return "No users provided" if not data["query"]: warn(f"No users provided: for num_chans {data}") return dumps({}) net = data["net"] results = userinfo.getChanNum(net, data["query"]) return dumps(results) @app.route("/irc/stats/", methods=["POST"]) @login_required def irc_stats(self, request): stats = {} numChannels = 0 numWhoEntries = 0 for i in main.IRCPool.keys(): numChannels += len(main.IRCPool[i].channels) numWhoEntries += userinfo.getNumTotalWhoEntries() numRelays = 0 for net in main.network.keys(): numRelays += len(main.network[net].relays) stats["servers_total_total"] = numRelays stats["servers_total_unique"] = len(main.network.keys()) stats["servers_online_total"] = len(main.IRCPool.keys()) stats["servers_online_unique"] = len(main.liveNets()) stats["channels"] = numChannels stats["records"] = numWhoEntries stats["eventrate"] = main.lastMinuteSample return dumps(stats) @app.route("/irc/networks/", methods=["POST"]) @login_required def irc_networks(self, request): networks = {} for net in main.network.keys(): networks[net] = { "relays": len(main.network[net].relays), "channels": userinfo.getTotalChanNum(net), "records": userinfo.getNumWhoEntries(net), } return dumps(networks) @app.route("/irc/network//", methods=["POST"]) @login_required def irc_network(self, request, net): if net not in main.network.keys(): return dumps({"success": False, "reason": "no such net."}) inst = main.network[net] network = {} network["net"] = inst.net network["auth"] = inst.auth network["host"] = inst.host network["last"] = inst.last network["port"] = inst.port network["security"] = inst.security network["relays"] = len(inst.relays) network["channels"] = userinfo.getTotalChanNum(net) network["records"] = userinfo.getNumWhoEntries(net) return dumps(network) @app.route("/irc/network//edit/", methods=["POST"]) @login_required def irc_network_edit(self, request, net): try: data = loads(request.content.read()) except JSONDecodeError: return "Invalid JSON" if net not in main.network.keys(): return dumps({"success": False, "reason": "no such net."}) inst = main.network[net] for item in data: if item == "auth": auth = data[item][0] if auth not in ["sasl", "ns", "none"]: return dumps({"success": False, "reason": "invalid auth."}) elif item == "host": host = data[item][0] elif item == "last": last = data[item][0] if not last.isdigit(): return dumps({"success": False, "reason": "invalid last: not a number."}) elif item == "port": port = data[item][0] if not port.isdigit(): return dumps({"success": False, "reason": "invalid port: not a number."}) port = int(port) elif item == "security": security = data[item][0] if security not in ["ssl", "plain"]: return dumps({"success": False, "reason": "invalid security."}) inst.auth = auth inst.host = host inst.last = last inst.port = port inst.security = security main.saveConf("network") return dumps({"success": True}) @app.route("/irc/network//relays/", methods=["POST"]) @login_required def irc_network_relays(self, request, net): if net not in main.network.keys(): return dumps({"success": False, "reason": "no such net."}) relays_inst = main.network[net].relays relays = [] for num in relays_inst.keys(): to_append = relays_inst[num] name = f"{net}{num}" if name in main.IRCPool.keys(): to_append["chans"] = len(main.IRCPool[name].channels) to_append["nick"] = main.IRCPool[name].nickname else: to_append["chans"] = 0 to_append["nick"] = None relays.append(to_append) return dumps({"relays": relays}) @app.route("/irc/network///", methods=["POST"]) @login_required def irc_network_relay(self, request, net, num): try: data = loads(request.content.read()) except JSONDecodeError: return "Invalid JSON" if net not in main.network.keys(): return dumps({"success": False, "reason": "no such net."}) if not num.isdigit(): return dumps({"success": False, "reason": "invalid num: not a number."}) num = int(num) net_inst = main.network[net] if num not in net_inst.relays: return dumps({"success": False, "reason": "network has no such relay."}) if "status" in data: if not type(data["status"]) == int: return dumps({"success": False, "reason": "invalid type for enabled."}) enabled = data["status"] if enabled: net_inst.enable_relay(num) else: net_inst.disable_relay(num) main.saveConf("network") return dumps({"success": True}) @app.route("/irc/network///", methods=["PUT"]) @login_required def irc_network_relay_add(self, request, net, num): if net not in main.network.keys(): return dumps({"success": False, "reason": "no such net."}) if not num.isdigit(): return dumps({"success": False, "reason": "invalid num: not a number."}) num = int(num) net_inst = main.network[net] if num in net_inst.relays: return dumps({"success": False, "reason": "network already has this relay."}) id, alias = net_inst.add_relay(num) main.saveConf("network") return dumps({"success": True, "id": id, "alias": alias}) @app.route("/irc/network//", methods=["PUT"]) @login_required def irc_network_relay_add_next(self, request, net): if net not in main.network.keys(): return dumps({"success": False, "reason": "no such net."}) net_inst = main.network[net] id, alias = net_inst.add_relay() main.saveConf("network") return dumps({"success": True, "id": id, "alias": alias}) @app.route("/irc/network///", methods=["DELETE"]) @login_required def irc_network_relay_del(self, request, net, num): if net not in main.network.keys(): return dumps({"success": False, "reason": "no such net."}) if not num.isdigit(): return dumps({"success": False, "reason": "invalid num: not a number."}) num = int(num) net_inst = main.network[net] if num not in net_inst.relays: return dumps({"success": False, "reason": "network does not have this relay."}) net_inst.delete_relay(num) main.saveConf("network") return dumps({"success": True}) @app.route("/irc/network//channels/", methods=["POST"]) @login_required def irc_network_channels(self, request, net): if net not in main.network.keys(): return dumps({"success": False, "reason": "no such net."}) relays_inst = main.network[net].relays channels = {} for num in relays_inst.keys(): name = f"{net}{num}" if name in main.IRCPool.keys(): net_chans = main.IRCPool[name].channels channels_annotated = userinfo.getUserNum(net, net_chans) for channel in net_chans: channels[channel] = channels_annotated[channel] return dumps({"channels": channels}) @app.route("/irc/network//channel//", methods=["DELETE"]) @login_required def irc_network_channel_part(self, request, net, channel): if net not in main.network.keys(): return dumps({"success": False, "reason": "no such net."}) parted = chankeep.partSingle(net, channel) if not parted: dumps({"success": False, "reason": "no channels matched."}) return dumps({"success": True, "relays": parted}) @app.route("/irc/network//channel//", methods=["PUT"]) @login_required def irc_network_channel_join(self, request, net, channel): if net not in main.network.keys(): return dumps({"success": False, "reason": "no such net."}) joined = chankeep.joinSingle(net, channel) if not joined: return dumps({"success": False, "reason": "could not allocate channel to relay."}) return dumps({"success": True, "relays": joined}) @app.route("/aliases/", methods=["GET"]) @login_required def aliases(self, request): alias_list = [] for num, alias in main.alias.items(): alias_dup = dict(alias) alias_dup["num"] = num alias_list.append(alias_dup) return dumps({"aliases": alias_list}) @app.route("/aliases/", methods=["POST"]) @login_required def aliases_update(self, request): try: data = loads(request.content.read()) except JSONDecodeError: return "Invalid JSON" for alias, fields in data.items(): if not alias.isdigit(): return dumps({"success": False, "reason": "alias not a number."}) alias = int(alias) if alias not in main.alias.keys(): return dumps({"success": False, "reason": "alias does not exist."}) if fields: for field in fields: if field in main.alias[alias]: main.alias[alias][field] = fields[field] if "emails" in fields: if not fields["emails"]: main.alias[alias]["emails"] = [] main.saveConf("alias") return dumps({"success": True}) @app.route("/irc/auto//", methods=["POST"]) @login_required def irc_auto_network(self, request, net): if net not in main.network.keys(): return dumps({"success": False, "reason": "no such net."}) if 1 in main.network[net].relays.keys(): return dumps({"success": False, "reason": f"first relay exists on {net}"}) num, alias = main.network[net].add_relay(1) provision.provisionRelay(num, net) main.saveConf("network") return dumps({"success": True, f"message": "created relay {num} with alias {alias} on {net}"}) @app.route("/irc/list//", methods=["POST"]) @login_required def irc_list_network(self, request, net): if net not in main.network.keys(): return dumps({"success": False, "reason": "no such net."}) if 1 not in main.network[net].relays.keys(): return dumps({"success": False, "reason": f"no first relay on {net}"}) num, alias = main.network[net].add_relay(1) provision.provisionRelay(num, net) main.saveConf("network") return dumps({"success": True, f"message": "created relay {num} with alias {alias} on {net}"})