Skip to content

Add 3 new rules to validate ECS configs #3546

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 138 additions & 0 deletions src/cfnlint/rules/resources/ecs/ServiceFargate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""

from __future__ import annotations

from collections import deque
from typing import Any, Iterator

from cfnlint.helpers import ensure_list, is_function
from cfnlint.jsonschema import ValidationError, ValidationResult
from cfnlint.jsonschema.protocols import Validator
from cfnlint.rules.helpers import get_resource_by_name, get_value_from_path
from cfnlint.rules.jsonschema.CfnLintKeyword import CfnLintKeyword


class ServiceFargate(CfnLintKeyword):
id = "E3054"
shortdesc = (
"Validate ECS service using Fargate uses TaskDefinition that allows Fargate"
)
description = (
"When using an ECS service with 'LaunchType' of 'FARGATE' "
"the associated task definition must have 'RequiresCompatibilities' "
"specified with 'FARGATE' listed"
)
tags = ["resources", "ecs"]

def __init__(self) -> None:
super().__init__(
keywords=["Resources/AWS::ECS::Service/Properties"],
)

def _filter_resource_name(self, instance: Any) -> str | None:
fn_k, fn_v = is_function(instance)
if fn_k is None:
return None
if fn_k == "Ref":
if isinstance(fn_v, str):
return fn_v
elif fn_k == "Fn::GetAtt":
name = ensure_list(fn_v)[0].split(".")[0]
if isinstance(name, str):
return name
return None

def _get_service_properties(
self, validator: Validator, instance: Any
) -> Iterator[tuple[str, str, Validator]]:
for task_definition_id, task_definition_validator in get_value_from_path(
validator, instance, deque(["TaskDefinition"])
):
task_definition_resource_name = self._filter_resource_name(
task_definition_id
)
if task_definition_resource_name is None:
continue

for (
launch_type,
launch_type_validator,
) in get_value_from_path(
task_definition_validator, instance, deque(["LaunchType"])
):
yield (
task_definition_resource_name,
launch_type,
launch_type_validator,
)

def _get_task_definition_properties(
self, validator: Validator, resource_name: Any
) -> Iterator[tuple[list[Any] | None, Validator]]:
task_definition, task_definition_validator = get_resource_by_name(
validator, resource_name, ["AWS::ECS::TaskDefinition"]
)
if not task_definition:
return

for capabilities, capabilities_validator in get_value_from_path(
task_definition_validator,
task_definition,
path=deque(["Properties", "RequiresCompatibilities"]),
):
if capabilities is None:
yield capabilities, capabilities_validator
continue
if not isinstance(capabilities, list):
continue
for capibility, _ in get_value_from_path(
capabilities_validator,
capabilities,
path=deque(["*"]),
):
if isinstance(capibility, dict) or capibility == "FARGATE":
break
else:
yield capabilities, capabilities_validator

def validate(
self, validator: Validator, _: Any, instance: Any, schema: dict[str, Any]
) -> ValidationResult:

for (
task_definition_resource_name,
launch_type,
service_validator,
) in self._get_service_properties(
validator,
instance,
):
if launch_type != "FARGATE":
continue
for (
capabilities,
capabilities_validator,
) in self._get_task_definition_properties(
service_validator,
task_definition_resource_name,
):
if capabilities is None:
yield ValidationError(
"'RequiresCompatibilities' is a required property",
validator="required",
rule=self,
path_override=deque(
list(capabilities_validator.context.path.path)[:-1]
),
)
continue

yield ValidationError(
f"{capabilities!r} does not contain items matching 'FARGATE'",
validator="contains",
rule=self,
path_override=capabilities_validator.context.path.path,
)
107 changes: 107 additions & 0 deletions src/cfnlint/rules/resources/ecs/ServiceNetworkConfiguration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""

from __future__ import annotations

from collections import deque
from typing import Any, Iterator

from cfnlint.helpers import ensure_list, is_function
from cfnlint.jsonschema import ValidationError, ValidationResult
from cfnlint.jsonschema.protocols import Validator
from cfnlint.rules.helpers import get_resource_by_name, get_value_from_path
from cfnlint.rules.jsonschema.CfnLintKeyword import CfnLintKeyword


class ServiceNetworkConfiguration(CfnLintKeyword):
id = "E3052"
shortdesc = "Validate ECS service requires NetworkConfiguration"
description = (
"When using an ECS task definition has NetworkMode set to "
"'awsvpc' then 'NetworkConfiguration' is required"
)
tags = ["resources", "ecs"]

