1
0
Fork 0

Refactor checks to support alternate suites

This commit is contained in:
Maurizio Porrato 2023-08-12 14:49:35 +01:00
parent 69d15c66d5
commit d524a042b8
7 changed files with 134 additions and 62 deletions

View File

@ -0,0 +1,67 @@
import importlib
from collections.abc import Callable, Iterable, Mapping
from inspect import getmembers, isfunction
from .. import Bundle, Operator
class CheckResult:
severity: int = 0
display: str = "UNKNOWN"
reason: str
def __init__(self, reason: str):
self.reason = reason
def __str__(self):
return f"{self.display}: {self.reason}"
def __repr__(self):
return f"{self.display}({self.reason})"
def __int__(self):
return self.severity
def __lt__(self, other):
return int(self) < int(other)
class Warn(CheckResult):
severity = 40
display = "WARNING"
class Fail(CheckResult):
severity = 90
display = "FAILURE"
SUPPORTED_TYPES = [("operator", Operator), ("bundle", Bundle)]
Check = Callable[[Operator | Bundle], Iterable[CheckResult]]
def get_checks(
suite_name: str = "operator_repo.checks",
) -> Mapping[str, Iterable[Check]]:
result = {}
for module_name, _ in SUPPORTED_TYPES:
result[module_name] = []
try:
module = importlib.import_module(f"{suite_name}.{module_name}")
for check_name, check in getmembers(module, isfunction):
if check_name.startswith("check_"):
result[module_name].append(check)
except ModuleNotFoundError:
pass
return result
def run_suite(
targets: Iterable[Operator | Bundle], suite_name: str = "operator_repo.checks"
) -> Iterable[CheckResult]:
checks = get_checks(suite_name)
for target in targets:
for target_type_name, target_type in SUPPORTED_TYPES:
if isinstance(target, target_type):
for check in checks.get(target_type_name, []):
yield from check(target)

View File

@ -1,50 +1,59 @@
from typing import Iterator, Tuple
from typing import Iterator
from semver import Version
from .. import Bundle
from ..utils import lookup_dict
from . import CheckResult, Fail, Warn
def check_operator_name(bundle: Bundle) -> Iterator[Tuple[str, str]]:
def check_operator_name(bundle: Bundle) -> Iterator[CheckResult]:
"""Check if the operator names used in CSV, metadata and filesystem are consistent"""
name = bundle.annotations.get("operators.operatorframework.io.bundle.package.v1")
if name is None:
yield "fail", "Bundle does not define the operator name in annotations.yaml"
yield Fail(f"Bundle does not define the operator name in annotations.yaml")
return
if name != bundle.csv_operator_name:
yield "fail", f"Operator name from annotations.yaml ({name}) does not match the name defined in the CSV ({bundle.csv_operator_name})"
yield Fail(
f"Operator name from annotations.yaml ({name}) does not match the name defined in the CSV ({bundle.csv_operator_name})"
)
if name != bundle.operator_name:
yield "warn", f"Operator name from annotations.yaml ({name}) does not match the operator's directory name ({bundle.operator_name})"
yield Fail(
f"Operator name from annotations.yaml ({name}) does not match the operator's directory name ({bundle.operator_name})"
)
def check_image(bundle: Bundle) -> Iterator[Tuple[str, str]]:
def check_image(bundle: Bundle) -> Iterator[CheckResult]:
"""Check if containerImage is properly defined and used in a deployment"""
try:
container_image = lookup_dict(bundle.csv, "metadata.annotations.containerImage")
if container_image is None:
yield "fail", "CSV doesn't define .metadata.annotations.containerImage"
yield Fail(f"CSV doesn't define .metadata.annotations.containerImage")
return
deployments = lookup_dict(bundle.csv, "spec.install.spec.deployments")
if deployments is None:
yield "fail", "CSV doesn't define .spec.install.spec.deployments"
yield Fail(f"CSV doesn't define .spec.install.spec.deployments")
return
for deployment in deployments:
containers = lookup_dict(deployment, "spec.template.spec.containers", [])
if any(container_image == x.get("image") for x in containers):
return
yield "fail", f"container image {container_image} not used by any deployment"
yield Fail(f"container image {container_image} not used by any deployment")
except Exception as e:
yield "fail", str(e)
yield Fail(str(e))
def check_semver(bundle: Bundle) -> Iterator[Tuple[str, str]]:
def check_semver(bundle: Bundle) -> Iterator[CheckResult]:
"""Check that the bundle version is semver compliant"""
try:
_ = Version.parse(bundle.operator_version)
except ValueError:
yield "warn", f"Version from filesystem ({bundle.operator_version}) is not valid semver"
yield Warn(
f"Version from filesystem ({bundle.operator_version}) is not valid semver"
)
try:
_ = Version.parse(bundle.csv_operator_version)
except ValueError:
yield "warn", f"Version from CSV ({bundle.csv_operator_version}) is not valid semver"
yield Warn(
f"Version from CSV ({bundle.csv_operator_version}) is not valid semver"
)

View File

