1
0
Fork 0

Compare commits

...

2 Commits

8 changed files with 150 additions and 74 deletions

View File

@ -0,0 +1,67 @@
import importlib
from collections.abc import Callable, Iterable, Mapping
from inspect import getmembers, isfunction
from .. import Bundle, Operator
class CheckResult:
severity: int = 0
display: str = "UNKNOWN"
reason: str
def __init__(self, reason: str):
self.reason = reason
def __str__(self):
return f"{self.display}: {self.reason}"
def __repr__(self):
return f"{self.display}({self.reason})"
def __int__(self):
return self.severity
def __lt__(self, other):
return int(self) < int(other)
class Warn(CheckResult):
severity = 40
display = "WARNING"
class Fail(CheckResult):
severity = 90
display = "FAILURE"
SUPPORTED_TYPES = [("operator", Operator), ("bundle", Bundle)]
Check = Callable[[Operator | Bundle], Iterable[CheckResult]]
def get_checks(
suite_name: str = "operator_repo.checks",
) -> Mapping[str, Iterable[Check]]:
result = {}
for module_name, _ in SUPPORTED_TYPES:
result[module_name] = []
try:
module = importlib.import_module(f"{suite_name}.{module_name}")
for check_name, check in getmembers(module, isfunction):
if check_name.startswith("check_"):
result[module_name].append(check)
except ModuleNotFoundError:
pass
return result
def run_suite(
targets: Iterable[Operator | Bundle], suite_name: str = "operator_repo.checks"
) -> Iterable[CheckResult]:
checks = get_checks(suite_name)
for target in targets:
for target_type_name, target_type in SUPPORTED_TYPES:
if isinstance(target, target_type):
for check in checks.get(target_type_name, []):
yield from check(target)

View File

@ -1,50 +1,59 @@
from typing import Iterator, Tuple
from collections.abc import Iterator
from semver import Version
from .. import Bundle
from ..utils import lookup_dict
from . import CheckResult, Fail, Warn
def check_operator_name(bundle: Bundle) -> Iterator[Tuple[str, str]]:
def check_operator_name(bundle: Bundle) -> Iterator[CheckResult]:
"""Check if the operator names used in CSV, metadata and filesystem are consistent"""
name = bundle.annotations.get("operators.operatorframework.io.bundle.package.v1")
if name is None:
yield "fail", "Bundle does not define the operator name in annotations.yaml"
yield Fail(f"Bundle does not define the operator name in annotations.yaml")
return
if name != bundle.csv_operator_name:
yield "fail", f"Operator name from annotations.yaml ({name}) does not match the name defined in the CSV ({bundle.csv_operator_name})"
yield Fail(
f"Operator name from annotations.yaml ({name}) does not match the name defined in the CSV ({bundle.csv_operator_name})"
)
if name != bundle.operator_name:
yield "warn", f"Operator name from annotations.yaml ({name}) does not match the operator's directory name ({bundle.operator_name})"
yield Fail(
f"Operator name from annotations.yaml ({name}) does not match the operator's directory name ({bundle.operator_name})"
)
def check_image(bundle: Bundle) -> Iterator[Tuple[str, str]]:
def check_image(bundle: Bundle) -> Iterator[CheckResult]:
"""Check if containerImage is properly defined and used in a deployment"""
try:
container_image = lookup_dict(bundle.csv, "metadata.annotations.containerImage")
if container_image is None:
yield "fail", "CSV doesn't define .metadata.annotations.containerImage"
yield Fail(f"CSV doesn't define .metadata.annotations.containerImage")
return
deployments = lookup_dict(bundle.csv, "spec.install.spec.deployments")
if deployments is None:
yield "fail", "CSV doesn't define .spec.install.spec.deployments"
yield Fail(f"CSV doesn't define .spec.install.spec.deployments")
return
for deployment in deployments:
containers = lookup_dict(deployment, "spec.template.spec.containers", [])
if any(container_image == x.get("image") for x in containers):
return
yield "fail", f"container image {container_image} not used by any deployment"
yield Fail(f"container image {container_image} not used by any deployment")
except Exception as e:
yield "fail", str(e)
yield Fail(str(e))
def check_semver(bundle: Bundle) -> Iterator[Tuple[str, str]]:
def check_semver(bundle: Bundle) -> Iterator[CheckResult]:
"""Check that the bundle version is semver compliant"""
try:
_ = Version.parse(bundle.operator_version)
except ValueError:
yield "warn", f"Version from filesystem ({bundle.operator_version}) is not valid semver"
yield Warn(
f"Version from filesystem ({bundle.operator_version}) is not valid semver"
)
try:
_ = Version.parse(bundle.csv_operator_version)
except ValueError:
yield "warn", f"Version from CSV ({bundle.csv_operator_version}) is not valid semver"
yield Warn(
f"Version from CSV ({bundle.csv_operator_version}) is not valid semver"
)

