Skip to content

Commit 34b7914

Browse files
artem1205jatinyadav-cc
authored andcommitted
Airbyte CDK: add filter to RemoveFields (airbytehq#35326)
Signed-off-by: Artem Inzhyyants <[email protected]>
1 parent 9d10818 commit 34b7914

File tree

6 files changed

+89
-17
lines changed

6 files changed

+89
-17
lines changed

airbyte-cdk/python/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ pip install -e ".[dev]" # [dev] installs development-only dependencies
6565
If the iteration you are working on includes changes to the models, you might want to regenerate them. In order to do that, you can run:
6666

6767
```bash
68-
./gradlew :airbyte-cdk:python:format
68+
./gradlew :airbyte-cdk:python:build
6969
```
7070

7171
This will generate the files based on the schemas, add the license information and format the code. If you want to only do the former and rely on

airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_component_schema.yaml

+13
Original file line numberDiff line numberDiff line change
@@ -1847,6 +1847,19 @@ definitions:
18471847
type:
18481848
type: string
18491849
enum: [RemoveFields]
1850+
condition:
1851+
description: The predicate to filter a property by a property value. Property will be removed if it is empty OR expression is evaluated to True.,
1852+
type: string
1853+
default: ""
1854+
interpolation_context:
1855+
- config
1856+
- property
1857+
- parameters
1858+
examples:
1859+
- "{{ property|string == '' }}"
1860+
- "{{ property is integer }}"
1861+
- "{{ property|length > 5 }}"
1862+
- "{{ property == 'some_string_to_match' }}"
18501863
field_pointers:
18511864
title: Field Paths
18521865
description: Array of paths defining the field to remove. Each item is an array whose field describe the path of a field to remove.

airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py

+10
Original file line numberDiff line numberDiff line change
@@ -579,6 +579,16 @@ class SchemaNormalization(Enum):
579579

580580
class RemoveFields(BaseModel):
581581
type: Literal['RemoveFields']
582+
condition: Optional[str] = Field(
583+
'',
584+
description="The predicate to filter a property by a property value. Property will be removed if it is empty OR expression is evaluated to True.",
585+
examples=[
586+
"{{ property|string == '' }}",
587+
'{{ property is integer }}',
588+
'{{ property|length > 5 }}',
589+
"{{ property == 'some_string_to_match' }}",
590+
],
591+
)
582592
field_pointers: List[List[str]] = Field(
583593
...,
584594
description='Array of paths defining the field to remove. Each item is an array whose field describe the path of a field to remove.',

airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -909,7 +909,7 @@ def create_record_selector(
909909

910910
@staticmethod
911911
def create_remove_fields(model: RemoveFieldsModel, config: Config, **kwargs: Any) -> RemoveFields:
912-
return RemoveFields(field_pointers=model.field_pointers, parameters={})
912+
return RemoveFields(field_pointers=model.field_pointers, condition=model.condition or "", parameters={})
913913

914914
def create_selective_authenticator(self, model: SelectiveAuthenticatorModel, config: Config, **kwargs: Any) -> DeclarativeAuthenticator:
915915
authenticators = {name: self._create_component_from_model(model=auth, config=config) for name, auth in model.authenticators.items()}

airbyte-cdk/python/airbyte_cdk/sources/declarative/transformations/remove_fields.py

+10-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import dpath.exceptions
99
import dpath.util
10+
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
1011
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
1112
from airbyte_cdk.sources.declarative.types import Config, FieldPointer, StreamSlice, StreamState
1213

@@ -40,6 +41,10 @@ class RemoveFields(RecordTransformation):
4041

4142
field_pointers: List[FieldPointer]
4243
parameters: InitVar[Mapping[str, Any]]
44+
condition: str = ""
45+
46+
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
47+
self._filter_interpolator = InterpolatedBoolean(condition=self.condition, parameters=parameters)
4348

4449
def transform(
4550
self,
@@ -55,7 +60,11 @@ def transform(
5560
for pointer in self.field_pointers:
5661
# the dpath library by default doesn't delete fields from arrays
5762
try:
58-
dpath.util.delete(record, pointer)
63+
dpath.util.delete(
64+
record,
65+
pointer,
66+
afilter=(lambda x: self._filter_interpolator.eval(config or {}, property=x)) if self.condition else None,
67+
)
5968
except dpath.exceptions.PathNotFound:
6069
# if the (potentially nested) property does not exist, silently skip
6170
pass

airbyte-cdk/python/unit_tests/sources/declarative/transformations/test_remove_fields.py

+54-14
Original file line numberDiff line numberDiff line change
@@ -10,39 +10,79 @@
1010

1111

1212
@pytest.mark.parametrize(
13-
["input_record", "field_pointers", "expected"],
13+
["input_record", "field_pointers", "condition", "expected"],
1414
[
15-
pytest.param({"k1": "v", "k2": "v"}, [["k1"]], {"k2": "v"}, id="remove a field that exists (flat dict)"),
16-
pytest.param({"k1": "v", "k2": "v"}, [["k3"]], {"k1": "v", "k2": "v"}, id="remove a field that doesn't exist (flat dict)"),
17-
pytest.param({"k1": "v", "k2": "v"}, [["k1"], ["k2"]], {}, id="remove multiple fields that exist (flat dict)"),
15+
pytest.param({"k1": "v", "k2": "v"}, [["k1"]], None, {"k2": "v"}, id="remove a field that exists (flat dict), condition = None"),
16+
pytest.param({"k1": "v", "k2": "v"}, [["k1"]], "", {"k2": "v"}, id="remove a field that exists (flat dict)"),
17+
pytest.param({"k1": "v", "k2": "v"}, [["k3"]], "", {"k1": "v", "k2": "v"}, id="remove a field that doesn't exist (flat dict)"),
18+
pytest.param({"k1": "v", "k2": "v"}, [["k1"], ["k2"]], "", {}, id="remove multiple fields that exist (flat dict)"),
1819
# TODO: should we instead splice the element out of the array? I think that's the more intuitive solution
1920
# Otherwise one could just set the field's value to null.
20-
pytest.param({"k1": [1, 2]}, [["k1", 0]], {"k1": [None, 2]}, id="remove field inside array (int index)"),
21-
pytest.param({"k1": [1, 2]}, [["k1", "0"]], {"k1": [None, 2]}, id="remove field inside array (string index)"),
21+
pytest.param({"k1": [1, 2]}, [["k1", 0]], "", {"k1": [None, 2]}, id="remove field inside array (int index)"),
22+
pytest.param({"k1": [1, 2]}, [["k1", "0"]], "", {"k1": [None, 2]}, id="remove field inside array (string index)"),
2223
pytest.param(
2324
{"k1": "v", "k2": "v", "k3": [0, 1], "k4": "v"},
2425
[["k1"], ["k2"], ["k3", 0]],
26+
"",
2527
{"k3": [None, 1], "k4": "v"},
2628
id="test all cases (flat)",
2729
),
28-
pytest.param({"k1": [0, 1]}, [[".", "k1", 10]], {"k1": [0, 1]}, id="remove array index that doesn't exist (flat)"),
29-
pytest.param({".": {"k1": [0, 1]}}, [[".", "k1", 10]], {".": {"k1": [0, 1]}}, id="remove array index that doesn't exist (nested)"),
30-
pytest.param({".": {"k2": "v", "k1": "v"}}, [[".", "k1"]], {".": {"k2": "v"}}, id="remove nested field that exists"),
30+
pytest.param({"k1": [0, 1]}, [[".", "k1", 10]], "", {"k1": [0, 1]}, id="remove array index that doesn't exist (flat)"),
3131
pytest.param(
32-
{".": {"k2": "v", "k1": "v"}}, [[".", "k3"]], {".": {"k2": "v", "k1": "v"}}, id="remove field that doesn't exist (nested)"
32+
{".": {"k1": [0, 1]}}, [[".", "k1", 10]], "", {".": {"k1": [0, 1]}}, id="remove array index that doesn't exist (nested)"
3333
),
34-
pytest.param({".": {"k2": "v", "k1": "v"}}, [[".", "k1"], [".", "k2"]], {".": {}}, id="remove multiple fields that exist (nested)"),
34+
pytest.param({".": {"k2": "v", "k1": "v"}}, [[".", "k1"]], "", {".": {"k2": "v"}}, id="remove nested field that exists"),
3535
pytest.param(
36-
{".": {"k1": [0, 1]}}, [[".", "k1", 0]], {".": {"k1": [None, 1]}}, id="remove multiple fields that exist in arrays (nested)"
36+
{".": {"k2": "v", "k1": "v"}}, [[".", "k3"]], "", {".": {"k2": "v", "k1": "v"}}, id="remove field that doesn't exist (nested)"
37+
),
38+
pytest.param(
39+
{".": {"k2": "v", "k1": "v"}}, [[".", "k1"], [".", "k2"]], "", {".": {}}, id="remove multiple fields that exist (nested)"
40+
),
41+
pytest.param(
42+
{".": {"k1": [0, 1]}},
43+
[[".", "k1", 0]],
44+
"",
45+
{".": {"k1": [None, 1]}},
46+
id="remove multiple fields that exist in arrays (nested)",
3747
),
3848
pytest.param(
3949
{".": {"k1": [{"k2": "v", "k3": "v"}, {"k4": "v"}]}},
4050
[[".", "k1", 0, "k2"], [".", "k1", 1, "k4"]],
51+
"",
4152
{".": {"k1": [{"k3": "v"}, {}]}},
4253
id="remove fields that exist in arrays (deeply nested)",
4354
),
55+
pytest.param(
56+
{"k1": "v", "k2": "v"},
57+
[["**"]],
58+
"{{ False }}",
59+
{"k1": "v", "k2": "v"},
60+
id="do not remove any field if condition is boolean False",
61+
),
62+
pytest.param({"k1": "v", "k2": "v"}, [["**"]], "{{ True }}", {}, id="remove all field if condition is boolean True"),
63+
pytest.param(
64+
{"k1": "v", "k2": "v1", "k3": "v1", "k4": {"k_nested": "v1", "k_nested2": "v2"}},
65+
[["**"]],
66+
"{{ property == 'v1' }}",
67+
{"k1": "v", "k4": {"k_nested2": "v2"}},
68+
id="recursively remove any field that matches property condition and leave that does not",
69+
),
70+
pytest.param(
71+
{"k1": "v", "k2": "some_long_string", "k3": "some_long_string", "k4": {"k_nested": "v1", "k_nested2": "v2"}},
72+
[["**"]],
73+
"{{ property|length > 5 }}",
74+
{"k1": "v", "k4": {"k_nested": "v1", "k_nested2": "v2"}},
75+
id="remove any field that have length > 5 and leave that does not",
76+
),
77+
pytest.param(
78+
{"k1": 255, "k2": "some_string", "k3": "some_long_string", "k4": {"k_nested": 123123, "k_nested2": "v2"}},
79+
[["**"]],
80+
"{{ property is integer }}",
81+
{"k2": "some_string", "k3": "some_long_string", "k4": {"k_nested2": "v2"}},
82+
id="recursively remove any field that of type integer and leave that does not",
83+
),
4484
],
4585
)
46-
def test_remove_fields(input_record: Mapping[str, Any], field_pointers: List[FieldPointer], expected: Mapping[str, Any]):
47-
transformation = RemoveFields(field_pointers=field_pointers, parameters={})
86+
def test_remove_fields(input_record: Mapping[str, Any], field_pointers: List[FieldPointer], condition: str, expected: Mapping[str, Any]):
87+
transformation = RemoveFields(field_pointers=field_pointers, condition=condition, parameters={})
4888
assert transformation.transform(input_record) == expected

0 commit comments

Comments
 (0)