import main import argparse import sys from io import StringIO from yaml import dump class Mon: def __init__(self, register): register("mon", self.mon) def setup_arguments(self, ArgumentParser): self.parser = ArgumentParser(prog="mon", description="Manage monitors. Extremely flexible. All arguments are optional.") group1 = self.parser.add_mutually_exclusive_group(required=True) group1.add_argument("-a", "--add", metavar="entry", dest="addEntry", help="Add an entry") group1.add_argument("-d", "--del", metavar="entry", dest="delEntry", help="Delete an entry") group1.add_argument("-l", "--list", action="store_true", dest="listEntry", help="List all entries") group1.add_argument("-m", "--mod", metavar="entry", dest="modEntry", help="Modify an entry") group2 = self.parser.add_mutually_exclusive_group() group2.add_argument("-p", "--append", action="store_true", dest="doAppend", help="Append entries to lists instead of replacing") group2.add_argument("-r", "--remove", action="store_true", dest="doRemove", help="Remove entries in lists instead of replacing") self.parser.add_argument("--inside", action="store_true", dest="inside", help="Use x in y logic for matching instead of comparing exact values") self.parser.add_argument("--outside", action="store_false", dest="inside", help="Don't use x in y logic for matching instead of comparing exact values") self.parser.add_argument("--type", nargs="*", metavar="type", dest="specType", help="Specify type of spec matching. Available types: join, part, quit, msg, topic, mode, nick, kick, notice, action, who") self.parser.add_argument("--free", nargs="*", metavar="query", dest="free", help="Use freeform matching") self.parser.add_argument("--exact", nargs="*", metavar="query", dest="exact", help="Use exact matching") self.parser.add_argument("--nick", nargs="*", metavar="nickname", dest="nick", help="Use nickname matching") self.parser.add_argument("--ident", nargs="*", metavar="ident", dest="ident", help="Use ident matching") self.parser.add_argument("--host", nargs="*", metavar="host", dest="host", help="Use host matching") self.parser.add_argument("--real", nargs="*", metavar="realname", dest="real", help="Use real name (GECOS) matching. Works with types: who") self.parser.add_argument("--source", nargs="*", action="append", metavar=("network", "channel"), dest="source", help="Target network and channel. Works with types: join, part, msg, topic, mode, kick, notice, action (can be specified multiple times)") self.parser.add_argument("--message", nargs="*", action="append", metavar="message", dest="message", help="Message. Works with types: part, quit, msg, topic, kick, notice, action") self.parser.add_argument("--user", nargs="*", metavar="user", dest="user", help="User (new nickname or kickee). Works with types: kick, nick") self.parser.add_argument("--modes", nargs="*", metavar="modes", dest="modes", help="Modes. Works with types: mode") self.parser.add_argument("--modeargs", nargs="*", metavar="modeargs", dest="modeargs", help="Mode arguments. Works with types: mode") self.parser.add_argument("--server", nargs="*", metavar="server", dest="server", help="Server. Works with types: who") self.parser.add_argument("--status", nargs="*", metavar="status", dest="status", help="Status. Works with types: who") self.parser.add_argument("--send", nargs="*", action="append", metavar=("network", "target"), dest="send", help="Network and target to send notifications to (can be specified multiple times)") def mon(self, addr, authed, data, obj, spl, success, failure, info, incUsage, length): # We need to override the ArgumentParser class because # it's only meant for CLI applications and exits # after running, so we override the exit function # to do nothing. We also need to do it here in order # to catch the error messages and show them to the user. # It sucks. I know. if authed: class ArgumentParser(argparse.ArgumentParser): def exit(self, status=0, message=None): if message: failure(message) self.setup_arguments(ArgumentParser) if length == 1: info("Incorrect usage, use mon -h for help") return old_stdout = sys.stdout old_stderr = sys.stderr my_stdout = sys.stdout = StringIO() my_stderr = sys.stderr = StringIO() try: parsed = self.parser.parse_args(spl[1:]) except: return sys.stdout = old_stdout sys.stderr = old_stderr stdout = my_stdout.getvalue() stderr = my_stdout.getvalue() if not stdout == "": info(stdout) elif not stderr == "": failure(my_stdout.getvalue()) return my_stdout.close() my_stderr.close() if parsed.addEntry: if parsed.addEntry in main.monitor.keys(): failure("Monitor group already exists: %s, specify --mod to change" % parsed.addEntry) return cast = self.makeCast(parsed, failure, info) if cast == False: return main.monitor[parsed.addEntry] = cast main.saveConf("monitor") success("Successfully created monitor group %s" % parsed.addEntry) return elif parsed.delEntry: if not parsed.delEntry in main.monitor.keys(): failure("No such monitor group: %s" % parsed.delEntry) return del main.monitor[parsed.delEntry] main.saveConf("monitor") success("Successfully removed monitor group: %s" % parsed.delEntry) return elif parsed.modEntry: if not parsed.doAppend and not parsed.doRemove: failure("Specify --append or --remove with --mod") return if not parsed.modEntry in main.monitor.keys(): failure("No such monitor group: %s" % parsed.modEntry) return cast = self.makeCast(parsed, failure, info) if cast == False: return if parsed.doAppend: merged = self.addCast(main.monitor[parsed.modEntry], cast, info) main.monitor[parsed.modEntry] = merged elif parsed.doRemove: merged = self.subtractCast(main.monitor[parsed.modEntry], cast, info) if merged == {}: del main.monitor[parsed.modEntry] info("Group %s deleted due to having no attributes" % parsed.modEntry) main.saveConf("monitor") return main.monitor[parsed.modEntry] = merged main.saveConf("monitor") success("Successfully updated entry %s" % parsed.modEntry) return elif parsed.listEntry: info(dump(main.monitor)) return else: incUsage(None) def dedup(self, data): if not isinstance(data, bool): return list(set(data)) return data def parseNetworkFormat(self, lst, failure, info): cast = {} if lst == None: return "nil" if len(lst) == 0: failure("List has no entries") return False elif len(lst) > 0: if len(lst[0]) == 0: failure("Nested list has no entries") return False for i in lst: if not i[0] in main.pool.keys(): failure("Name does not exist: %s" % i[0]) return False if i[0] in main.IRCPool.keys(): for x in i[1:]: if not x in main.IRCPool[i[0]].channels: info("%s: Bot not on channel: %s" % (i[0], x)) if len(i) == 1: cast[i[0]] = True else: if i[0] in cast.keys(): if not cast[i[0]] == True: for x in self.dedup(i[1:]): cast[i[0]].append(x) else: cast[i[0]] = self.dedup(i[1:]) else: cast[i[0]] = self.dedup(i[1:]) for i in cast.keys(): deduped = self.dedup(cast[i]) cast[i] = deduped return cast # Create or modify a monitor group magically def makeCast(self, obj, failure, info): validTypes = ["join", "part", "quit", "msg", "topic", "mode", "nick", "kick", "notice", "action", "who"] cast = {} if not obj.specType == None: types = self.dedup(obj.specType) for i in types: if not i in validTypes: failure("Invalid type: %s" % i) info("Available types: %s" % ", ".join(validTypes)) return False cast["type"] = types if not obj.source == None: sourceParse = self.parseNetworkFormat(obj.source, failure, info) if not sourceParse: return False if not sourceParse == "nil": cast["sources"] = sourceParse if not obj.send == None: sendParse = self.parseNetworkFormat(obj.send, failure, info) if not sendParse: return False if not sendParse == "nil": cast["send"] = sendParse if not obj.message == None: cast["message"] = [] for i in obj.message: cast["message"].append(" ".join(i)) if not obj.user == None: cast["user"] = obj.user if not obj.modes == None: cast["modes"] = obj.modes if not obj.modeargs == None: cast["modeargs"] = obj.modeargs if not obj.server == None: cast["server"] = obj.server if not obj.status == None: cast["status"] = obj.status if not obj.free == None: cast["free"] = obj.free if not obj.exact == None: cast["exact"] = obj.exact if not obj.nick == None: cast["nick"] = obj.nick if not obj.ident == None: cast["ident"] = obj.ident if not obj.host == None: cast["host"] = obj.host if not obj.real == None: cast["real"] = [] for i in obj.real: cast["real"].append(" ".join(i)) if not obj.inside == None: cast["inside"] = obj.inside return cast def subtractCast(self, source, patch, info): for i in patch.keys(): if i in source.keys(): if isinstance(source[i], dict): result = self.subtractCast(source[i], patch[i], info) if result == {}: info("Removing upper element: %s" % i) del source[i] continue source[i] = result continue if isinstance(patch[i], bool): del source[i] info("Removing entire element: %s" % i) continue for x in patch[i]: if isinstance(source[i], bool): info("Attempt to remove %s from network-wide definition" % x) continue if x in source[i]: source[i].remove(x) if source[i] == []: del source[i] else: info("Element %s not in source %s" % (x, i)) else: info("Non-matched key: %s" % i) return source def addCast(self, source, patch, info): for i in patch.keys(): if i in source.keys(): if isinstance(source[i], dict): result = self.addCast(source[i], patch[i], info) source[i] = result continue if isinstance(patch[i], bool): source[i] = patch[i] info("Overriding local element %s with network-wide definition" % i) continue for x in patch[i]: if isinstance(source[i], bool): source[i] = [] info("Overriding element %s, previously set network-wide" % i) if x in source[i]: info("Element %s already in source %s" % (x, i)) else: source[i].append(x) else: source[i] = patch[i] return source