monolith/modules/regproc.py

242 lines
8.2 KiB
Python
Raw Normal View History

2022-07-21 12:40:09 +00:00
from copy import deepcopy
from random import choice
import main
from modules import provision, helpers
2022-07-21 12:40:05 +00:00
from utils.logging.debug import debug
2022-07-21 12:40:09 +00:00
from utils.logging.log import error
2022-07-21 12:39:41 +00:00
2022-07-21 12:40:03 +00:00
def needToRegister(net):
# Check if the network does not support authentication
networkObj = main.network[net]
if networkObj.auth == "none":
return False
# Check if the IRC network definition has registration disabled
inst = selectInst(net)
if "register" in inst.keys():
if inst["register"]:
return True
else:
return False
2022-07-21 12:39:41 +00:00
def needToAuth(net):
networkObj = main.network[net]
if networkObj.auth == "none":
return False
return True
def selectInst(net):
if net in main.irc.keys():
inst = deepcopy(main.irc[net])
for i in main.irc["_"].keys():
2022-07-21 12:40:05 +00:00
if i not in inst:
inst[i] = main.irc["_"][i]
else:
inst = main.irc["_"]
return inst
2022-07-21 12:39:41 +00:00
def substitute(net, num, token=None):
inst = selectInst(net)
alias = main.alias[num]
2022-07-21 12:39:52 +00:00
gotemail = False
if "emails" in alias:
2022-07-21 12:39:46 +00:00
# First priority is explicit email lists
2022-07-21 12:39:52 +00:00
if alias["emails"]:
email = choice(alias["emails"])
gotemail = True
if "domains" in inst:
if inst["domains"]:
if not gotemail:
domain = choice(inst["domains"])
email = f"{alias['nickname']}@{domain}"
gotemail = True
if not gotemail:
error(f"Could not get email for {net} - {num}")
return False
nickname = alias["nick"]
2022-07-21 12:40:05 +00:00
# username = nickname + "/" + net
password = main.network[net].aliases[num]["password"]
2022-07-21 12:40:03 +00:00
# inst["email"] = inst["email"].replace("{nickname}", nickname)
name = f"{net}{num}"
if name in main.IRCPool:
curnick = main.IRCPool[name].nickname
else:
curnick = nickname
for i in inst.keys():
if not isinstance(inst[i], str):
continue
inst[i] = inst[i].replace("{nickname}", nickname)
inst[i] = inst[i].replace("{curnick}", curnick)
inst[i] = inst[i].replace("{password}", password)
2022-07-21 12:39:46 +00:00
inst[i] = inst[i].replace("{email}", email)
if token:
inst[i] = inst[i].replace("{token}", token)
return inst
2022-07-21 12:39:41 +00:00
def registerAccount(net, num):
debug("Attempting to register: %s - %i" % (net, num))
sinst = substitute(net, num)
2022-07-21 12:39:52 +00:00
if not sinst:
error(f"Register account failed for {net} - {num}")
return
if not sinst["register"]:
error("Cannot register for %s: function disabled" % (net))
return False
2022-07-21 12:39:41 +00:00
name = net + str(num)
main.IRCPool[name].msg(sinst["entity"], sinst["registermsg"])
2022-07-21 12:39:41 +00:00
def confirmAccount(net, num, token):
sinst = substitute(net, num, token=token)
2022-07-21 12:39:41 +00:00
name = net + str(num)
main.IRCPool[name].msg(sinst["entity"], sinst["confirm"])
enableAuthentication(net, num)
2022-07-21 12:39:41 +00:00
def confirmRegistration(net, num, negativepass=None):
obj = main.network[net]
2022-07-21 12:39:41 +00:00
name = net + str(num)
if name in main.IRCPool.keys():
2022-07-21 12:40:05 +00:00
if negativepass is not None:
main.IRCPool[name].regPing(negativepass=negativepass)
return
2022-07-21 12:39:41 +00:00
debug("Relay authenticated: %s - %i" % (net, num))
main.IRCPool[name].authenticated = True
main.IRCPool[name].recheckList()
if obj.relays[num]["registered"]:
return
if name in main.IRCPool.keys():
if main.IRCPool[name]._regAttempt:
try:
main.IRCPool[name]._regAttempt.cancel()
2022-07-21 12:40:05 +00:00
except: # noqa
pass
obj.relays[num]["registered"] = True
main.saveConf("network")
2022-07-21 12:39:41 +00:00
2022-08-14 12:13:05 +00:00
def attemptManualAuthentication(net, num):
sinst = substitute(net, num)
obj = main.network[net]
identifymsg = sinst["identifymsg"]
entity = sinst["entity"]
name = f"{net}{num}"
2022-08-14 12:51:13 +00:00
print(f"SENDING `{identifymsg}` TO `{entity}` ON {name}")
2022-08-14 12:13:05 +00:00
if name not in main.IRCPool:
return
main.IRCPool[name].msg(entity, identifymsg)
def enableAuthentication(net, num, jump=True, run_now=False):
obj = main.network[net]
nick = main.alias[num]["nick"]
auth = obj.auth
2022-08-14 12:13:05 +00:00
name = f"{net}{num}"
if name not in main.IRCPool:
return
2022-07-21 12:40:05 +00:00
# uname = main.alias[num]["nick"] + "/" + net
2022-08-14 12:13:05 +00:00
password = main.network[net].aliases[num]["password"]
2022-08-14 12:51:13 +00:00
provision.provisionAuthenticationData(num, nick, net, auth, password) # Set up for auth
2022-08-14 12:13:05 +00:00
if jump:
main.IRCPool[name].msg(main.config["Tweaks"]["ZNC"]["Prefix"] + "status", "Jump")
if run_now:
attemptManualAuthentication(net, num)
2022-07-21 12:40:05 +00:00
if selectInst(net)["check"] is False:
confirmRegistration(net, num)
2022-07-21 12:39:41 +00:00
def get_unregistered_relays(net=None):
"""
Get a dict of unregistereed relays, either globally or
for a network.
Returns:
{"net": [["nick1", 1], ["nick2", 2], ...]}
"""
unreg = {}
if net:
nets = [net]
else:
nets = main.network.keys()
for i in nets:
for num in main.network[i].relays.keys():
if not main.network[i].relays[num]["registered"]:
nick = main.alias[num]["nick"]
if net in unreg:
unreg[i].append([nick, num])
else:
unreg[i] = [[nick, num]]
return unreg
def registerTest(c):
sinst = substitute(c["net"], c["num"])
2022-07-21 12:39:41 +00:00
name = c["net"] + str(c["num"])
net = c["net"]
num = c["num"]
2022-07-21 12:40:05 +00:00
if sinst["check"] is False:
return
2022-07-21 12:40:05 +00:00
if "msg" in c.keys() and not c["msg"] is None:
if sinst["negative"]:
if name in main.IRCPool.keys():
2022-07-21 12:40:05 +00:00
if main.IRCPool[name]._negativePass is not True:
if c["type"] == "query" and c["nick"] == sinst["entity"]:
if sinst["checknegativemsg"] in c["msg"]:
2022-07-21 12:40:03 +00:00
confirmRegistration(
c["net"], c["num"], negativepass=False
) # Not passed negative check, report back
debug(
(
f"registerTest() {net} - {num} not passed negative:checknegativemsg "
f"check, {sinst['checknegativemsg']} present in message"
)
)
return
if sinst["checkendnegative"] in c["msg"]:
2022-07-21 12:40:03 +00:00
confirmRegistration(
c["net"], c["num"], negativepass=True
) # Passed the negative check, report back
debug(
(
f"registerTest() {net} - {num} passed negative:checkendnegative "
f"check, {sinst['checkendnegative']} present in message"
)
)
return
if sinst["ping"]:
if sinst["checkmsg2"] in c["msg"] and c["nick"] == sinst["entity"]:
confirmRegistration(c["net"], c["num"])
debug(
(
f"registerTest() {net} - {num} passed ping:checkmsg2 "
f"check, {sinst['checkmsg2']} present in message"
)
)
return
if sinst["checktype"] == "msg":
2022-08-13 22:38:13 +00:00
if sinst["checkmsg"] in c["msg"]:
confirmRegistration(c["net"], c["num"])
debug(
(
f"registerTest() {net} - {num} passed checktype:msg:checkmsg check, "
f"{sinst['checkmsg']} present in message"
)
)
2022-08-13 22:38:13 +00:00
return
elif sinst["checktype"] == "mode":
if c["type"] == "self":
if c["mtype"] == "mode":
2022-07-21 12:40:05 +00:00
if sinst["checkmode"] in c["mode"] and c["status"] is True:
confirmRegistration(c["net"], c["num"])
debug(
(
f"registerTest() {net} - {num} passed checktype:mode:checkmost check, "
f"{sinst['checkmode']} present in mode"
)
)
return