Skip to content

Commit 44a3494

Browse files
girardamaxi297
authored andcommitted
file-based CDK: Configurable strings_can_be_null (airbytehq#29298)
* [ISSUE airbytehq#28893] infer csv schema * [ISSUE airbytehq#28893] align with pyarrow * Automated Commit - Formatting Changes * [ISSUE airbytehq#28893] legacy inference and infer only when needed * [ISSUE airbytehq#28893] fix scenario tests * [ISSUE airbytehq#28893] using discovered schema as part of read * [ISSUE airbytehq#28893] self-review + cleanup * [ISSUE airbytehq#28893] fix test * [ISSUE airbytehq#28893] code review part #1 * [ISSUE airbytehq#28893] code review part #2 * Fix test * formatcdk * first pass * [ISSUE airbytehq#28893] code review * fix mypy issues * comment * rename for clarity * Add a scenario test case * this isn't optional anymore * FIX test log level * Re-adding failing tests * [ISSUE airbytehq#28893] improve inferrence to consider multiple types per value * Automated Commit - Formatting Changes * [ISSUE airbytehq#28893] remove InferenceType.PRIMITIVE_AND_COMPLEX_TYPES * Code review * Automated Commit - Formatting Changes * fix unit tests --------- Co-authored-by: maxi297 <[email protected]> Co-authored-by: maxi297 <[email protected]>
1 parent 8c4e7c7 commit 44a3494

File tree

5 files changed

+652
-513
lines changed

5 files changed

+652
-513
lines changed

airbyte-cdk/python/airbyte_cdk/sources/file_based/config/csv_format.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@ class Config:
6363
default=[],
6464
description="A set of case-sensitive strings that should be interpreted as null values. For example, if the value 'NA' should be interpreted as null, enter 'NA' in this field.",
6565
)
66+
strings_can_be_null: bool = Field(
67+
title="Strings Can Be Null",
68+
default=True,
69+
description="Whether strings can be interpreted as null values. If true, strings that match the null_values set will be interpreted as null. If false, strings that match the null_values set will be interpreted as the string itself.",
70+
)
6671
skip_rows_before_header: int = Field(
6772
title="Skip Rows Before Header",
6873
default=0,

airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/csv_parser.py

Lines changed: 54 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -160,10 +160,15 @@ def parse_records(
160160
discovered_schema: Optional[Mapping[str, SchemaType]],
161161
) -> Iterable[Dict[str, Any]]:
162162
config_format = _extract_format(config)
163-
cast_fn = CsvParser._get_cast_function(discovered_schema, config_format, logger)
163+
if discovered_schema:
164+
property_types = {col: prop["type"] for col, prop in discovered_schema["properties"].items()} # type: ignore # discovered_schema["properties"] is known to be a mapping
165+
deduped_property_types = CsvParser._pre_propcess_property_types(property_types)
166+
else:
167+
deduped_property_types = {}
168+
cast_fn = CsvParser._get_cast_function(deduped_property_types, config_format, logger)
164169
data_generator = self._csv_reader.read_data(config, file, stream_reader, logger, self.file_read_mode)
165170
for row in data_generator:
166-
yield CsvParser._to_nullable(cast_fn(row), config_format.null_values)
171+
yield CsvParser._to_nullable(cast_fn(row), deduped_property_types, config_format.null_values, config_format.strings_can_be_null)
167172
data_generator.close()
168173

169174
@property
@@ -172,24 +177,62 @@ def file_read_mode(self) -> FileReadMode:
172177

173178
@staticmethod
174179
def _get_cast_function(
175-
schema: Optional[Mapping[str, SchemaType]], config_format: CsvFormat, logger: logging.Logger
180+
deduped_property_types: Mapping[str, str], config_format: CsvFormat, logger: logging.Logger
176181
) -> Callable[[Mapping[str, str]], Mapping[str, str]]:
177182
# Only cast values if the schema is provided
178-
if schema:
179-
property_types = {col: prop["type"] for col, prop in schema["properties"].items()}
180-
return partial(CsvParser._cast_types, property_types=property_types, config_format=config_format, logger=logger)
183+
if deduped_property_types:
184+
return partial(CsvParser._cast_types, deduped_property_types=deduped_property_types, config_format=config_format, logger=logger)
181185
else:
182186
# If no schema is provided, yield the rows as they are
183187
return _no_cast
184188

185189
@staticmethod
186-
def _to_nullable(row: Mapping[str, str], null_values: Set[str]) -> Dict[str, Optional[str]]:
187-
nullable = row | {k: None if v in null_values else v for k, v in row.items()}
190+
def _to_nullable(
191+
row: Mapping[str, str], deduped_property_types: Mapping[str, str], null_values: Set[str], strings_can_be_null: bool
192+
) -> Dict[str, Optional[str]]:
193+
nullable = row | {
194+
k: None if CsvParser._value_is_none(v, deduped_property_types.get(k), null_values, strings_can_be_null) else v
195+
for k, v in row.items()
196+
}
188197
return nullable
189198

199+
@staticmethod
200+
def _value_is_none(value: Any, deduped_property_type: Optional[str], null_values: Set[str], strings_can_be_null: bool) -> bool:
201+
return value in null_values and (strings_can_be_null or deduped_property_type != "string")
202+
203+
@staticmethod
204+
def _pre_propcess_property_types(property_types: Dict[str, Any]) -> Mapping[str, str]:
205+
"""
206+
Transform the property types to be non-nullable and remove duplicate types if any.
207+
Sample input:
208+
{
209+
"col1": ["string", "null"],
210+
"col2": ["string", "string", "null"],
211+
"col3": "integer"
212+
}
213+
214+
Sample output:
215+
{
216+
"col1": "string",
217+
"col2": "string",
218+
"col3": "integer",
219+
}
220+
"""
221+
output = {}
222+
for prop, prop_type in property_types.items():
223+
if isinstance(prop_type, list):
224+
prop_type_distinct = set(prop_type)
225+
prop_type_distinct.remove("null")
226+
if len(prop_type_distinct) != 1:
227+
raise ValueError(f"Could not get non nullable type from {prop_type}")
228+
output[prop] = next(iter(prop_type_distinct))
229+
else:
230+
output[prop] = prop_type
231+
return output
232+
190233
@staticmethod
191234
def _cast_types(
192-
row: Dict[str, str], property_types: Dict[str, Any], config_format: CsvFormat, logger: logging.Logger
235+
row: Dict[str, str], deduped_property_types: Dict[str, str], config_format: CsvFormat, logger: logging.Logger
193236
) -> Dict[str, Any]:
194237
"""
195238
Casts the values in the input 'row' dictionary according to the types defined in the JSON schema.
@@ -202,17 +245,10 @@ def _cast_types(
202245
result = {}
203246

204247
for key, value in row.items():
205-
prop_type = property_types.get(key)
248+
prop_type = deduped_property_types.get(key)
206249
cast_value: Any = value
207250

208-
if isinstance(prop_type, list):
209-
prop_type_distinct = set(prop_type)
210-
prop_type_distinct.remove("null")
211-
if len(prop_type_distinct) != 1:
212-
raise ValueError(f"Could not get non nullable type from {prop_type}")
213-
(prop_type,) = prop_type_distinct
214-
215-
if prop_type in TYPE_PYTHON_MAPPING:
251+
if prop_type in TYPE_PYTHON_MAPPING and prop_type is not None:
216252
_, python_type = TYPE_PYTHON_MAPPING[prop_type]
217253

218254
if python_type is None:

0 commit comments

Comments
 (0)