Skip to content

Commit

Permalink
Linting updates. (unioslo#187)
Browse files Browse the repository at this point in the history
Linting:

  - Add "D": pydocstyle.
  - Add "F": Pyflakes
  - Remove isort and black from dependencies as all listing and formatting is done by ruff.
  - Adhere to advice from ruff developers for formatting rules.
  - Make `tox -e lint` validate/check-only.
  - Add `tox -e lint-fix` to format and fix linting where possible.

CI:

  - Add tox to CI.

Typing:

  - Fix all hard-failing type definitions.
  - Create types for IP_version (4 or 6) or IP_network (IPv4Network or IPv6network)
  • Loading branch information
terjekv authored Dec 13, 2023
1 parent f643c2a commit 939f428
Show file tree
Hide file tree
Showing 25 changed files with 1,200 additions and 549 deletions.
1 change: 1 addition & 0 deletions mreg_cli/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""The mreg_cli package."""
2 changes: 1 addition & 1 deletion mreg_cli/__init__.pyi
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .log import *
from .log import * # noqa
1 change: 1 addition & 0 deletions mreg_cli/__main__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Entry point for the mreg_cli application."""
from . import main

if __name__ == "__main__":
Expand Down
28 changes: 23 additions & 5 deletions mreg_cli/bacnet.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
"""Backnet commands for the mreg_cli."""

import argparse

from .cli import Flag
from .history import history
from .host import host
from .log import cli_error, cli_info
from .outputmanager import OutputManager
from .util import delete, get, get_list, host_info_by_name, post

BACNET_MAX_ID = 4194302


def bacnetid_add(args) -> None:
def bacnetid_add(args: argparse.Namespace) -> None:
"""Assign a BACnet ID to the host.
:param args: argparse.Namespace (name, id)
"""
info = host_info_by_name(args.name)
if "bacnetid" in info and info["bacnetid"] is not None:
cli_error("{} already has BACnet ID {}.".format(info["name"], info["bacnetid"]["id"]))
Expand Down Expand Up @@ -39,7 +49,11 @@ def bacnetid_add(args) -> None:
)


def bacnetid_remove(args) -> None:
def bacnetid_remove(args: argparse.Namespace) -> None:
"""Unassign the BACnet ID from the host.
:param args: argparse.Namespace (name)
"""
info = host_info_by_name(args.name)
if "bacnetid" not in info or info["bacnetid"] is None:
cli_error("{} does not have a BACnet ID assigned.".format(info["name"]))
Expand All @@ -63,7 +77,11 @@ def bacnetid_remove(args) -> None:
)


def bacnetid_list(args) -> None:
def bacnetid_list(args: argparse.Namespace) -> None:
"""Find/list BACnet IDs and hostnames by ID.
:param args: argparse.Namespace (min, max)
"""
minval = 0
if args.min is not None:
minval = args.min
Expand All @@ -72,8 +90,8 @@ def bacnetid_list(args) -> None:
maxval = 4194302
if args.max is not None:
maxval = args.max
if maxval > 4194302:
cli_error("The maximum ID value is 4194302.")
if maxval > BACNET_MAX_ID:
cli_error(f"The maximum ID value is {BACNET_MAX_ID}.")
r = get_list("/api/v1/bacnet/ids/", {"id__range": "{},{}".format(minval, maxval)})
OutputManager().add_formatted_table(("ID", "Hostname"), ("id", "hostname"), r)

Expand Down
93 changes: 64 additions & 29 deletions mreg_cli/cli.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,46 @@
"""Command line interface for mreg.
This file contains the main CLI class and the top level parser.
"""

import argparse
import html
import os
import shlex
import sys
from typing import Generator, List
from typing import Any, Callable, Generator, List, NoReturn, Union

from prompt_toolkit import HTML, print_formatted_text
from prompt_toolkit.completion import Completer, Completion
from prompt_toolkit import HTML, document, print_formatted_text
from prompt_toolkit.completion import CompleteEvent, Completer, Completion

from . import util
from .exceptions import CliError, CliWarning
from .outputmanager import OutputManager, remove_comments


class CliExit(Exception):
"""Exception used to exit the CLI."""

pass


class Flag:
"""Class for flag information available to commands in the CLI."""

