3
3
#
4
4
5
5
from abc import ABC , abstractmethod
6
- from multiprocessing import context
6
+ from enum import Enum
7
7
8
8
import jsonschema
9
9
from airbyte_cdk .models import ConnectorSpecification
13
13
from source_acceptance_test .utils import SecretDict
14
14
15
15
16
+ class BackwardIncompatibilityContext (Enum ):
17
+ SPEC = 1
18
+ DISCOVER = 2
19
+
20
+
16
21
class NonBackwardCompatibleError (Exception ):
17
- pass
22
+ def __init__ (self , error_message : str , context : BackwardIncompatibilityContext ) -> None :
23
+ self .error_message = error_message
24
+ self .context = context
25
+ super ().__init__ (error_message )
26
+
27
+ def __str__ (self ):
28
+ return f"{ self .context } - { self .error_message } "
18
29
19
30
20
31
class BaseDiffChecker (ABC ):
21
- def __init__ (self , diff : DeepDiff ) -> None :
22
- self ._diff = diff
32
+ def __init__ (self , previous : dict , current : dict ) -> None :
33
+ self ._previous = previous
34
+ self ._current = current
35
+ self .compute_diffs ()
23
36
24
- def _raise_error (self , message : str ):
25
- raise NonBackwardCompatibleError (f"{ context } - { message } . Diff: { self . _diff . pretty ()} " )
37
+ def _raise_error (self , message : str , diff : DeepDiff ):
38
+ raise NonBackwardCompatibleError (f"{ message } . Diff: { diff . pretty ()} " , self . context )
26
39
27
40
@property
28
41
@abstractmethod
29
42
def context (self ): # pragma: no cover
30
43
pass
31
44
45
+ @abstractmethod
46
+ def compute_diffs (self ): # pragma: no cover
47
+ pass
48
+
32
49
@abstractmethod
33
50
def assert_is_backward_compatible (self ): # pragma: no cover
34
51
pass
35
52
36
- def check_if_value_of_type_field_changed (self ):
53
+ def check_if_value_of_type_field_changed (self , diff : DeepDiff ):
37
54
"""Check if a type was changed"""
38
55
# Detect type value change in case type field is declared as a string (e.g "str" -> "int"):
39
- type_values_changed = [change for change in self . _diff .get ("values_changed" , []) if change .path (output_format = "list" )[- 1 ] == "type" ]
56
+ type_values_changed = [change for change in diff .get ("values_changed" , []) if change .path (output_format = "list" )[- 1 ] == "type" ]
40
57
41
58
# Detect type value change in case type field is declared as a single item list (e.g ["str"] -> ["int"]):
42
59
type_values_changed_in_list = [
43
- change for change in self . _diff .get ("values_changed" , []) if change .path (output_format = "list" )[- 2 ] == "type"
60
+ change for change in diff .get ("values_changed" , []) if change .path (output_format = "list" )[- 2 ] == "type"
44
61
]
45
62
if type_values_changed or type_values_changed_in_list :
46
- self ._raise_error ("The'type' field value was changed." )
63
+ self ._raise_error ("The'type' field value was changed." , diff )
47
64
48
- def check_if_new_type_was_added (self ): # pragma: no cover
65
+ def check_if_new_type_was_added (self , diff : DeepDiff ): # pragma: no cover
49
66
"""Detect type value added to type list if new type value is not None (e.g ["str"] -> ["str", "int"])"""
50
67
new_values_in_type_list = [
51
68
change
52
- for change in self . _diff .get ("iterable_item_added" , [])
69
+ for change in diff .get ("iterable_item_added" , [])
53
70
if change .path (output_format = "list" )[- 2 ] == "type"
54
71
if change .t2 != "null"
55
72
]
56
73
if new_values_in_type_list :
57
74
self ._raise_error ("A new value was added to a 'type' field" )
58
75
59
- def check_if_type_of_type_field_changed (self ):
76
+ def check_if_type_of_type_field_changed (self , diff : DeepDiff ):
60
77
"""
61
78
Detect the change of type of a type field
62
79
e.g:
@@ -68,83 +85,89 @@ def check_if_type_of_type_field_changed(self):
68
85
- ["str"] -> "int" INVALID
69
86
- ["str"] -> 1 INVALID
70
87
"""
71
- type_changes = [change for change in self . _diff .get ("type_changes" , []) if change .path (output_format = "list" )[- 1 ] == "type" ]
88
+ type_changes = [change for change in diff .get ("type_changes" , []) if change .path (output_format = "list" )[- 1 ] == "type" ]
72
89
for change in type_changes :
73
90
# We only accept change on the type field if the new type for this field is list or string
74
91
# This might be something already guaranteed by JSON schema validation.
75
92
if isinstance (change .t1 , str ):
76
93
if not isinstance (change .t2 , list ):
77
- self ._raise_error ("A 'type' field was changed from string to an invalid value." )
94
+ self ._raise_error ("A 'type' field was changed from string to an invalid value." , diff )
78
95
# If the new type field is a list we want to make sure it only has the original type (t1) and null: e.g. "str" -> ["str", "null"]
79
96
# We want to raise an error otherwise.
80
97
t2_not_null_types = [_type for _type in change .t2 if _type != "null" ]
81
98
if not (len (t2_not_null_types ) == 1 and t2_not_null_types [0 ] == change .t1 ):
82
- self ._raise_error ("The 'type' field was changed to a list with multiple invalid values" )
99
+ self ._raise_error ("The 'type' field was changed to a list with multiple invalid values" , diff )
83
100
if isinstance (change .t1 , list ):
84
101
if not isinstance (change .t2 , str ):
85
- self ._raise_error ("The 'type' field was changed from a list to an invalid value" )
102
+ self ._raise_error ("The 'type' field was changed from a list to an invalid value" , diff )
86
103
if not (len (change .t1 ) == 1 and change .t2 == change .t1 [0 ]):
87
- self ._raise_error ("An element was removed from the list of 'type'" )
104
+ self ._raise_error ("An element was removed from the list of 'type'" , diff )
88
105
89
106
90
107
class SpecDiffChecker (BaseDiffChecker ):
91
108
"""A class to perform backward compatibility checks on a connector specification diff"""
92
109
93
- context = "Specification"
110
+ context = BackwardIncompatibilityContext .SPEC
111
+
112
+ def compute_diffs (self ):
113
+ self .connection_specification_diff = DeepDiff (
114
+ self ._previous ["connectionSpecification" ],
115
+ self ._current ["connectionSpecification" ],
116
+ view = "tree" ,
117
+ ignore_order = True ,
118
+ )
94
119
95
120
def assert_is_backward_compatible (self ):
96
- self .check_if_declared_new_required_field ()
97
- self .check_if_added_a_new_required_property ()
98
- self .check_if_value_of_type_field_changed ()
99
- # self.check_if_new_type_was_added() We want to allow type expansion atm
100
- self .check_if_type_of_type_field_changed ()
101
- self .check_if_field_was_made_not_nullable ()
102
- self .check_if_enum_was_narrowed ()
103
- self .check_if_declared_new_enum_field ()
104
-
105
- def check_if_declared_new_required_field (self ):
121
+ self .check_if_declared_new_required_field (self . connection_specification_diff )
122
+ self .check_if_added_a_new_required_property (self . connection_specification_diff )
123
+ self .check_if_value_of_type_field_changed (self . connection_specification_diff )
124
+ # self.check_if_new_type_was_added(self.connection_specification_diff ) We want to allow type expansion atm
125
+ self .check_if_type_of_type_field_changed (self . connection_specification_diff )
126
+ self .check_if_field_was_made_not_nullable (self . connection_specification_diff )
127
+ self .check_if_enum_was_narrowed (self . connection_specification_diff )
128
+ self .check_if_declared_new_enum_field (self . connection_specification_diff )
129
+
130
+ def check_if_declared_new_required_field (self , diff : DeepDiff ):
106
131
"""Check if the new spec declared a 'required' field."""
107
132
added_required_fields = [
108
- addition for addition in self . _diff .get ("dictionary_item_added" , []) if addition .path (output_format = "list" )[- 1 ] == "required"
133
+ addition for addition in diff .get ("dictionary_item_added" , []) if addition .path (output_format = "list" )[- 1 ] == "required"
109
134
]
110
135
if added_required_fields :
111
- self ._raise_error ("A new 'required' field was declared." )
136
+ self ._raise_error ("A new 'required' field was declared." , diff )
112
137
113
- def check_if_added_a_new_required_property (self ):
138
+ def check_if_added_a_new_required_property (self , diff : DeepDiff ):
114
139
"""Check if the new spec added a property to the 'required' list"""
115
140
added_required_properties = [
116
- addition for addition in self . _diff .get ("iterable_item_added" , []) if addition .up .path (output_format = "list" )[- 1 ] == "required"
141
+ addition for addition in diff .get ("iterable_item_added" , []) if addition .up .path (output_format = "list" )[- 1 ] == "required"
117
142
]
118
143
if added_required_properties :
119
- self ._raise_error ("A new property was added to 'required'" )
144
+ self ._raise_error ("A new property was added to 'required'" , diff )
120
145
121
- def check_if_field_was_made_not_nullable (self ):
146
+ def check_if_field_was_made_not_nullable (self , diff : DeepDiff ):
122
147
"""Detect when field was made not nullable but is still a list: e.g ["string", "null"] -> ["string"]"""
123
- removed_nullable = [
124
- change for change in self ._diff .get ("iterable_item_removed" , []) if change .path (output_format = "list" )[- 2 ] == "type"
125
- ]
148
+ removed_nullable = [change for change in diff .get ("iterable_item_removed" , []) if change .path (output_format = "list" )[- 2 ] == "type" ]
126
149
if removed_nullable :
127
- self ._raise_error ("A field type was narrowed or made a field not nullable" )
150
+ self ._raise_error ("A field type was narrowed or made a field not nullable" , diff )
128
151
129
- def check_if_enum_was_narrowed (self ):
152
+ def check_if_enum_was_narrowed (self , diff : DeepDiff ):
130
153
"""Check if the list of values in a enum was shortened in a spec."""
131
154
enum_removals = [
132
155
enum_removal
133
- for enum_removal in self . _diff .get ("iterable_item_removed" , [])
156
+ for enum_removal in diff .get ("iterable_item_removed" , [])
134
157
if enum_removal .up .path (output_format = "list" )[- 1 ] == "enum"
135
158
]
136
159
if enum_removals :
137
- self ._raise_error ("An enum field was narrowed." )
160
+ self ._raise_error ("An enum field was narrowed." , diff )
138
161
139
- def check_if_declared_new_enum_field (self ):
162
+ def check_if_declared_new_enum_field (self , diff : DeepDiff ):
140
163
"""Check if an 'enum' field was added to the spec."""
141
164
enum_additions = [
142
165
enum_addition
143
- for enum_addition in self . _diff .get ("dictionary_item_added" , [])
166
+ for enum_addition in diff .get ("dictionary_item_added" , [])
144
167
if enum_addition .path (output_format = "list" )[- 1 ] == "enum"
145
168
]
146
169
if enum_additions :
147
- self ._raise_error ("An 'enum' field was declared on an existing property" )
170
+ self ._raise_error ("An 'enum' field was declared on an existing property" , diff )
148
171
149
172
150
173
def validate_previous_configs (
@@ -163,26 +186,45 @@ def check_fake_previous_config_against_actual_spec(fake_previous_config):
163
186
try :
164
187
jsonschema .validate (instance = filtered_fake_previous_config , schema = actual_connector_spec .connectionSpecification )
165
188
except jsonschema .exceptions .ValidationError as err :
166
- raise NonBackwardCompatibleError (err )
189
+ raise NonBackwardCompatibleError (err , BackwardIncompatibilityContext . SPEC )
167
190
168
191
check_fake_previous_config_against_actual_spec ()
169
192
170
193
171
194
class CatalogDiffChecker (BaseDiffChecker ):
172
195
"""A class to perform backward compatibility checks on a discoverd catalog diff"""
173
196
174
- context = "Catalog"
197
+ context = BackwardIncompatibilityContext .DISCOVER
198
+
199
+ def compute_diffs (self ):
200
+ self .streams_json_schemas_diff = DeepDiff (
201
+ {stream_name : airbyte_stream .dict ().pop ("json_schema" ) for stream_name , airbyte_stream in self ._previous .items ()},
202
+ {stream_name : airbyte_stream .dict ().pop ("json_schema" ) for stream_name , airbyte_stream in self ._current .items ()},
203
+ view = "tree" ,
204
+ ignore_order = True ,
205
+ )
206
+ self .streams_cursor_fields_diff = DeepDiff (
207
+ {stream_name : airbyte_stream .dict ().pop ("default_cursor_field" ) for stream_name , airbyte_stream in self ._previous .items ()},
208
+ {stream_name : airbyte_stream .dict ().pop ("default_cursor_field" ) for stream_name , airbyte_stream in self ._current .items ()},
209
+ view = "tree" ,
210
+ )
175
211
176
212
def assert_is_backward_compatible (self ):
177
- self .check_if_stream_was_removed ()
178
- self .check_if_value_of_type_field_changed ()
179
- self .check_if_type_of_type_field_changed ()
213
+ self .check_if_stream_was_removed (self .streams_json_schemas_diff )
214
+ self .check_if_value_of_type_field_changed (self .streams_json_schemas_diff )
215
+ self .check_if_type_of_type_field_changed (self .streams_json_schemas_diff )
216
+ self .check_if_cursor_field_was_changed (self .streams_cursor_fields_diff )
180
217
181
- def check_if_stream_was_removed (self ):
218
+ def check_if_stream_was_removed (self , diff : DeepDiff ):
182
219
"""Check if a stream was removed from the catalog."""
183
220
removed_streams = []
184
- for removal in self . _diff .get ("dictionary_item_removed" , []):
221
+ for removal in diff .get ("dictionary_item_removed" , []):
185
222
if removal .path () != "root" and removal .up .path () == "root" :
186
223
removed_streams .append (removal .path (output_format = "list" )[0 ])
187
224
if removed_streams :
188
- self ._raise_error (f"The following streams were removed: { ',' .join (removed_streams )} " )
225
+ self ._raise_error (f"The following streams were removed: { ',' .join (removed_streams )} " , diff )
226
+
227
+ def check_if_cursor_field_was_changed (self , diff : DeepDiff ):
228
+ """Check if a default cursor field value was changed."""
229
+ if diff :
230
+ self ._raise_error ("The value of 'default_cursor_field' was changed" , diff )
0 commit comments