from copy import deepcopy from random import choice import main from modules import provision from utils.logging.debug import debug from utils.logging.log import error def needToRegister(net): # Check if the network does not support authentication networkObj = main.network[net] if networkObj.auth == "none": return False # Check if the IRC network definition has registration disabled inst = selectInst(net) if "register" in inst.keys(): if inst["register"]: return True else: return False def needToAuth(net): networkObj = main.network[net] if networkObj.auth == "none": return False return True def selectInst(net): if net in main.irc.keys(): inst = deepcopy(main.irc[net]) for i in main.irc["_"].keys(): if i not in inst: inst[i] = main.irc["_"][i] else: inst = deepcopy(main.irc["_"]) return inst def substitute(net, num, token=None): inst = selectInst(net) alias = main.alias[num] gotemail = False if "emails" in alias: # First priority is explicit email lists if alias["emails"]: email = choice(alias["emails"]) gotemail = True if "domains" in inst: if inst["domains"]: if not gotemail: domain = choice(inst["domains"]) email = f"{alias['nickname']}@{domain}" gotemail = True if not gotemail: inst["email"] = False nickname = alias["nick"] # username = nickname + "/" + net password = main.network[net].aliases[num]["password"] # inst["email"] = inst["email"].replace("{nickname}", nickname) name = f"{net}{num}" if name in main.IRCPool: curnick = main.IRCPool[name].nickname else: curnick = nickname for i in inst.keys(): if not isinstance(inst[i], str): continue inst[i] = inst[i].replace("{nickname}", nickname) inst[i] = inst[i].replace("{curnick}", curnick) inst[i] = inst[i].replace("{password}", password) if gotemail: inst[i] = inst[i].replace("{email}", email) if token: inst[i] = inst[i].replace("{token}", token) return inst def registerAccount(net, num): debug("Attempting to register: %s - %i" % (net, num)) sinst = substitute(net, num) if not sinst: error(f"Register account failed for {net} - {num}") return if not sinst["email"]: error(f"Could not get email for {net} - {num}") return if not sinst["register"]: error("Cannot register for %s: function disabled" % (net)) return False name = net + str(num) if not main.IRCPool[name].authenticated: main.IRCPool[name].msg(sinst["entity"], sinst["registermsg"]) def confirmAccount(net, num, token): sinst = substitute(net, num, token=token) name = net + str(num) if name in main.IRCPool: main.IRCPool[name].msg(sinst["entity"], sinst["confirm"]) enableAuthentication(net, num) def confirmRegistration(net, num, negativepass=None): obj = main.network[net] name = net + str(num) if name in main.IRCPool.keys(): if negativepass is not None: main.IRCPool[name].regPing(negativepass=negativepass) return debug("Relay authenticated: %s - %i" % (net, num)) main.IRCPool[name].authenticated = True main.IRCPool[name].recheckList() if obj.relays[num]["registered"]: return if name in main.IRCPool.keys(): if main.IRCPool[name]._regAttempt: try: main.IRCPool[name]._regAttempt.cancel() except: # noqa pass obj.relays[num]["registered"] = True main.saveConf("network") def attemptManualAuthentication(net, num): sinst = substitute(net, num) identifymsg = sinst["identifymsg"] entity = sinst["entity"] name = f"{net}{num}" if name not in main.IRCPool: return main.IRCPool[name].sendmsg(entity, identifymsg, in_query=True) def enableAuthentication(net, num, jump=True, run_now=False): obj = main.network[net] nick = main.alias[num]["nick"] auth = obj.auth name = f"{net}{num}" if name not in main.IRCPool: return # uname = main.alias[num]["nick"] + "/" + net password = main.network[net].aliases[num]["password"] provision.provisionAuthenticationData(num, nick, net, auth, password) # Set up for auth if jump: main.IRCPool[name].msg(main.config["Tweaks"]["ZNC"]["Prefix"] + "status", "Jump") if run_now: attemptManualAuthentication(net, num) if selectInst(net)["check"] is False: confirmRegistration(net, num) def get_unregistered_relays(net=None): """ Get a dict of unregistereed relays, either globally or for a network. Returns: {"net": [["nick1", 1], ["nick2", 2], ...]} """ unreg = {} if net: nets = [net] else: nets = main.network.keys() for i in nets: for num in main.network[i].relays.keys(): if not main.network[i].relays[num]["registered"]: nick = main.alias[num]["nick"] if i in unreg: unreg[i].append([nick, num]) else: unreg[i] = [[nick, num]] return unreg def registerTest(c): sinst = substitute(c["net"], c["num"]) name = c["net"] + str(c["num"]) net = c["net"] num = c["num"] if sinst["check"] is False: return if "msg" in c.keys() and not c["msg"] is None: if sinst["negative"]: if name in main.IRCPool.keys(): if main.IRCPool[name]._negativePass is not True: if c["type"] == "query" and c["nick"] == sinst["entity"]: if sinst["checknegativemsg"] in c["msg"]: confirmRegistration( c["net"], c["num"], negativepass=False ) # Not passed negative check, report back return if sinst["checkendnegative"] in c["msg"]: confirmRegistration( c["net"], c["num"], negativepass=True ) # Passed the negative check, report back return if sinst["ping"]: if sinst["checkmsg2"] in c["msg"] and c["nick"] == sinst["entity"]: confirmRegistration(c["net"], c["num"]) debug( ( f"registerTest() {net} - {num} passed ping:checkmsg2 " f"check, {sinst['checkmsg2']} present in message" ) ) return if sinst["checktype"] == "msg": if sinst["checkmsg"] in c["msg"]: confirmRegistration(c["net"], c["num"]) debug( ( f"registerTest() {net} - {num} passed checktype:msg:checkmsg check, " f"{sinst['checkmsg']} present in message" ) ) return elif sinst["checktype"] == "mode": if c["type"] == "self": if c["mtype"] == "mode": if sinst["checkmode"] in c["mode"] and c["status"] is True: confirmRegistration(c["net"], c["num"]) debug( ( f"registerTest() {net} - {num} passed checktype:mode:checkmost check, " f"{sinst['checkmode']} present in mode" ) ) return