8
8
from collections import defaultdict
9
9
from typing import Any , Dict , List , Mapping
10
10
11
- import dpath .util
12
11
import pendulum
13
12
from airbyte_protocol .models import AirbyteRecordMessage , ConfiguredAirbyteCatalog
14
13
from jsonschema import Draft7Validator , FormatChecker , FormatError , ValidationError , validators
26
25
Draft7ValidatorWithStrictInteger = validators .extend (Draft7Validator , type_checker = strict_integer_type_checker )
27
26
28
27
28
+ class NoAdditionalPropertiesValidator (Draft7Validator ):
29
+ def __init__ (self , schema , ** kwargs ):
30
+ schema = self ._enforce_false_additional_properties (schema )
31
+ super ().__init__ (schema , ** kwargs )
32
+
33
+ @staticmethod
34
+ def _enforce_false_additional_properties (json_schema : Dict [str , Any ]) -> Dict [str , Any ]:
35
+ """Create a copy of the schema in which `additionalProperties` is set to False for all non-null object properties.
36
+
37
+ This method will override the value of `additionalProperties` if it is set,
38
+ or will create the property and set it to False if it does not exist.
39
+ """
40
+ new_schema = copy .deepcopy (json_schema )
41
+ new_schema ["additionalProperties" ] = False
42
+
43
+ def add_properties (properties ):
44
+ for prop_name , prop_value in properties .items ():
45
+ if "type" in prop_value and "object" in prop_value ["type" ] and len (prop_value .get ("properties" , [])):
46
+ prop_value ["additionalProperties" ] = False
47
+ add_properties (prop_value .get ("properties" , {}))
48
+ elif "type" in prop_value and "array" in prop_value ["type" ]:
49
+ if (
50
+ prop_value .get ("items" )
51
+ and "object" in prop_value .get ("items" , {}).get ("type" )
52
+ and len (prop_value .get ("items" , {}).get ("properties" , []))
53
+ ):
54
+ prop_value ["items" ]["additionalProperties" ] = False
55
+ if prop_value .get ("items" , {}).get ("properties" ):
56
+ add_properties (prop_value ["items" ]["properties" ])
57
+
58
+ add_properties (new_schema .get ("properties" , {}))
59
+ return new_schema
60
+
61
+
29
62
class CustomFormatChecker (FormatChecker ):
30
63
@staticmethod
31
64
def check_datetime (value : str ) -> bool :
@@ -46,17 +79,6 @@ def check(self, instance, format):
46
79
return super ().check (instance , format )
47
80
48
81
49
- def _enforce_no_additional_top_level_properties (json_schema : Dict [str , Any ]):
50
- """Create a copy of the schema in which `additionalProperties` is set to False for the dict of top-level properties.
51
-
52
- This method will override the value of `additionalProperties` if it is set,
53
- or will create the property and set it to False if it does not exist.
54
- """
55
- enforced_schema = copy .deepcopy (json_schema )
56
- dpath .util .new (enforced_schema , "additionalProperties" , False )
57
- return enforced_schema
58
-
59
-
60
82
def verify_records_schema (
61
83
records : List [AirbyteRecordMessage ], catalog : ConfiguredAirbyteCatalog , fail_on_extra_columns : bool
62
84
) -> Mapping [str , Mapping [str , ValidationError ]]:
@@ -66,11 +88,8 @@ def verify_records_schema(
66
88
stream_validators = {}
67
89
for stream in catalog .streams :
68
90
schema_to_validate_against = stream .stream .json_schema
69
- if fail_on_extra_columns :
70
- schema_to_validate_against = _enforce_no_additional_top_level_properties (schema_to_validate_against )
71
- stream_validators [stream .stream .name ] = Draft7ValidatorWithStrictInteger (
72
- schema_to_validate_against , format_checker = CustomFormatChecker ()
73
- )
91
+ validator = NoAdditionalPropertiesValidator if fail_on_extra_columns else Draft7ValidatorWithStrictInteger
92
+ stream_validators [stream .stream .name ] = validator (schema_to_validate_against , format_checker = CustomFormatChecker ())
74
93
stream_errors = defaultdict (dict )
75
94
for record in records :
76
95
validator = stream_validators .get (record .stream )
0 commit comments