def __init__(
self,
name,
description="",
short_desc="",
nargs=None,
default=None,
flag_type=None,
choices=None,
required=False,
metavar=None,
action=None,
name: str,
description: str = "",
short_desc: str = "",
nargs: int = None,
default: Any = None,
flag_type: Any = None,
choices: List[str] = None,
required: bool = False,
metavar: str = None,
action: str = None,
):
"""Initialize a Flag object."""
self.name = name
self.short_desc = short_desc
self.description = description
Expand All @@ -43,7 +53,8 @@ def __init__(
self.action = action


def _create_command_group(parent):
def _create_command_group(parent: argparse.ArgumentParser):
"""Create a sub parser for a command."""
parent_name = parent.prog.strip()

if parent_name:
Expand All @@ -64,14 +75,16 @@ def _create_command_group(parent):


class Command(Completer):
"""Command is a class which acts as a wrapper around argparse and
prompt_toolkit.
"""Command class for the CLI.
Wrapper around argparse.ArgumentParser and prompt_toolkit.
"""

# Used to detect an error when running commands from a source file.
last_errno = 0

def __init__(self, parser, flags, short_desc):
def __init__(self, parser: argparse.ArgumentParser, flags: List[Flag], short_desc: str):
"""Initialize a Command object."""
self.parser = parser
# sub is an object used for creating sub parser for this command. A
# command/ArgParser can only have one of this object.
Expand All @@ -85,9 +98,17 @@ def __init__(self, parser, flags, short_desc):
self.flags[flag.name.lstrip("-")] = flag

def add_command(
self, prog, description, short_desc="", epilog=None, callback=None, flags=None
self,
prog: str,
description: str,
short_desc: str = "",
epilog: str = None,
callback: Callable[[argparse.ArgumentParser], None] = None,
flags: Union[List[Flag], None] = None,
):
""":param flags: a list of Flag objects. NB: must be handled as read-only,
"""Add a command to the current parser.
:param flags: a list of Flag objects. NB: must be handled as read-only,
since the default value is [].
:return: the Command object of the new command.
"""
Expand Down Expand Up @@ -127,7 +148,6 @@ def add_command(

def parse(self, command: str) -> None:
"""Parse and execute a command."""

args = shlex.split(command, comments=True)

try:
Expand Down Expand Up @@ -155,12 +175,26 @@ def parse(self, command: str) -> None:
# code.
self.last_errno = 0

def get_completions(self, document, complete_event):
def get_completions(
self, document: document.Document, complete_event: CompleteEvent
) -> Generator[Union[Completion, Any], Any, None]:
"""Prepare completions for the current command.
:param document: The current document.
:param complete_event: The current complete event.
:yields: Completions for the current command.
"""
cur = document.get_word_before_cursor()
words = document.text.strip().split(" ")
yield from self.complete(cur, words)

def complete(self, cur, words):
def complete(self, cur: str, words: str) -> Generator[Union[Completion, Any], Any, None]:
"""Generate completions during typing.
:param cur: The current word.
:param words: The current line split into words.
"""
# if line is empty suggest all sub commands
if not words:
for name in self.children:
Expand Down Expand Up @@ -231,19 +265,19 @@ def process_command_line(self, line: str) -> None:
cli = Command(_top_parser, list(), "")


def _quit(args):
def _quit(args: argparse.Namespace) -> NoReturn:
raise CliExit


def _start_recording(args) -> None:
def _start_recording(args: argparse.Namespace) -> None:
"""Start recording commands and output to the given file."""
if not args.filename:
raise CliError("No filename given.")

OutputManager().start_recording(args.filename)


def _stop_recording(args):
def _stop_recording(args: argparse.Namespace):
"""Stop recording commands and output to the given file."""
OutputManager().save_recording()

Expand All @@ -264,7 +298,8 @@ def _stop_recording(args):
)


def logout(args):
def logout(args: argparse.Namespace):
"""Log out from mreg and exit. Will delete token."""
util.logout()
raise CliExit

Expand Down Expand Up @@ -348,10 +383,10 @@ def source(files: List[str], ignore_errors: bool, verbose: bool) -> Generator[st
print_formatted_text(f"Permission denied: '{filename}'")


def _source(args):
"""Wrapper for the source function to integrate with the CLI.
def _source(args: argparse.Namespace):
"""Source command for the CLI.
:param args: Arguments from the CLI.
:param args: The arguments passed to the command.
"""
for command in source(args.files, args.ignore_errors, args.verbose):
# Process each command here as needed, similar to the main loop
Expand Down
24 changes: 14 additions & 10 deletions mreg_cli/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Logging
-------
"""Logging.
This module can be used to configure basic logging to stderr, with an optional
filter level.
Expand All @@ -13,9 +13,11 @@
2. :py:const:`logging.INFO`
3. :py:const:`logging.DEBUG`
"""

import logging
import os
import sys
from typing import Tuple, Union

logger = logging.getLogger(__name__)

Expand All @@ -39,15 +41,15 @@
LOGGING_FORMAT = "%(levelname)s - %(name)s - %(message)s"

# Verbosity count to logging level
LOGGING_VERBOSITY = (
LOGGING_VERBOSITY: Tuple[int, int, int, int] = (
logging.ERROR,
logging.WARNING,
logging.INFO,
logging.DEBUG,
)


def get_verbosity(verbosity):
def get_verbosity(verbosity: int) -> int:
"""Translate verbosity to logging level.
Levels are traslated according to :py:const:`LOGGING_VERBOSITY`.
Expand All @@ -60,18 +62,18 @@ def get_verbosity(verbosity):
return level


def configure_logging(level):
def configure_logging(level: int = logging.INFO) -> None:
"""Enable and configure logging.
:param int level: logging level
:param int level: logging level, defaults to :py:const:`logging.INFO`
"""
logging.basicConfig(level=level, format=LOGGING_FORMAT)


def get_config_file():
""":return:
returns the best (first) match from DEFAULT_CONFIG_PATH, or
None if no file was found.
def get_config_file() -> Union[str, None]:
"""Get the first config file found in DEFAULT_CONFIG_PATH.
:returns: path to config file, or None if no config file was found
"""
for path in DEFAULT_CONFIG_PATH:
logger.debug("looking for config in %r", os.path.abspath(path))
Expand All @@ -83,10 +85,12 @@ def get_config_file():


def get_default_domain():
"""Get the default domain from the application."""
return DEFAULT_DOMAIN


def get_default_url():
"""Get the default url from the application."""
for url in (os.environ.get("MREGCLI_DEFAULT_URL"), DEFAULT_URL):
if url is not None:
return url
Expand Down
Loading

0 comments on commit 939f428

Please sign in to comment.