@ -1,9 +1,10 @@
from typing import Iterator, Tuple
from typing import Iterator
from .. import Operator
from . import CheckResult, Fail
def check_upgrade(operator: Operator) -> Iterator[Tuple[str, str]]:
def check_upgrade(operator: Operator) -> Iterator[CheckResult]:
"""Validate upgrade graphs for all channels"""
all_channels = operator.channels | {operator.default_channel} - {None}
for channel in sorted(all_channels):
@ -15,6 +16,8 @@ def check_upgrade(operator: Operator) -> Iterator[Tuple[str, str]]:
x for x in channel_bundles if x not in graph and x != channel_head
}
if dangling_bundles:
yield "fail", f"Channel {channel} has dangling bundles: {dangling_bundles}."
yield Fail(
f"Channel {channel} has dangling bundles: {dangling_bundles}."
)
except Exception as e:
yield "fail", str(e)
yield Fail(str(e))

View File

@ -409,7 +409,7 @@ class Operator:
) = replaced_bundle_name.split(".", 1)
if replaced_bundle_operator != bundle.csv_operator_name:
raise ValueError(
f"{self}: {bundle} replaces a bundle from a different operator"
f"{bundle} replaces a bundle from a different operator"
)
try:
replaced_bundle = version_to_bundle[

View File

@ -5,12 +5,11 @@
import argparse
import logging
from inspect import getmembers, isfunction
from itertools import chain
from pathlib import Path
from typing import Union
from typing import Iterator, Union
from .checks import bundle as bundle_checks
from .checks import operator as operator_checks
from .checks import get_checks, run_suite
from .classes import Bundle, Operator, Repo
@ -69,44 +68,36 @@ def action_list(repo_path, *what: str, recursive: bool = False) -> None:
_list(parse_target(repo, target), recursive)
def action_check_bundle(bundle: Bundle) -> None:
print(f"Checking {bundle}")
for check_name, check in getmembers(bundle_checks, isfunction):
if check_name.startswith("check_"):
for result, message in check(bundle):
print(f"{result.upper()}: {bundle}: {message}")
def _walk(target: Repo | Operator | Bundle) -> Iterator[Repo | Operator | Bundle]:
yield target
if isinstance(target, Repo):
for operator in target:
yield from _walk(operator)
elif isinstance(target, Operator):
yield from target.all_bundles()
def action_check_operator(operator: Operator) -> None:
print(f"Checking {operator}")
for check_name, check in getmembers(operator_checks, isfunction):
if check_name.startswith("check_"):
for result, message in check(operator):
print(f"{result.upper()}: {operator}: {message}")
def action_check(repo_path: Path, *what: str, recursive: bool = False) -> None:
def action_check(
repo_path: Path, suite: str, *what: str, recursive: bool = False
) -> None:
repo = Repo(repo_path)
for target in [parse_target(repo, x) for x in what] or sorted(repo):
if isinstance(target, Operator):
action_check_operator(target)
if recursive:
for bundle in sorted(target):
action_check_bundle(bundle)
elif isinstance(target, Bundle):
action_check_bundle(target)
if recursive:
if what:
targets = chain(_walk(parse_target(repo, x)) for x in what)
else:
targets = chain(_walk(x) for x in repo)
else:
targets = [parse_target(repo, x) for x in what] or repo.all_operators()
for result in run_suite(targets, suite_name=suite):
print(result)
def action_check_list() -> None:
for check_type_name, check_type in (
("Operator", operator_checks),
("Bundle", bundle_checks),
):
def action_check_list(suite: str) -> None:
for check_type_name, checks in get_checks(suite).items():
print(f"{check_type_name} checks:")
for check_name, check in getmembers(check_type, isfunction):
if check_name.startswith("check_"):
display_name = check_name.removeprefix("check_")
print(f" - {display_name}: {check.__doc__}")
for check in checks:
display_name = check.__name__.removeprefix("check_")
print(f" - {display_name}: {check.__doc__}")
def main() -> None:
@ -140,7 +131,10 @@ def main() -> None:
help="check validity of an operator or bundle",
)
check_parser.add_argument(
"--list", action="store_true", help="list available checks"
"-s", "--suite", default="operator_repo.checks", help="check suite to use"
)
check_parser.add_argument(
"-l", "--list", action="store_true", help="list available checks"
)
check_parser.add_argument(
"-R", "--recursive", action="store_true", help="descend the tree"
@ -167,10 +161,13 @@ def main() -> None:
action_list(args.repo or Path.cwd(), *args.target, recursive=args.recursive)
elif args.action == "check":
if args.list:
action_check_list()
action_check_list(args.suite)
else:
action_check(
args.repo or Path.cwd(), *args.target, recursive=args.recursive
args.repo or Path.cwd(),
args.suite,
*args.target,
recursive=args.recursive,
)
else:
main_parser.print_help()

View File

@ -1,7 +1,6 @@
from pathlib import Path
from operator_repo import Operator, Repo
from tests import bundle_files, create_files

View File

@ -2,11 +2,8 @@ from pathlib import Path
import pytest
from operator_repo.utils import load_yaml
from operator_repo.utils import lookup_dict
from operator_repo.exceptions import OperatorRepoException
from operator_repo.utils import load_yaml, lookup_dict
from tests import create_files