201 lines
6.2 KiB
Python
201 lines
6.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
CLI tool to handle operator repositories
|
|
"""
|
|
|
|
import argparse
|
|
import logging
|
|
import sys
|
|
from collections.abc import Iterator
|
|
from itertools import chain
|
|
from pathlib import Path
|
|
from typing import Optional, Union
|
|
|
|
from .checks import get_checks, run_suite
|
|
from .core import Bundle, Operator, Repo
|
|
from .exceptions import OperatorRepoException
|
|
|
|
|
|
def parse_target(repo: Repo, target: str) -> Union[Operator, Bundle]:
|
|
if "/" in target:
|
|
operator_name, operator_version = target.split("/", 1)
|
|
return repo.operator(operator_name).bundle(operator_version)
|
|
return repo.operator(target)
|
|
|
|
|
|
def indent(depth: int) -> str:
|
|
return " " * depth
|
|
|
|
|
|
def show_repo(repo: Repo, recursive: bool = False, depth: int = 0) -> None:
|
|
print(indent(depth) + str(repo))
|
|
for operator in repo:
|
|
if recursive:
|
|
show_operator(operator, recursive, depth + 1)
|
|
else:
|
|
print(indent(depth + 1) + str(operator))
|
|
|
|
|
|
def show_operator(operator: Operator, recursive: bool = False, depth: int = 0) -> None:
|
|
print(indent(depth) + str(operator))
|
|
for bundle in operator:
|
|
if recursive:
|
|
show_bundle(bundle, depth + 1)
|
|
else:
|
|
print(indent(depth + 1) + str(bundle))
|
|
|
|
|
|
def show_bundle(bundle: Bundle, depth: int = 0) -> None:
|
|
print(indent(depth) + str(bundle))
|
|
csv_annotations = bundle.csv.get("metadata", {}).get("annotations", {})
|
|
info = [
|
|
("Description", csv_annotations.get("description", "")),
|
|
("Name", f"{bundle.csv_operator_name}.v{bundle.csv_operator_version}"),
|
|
("Channels", ", ".join(bundle.channels)),
|
|
("Default channel", bundle.default_channel),
|
|
("Container image", csv_annotations.get("containerImage", "")),
|
|
("Replaces", bundle.csv.get("spec", {}).get("replaces", "")),
|
|
("Skips", bundle.csv.get("spec", {}).get("skips", [])),
|
|
]
|
|
max_width = max(len(key) for key, _ in info)
|
|
for key, value in info:
|
|
message = f"{key.ljust(max_width + 1)}: {value}"
|
|
print(indent(depth + 1) + message)
|
|
|
|
|
|
def show(target: Union[Repo, Operator, Bundle], recursive: bool = False) -> None:
|
|
if isinstance(target, Repo):
|
|
show_repo(target, recursive, 0)
|
|
elif isinstance(target, Operator):
|
|
show_operator(target, recursive, 1 * recursive)
|
|
elif isinstance(target, Bundle):
|
|
show_bundle(target, 2 * recursive)
|
|
|
|
|
|
def action_list(repo: Repo, *what: str, recursive: bool = False) -> None:
|
|
if what:
|
|
targets: Iterator[Union[Repo, Operator, Bundle]] = (
|
|
parse_target(repo, x) for x in what
|
|
)
|
|
else:
|
|
targets = iter([repo])
|
|
for target in targets:
|
|
show(target, recursive)
|
|
|
|
|
|
def _walk(target: Union[Operator, Bundle]) -> Iterator[Union[Operator, Bundle]]:
|
|
yield target
|
|
if isinstance(target, Operator):
|
|
yield from target.all_bundles()
|
|
|
|
|
|
def action_check(repo: Repo, suite: str, *what: str, recursive: bool = False) -> None:
|
|
if what:
|
|
targets: Iterator[Union[Operator, Bundle]] = (
|
|
parse_target(repo, x) for x in what
|
|
)
|
|
else:
|
|
targets = repo.all_operators()
|
|
if recursive:
|
|
all_targets: Iterator[Union[Operator, Bundle]] = chain.from_iterable(
|
|
_walk(x) for x in targets
|
|
)
|
|
else:
|
|
all_targets = targets
|
|
for result in run_suite(all_targets, suite_name=suite):
|
|
print(result)
|
|
|
|
|
|
def action_check_list(suite: str) -> None:
|
|
for check_type_name, checks in get_checks(suite).items():
|
|
print(f"{check_type_name} checks:")
|
|
for check in checks:
|
|
display_name = check.__name__.removeprefix("check_")
|
|
print(f" - {display_name}: {check.__doc__}")
|
|
|
|
|
|
def _get_repo(path: Optional[Path] = None) -> Repo:
|
|
if not path:
|
|
path = Path.cwd()
|
|
try:
|
|
return Repo(path)
|
|
except OperatorRepoException:
|
|
print(f"{path} is not a valid operator repository")
|
|
sys.exit(1)
|
|
|
|
|
|
def main() -> None:
|
|
main_parser = argparse.ArgumentParser(
|
|
description="Operator repository manipulation tool",
|
|
)
|
|
main_parser.add_argument(
|
|
"-r", "--repo", help="path to the root of the operator repository", type=Path
|
|
)
|
|
main_parser.add_argument(
|
|
"-v", "--verbose", action="count", default=0, help="increase log verbosity"
|
|
)
|
|
main_subparsers = main_parser.add_subparsers(dest="action")
|
|
|
|
# list
|
|
list_parser = main_subparsers.add_parser(
|
|
"list", aliases=["ls"], help="list contents of repo, operators or bundles"
|
|
)
|
|
list_parser.add_argument(
|
|
"-R", "--recursive", action="store_true", help="descend the tree"
|
|
)
|
|
list_parser.add_argument(
|
|
"target",
|
|
nargs="*",
|
|
help="name of the repos or bundles to list; if omitted, list the contents of the repo",
|
|
)
|
|
|
|
# check_bundle
|
|
check_parser = main_subparsers.add_parser(
|
|
"check",
|
|
help="check validity of an operator or bundle",
|
|
)
|
|
check_parser.add_argument(
|
|
"-s", "--suite", default="operator_repo.checks", help="check suite to use"
|
|
)
|
|
check_parser.add_argument(
|
|
"-l", "--list", action="store_true", help="list available checks"
|
|
)
|
|
check_parser.add_argument(
|
|
"-R", "--recursive", action="store_true", help="descend the tree"
|
|
)
|
|
check_parser.add_argument(
|
|
"target",
|
|
nargs="*",
|
|
help="name of the operators or bundles to check",
|
|
)
|
|
|
|
args = main_parser.parse_args()
|
|
|
|
verbosity = {0: logging.ERROR, 1: logging.WARNING, 2: logging.INFO}
|
|
log = logging.getLogger(__package__)
|
|
log.setLevel(verbosity.get(args.verbose, logging.DEBUG))
|
|
handler = logging.StreamHandler()
|
|
handler.setFormatter(
|
|
logging.Formatter("%(asctime)s %(name)-12s %(levelname)-8s %(message)s")
|
|
)
|
|
log.addHandler(handler)
|
|
|
|
if args.action in ("list", "ls"):
|
|
action_list(_get_repo(args.repo), *args.target, recursive=args.recursive)
|
|
elif args.action == "check":
|
|
if args.list:
|
|
action_check_list(args.suite)
|
|
else:
|
|
action_check(
|
|
_get_repo(args.repo),
|
|
args.suite,
|
|
*args.target,
|
|
recursive=args.recursive,
|
|
)
|
|
else:
|
|
main_parser.print_help()
|
|
|
|
|
|
if __name__ == "__main__": # pragma: no cover
|
|
main()
|