Skip to content

Commit 50f4965

Browse files
author
Anton Karpets
authored
File-based CDK: avoid error on empty stream when running discover (#38230)
1 parent e669832 commit 50f4965

File tree

3 files changed

+54
-8
lines changed

3 files changed

+54
-8
lines changed

airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
FileBasedSourceError,
1717
InvalidSchemaError,
1818
MissingSchemaError,
19-
NoFilesMatchingError,
2019
RecordParseError,
2120
SchemaInferenceError,
2221
StopSyncPerValidationPolicy,
@@ -172,7 +171,7 @@ def get_json_schema(self) -> JsonSchema:
172171
}
173172
try:
174173
schema = self._get_raw_json_schema()
175-
except (InvalidSchemaError, NoFilesMatchingError) as config_exception:
174+
except InvalidSchemaError as config_exception:
176175
self.logger.exception(FileBasedSourceError.SCHEMA_INFERENCE_ERROR.value, exc_info=config_exception)
177176
raise AirbyteTracedException(
178177
internal_message="Please check the logged errors for more information.",
@@ -195,7 +194,8 @@ def _get_raw_json_schema(self) -> JsonSchema:
195194
total_n_files = len(files)
196195

197196
if total_n_files == 0:
198-
raise NoFilesMatchingError(FileBasedSourceError.EMPTY_STREAM, stream=self.name)
197+
self.logger.warning(msg=f"No files were identified in the stream {self.name}. Setting default schema for the stream.")
198+
return schemaless_schema
199199

200200
max_n_files_for_schema_inference = self._discovery_policy.get_max_n_files_for_schema_inference(self.get_parser())
201201
if total_n_files > max_n_files_for_schema_inference:

airbyte-cdk/python/unit_tests/sources/file_based/scenarios/csv_scenarios.py

+47-4
Original file line numberDiff line numberDiff line change
@@ -3029,7 +3029,6 @@
30293029
.set_file_type("csv")
30303030
)
30313031
.set_expected_check_status("FAILED")
3032-
.set_expected_check_error(AirbyteTracedException, FileBasedSourceError.EMPTY_STREAM.value)
30333032
.set_expected_catalog(
30343033
{
30353034
"streams": [
@@ -3038,10 +3037,9 @@
30383037
"json_schema": {
30393038
"type": "object",
30403039
"properties": {
3041-
"col1": {"type": "string"},
3042-
"col2": {"type": "string"},
30433040
"_ab_source_file_last_modified": {"type": "string"},
30443041
"_ab_source_file_url": {"type": "string"},
3042+
"data": {"type": "object"},
30453043
},
30463044
},
30473045
"name": "stream1",
@@ -3051,7 +3049,7 @@
30513049
]
30523050
}
30533051
)
3054-
.set_expected_discover_error(AirbyteTracedException, FileBasedSourceError.SCHEMA_INFERENCE_ERROR.value)
3052+
.set_expected_records(None)
30553053
).build()
30563054

30573055
csv_no_records_scenario: TestScenario[InMemoryFilesSource] = (
@@ -3109,3 +3107,48 @@
31093107
)
31103108
.set_expected_records([])
31113109
).build()
3110+
3111+
csv_no_files_scenario: TestScenario[InMemoryFilesSource] = (
3112+
TestScenarioBuilder[InMemoryFilesSource]()
3113+
.set_name("no_files_csv_stream")
3114+
.set_config(
3115+
{
3116+
"streams": [
3117+
{
3118+
"name": "stream1",
3119+
"format": {"filetype": "csv"},
3120+
"globs": ["*"],
3121+
"validation_policy": "Emit Record",
3122+
}
3123+
],
3124+
"start_date": "2023-06-10T03:54:07.000000Z",
3125+
}
3126+
)
3127+
.set_source_builder(
3128+
FileBasedSourceBuilder()
3129+
.set_files({})
3130+
.set_file_type("csv")
3131+
)
3132+
.set_expected_check_status("FAILED")
3133+
.set_expected_catalog(
3134+
{
3135+
"streams": [
3136+
{
3137+
"default_cursor_field": ["_ab_source_file_last_modified"],
3138+
"json_schema": {
3139+
"type": "object",
3140+
"properties": {
3141+
"_ab_source_file_last_modified": {"type": "string"},
3142+
"_ab_source_file_url": {"type": "string"},
3143+
"data": {"type": "object"},
3144+
},
3145+
},
3146+
"name": "stream1",
3147+
"source_defined_cursor": True,
3148+
"supported_sync_modes": ["full_refresh", "incremental"],
3149+
}
3150+
]
3151+
}
3152+
)
3153+
.set_expected_records(None)
3154+
).build()

airbyte-cdk/python/unit_tests/sources/file_based/test_file_based_scenarios.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
csv_multi_stream_scenario,
6363
csv_newline_in_values_not_quoted_scenario,
6464
csv_newline_in_values_quoted_value_scenario,
65+
csv_no_files_scenario,
6566
csv_no_records_scenario,
6667
csv_single_stream_scenario,
6768
csv_skip_after_header_scenario,
@@ -154,7 +155,6 @@
154155
from unit_tests.sources.file_based.test_scenarios import verify_check, verify_discover, verify_read, verify_spec
155156

156157
discover_failure_scenarios = [
157-
earlier_csv_scenario,
158158
empty_schema_inference_scenario,
159159
]
160160

@@ -263,6 +263,8 @@
263263
single_csv_input_state_is_earlier_scenario_concurrent,
264264
single_csv_input_state_is_later_scenario_concurrent,
265265
single_csv_no_input_state_scenario_concurrent,
266+
earlier_csv_scenario,
267+
csv_no_files_scenario,
266268
]
267269

268270
discover_scenarios = discover_failure_scenarios + discover_success_scenarios
@@ -296,6 +298,7 @@
296298
valid_single_stream_user_input_schema_scenario,
297299
single_avro_scenario,
298300
earlier_csv_scenario,
301+
csv_no_files_scenario,
299302
]
300303

301304

0 commit comments

Comments
 (0)