From ab5575fcbf26cd9dedc10f82e528f00ffa7444ef Mon Sep 17 00:00:00 2001 From: Edvard Rejthar Date: Thu, 11 Sep 2025 19:10:10 +0200 Subject: [PATCH] feat: intelmqctl interface --- intelmq/bin/intelmqctl.py | 187 +++--------------------- intelmq/bin/intelmqsetup.py | 29 +--- intelmq/lib/cli.py | 278 ++++++++++++++++++++++++++++++++++++ intelmq/lib/setup_cli.py | 29 ++++ setup.py | 1 + 5 files changed, 332 insertions(+), 192 deletions(-) create mode 100644 intelmq/lib/cli.py create mode 100644 intelmq/lib/setup_cli.py diff --git a/intelmq/bin/intelmqctl.py b/intelmq/bin/intelmqctl.py index 75cbafc720..ac622f7d75 100644 --- a/intelmq/bin/intelmqctl.py +++ b/intelmq/bin/intelmqctl.py @@ -29,8 +29,11 @@ from intelmq.lib.datatypes import ReturnType, MESSAGES, LogLevel from intelmq.lib.processmanager import * from intelmq.lib.pipeline import PipelineFactory +from intelmq.lib.cli import get_parser, run_handler import intelmq.lib.upgrades as upgrades +from mininterface import run as mrun + yaml = YAML(typ="safe", pure=True) try: @@ -213,167 +216,9 @@ def __init__(self, interactive: bool = False, returntype: ReturnType = ReturnTyp self.abort('Invalid process manager given: %r, should be one of %r.' '' % (self._processmanagertype, list(process_managers().keys()))) if self._interactive: - parser = argparse.ArgumentParser( - prog=APPNAME, - description=DESCRIPTION, - epilog=EPILOG, - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - - parser.add_argument('--version', '-v', action='version', version=VERSION) - parser.add_argument('--type', '-t', choices=[i.name.lower() for i in ReturnType], default='text', help='choose if it should return regular text or other machine-readable') - parser.add_argument('--quiet', '-q', action='store_true', help='Quiet mode, useful for reloads initiated scripts like logrotate') - - subparsers = parser.add_subparsers(title='subcommands') - - parser_list = subparsers.add_parser('list', help='Listing bots or queues') - parser_list.add_argument('kind', choices=['bots', 'queues', 'queues-and-status']) - parser_list.add_argument('--non-zero', '--quiet', '-q', action='store_true', - help='Only list non-empty queues ' - 'or the IDs of enabled bots.') - parser_list.add_argument('--count', '--sum', '-s', action='store_true', - help='Only show the total ' - 'number of messages in queues. ' - 'Only valid for listing queues.') - parser_list.add_argument('--configured', '-c', action='store_true', - help='Only show configured bots') - parser_list.set_defaults(func=self.list) - - parser_clear = subparsers.add_parser('clear', help='Clear a queue') - parser_clear.add_argument('queue', help='queue name') - parser_clear.set_defaults(func=self.clear_queue) - - parser_log = subparsers.add_parser('log', help='Get last log lines of a bot') - parser_log.add_argument('bot_id', help='bot id', choices=self._configured_bots_list()) - parser_log.add_argument('number_of_lines', help='number of lines', - default=10, type=int, nargs='?') - parser_log.add_argument('log_level', help='logging level', choices=[i.name for i in LogLevel], default='INFO', nargs='?') - parser_log.set_defaults(func=self.read_bot_log) - - parser_run = subparsers.add_parser('run', help='Run a bot interactively') - parser_run.add_argument('bot_id', choices=self._configured_bots_list()) - parser_run.add_argument('--loglevel', '-l', nargs='?', default=None, choices=[i.name for i in LogLevel]) - parser_run_subparsers = parser_run.add_subparsers(title='run-subcommands') - - parser_run_console = parser_run_subparsers.add_parser('console', help='Get a ipdb live console.') - parser_run_console.add_argument('console_type', nargs='?', - help='You may specify which console should be run. Default is ipdb (if installed)' - ' or pudb (if installed) or pdb but you may want to use another one.') - parser_run_console.set_defaults(run_subcommand="console") - - parser_run_message = parser_run_subparsers.add_parser('message', - help='Debug bot\'s pipelines. Get the message in the' - ' input pipeline, pop it (cut it) and display it, or' - ' send the message directly to bot\'s output pipeline(s).') - parser_run_message.add_argument('message_action_kind', - choices=["get", "pop", "send"], - help='get: show the next message in the source pipeline. ' - 'pop: show and delete the next message in the source pipeline ' - 'send: Send the given message to the destination pipeline(s).') - parser_run_message.add_argument('msg', nargs='?', help='If send was chosen, put here the message in JSON.') - parser_run_message.set_defaults(run_subcommand="message") - - parser_run_process = parser_run_subparsers.add_parser('process', help='Single run of bot\'s process() method.') - parser_run_process.add_argument('--show-sent', '-s', action='store_true', - help='If message is sent through, displays it.') - parser_run_process.add_argument('--dryrun', '-d', action='store_true', - help='Never really pop the message from the input pipeline ' - 'nor send to output pipeline.') - parser_run_process.add_argument('--msg', '-m', - help='Trick the bot to process this JSON ' - 'instead of the Message in its pipeline.') - parser_run_process.set_defaults(run_subcommand="process") - parser_run.set_defaults(func=self.bot_run) - - parser_check = subparsers.add_parser('check', - help='Check installation and configuration') - parser_check.add_argument('--quiet', '-q', action='store_true', - help='Only print warnings and errors.') - parser_check.add_argument('--no-connections', '-C', action='store_true', - help='Do not test the connections to services like redis.') - parser_check.set_defaults(func=self.check) - - parser_help = subparsers.add_parser('help', - help='Show the help') - parser_help.set_defaults(func=parser.print_help) - - parser_start = subparsers.add_parser('start', help='Start a bot or botnet') - parser_start.add_argument('bot_id', nargs='?', - choices=self._configured_bots_list()) - parser_start.add_argument('--group', help='Start a group of bots', - choices=BOT_GROUP.keys()) - parser_start.set_defaults(func=self.bot_start) - - parser_stop = subparsers.add_parser('stop', help='Stop a bot or botnet') - parser_stop.add_argument('bot_id', nargs='?', - choices=self._configured_bots_list()) - parser_stop.add_argument('--group', help='Stop a group of bots', - choices=BOT_GROUP.keys()) - parser_stop.set_defaults(func=self.bot_stop) - - parser_restart = subparsers.add_parser('restart', help='Restart a bot or botnet') - parser_restart.add_argument('bot_id', nargs='?', - choices=self._configured_bots_list()) - parser_restart.add_argument('--group', help='Restart a group of bots', - choices=BOT_GROUP.keys()) - parser_restart.set_defaults(func=self.bot_restart) - - parser_reload = subparsers.add_parser('reload', help='Reload a bot or botnet') - parser_reload.add_argument('bot_id', nargs='?', - choices=self._configured_bots_list()) - parser_reload.add_argument('--group', help='Reload a group of bots', - choices=BOT_GROUP.keys()) - parser_reload.set_defaults(func=self.bot_reload) - - parser_status = subparsers.add_parser('status', help='Status of a bot or botnet') - parser_status.add_argument('bot_id', nargs='?', - choices=self._configured_bots_list()) - parser_status.add_argument('--group', help='Get status of a group of bots', - choices=BOT_GROUP.keys()) - parser_status.set_defaults(func=self.bot_status) - - parser_status = subparsers.add_parser('enable', help='Enable a bot') - parser_status.add_argument('bot_id', - choices=self._configured_bots_list()) - parser_status.set_defaults(func=self.bot_enable) - - parser_status = subparsers.add_parser('disable', help='Disable a bot') - parser_status.add_argument('bot_id', - choices=self._configured_bots_list()) - parser_status.set_defaults(func=self.bot_disable) - - parser_upgrade_conf = subparsers.add_parser('upgrade-config', - help='Upgrade IntelMQ configuration to a newer version.') - parser_upgrade_conf.add_argument('-p', '--previous', - help='Use this version as the previous one.') - parser_upgrade_conf.add_argument('-d', '--dry-run', - action='store_true', default=False, - help='Do not write any files.') - parser_upgrade_conf.add_argument('-u', '--function', - help='Run this upgrade function.', - choices=upgrades.__all__) - parser_upgrade_conf.add_argument('-f', '--force', - action='store_true', - help='Force running the upgrade procedure.') - parser_upgrade_conf.add_argument('--state-file', - help='The state file location to use.', - default=STATE_FILE_PATH) - parser_upgrade_conf.add_argument('--no-backup', - help='Do not create backups of state and configuration files.', - action='store_true') - parser_upgrade_conf.set_defaults(func=self.upgrade_conf) - - parser_debug = subparsers.add_parser('debug', help='Get debugging output.') - parser_debug.add_argument('--get-paths', help='Give all paths', - action='append_const', dest='sections', - const='paths') - parser_debug.add_argument('--get-environment-variables', - help='Give environment variables', - action='append_const', dest='sections', - const='environment_variables') - parser_debug.set_defaults(func=self.debug) - - self.parser = parser + self.parser = mrun(get_parser(self, self._configured_bots_list(), BOT_GROUP.keys(), upgrades.__all__), + prog=APPNAME, description=DESCRIPTION, epilog=EPILOG, + add_version=VERSION) else: self._processmanager = process_managers()[self._processmanagertype]( self._interactive, @@ -404,15 +249,11 @@ def load_defaults_configuration(self, silent=False): setattr(self._parameters, option, value) def run(self): - results = None - args = self.parser.parse_args() - if 'func' not in args: - sys.exit(self.parser.print_help()) - args_dict = vars(args).copy() + m =self.parser + args = m.env self._quiet = args.quiet - self._returntype = ReturnType[args.type.upper()] - del args_dict['type'], args_dict['quiet'], args_dict['func'] + self._returntype = args.type self._logging_level = 'WARNING' if self._quiet else 'INFO' self._logger.setLevel(self._logging_level) @@ -427,7 +268,7 @@ def run(self): self._quiet ) - retval, results = args.func(**args_dict) + retval, results = run_handler(args.command) if self._returntype is ReturnType.JSON: print(json.dumps(results, indent=4)) @@ -438,6 +279,14 @@ def bot_run(self, **kwargs): # the bot_run method is special in that it mixes plain text # and json in its output, therefore it is printed here # and not in the calling `run` method. + + # Adapt to newer interface. NOTE we should rather refactor the _processmanager.bot_run to accept these parameters. + subc = kwargs["subcommand"] + del kwargs["subcommand"] + if kwargs["loglevel"]: + kwargs["loglevel"] = kwargs["loglevel"].value + kwargs = {**kwargs, **subc} + retval, results = self._processmanager.bot_run(**kwargs) print(results) return retval, None diff --git a/intelmq/bin/intelmqsetup.py b/intelmq/bin/intelmqsetup.py index 5d4f65d7c0..daf938671e 100755 --- a/intelmq/bin/intelmqsetup.py +++ b/intelmq/bin/intelmqsetup.py @@ -17,7 +17,6 @@ Pip does not (and cannot) create `/opt/intelmq`/user-given ROOT_DIR, as described in https://github.com/certtools/intelmq/issues/819 """ -import argparse import os import shutil import stat @@ -31,6 +30,8 @@ from tempfile import NamedTemporaryFile from typing import Optional +from mininterface import run as mrun + try: import intelmq_api import intelmq_api.version @@ -53,6 +54,7 @@ from intelmq import (CONFIG_DIR, DEFAULT_LOGGING_PATH, ROOT_DIR, VAR_RUN_PATH, VAR_STATE_PATH, STATE_FILE_PATH) from intelmq.bin.intelmqctl import IntelMQController +from intelmq.lib.setup_cli import SetupConfig FILE_OUTPUT_PATH = Path(VAR_STATE_PATH) / 'file-output/' @@ -298,31 +300,12 @@ def intelmqsetup_manager_generate(): def main(): - parser = argparse.ArgumentParser("Set's up directories and example " - "configurations for IntelMQ.") - parser.add_argument('--skip-ownership', action='store_true', - help='Skip setting file ownership') - parser.add_argument('--state-file', - help='The state file location to use.', - default=STATE_FILE_PATH) - parser.add_argument('--webserver-user', - help='The webserver to use instead of auto-detection.') - parser.add_argument('--webserver-configuration-directory', - help='The webserver configuration directory to use instead of auto-detection.') - parser.add_argument('--skip-api', - help='Skip set-up of intelmq-api.', - action='store_true') - parser.add_argument('--skip-webserver', - help='Skip all operations on the webserver configuration, affects the API and Manager.', - action='store_true') - parser.add_argument('--skip-manager', - help='Skip set-up of intelmq-manager.', - action='store_true') - args = parser.parse_args() + m = mrun(SetupConfig, ask_on_empty_cli=True) + args = m.env basic_checks(skip_ownership=args.skip_ownership) intelmqsetup_core(ownership=not args.skip_ownership, - state_file=args.state_file) + state_file=str(args.state_file)) if intelmq_api and not args.skip_api: print(f'Running setup for intelmq-api (version {intelmq_api.version.__version__}).') intelmqsetup_api(ownership=not args.skip_ownership, diff --git a/intelmq/lib/cli.py b/intelmq/lib/cli.py new file mode 100644 index 0000000000..1b9b571f55 --- /dev/null +++ b/intelmq/lib/cli.py @@ -0,0 +1,278 @@ +from dataclasses import asdict, dataclass +from typing import TYPE_CHECKING, Annotated, Literal, Optional, TypeVar + +from mininterface.cli import Positional +from tyro.conf import (DisallowNone, FlagCreatePairsOff, + OmitSubcommandPrefixes, arg) + +from intelmq import STATE_FILE_PATH +from intelmq.lib.datatypes import LogLevel + +if TYPE_CHECKING: + from ..bin.intelmqctl import IntelMQController + +from typing import Any, Callable, Type + +from .datatypes import ReturnType + +if TYPE_CHECKING: + BotId = str + GroupType = str + UpgradeType = str + + +# Instead of refactoring IntelMQController code (for backwards compatibility for the case someone uses it as a library), +# we need this handler to control IntelMQController methods. + +T = TypeVar("T", bound=Type) +_registry: dict[Type, Callable] = {} + + +def with_handler(handler: Callable): + """Decorator to register a callback to a dataclass.""" + + def wrapper(cls: T) -> T: + _registry[cls] = handler + return cls + + return wrapper + + +def run_handler(obj: Any): + """Run callback registered with `with_handler`.""" + cls = type(obj) + if cls not in _registry: + raise ValueError(f"No handler registered for {cls}") + return _registry[cls](**asdict(obj)) + + +# Args arguments + +def get_parser(ic: "IntelMQController", bot_ids, group_keys, upgrades): + + if not TYPE_CHECKING: + BotId = Literal[tuple(bot_ids)] + GroupType = Literal[tuple(group_keys)] + UpgradeType = Literal[tuple(upgrades)] + + # Helper dataclasses + + @dataclass + class Botted: + bot_id: BotId + + @dataclass + class Grouped(Botted): + bot_id: Positional[Optional[BotId]] = None + group: Optional[GroupType] = None + "group of bots" + + # Subcommands + + @dataclass + @with_handler(ic.list) + class List: + "Listing bots or queues" + + kind: Literal["bots", "queues", "queues-and-status"] + non_zero: Annotated[bool, arg(aliases=["-q", "--quiet"])] = False + "Only list non-empty queues or the IDs of enabled bots." + count: Annotated[bool, arg(aliases=["-s", "--sum"])] = False + "Only show the total number of messages in queues. Only valid for listing queues." + configured: Annotated[bool, arg(aliases=["-c"])] = False + "Only show configured bots" + + @dataclass + @with_handler(ic.clear_queue) + class Clear: + "Clear a queue" + + queue: str + "queue name" + + @dataclass + @with_handler(ic.read_bot_log) + class Log: + "Get last log lines of a bot" + + bot_id: BotId + number_of_lines: int = 10 + "number of lines" + log_level: Annotated[LogLevel, arg(aliases=["-l"])] = LogLevel.INFO + + @dataclass + class Console: + "Get a ipdb live console." + + console_type: Optional[str] = None + """You may specify which console should be run. Default is ipdb (if installed) + or pudb (if installed) or pdb but you may want to use another one.""" + + @dataclass + class Message: + """Debug bot's pipelines. Get the message in the + input pipeline, pop it (cut it) and display it, or + send the message directly to bot's output pipeline(s).""" + + message_action_kind: Literal["get", "pop", "send"] + """get: show the next message in the source pipeline. + pop: show and delete the next message in the source pipeline + send: Send the given message to the destination pipeline(s).""" + + msg: Optional[str] = None + "If send was chosen, put here the message in JSON." + + @dataclass + class Process: + """Single run of bot's process() method.""" + + show_sent: Annotated[bool, arg(aliases=["-s"])] = False + "If message is sent through, displays it." + dryrun: Annotated[bool, arg(aliases=["-d"])] = False + """Never really pop the message from the input pipeline + nor send to output pipeline.""" + msg: Annotated[Optional[str], arg(aliases=["-m"])] = None + """Trick the bot to process this JSON + instead of the Message in its pipeline.""" + + @dataclass + @with_handler(ic.bot_run) + class Run: + """Run a bot interactively""" + + subcommand: Console | Message | Process + + bot_id: BotId + loglevel: LogLevel | None = None + + @dataclass + @with_handler(ic.check) + class Check: + "Check installation and configuration" + + quiet: Annotated[bool, arg(aliases=["-q"])] = False + "Only print warnings and errors." + no_connections: Annotated[bool, arg(aliases=["-C"])] = False + "Do not test the connections to services like redis." + + @dataclass + @with_handler(ic.bot_start) + class Start(Grouped): + "Start a bot or a botnet" + + pass + + @dataclass + @with_handler(ic.bot_stop) + class Stop(Grouped): + "Stop a bot or a botnet" + + pass + + @dataclass + @with_handler(ic.bot_restart) + class Restart(Grouped): + "Restart a bot or a botnet" + + pass + + @dataclass + @with_handler(ic.bot_reload) + class Reload(Grouped): + "Reload a bot or a botnet" + + pass + + @dataclass + @with_handler(ic.bot_status) + class Status(Grouped): + "Get status of a bot or a botnet" + + pass + + @dataclass + @with_handler(ic.bot_enable) + class Enable(Botted): + "Enable a bot" + + pass + + @dataclass + @with_handler(ic.bot_disable) + class Disable(Botted): + "Disable a bot" + + pass + + @dataclass + @with_handler(ic.upgrade_conf) + class UpgradeConfig: + "Upgrade IntelMQ configuration to a newer version." + + previous: Annotated[Optional[str], arg(aliases=["-p"])] = None + "Use this version as the previous one." + dry_run: Annotated[bool, arg(aliases=["-d"])] = False + "Do not write any files." + function: Annotated[Optional[UpgradeType], arg(aliases=["-u"])] = None + "Run this upgrade function." + force: Annotated[bool, arg(aliases=["-f", "--force"])] = False + "Force running the upgrade procedure." + state_file: str = STATE_FILE_PATH + "The state file location to use." + no_backup: bool = False + "Do not create backups of state and configuration files." + + # Just a helper function to adapt to `ic.debug`. I recommend refactor `ic.debug` to get rid of this. + def debug_adapt(get_paths, get_environment_variables): + sections = [ + name + for cond, name in ( + (get_paths, "paths"), + (get_environment_variables, "environment_variables"), + ) + if cond + ] + if not sections: + sections = None + return ic.debug(sections) + + @dataclass + @with_handler(debug_adapt) + class Debug: + "Get debugging output." + + get_paths: bool = False + "Give all paths." + + get_environment_variables: bool = False + "Give environment variables." + + # Top-level subcommands + @dataclass + class Env: + command: Annotated[ + List + | Clear + | Log + | Run + | Check + | Start + | Stop + | Restart + | Reload + | Status + | Enable + | Disable + | UpgradeConfig + | Debug, + FlagCreatePairsOff, + OmitSubcommandPrefixes, + DisallowNone, + ] + + type: ReturnType = ReturnType.TEXT + """choose if it should return regular text or other machine-readable""" + quiet: Annotated[bool, arg(aliases=["-q"])] = False + """Quiet mode, useful for reloads initiated scripts like logrotate""" + + return Env diff --git a/intelmq/lib/setup_cli.py b/intelmq/lib/setup_cli.py new file mode 100644 index 0000000000..ecf93cd41f --- /dev/null +++ b/intelmq/lib/setup_cli.py @@ -0,0 +1,29 @@ +from dataclasses import dataclass +from pathlib import Path +from typing import Optional +from intelmq import STATE_FILE_PATH + +@dataclass +class SetupConfig: + "Set's up directories and example configurations for IntelMQ." + + skip_ownership: bool = False + "Skip setting file ownership." + + state_file: Path = Path(STATE_FILE_PATH) + "The state file location to use." + + webserver_user: Optional[str] = None + "The webserver to use instead of auto-detection." + + webserver_configuration_directory: Optional[str] = None + "The webserver configuration directory to use instead of auto-detection." + + skip_api: bool = False + "Skip set-up of intelmq-api." + + skip_webserver: bool = False + "Skip all operations on the webserver configuration, affects the API and Manager." + + skip_manager: bool = False + "Skip set-up of intelmq-manager." diff --git a/setup.py b/setup.py index 888ec8efcb..6c6c1c4f9a 100644 --- a/setup.py +++ b/setup.py @@ -11,6 +11,7 @@ REQUIRES = [ 'dnspython>=2.0.0', + 'mininterface[basic]<2', 'psutil>=1.2.1', 'python-dateutil>=2.5', 'python-termstyle>=0.1.10',