1
0
Fork 0
operator-repo/src/operator_repo/checks/operator.py

24 lines
862 B
Python
Raw Normal View History

from collections.abc import Iterator
2023-08-07 09:17:56 +00:00
from .. import Operator
from . import CheckResult, Fail
2023-08-07 09:17:56 +00:00
def check_upgrade(operator: Operator) -> Iterator[CheckResult]:
"""Validate upgrade graphs for all channels"""
2023-08-07 09:17:56 +00:00
all_channels = operator.channels | {operator.default_channel} - {None}
for channel in sorted(all_channels):
try:
channel_bundles = operator.channel_bundles(channel)
channel_head = operator.head(channel)
graph = operator.update_graph(channel)
dangling_bundles = {
x for x in channel_bundles if x not in graph and x != channel_head
}
if dangling_bundles:
yield Fail(
f"Channel {channel} has dangling bundles: {dangling_bundles}"
)
2023-08-12 14:50:57 +00:00
except Exception as exc:
yield Fail(str(exc))