View File

@ -1,9 +1,10 @@
from typing import Iterator, Tuple
from collections.abc import Iterator
from .. import Operator
from . import CheckResult, Fail
def check_upgrade(operator: Operator) -> Iterator[Tuple[str, str]]:
def check_upgrade(operator: Operator) -> Iterator[CheckResult]:
"""Validate upgrade graphs for all channels"""
all_channels = operator.channels | {operator.default_channel} - {None}
for channel in sorted(all_channels):
@ -15,6 +16,8 @@ def check_upgrade(operator: Operator) -> Iterator[Tuple[str, str]]:
x for x in channel_bundles if x not in graph and x != channel_head
}
if dangling_bundles:
yield "fail", f"Channel {channel} has dangling bundles: {dangling_bundles}."
yield Fail(
f"Channel {channel} has dangling bundles: {dangling_bundles}."
)
except Exception as e:
yield "fail", str(e)
yield Fail(str(e))

View File

@ -3,9 +3,10 @@
"""
import logging
from collections.abc import Iterator
from functools import cached_property, total_ordering
from pathlib import Path
from typing import Any, Dict, Iterator, List, Optional, Set, Tuple, Union
from typing import Any, Optional, Union
from semver import Version
@ -43,21 +44,21 @@ class Bundle:
self._parent = operator
@cached_property
def annotations(self) -> Dict[str, Any]:
def annotations(self) -> dict[str, Any]:
"""
:return: The content of the "annotations" field in metadata/annotations.yaml
"""
return self.load_metadata("annotations.yaml").get("annotations", {})
@cached_property
def dependencies(self) -> List[Any]:
def dependencies(self) -> list[Any]:
"""
:return: The content of the "dependencies" field in metadata/dependencies.yaml
"""
return self.load_metadata("dependencies.yaml").get("dependencies", [])
@cached_property
def csv(self) -> Dict[str, Any]:
def csv(self) -> dict[str, Any]:
"""
:return: The content of the CSV file for the bundle
"""
@ -67,7 +68,7 @@ class Bundle:
return csv
@cached_property
def csv_full_name(self) -> Tuple[str, str]:
def csv_full_name(self) -> tuple[str, str]:
try:
csv_full_name = self.csv["metadata"]["name"]
name, version = csv_full_name.split(".", 1)
@ -114,7 +115,7 @@ class Bundle:
self._parent = Operator(self._bundle_path.parent)
return self._parent
def load_metadata(self, filename: str) -> Dict[str, Any]:
def load_metadata(self, filename: str) -> dict[str, Any]:
"""
Load and parse a yaml file from the metadata directory of the bundle
:param filename: Name of the file
@ -147,7 +148,7 @@ class Bundle:
)
@property
def channels(self) -> Set[str]:
def channels(self) -> set[str]:
"""
:return: Set of channels the bundle belongs to
"""
@ -316,7 +317,7 @@ class Operator:
)
@cached_property
def channels(self) -> Set[str]:
def channels(self) -> set[str]:
"""
:return: All channels defined by the operator bundles
"""
@ -354,7 +355,7 @@ class Operator:
except IndexError:
return None
def channel_bundles(self, channel: str) -> List[Bundle]:
def channel_bundles(self, channel: str) -> list[Bundle]:
"""
:param channel: Name of the channel
:return: List of bundles in the given channel
@ -368,7 +369,7 @@ class Operator:
"""
return self.channel_bundles(channel)[-1]
def update_graph(self, channel: str) -> Dict[Bundle, Set[Bundle]]:
def update_graph(self, channel: str) -> dict[Bundle, set[Bundle]]:
"""
Return the update graph for the given channel
:param channel: Name of the channel
@ -383,7 +384,7 @@ class Operator:
# TODO: implement semver-skippatch
raise NotImplementedError("%s: semver-skippatch is not implemented yet")
if update_strategy == "replaces-mode":
edges: Dict[Bundle, set[Bundle]] = {}
edges: dict[Bundle, set[Bundle]] = {}
all_bundles_set = set(all_bundles)
operator_names = {x.csv_operator_name for x in all_bundles_set}
if len(operator_names) != 1:
@ -409,7 +410,7 @@ class Operator:
) = replaced_bundle_name.split(".", 1)
if replaced_bundle_operator != bundle.csv_operator_name:
raise ValueError(
f"{self}: {bundle} replaces a bundle from a different operator"
f"{bundle} replaces a bundle from a different operator"
)
try:
replaced_bundle = version_to_bundle[

View File

@ -5,12 +5,12 @@
import argparse
import logging
from inspect import getmembers, isfunction
from collections.abc import Iterator
from itertools import chain
from pathlib import Path
from typing import Union
from .checks import bundle as bundle_checks
from .checks import operator as operator_checks
from .checks import get_checks, run_suite
from .classes import Bundle, Operator, Repo
@ -69,44 +69,38 @@ def action_list(repo_path, *what: str, recursive: bool = False) -> None:
_list(parse_target(repo, target), recursive)
def action_check_bundle(bundle: Bundle) -> None:
print(f"Checking {bundle}")
for check_name, check in getmembers(bundle_checks, isfunction):
if check_name.startswith("check_"):
for result, message in check(bundle):
print(f"{result.upper()}: {bundle}: {message}")
def _walk(
target: Union[Repo, Operator, Bundle]
) -> Iterator[Union[Repo, Operator, Bundle]]:
yield target
if isinstance(target, Repo):
for operator in target:
yield from _walk(operator)
elif isinstance(target, Operator):
yield from target.all_bundles()
def action_check_operator(operator: Operator) -> None:
print(f"Checking {operator}")
for check_name, check in getmembers(operator_checks, isfunction):
if check_name.startswith("check_"):
for result, message in check(operator):
print(f"{result.upper()}: {operator}: {message}")
def action_check(repo_path: Path, *what: str, recursive: bool = False) -> None:
def action_check(
repo_path: Path, suite: str, *what: str, recursive: bool = False
) -> None:
repo = Repo(repo_path)
for target in [parse_target(repo, x) for x in what] or sorted(repo):
if isinstance(target, Operator):
action_check_operator(target)
if recursive:
for bundle in sorted(target):
action_check_bundle(bundle)
elif isinstance(target, Bundle):
action_check_bundle(target)
if recursive:
if what:
targets = chain(_walk(parse_target(repo, x)) for x in what)
else:
targets = chain(_walk(x) for x in repo)
else:
targets = [parse_target(repo, x) for x in what] or repo.all_operators()
for result in run_suite(targets, suite_name=suite):
print(result)
def action_check_list() -> None:
for check_type_name, check_type in (
("Operator", operator_checks),
("Bundle", bundle_checks),
):
def action_check_list(suite: str) -> None:
for check_type_name, checks in get_checks(suite).items():
print(f"{check_type_name} checks:")
for check_name, check in getmembers(check_type, isfunction):
if check_name.startswith("check_"):
display_name = check_name.removeprefix("check_")
print(f" - {display_name}: {check.__doc__}")
for check in checks:
display_name = check.__name__.removeprefix("check_")
print(f" - {display_name}: {check.__doc__}")
def main() -> None:
@ -140,7 +134,10 @@ def main() -> None:
help="check validity of an operator or bundle",
)
check_parser.add_argument(
"--list", action="store_true", help="list available checks"
"-s", "--suite", default="operator_repo.checks", help="check suite to use"
)
check_parser.add_argument(
"-l", "--list", action="store_true", help="list available checks"
)
check_parser.add_argument(
"-R", "--recursive", action="store_true", help="descend the tree"
@ -167,10 +164,13 @@ def main() -> None:
action_list(args.repo or Path.cwd(), *args.target, recursive=args.recursive)
elif args.action == "check":
if args.list:
action_check_list()
action_check_list(args.suite)
else:
action_check(
args.repo or Path.cwd(), *args.target, recursive=args.recursive
args.repo or Path.cwd(),
args.suite,
*args.target,
recursive=args.recursive,
)
else:
main_parser.print_help()

View File

@ -4,7 +4,7 @@
import logging
from pathlib import Path
from typing import Any, Dict
from typing import Any
import yaml
from yaml.composer import ComposerError
@ -51,7 +51,7 @@ def load_yaml(path: Path) -> Any:
def lookup_dict(
data: Dict[str, Any], path: str, default: Any = None, separator: str = "."
data: dict[str, Any], path: str, default: Any = None, separator: str = "."
) -> Any:
keys = path.split(separator)
subtree = data

View File

@ -1,7 +1,6 @@
from pathlib import Path
from operator_repo import Operator, Repo
from tests import bundle_files, create_files

View File

@ -2,11 +2,8 @@ from pathlib import Path
import pytest
from operator_repo.utils import load_yaml
from operator_repo.utils import lookup_dict
from operator_repo.exceptions import OperatorRepoException
from operator_repo.utils import load_yaml, lookup_dict
from tests import create_files