5
5
import logging
6
6
import tempfile
7
7
from collections import defaultdict
8
+ from collections .abc import Iterable , Iterator , MutableMapping
8
9
from dataclasses import dataclass , field
9
10
from enum import Enum
10
11
from pathlib import Path
11
- from typing import Any , Dict , Iterable , Iterator , List , MutableMapping , Optional , Type
12
+ from typing import Any , Optional
12
13
13
14
import _collections_abc
14
15
import dagger
15
16
import requests
16
-
17
- # type: ignore
18
- from airbyte_protocol .models import AirbyteCatalog , AirbyteMessage , ConfiguredAirbyteCatalog # type: ignore
17
+ from airbyte_protocol . models import AirbyteCatalog # type: ignore
18
+ from airbyte_protocol . models import AirbyteMessage # type: ignore
19
+ from airbyte_protocol .models import ConfiguredAirbyteCatalog # type: ignore
19
20
from airbyte_protocol .models import Type as AirbyteMessageType
20
21
from genson import SchemaBuilder # type: ignore
21
22
from live_tests .commons .backends import DuckDbBackend , FileBackend
@@ -174,7 +175,7 @@ def actor_type(self) -> ActorType:
174
175
175
176
@classmethod
176
177
async def from_image_name (
177
- cls : Type [ConnectorUnderTest ],
178
+ cls : type [ConnectorUnderTest ],
178
179
dagger_client : dagger .Client ,
179
180
image_name : str ,
180
181
target_or_control : TargetOrControl ,
@@ -191,8 +192,8 @@ class ExecutionInputs:
191
192
command : Command
192
193
config : Optional [SecretDict ] = None
193
194
configured_catalog : Optional [ConfiguredAirbyteCatalog ] = None
194
- state : Optional [Dict ] = None
195
- environment_variables : Optional [Dict ] = None
195
+ state : Optional [dict ] = None
196
+ environment_variables : Optional [dict ] = None
196
197
duckdb_path : Optional [Path ] = None
197
198
198
199
def raise_if_missing_attr_for_command (self , attribute : str ) -> None :
@@ -232,8 +233,8 @@ class ExecutionResult:
232
233
success : bool
233
234
executed_container : Optional [dagger .Container ]
234
235
http_dump : Optional [dagger .File ] = None
235
- http_flows : List [http .HTTPFlow ] = field (default_factory = list )
236
- stream_schemas : Optional [Dict [str , Any ]] = None
236
+ http_flows : list [http .HTTPFlow ] = field (default_factory = list )
237
+ stream_schemas : Optional [dict [str , Any ]] = None
237
238
backend : Optional [FileBackend ] = None
238
239
239
240
HTTP_DUMP_FILE_NAME = "http_dump.mitm"
@@ -253,7 +254,7 @@ def duckdb_schema(self) -> Iterable[str]:
253
254
254
255
@classmethod
255
256
async def load (
256
- cls : Type [ExecutionResult ],
257
+ cls : type [ExecutionResult ],
257
258
connector_under_test : ConnectorUnderTest ,
258
259
actor_id : str ,
259
260
command : Command ,
@@ -286,7 +287,7 @@ async def load_http_flows(self) -> None:
286
287
def parse_airbyte_messages_from_command_output (
287
288
self , command_output_path : Path , log_validation_errors : bool = False
288
289
) -> Iterable [AirbyteMessage ]:
289
- with open (command_output_path , "r" ) as command_output :
290
+ with open (command_output_path ) as command_output :
290
291
for line in command_output :
291
292
try :
292
293
yield AirbyteMessage .parse_raw (line )
@@ -302,9 +303,9 @@ def get_records(self) -> Iterable[AirbyteMessage]:
302
303
if message .type is AirbyteMessageType .RECORD :
303
304
yield message
304
305
305
- def generate_stream_schemas (self ) -> Dict [str , Any ]:
306
+ def generate_stream_schemas (self ) -> dict [str , Any ]:
306
307
self .logger .info ("Generating stream schemas" )
307
- stream_builders : Dict [str , SchemaBuilder ] = {}
308
+ stream_builders : dict [str , SchemaBuilder ] = {}
308
309
for record in self .get_records ():
309
310
stream = record .record .stream
310
311
if stream not in stream_builders :
@@ -328,8 +329,8 @@ def get_records_per_stream(self, stream: str) -> Iterator[AirbyteMessage]:
328
329
if message .type is AirbyteMessageType .RECORD :
329
330
yield message
330
331
331
- def get_message_count_per_type (self ) -> Dict [AirbyteMessageType , int ]:
332
- message_count : Dict [AirbyteMessageType , int ] = defaultdict (int )
332
+ def get_message_count_per_type (self ) -> dict [AirbyteMessageType , int ]:
333
+ message_count : dict [AirbyteMessageType , int ] = defaultdict (int )
333
334
for message in self .airbyte_messages :
334
335
message_count [message .type ] += 1
335
336
return message_count
@@ -376,7 +377,7 @@ async def save_artifacts(self, output_dir: Path, duckdb_path: Optional[Path] = N
376
377
self .save_stream_schemas (output_dir )
377
378
self .logger .info ("All artifacts saved to disk" )
378
379
379
- def get_updated_configuration (self , control_message_path : Path ) -> Optional [Dict [str , Any ]]:
380
+ def get_updated_configuration (self , control_message_path : Path ) -> Optional [dict [str , Any ]]:
380
381
"""Iterate through the control messages to find CONNECTOR_CONFIG message and return the last updated configuration."""
381
382
if not control_message_path .exists ():
382
383
return None
@@ -403,7 +404,7 @@ def update_configuration(self) -> None:
403
404
payload = {
404
405
"configuration" : {
405
406
** updated_configuration ,
406
- ** { f"{ self .connector_under_test .actor_type .value } Type" : self .connector_under_test .name_without_type_prefix } ,
407
+ f"{ self .connector_under_test .actor_type .value } Type" : self .connector_under_test .name_without_type_prefix ,
407
408
}
408
409
}
409
410
headers = {
@@ -427,7 +428,7 @@ class ConnectionObjects:
427
428
destination_config : Optional [SecretDict ]
428
429
configured_catalog : Optional [ConfiguredAirbyteCatalog ]
429
430
catalog : Optional [AirbyteCatalog ]
430
- state : Optional [Dict ]
431
+ state : Optional [dict ]
431
432
workspace_id : Optional [str ]
432
433
source_id : Optional [str ]
433
434
destination_id : Optional [str ]
0 commit comments