def __init__(self) -> None:
super().__init__(
keywords=["Resources/AWS::ECS::Service/Properties"],
)

def _filter_resource_name(self, instance: Any) -> str | None:
fn_k, fn_v = is_function(instance)
if fn_k is None:
return None
if fn_k == "Ref":
if isinstance(fn_v, str):
return fn_v
elif fn_k == "Fn::GetAtt":
name = ensure_list(fn_v)[0].split(".")[0]
if isinstance(name, str):
return name
return None

def _get_service_properties(
self, validator: Validator, instance: Any
) -> Iterator[tuple[str, str, Validator]]:
for task_definition_id, task_definition_validator in get_value_from_path(
validator, instance, deque(["TaskDefinition"])
):
task_definition_resource_name = self._filter_resource_name(
task_definition_id
)
if task_definition_resource_name is None:
continue

for (
network_configuration,
network_configuration_validator,
) in get_value_from_path(
task_definition_validator, instance, deque(["NetworkConfiguration"])
):
yield (
task_definition_resource_name,
network_configuration,
network_configuration_validator,
)

def _get_task_definition_properties(
self, validator: Validator, resource_name: Any
) -> Iterator[tuple[str | int | None, Validator]]:
target_group, target_group_validator = get_resource_by_name(
validator, resource_name, ["AWS::ECS::TaskDefinition"]
)
if not target_group:
return

for network_mode, network_mode_validator in get_value_from_path(
target_group_validator,
target_group,
path=deque(["Properties", "NetworkMode"]),
):
if network_mode == "awsvpc":
yield network_mode, network_mode_validator

def validate(
self, validator: Validator, _: Any, instance: Any, schema: dict[str, Any]
) -> ValidationResult:

for (
task_definition_resource_name,
network_configuration,
task_definition_validator,
) in self._get_service_properties(
validator,
instance,
):
for _, _ in self._get_task_definition_properties(
task_definition_validator,
task_definition_resource_name,
):
if network_configuration is None:
yield ValidationError(
"'NetworkConfiguration' is a required property",
validator="required",
rule=self,
)
81 changes: 81 additions & 0 deletions src/cfnlint/rules/resources/ecs/TaskDefinitionAwsVpc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""

from __future__ import annotations

from collections import deque
from typing import Any, Iterator

from cfnlint.jsonschema import ValidationError, ValidationResult
from cfnlint.jsonschema.protocols import Validator
from cfnlint.rules.helpers import get_value_from_path
from cfnlint.rules.jsonschema.CfnLintKeyword import CfnLintKeyword


class TaskDefinitionAwsVpc(CfnLintKeyword):
id = "E3053"
shortdesc = "Validate ECS task definition is has correct values for 'HostPort'"
description = (
"The 'HostPort' must either be undefined or equal to "
"the 'ContainerPort' value"
)
tags = ["resources", "ecs"]

def __init__(self) -> None:
super().__init__(
keywords=["Resources/AWS::ECS::TaskDefinition/Properties"],
)

def _get_port_mappings(
self, validator: Validator, instance: Any
) -> Iterator[tuple[str | int | None, str | int | None, Validator]]:

for container_definition, container_definition_validator in get_value_from_path(
validator,
instance,
path=deque(["ContainerDefinitions", "*", "PortMappings", "*"]),
):
for host_port, host_port_validator in get_value_from_path(
container_definition_validator,
container_definition,
path=deque(["HostPort"]),
):
if not isinstance(host_port, (str, int)):
continue
for container_port, _ in get_value_from_path(
host_port_validator,
container_definition,
path=deque(["ContainerPort"]),
):
if not isinstance(container_port, (str, int)):
continue
if str(host_port) != str(container_port):
yield host_port, container_port, host_port_validator

def validate(
self, validator: Validator, _: Any, instance: Any, schema: dict[str, Any]
) -> ValidationResult:

for network_mode, _ in get_value_from_path(
validator,
instance,
path=deque(["NetworkMode"]),
):
if network_mode != "awsvpc":
continue
for (
host_port,
container_port,
port_mapping_validator,
) in self._get_port_mappings(
validator,
instance,
):
yield ValidationError(
f"{host_port!r} does not equal {container_port!r}",
validator="const",
rule=self,
path_override=port_mapping_validator.context.path.path,
)
Loading
Loading