-
Notifications
You must be signed in to change notification settings - Fork 606
/
Copy pathVpcSubnetCidr.py
197 lines (175 loc) · 7.43 KB
/
VpcSubnetCidr.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""
from __future__ import annotations
import logging
from collections import deque
from ipaddress import IPv4Network, IPv6Network, ip_network
from typing import Any, Iterator
from cfnlint.context import Path
from cfnlint.jsonschema import ValidationError, ValidationResult, Validator
from cfnlint.rules.helpers import get_value_from_path
from cfnlint.rules.jsonschema.CfnLintKeyword import CfnLintKeyword
LOGGER = logging.getLogger(__name__)
class VpcSubnetCidr(CfnLintKeyword):
id = "E3059"
shortdesc = "Validate subnet CIDRs are within the CIDRs of the VPC"
description = (
"When specifying subnet CIDRs for a VPC the subnet CIDRs "
"most be within the VPC CIDRs"
)
source_url = "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ec2-subnet.html"
tags = ["resources", "ec2", "vpc", "subnet"]
def __init__(self) -> None:
super().__init__(
keywords=[
"Resources/AWS::EC2::VPC/Properties",
],
)
def _validate_subnets(
self,
source: IPv4Network | IPv6Network,
destination: IPv4Network | IPv6Network,
) -> bool:
if isinstance(source, IPv4Network) and isinstance(destination, IPv4Network):
if source.subnet_of(destination):
return True
return False
elif isinstance(source, IPv6Network) and isinstance(destination, IPv6Network):
if source.subnet_of(destination):
return True
return False
return False
def _create_network(self, cidr: Any) -> IPv4Network | IPv6Network | None:
if not isinstance(cidr, str):
return None
try:
return ip_network(cidr)
except Exception as e:
LOGGER.debug(f"Unable to create network from {cidr}", e)
return None
def _get_vpc_cidrs(
self, validator: Validator, instance: dict[str, Any]
) -> Iterator[tuple[IPv4Network | IPv6Network | None, Validator]]:
for key in [
"Ipv4IpamPoolId",
"Ipv6IpamPoolId",
"Ipv6Pool",
"AmazonProvidedIpv6CidrBlock",
]:
for value, value_validator in get_value_from_path(
validator,
instance,
deque([key]),
):
if value is None:
continue
yield None, value_validator
for key in ["CidrBlock", "Ipv6CidrBlock"]:
for cidr, cidr_validator in get_value_from_path(
validator,
instance,
deque([key]),
):
if cidr is None:
continue
yield self._create_network(cidr), cidr_validator
def validate(
self, validator: Validator, keywords: Any, instance: Any, schema: dict[str, Any]
) -> ValidationResult:
if not validator.cfn.graph:
return
vpc_ipv4_networks: list[IPv4Network] = []
vpc_ipv6_networks: list[IPv6Network] = []
for vpc_network, _ in self._get_vpc_cidrs(validator, instance):
if not vpc_network:
return
if isinstance(vpc_network, IPv4Network):
vpc_ipv4_networks.append(vpc_network)
# you can't specify IPV6 networks on a VPC
template_validator = validator.evolve(
context=validator.context.evolve(path=Path())
)
# dynamic vpc network (using IPAM or AWS provided)
# allows to validate subnet overlapping even if using
# dynamic networks
has_dynamic_network = False
for source, _ in validator.cfn.graph.graph.in_edges(
validator.context.path.path[1]
):
if (
validator.cfn.graph.graph.nodes[source].get("resource_type")
== "AWS::EC2::VPCCidrBlock"
):
for cidr_props, cidr_validator in get_value_from_path(
template_validator,
validator.cfn.template,
deque(["Resources", source, "Properties"]),
):
for cidr_network, _ in self._get_vpc_cidrs(
cidr_validator, cidr_props
):
if not cidr_network:
has_dynamic_network = True
continue
if isinstance(cidr_network, IPv4Network):
vpc_ipv4_networks.append(cidr_network)
else:
vpc_ipv6_networks.append(cidr_network)
subnets: list[tuple[IPv4Network | IPv6Network, deque]] = []
for source, _ in validator.cfn.graph.graph.in_edges(
validator.context.path.path[1]
):
if (
validator.cfn.graph.graph.nodes[source].get("resource_type")
== "AWS::EC2::Subnet"
):
for subnet_props, source_validator in get_value_from_path(
template_validator,
validator.cfn.template,
deque(["Resources", source, "Properties"]),
):
for subnet_network, subnet_validator in self._get_vpc_cidrs(
source_validator, subnet_props
):
if not subnet_network:
continue
subnets.append(
(subnet_network, subnet_validator.context.path.path)
)
if has_dynamic_network:
continue
if not any(
self._validate_subnets(
subnet_network,
vpc_network,
)
for vpc_network in vpc_ipv4_networks + vpc_ipv6_networks
):
if isinstance(subnet_network, IPv4Network):
# Every VPC has to have a ipv4 network
# we continue if there isn't one
if not vpc_ipv4_networks:
continue
reprs = (
"is not a valid subnet of "
f"{[f'{str(v)}' for v in vpc_ipv4_networks]!r}"
)
else:
if not vpc_ipv6_networks:
reprs = (
"is specified on a VPC that has "
"no ipv6 networks defined"
)
else:
reprs = (
"is not a valid subnet of "
f"{[f'{str(v)}' for v in vpc_ipv6_networks]!r}"
)
yield ValidationError(
(f"{str(subnet_network)!r} {reprs}"),
rule=self,
path_override=subnet_validator.context.path.path,
)
continue