1
0
Fork 0

Check results carry check name; extract semver-mode graph generation

This commit is contained in:
Maurizio Porrato 2023-08-15 08:02:11 +01:00
parent e245dd758e
commit e99c181021
4 changed files with 81 additions and 71 deletions

View File

@ -1,8 +1,8 @@
import importlib
import logging
from collections.abc import Callable, Iterable, Mapping
from collections.abc import Callable, Iterable
from inspect import getmembers, isfunction
from typing import Union
from typing import Optional, Union
from .. import Bundle, Operator, Repo
@ -12,18 +12,20 @@ log = logging.getLogger(__name__)
class CheckResult:
severity: int = 0
kind: str = "unknown"
check: Optional[str]
origin: Union[Repo, Operator, Bundle, None]
reason: str
def __init__(self, origin, reason: str):
self.origin = origin
def __init__(self, reason: str):
self.origin = None
self.check = None
self.reason = reason
def __str__(self):
return f"{self.kind}: {self.origin}: {self.reason}"
return f"{self.kind}: {self.check}({self.origin}): {self.reason}"
def __repr__(self):
return f"{self.kind}({self.origin}, {self.reason})"
return f"{self.kind}({self.check}, {self.origin}, {self.reason})"
def __int__(self):
return self.severity
@ -33,22 +35,24 @@ class CheckResult:
class Warn(CheckResult):
# pylint: disable=too-few-public-methods
severity = 40
kind = "warning"
class Fail(CheckResult):
# pylint: disable=too-few-public-methods
severity = 90
kind = "failure"
SUPPORTED_TYPES = [("operator", Operator), ("bundle", Bundle)]
Check = Callable[[Operator | Bundle], Iterable[CheckResult]]
Check = Callable[[Union[Repo, Operator, Bundle]], Iterable[CheckResult]]
def get_checks(
suite_name: str = "operator_repo.checks",
) -> Mapping[str, Iterable[Check]]:
) -> dict[str, list[Check]]:
result = {}
for module_name, _ in SUPPORTED_TYPES:
result[module_name] = []
@ -68,8 +72,18 @@ def get_checks(
return result
def run_check(
check: Check, target: Union[Repo, Operator, Bundle]
) -> Iterable[CheckResult]:
log.debug("Running %s check on %s", check.__name__, target)
for result in check(target):
result.check = check.__name__
result.origin = target
yield result
def run_suite(
targets: Iterable[Repo | Operator | Bundle],
targets: Iterable[Union[Repo, Operator, Bundle]],
suite_name: str = "operator_repo.checks",
) -> Iterable[CheckResult]:
checks = get_checks(suite_name)
@ -77,5 +91,4 @@ def run_suite(
for target_type_name, target_type in SUPPORTED_TYPES:
if isinstance(target, target_type):
for check in checks.get(target_type_name, []):
log.debug("Running %s check on %s", check.__name__, target)
yield from check(target)
yield from run_check(check, target)

View File

@ -11,22 +11,20 @@ def check_operator_name(bundle: Bundle) -> Iterator[CheckResult]:
"""Check if the operator names used in CSV, metadata and filesystem are consistent"""
name = bundle.annotations.get("operators.operatorframework.io.bundle.package.v1")
if name is None:
yield Fail(
bundle, "Bundle does not define the operator name in annotations.yaml"
)
yield Fail("Bundle does not define the operator name in annotations.yaml")
return
if name != bundle.csv_operator_name:
msg = (
f"Operator name from annotations.yaml ({name})"
f" does not match the name defined in the CSV ({bundle.csv_operator_name})"
)
yield Fail(bundle, msg)
yield Fail(msg)
if name != bundle.operator_name:
msg = (
f"Operator name from annotations.yaml ({name})"
f" does not match the operator's directory name ({bundle.operator_name})"
)
yield Fail(bundle, msg)
yield Fail(msg)
def check_image(bundle: Bundle) -> Iterator[CheckResult]:
@ -34,23 +32,19 @@ def check_image(bundle: Bundle) -> Iterator[CheckResult]:
try:
container_image = lookup_dict(bundle.csv, "metadata.annotations.containerImage")
if container_image is None:
yield Fail(
bundle, "CSV doesn't define .metadata.annotations.containerImage"
)
yield Fail("CSV doesn't define .metadata.annotations.containerImage")
return
deployments = lookup_dict(bundle.csv, "spec.install.spec.deployments")
if deployments is None:
yield Fail(bundle, "CSV doesn't define .spec.install.spec.deployments")
yield Fail("CSV doesn't define .spec.install.spec.deployments")
return
for deployment in deployments:
containers = lookup_dict(deployment, "spec.template.spec.containers", [])
if any(container_image == x.get("image") for x in containers):
return
yield Fail(
bundle, f"container image {container_image} not used by any deployment"
)
yield Fail(f"container image {container_image} not used by any deployment")
except Exception as exc:
yield Fail(bundle, str(exc))
yield Fail(str(exc))
def check_semver(bundle: Bundle) -> Iterator[CheckResult]:
@ -59,13 +53,11 @@ def check_semver(bundle: Bundle) -> Iterator[CheckResult]:
_ = Version.parse(bundle.operator_version)
except ValueError:
yield Warn(
bundle,
f"Version from filesystem ({bundle.operator_version}) is not valid semver",
f"Version from filesystem ({bundle.operator_version}) is not valid semver"
)
try:
_ = Version.parse(bundle.csv_operator_version)
except ValueError:
yield Warn(
bundle,
f"Version from CSV ({bundle.csv_operator_version}) is not valid semver",
f"Version from CSV ({bundle.csv_operator_version}) is not valid semver"
)

View File

@ -17,8 +17,7 @@ def check_upgrade(operator: Operator) -> Iterator[CheckResult]:
}
if dangling_bundles:
yield Fail(
operator,
f"Channel {channel} has dangling bundles: {dangling_bundles}",
f"Channel {channel} has dangling bundles: {dangling_bundles}"
)
except Exception as exc:
yield Fail(operator, str(exc))
yield Fail(str(exc))

View File

@ -369,6 +369,46 @@ class Operator:
"""
return self.channel_bundles(channel)[-1]
@staticmethod
def _replaces_graph(
channel: str, bundles: list[Bundle]
) -> dict[Bundle, set[Bundle]]:
edges: dict[Bundle, set[Bundle]] = {}
all_bundles_set = set(bundles)
version_to_bundle = {x.csv_operator_version: x for x in all_bundles_set}
for bundle in all_bundles_set:
spec = bundle.csv.get("spec", {})
replaces = spec.get("replaces")
skips = spec.get("skips", [])
previous = set(skips) | {replaces}
for replaced_bundle_name in previous:
if replaced_bundle_name is None:
continue
if ".v" not in replaced_bundle_name:
raise ValueError(
f"{bundle} has invalid 'replaces' field: '{replaced_bundle_name}'"
)
(
replaced_bundle_operator,
replaced_bundle_version,
) = replaced_bundle_name.split(".", 1)
if replaced_bundle_operator != bundle.csv_operator_name:
raise ValueError(
f"{bundle} replaces a bundle from a different operator"
)
try:
replaced_bundle = version_to_bundle[
replaced_bundle_version.lstrip("v")
]
if (
channel in bundle.channels
and channel in replaced_bundle.channels
):
edges.setdefault(replaced_bundle, set()).add(bundle)
except KeyError:
pass
return edges
def update_graph(self, channel: str) -> dict[Bundle, set[Bundle]]:
"""
Return the update graph for the given channel
@ -378,52 +418,18 @@ class Operator:
"""
all_bundles = self.channel_bundles(channel)
update_strategy = self.config.get("updateGraph", "replaces-mode")
operator_names = {x.csv_operator_name for x in all_bundles}
if len(operator_names) > 1:
raise ValueError(
f"{self} has bundles with different operator names: {operator_names}"
)
if update_strategy == "semver-mode":
return {x: {y} for x, y in zip(all_bundles, all_bundles[1:])}
if update_strategy == "semver-skippatch":
# TODO: implement semver-skippatch
raise NotImplementedError("%s: semver-skippatch is not implemented yet")
if update_strategy == "replaces-mode":
edges: dict[Bundle, set[Bundle]] = {}
all_bundles_set = set(all_bundles)
operator_names = {x.csv_operator_name for x in all_bundles_set}
if len(operator_names) != 1:
raise ValueError(
f"{self} has bundles with different operator names: {operator_names}"
)
version_to_bundle = {x.csv_operator_version: x for x in all_bundles_set}
for bundle in all_bundles_set:
spec = bundle.csv.get("spec", {})
replaces = spec.get("replaces")
skips = spec.get("skips", [])
previous = set(skips) | {replaces}
for replaced_bundle_name in previous:
if replaced_bundle_name is None:
continue
if ".v" not in replaced_bundle_name:
raise ValueError(
f"{bundle} has invalid 'replaces' field: '{replaced_bundle_name}'"
)
(
replaced_bundle_operator,
replaced_bundle_version,
) = replaced_bundle_name.split(".", 1)
if replaced_bundle_operator != bundle.csv_operator_name:
raise ValueError(
f"{bundle} replaces a bundle from a different operator"
)
try:
replaced_bundle = version_to_bundle[
replaced_bundle_version.lstrip("v")
]
if (
channel in bundle.channels
and channel in replaced_bundle.channels
):
edges.setdefault(replaced_bundle, set()).add(bundle)
except KeyError:
pass
return edges
return self._replaces_graph(channel, all_bundles)
raise ValueError(f"{self}: unknown updateGraph value: {update_strategy}")
def __eq__(self, other: object) -> bool: