Compare commits
2 Commits
69d15c66d5
...
8f841bc364
Author | SHA1 | Date |
---|---|---|
Maurizio Porrato | 8f841bc364 | |
Maurizio Porrato | d524a042b8 |
|
@ -0,0 +1,67 @@
|
|||
import importlib
|
||||
from collections.abc import Callable, Iterable, Mapping
|
||||
from inspect import getmembers, isfunction
|
||||
|
||||
from .. import Bundle, Operator
|
||||
|
||||
|
||||
class CheckResult:
|
||||
severity: int = 0
|
||||
display: str = "UNKNOWN"
|
||||
reason: str
|
||||
|
||||
def __init__(self, reason: str):
|
||||
self.reason = reason
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.display}: {self.reason}"
|
||||
|
||||
def __repr__(self):
|
||||
return f"{self.display}({self.reason})"
|
||||
|
||||
def __int__(self):
|
||||
return self.severity
|
||||
|
||||
def __lt__(self, other):
|
||||
return int(self) < int(other)
|
||||
|
||||
|
||||
class Warn(CheckResult):
|
||||
severity = 40
|
||||
display = "WARNING"
|
||||
|
||||
|
||||
class Fail(CheckResult):
|
||||
severity = 90
|
||||
display = "FAILURE"
|
||||
|
||||
|
||||
SUPPORTED_TYPES = [("operator", Operator), ("bundle", Bundle)]
|
||||
Check = Callable[[Operator | Bundle], Iterable[CheckResult]]
|
||||
|
||||
|
||||
def get_checks(
|
||||
suite_name: str = "operator_repo.checks",
|
||||
) -> Mapping[str, Iterable[Check]]:
|
||||
result = {}
|
||||
for module_name, _ in SUPPORTED_TYPES:
|
||||
result[module_name] = []
|
||||
try:
|
||||
module = importlib.import_module(f"{suite_name}.{module_name}")
|
||||
for check_name, check in getmembers(module, isfunction):
|
||||
if check_name.startswith("check_"):
|
||||
result[module_name].append(check)
|
||||
except ModuleNotFoundError:
|
||||
pass
|
||||
return result
|
||||
|
||||
|
||||
def run_suite(
|
||||
targets: Iterable[Operator | Bundle], suite_name: str = "operator_repo.checks"
|
||||
) -> Iterable[CheckResult]:
|
||||
checks = get_checks(suite_name)
|
||||
for target in targets:
|
||||
for target_type_name, target_type in SUPPORTED_TYPES:
|
||||
if isinstance(target, target_type):
|
||||
for check in checks.get(target_type_name, []):
|
||||
yield from check(target)
|
|
@ -1,50 +1,59 @@
|
|||
from typing import Iterator, Tuple
|
||||
from collections.abc import Iterator
|
||||
|
||||
from semver import Version
|
||||
|
||||
from .. import Bundle
|
||||
from ..utils import lookup_dict
|
||||
from . import CheckResult, Fail, Warn
|
||||
|
||||
|
||||
def check_operator_name(bundle: Bundle) -> Iterator[Tuple[str, str]]:
|
||||
def check_operator_name(bundle: Bundle) -> Iterator[CheckResult]:
|
||||
"""Check if the operator names used in CSV, metadata and filesystem are consistent"""
|
||||
name = bundle.annotations.get("operators.operatorframework.io.bundle.package.v1")
|
||||
if name is None:
|
||||
yield "fail", "Bundle does not define the operator name in annotations.yaml"
|
||||
yield Fail(f"Bundle does not define the operator name in annotations.yaml")
|
||||
return
|
||||
if name != bundle.csv_operator_name:
|
||||
yield "fail", f"Operator name from annotations.yaml ({name}) does not match the name defined in the CSV ({bundle.csv_operator_name})"
|
||||
yield Fail(
|
||||
f"Operator name from annotations.yaml ({name}) does not match the name defined in the CSV ({bundle.csv_operator_name})"
|
||||
)
|
||||
if name != bundle.operator_name:
|
||||
yield "warn", f"Operator name from annotations.yaml ({name}) does not match the operator's directory name ({bundle.operator_name})"
|
||||
yield Fail(
|
||||
f"Operator name from annotations.yaml ({name}) does not match the operator's directory name ({bundle.operator_name})"
|
||||
)
|
||||
|
||||
|
||||
def check_image(bundle: Bundle) -> Iterator[Tuple[str, str]]:
|
||||
def check_image(bundle: Bundle) -> Iterator[CheckResult]:
|
||||
"""Check if containerImage is properly defined and used in a deployment"""
|
||||
try:
|
||||
container_image = lookup_dict(bundle.csv, "metadata.annotations.containerImage")
|
||||
if container_image is None:
|
||||
yield "fail", "CSV doesn't define .metadata.annotations.containerImage"
|
||||
yield Fail(f"CSV doesn't define .metadata.annotations.containerImage")
|
||||
return
|
||||
deployments = lookup_dict(bundle.csv, "spec.install.spec.deployments")
|
||||
if deployments is None:
|
||||
yield "fail", "CSV doesn't define .spec.install.spec.deployments"
|
||||
yield Fail(f"CSV doesn't define .spec.install.spec.deployments")
|
||||
return
|
||||
for deployment in deployments:
|
||||
containers = lookup_dict(deployment, "spec.template.spec.containers", [])
|
||||
if any(container_image == x.get("image") for x in containers):
|
||||
return
|
||||
yield "fail", f"container image {container_image} not used by any deployment"
|
||||
yield Fail(f"container image {container_image} not used by any deployment")
|
||||
except Exception as e:
|
||||
yield "fail", str(e)
|
||||
yield Fail(str(e))
|
||||
|
||||
|
||||
def check_semver(bundle: Bundle) -> Iterator[Tuple[str, str]]:
|
||||
def check_semver(bundle: Bundle) -> Iterator[CheckResult]:
|
||||
"""Check that the bundle version is semver compliant"""
|
||||
try:
|
||||
_ = Version.parse(bundle.operator_version)
|
||||
except ValueError:
|
||||
yield "warn", f"Version from filesystem ({bundle.operator_version}) is not valid semver"
|
||||
yield Warn(
|
||||
f"Version from filesystem ({bundle.operator_version}) is not valid semver"
|
||||
)
|
||||
try:
|
||||
_ = Version.parse(bundle.csv_operator_version)
|
||||
except ValueError:
|
||||
yield "warn", f"Version from CSV ({bundle.csv_operator_version}) is not valid semver"
|
||||
yield Warn(
|
||||
f"Version from CSV ({bundle.csv_operator_version}) is not valid semver"
|
||||
)
|
||||
|
|
|
@ -1,9 +1,10 @@
|
|||
from typing import Iterator, Tuple
|
||||
from collections.abc import Iterator
|
||||
|
||||
from .. import Operator
|
||||
from . import CheckResult, Fail
|
||||
|
||||
|
||||
def check_upgrade(operator: Operator) -> Iterator[Tuple[str, str]]:
|
||||
def check_upgrade(operator: Operator) -> Iterator[CheckResult]:
|
||||
"""Validate upgrade graphs for all channels"""
|
||||
all_channels = operator.channels | {operator.default_channel} - {None}
|
||||
for channel in sorted(all_channels):
|
||||
|
@ -15,6 +16,8 @@ def check_upgrade(operator: Operator) -> Iterator[Tuple[str, str]]:
|
|||
x for x in channel_bundles if x not in graph and x != channel_head
|
||||
}
|
||||
if dangling_bundles:
|
||||
yield "fail", f"Channel {channel} has dangling bundles: {dangling_bundles}."
|
||||
yield Fail(
|
||||
f"Channel {channel} has dangling bundles: {dangling_bundles}."
|
||||
)
|
||||
except Exception as e:
|
||||
yield "fail", str(e)
|
||||
yield Fail(str(e))
|
||||
|
|
|
@ -3,9 +3,10 @@
|
|||
"""
|
||||
|
||||
import logging
|
||||
from collections.abc import Iterator
|
||||
from functools import cached_property, total_ordering
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Iterator, List, Optional, Set, Tuple, Union
|
||||
from typing import Any, Optional, Union
|
||||
|
||||
from semver import Version
|
||||
|
||||
|
@ -43,21 +44,21 @@ class Bundle:
|
|||
self._parent = operator
|
||||
|
||||
@cached_property
|
||||
def annotations(self) -> Dict[str, Any]:
|
||||
def annotations(self) -> dict[str, Any]:
|
||||
"""
|
||||
:return: The content of the "annotations" field in metadata/annotations.yaml
|
||||
"""
|
||||
return self.load_metadata("annotations.yaml").get("annotations", {})
|
||||
|
||||
@cached_property
|
||||
def dependencies(self) -> List[Any]:
|
||||
def dependencies(self) -> list[Any]:
|
||||
"""
|
||||
:return: The content of the "dependencies" field in metadata/dependencies.yaml
|
||||
"""
|
||||
return self.load_metadata("dependencies.yaml").get("dependencies", [])
|
||||
|
||||
@cached_property
|
||||
def csv(self) -> Dict[str, Any]:
|
||||
def csv(self) -> dict[str, Any]:
|
||||
"""
|
||||
:return: The content of the CSV file for the bundle
|
||||
"""
|
||||
|
@ -67,7 +68,7 @@ class Bundle:
|
|||
return csv
|
||||
|
||||
@cached_property
|
||||
def csv_full_name(self) -> Tuple[str, str]:
|
||||
def csv_full_name(self) -> tuple[str, str]:
|
||||
try:
|
||||
csv_full_name = self.csv["metadata"]["name"]
|
||||
name, version = csv_full_name.split(".", 1)
|
||||
|
@ -114,7 +115,7 @@ class Bundle:
|
|||
self._parent = Operator(self._bundle_path.parent)
|
||||
return self._parent
|
||||
|
||||
def load_metadata(self, filename: str) -> Dict[str, Any]:
|
||||
def load_metadata(self, filename: str) -> dict[str, Any]:
|
||||
"""
|
||||
Load and parse a yaml file from the metadata directory of the bundle
|
||||
:param filename: Name of the file
|
||||
|
@ -147,7 +148,7 @@ class Bundle:
|
|||
)
|
||||
|
||||
@property
|
||||
def channels(self) -> Set[str]:
|
||||
def channels(self) -> set[str]:
|
||||
"""
|
||||
:return: Set of channels the bundle belongs to
|
||||
"""
|
||||
|
@ -316,7 +317,7 @@ class Operator:
|
|||
)
|
||||
|
||||
@cached_property
|
||||
def channels(self) -> Set[str]:
|
||||
def channels(self) -> set[str]:
|
||||
"""
|
||||
:return: All channels defined by the operator bundles
|
||||
"""
|
||||
|
@ -354,7 +355,7 @@ class Operator:
|
|||
except IndexError:
|
||||
return None
|
||||
|
||||
def channel_bundles(self, channel: str) -> List[Bundle]:
|
||||
def channel_bundles(self, channel: str) -> list[Bundle]:
|
||||
"""
|
||||
:param channel: Name of the channel
|
||||
:return: List of bundles in the given channel
|
||||
|
@ -368,7 +369,7 @@ class Operator:
|
|||
"""
|
||||
return self.channel_bundles(channel)[-1]
|
||||
|
||||
def update_graph(self, channel: str) -> Dict[Bundle, Set[Bundle]]:
|
||||
def update_graph(self, channel: str) -> dict[Bundle, set[Bundle]]:
|
||||
"""
|
||||
Return the update graph for the given channel
|
||||
:param channel: Name of the channel
|
||||
|
@ -383,7 +384,7 @@ class Operator:
|
|||
# TODO: implement semver-skippatch
|
||||
raise NotImplementedError("%s: semver-skippatch is not implemented yet")
|
||||
if update_strategy == "replaces-mode":
|
||||
edges: Dict[Bundle, set[Bundle]] = {}
|
||||
edges: dict[Bundle, set[Bundle]] = {}
|
||||
all_bundles_set = set(all_bundles)
|
||||
operator_names = {x.csv_operator_name for x in all_bundles_set}
|
||||
if len(operator_names) != 1:
|
||||
|
@ -409,7 +410,7 @@ class Operator:
|
|||
) = replaced_bundle_name.split(".", 1)
|
||||
if replaced_bundle_operator != bundle.csv_operator_name:
|
||||
raise ValueError(
|
||||
f"{self}: {bundle} replaces a bundle from a different operator"
|
||||
f"{bundle} replaces a bundle from a different operator"
|
||||
)
|
||||
try:
|
||||
replaced_bundle = version_to_bundle[
|
||||
|
|
|
@ -5,12 +5,12 @@
|
|||
|
||||
import argparse
|
||||
import logging
|
||||
from inspect import getmembers, isfunction
|
||||
from collections.abc import Iterator
|
||||
from itertools import chain
|
||||
from pathlib import Path
|
||||
from typing import Union
|
||||
|
||||
from .checks import bundle as bundle_checks
|
||||
from .checks import operator as operator_checks
|
||||
from .checks import get_checks, run_suite
|
||||
from .classes import Bundle, Operator, Repo
|
||||
|
||||
|
||||
|
@ -69,44 +69,38 @@ def action_list(repo_path, *what: str, recursive: bool = False) -> None:
|
|||
_list(parse_target(repo, target), recursive)
|
||||
|
||||
|
||||
def action_check_bundle(bundle: Bundle) -> None:
|
||||
print(f"Checking {bundle}")
|
||||
for check_name, check in getmembers(bundle_checks, isfunction):
|
||||
if check_name.startswith("check_"):
|
||||
for result, message in check(bundle):
|
||||
print(f"{result.upper()}: {bundle}: {message}")
|
||||
def _walk(
|
||||
target: Union[Repo, Operator, Bundle]
|
||||
) -> Iterator[Union[Repo, Operator, Bundle]]:
|
||||
yield target
|
||||
if isinstance(target, Repo):
|
||||
for operator in target:
|
||||
yield from _walk(operator)
|
||||
elif isinstance(target, Operator):
|
||||
yield from target.all_bundles()
|
||||
|
||||
|
||||
def action_check_operator(operator: Operator) -> None:
|
||||
print(f"Checking {operator}")
|
||||
for check_name, check in getmembers(operator_checks, isfunction):
|
||||
if check_name.startswith("check_"):
|
||||
for result, message in check(operator):
|
||||
print(f"{result.upper()}: {operator}: {message}")
|
||||
|
||||
|
||||
def action_check(repo_path: Path, *what: str, recursive: bool = False) -> None:
|
||||
def action_check(
|
||||
repo_path: Path, suite: str, *what: str, recursive: bool = False
|
||||
) -> None:
|
||||
repo = Repo(repo_path)
|
||||
for target in [parse_target(repo, x) for x in what] or sorted(repo):
|
||||
if isinstance(target, Operator):
|
||||
action_check_operator(target)
|
||||
if recursive:
|
||||
for bundle in sorted(target):
|
||||
action_check_bundle(bundle)
|
||||
elif isinstance(target, Bundle):
|
||||
action_check_bundle(target)
|
||||
if recursive:
|
||||
if what:
|
||||
targets = chain(_walk(parse_target(repo, x)) for x in what)
|
||||
else:
|
||||
targets = chain(_walk(x) for x in repo)
|
||||
else:
|
||||
targets = [parse_target(repo, x) for x in what] or repo.all_operators()
|
||||
for result in run_suite(targets, suite_name=suite):
|
||||
print(result)
|
||||
|
||||
|
||||
def action_check_list() -> None:
|
||||
for check_type_name, check_type in (
|
||||
("Operator", operator_checks),
|
||||
("Bundle", bundle_checks),
|
||||
):
|
||||
def action_check_list(suite: str) -> None:
|
||||
for check_type_name, checks in get_checks(suite).items():
|
||||
print(f"{check_type_name} checks:")
|
||||
for check_name, check in getmembers(check_type, isfunction):
|
||||
if check_name.startswith("check_"):
|
||||
display_name = check_name.removeprefix("check_")
|
||||
print(f" - {display_name}: {check.__doc__}")
|
||||
for check in checks:
|
||||
display_name = check.__name__.removeprefix("check_")
|
||||
print(f" - {display_name}: {check.__doc__}")
|
||||
|
||||
|
||||
def main() -> None:
|
||||
|
@ -140,7 +134,10 @@ def main() -> None:
|
|||
help="check validity of an operator or bundle",
|
||||
)
|
||||
check_parser.add_argument(
|
||||
"--list", action="store_true", help="list available checks"
|
||||
"-s", "--suite", default="operator_repo.checks", help="check suite to use"
|
||||
)
|
||||
check_parser.add_argument(
|
||||
"-l", "--list", action="store_true", help="list available checks"
|
||||
)
|
||||
check_parser.add_argument(
|
||||
"-R", "--recursive", action="store_true", help="descend the tree"
|
||||
|
@ -167,10 +164,13 @@ def main() -> None:
|
|||
action_list(args.repo or Path.cwd(), *args.target, recursive=args.recursive)
|
||||
elif args.action == "check":
|
||||
if args.list:
|
||||
action_check_list()
|
||||
action_check_list(args.suite)
|
||||
else:
|
||||
action_check(
|
||||
args.repo or Path.cwd(), *args.target, recursive=args.recursive
|
||||
args.repo or Path.cwd(),
|
||||
args.suite,
|
||||
*args.target,
|
||||
recursive=args.recursive,
|
||||
)
|
||||
else:
|
||||
main_parser.print_help()
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict
|
||||
from typing import Any
|
||||
|
||||
import yaml
|
||||
from yaml.composer import ComposerError
|
||||
|
@ -51,7 +51,7 @@ def load_yaml(path: Path) -> Any:
|
|||
|
||||
|
||||
def lookup_dict(
|
||||
data: Dict[str, Any], path: str, default: Any = None, separator: str = "."
|
||||
data: dict[str, Any], path: str, default: Any = None, separator: str = "."
|
||||
) -> Any:
|
||||
keys = path.split(separator)
|
||||
subtree = data
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
from pathlib import Path
|
||||
|
||||
from operator_repo import Operator, Repo
|
||||
|
||||
from tests import bundle_files, create_files
|
||||
|
||||
|
||||
|
|
|
@ -2,11 +2,8 @@ from pathlib import Path
|
|||
|
||||
import pytest
|
||||
|
||||
from operator_repo.utils import load_yaml
|
||||
|
||||
from operator_repo.utils import lookup_dict
|
||||
|
||||
from operator_repo.exceptions import OperatorRepoException
|
||||
from operator_repo.utils import load_yaml, lookup_dict
|
||||
from tests import create_files
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue