Skip to content

Commit e97ece5

Browse files
brianjlaigirarda
andauthored
[low-code cdk] break resolving reference preprocessing into its own class so it can be reused (#19517)
* break resolving reference preprocessing into its own class so it can be reused * move reference resolution into the ManifestDeclarativeSource and deprecate the parser * formatting * last formatting i promise * rename * bump version Co-authored-by: Alexandre Girard <[email protected]>
1 parent 1dbad96 commit e97ece5

File tree

10 files changed

+201
-218
lines changed

10 files changed

+201
-218
lines changed

airbyte-cdk/python/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
# Changelog
22

3+
## 0.9.4
4+
Low-code: Fix reference resolution for connector builder
5+
36
## 0.9.3
47
Low-code: Avoid duplicate HTTP query in `simple_retriever`
58

airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
2424
from airbyte_cdk.sources.declarative.exceptions import InvalidConnectorDefinitionException
2525
from airbyte_cdk.sources.declarative.parsers.factory import DeclarativeComponentFactory
26+
from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import ManifestReferenceResolver
2627
from airbyte_cdk.sources.declarative.types import ConnectionDefinition
2728
from airbyte_cdk.sources.streams.core import Stream
2829
from dataclasses_jsonschema import JsonSchemaMixin
@@ -47,7 +48,10 @@ def __init__(self, source_config: ConnectionDefinition, debug: bool = False):
4748
:param debug(bool): True if debug mode is enabled
4849
"""
4950
self.logger = logging.getLogger(f"airbyte.{self.name}")
50-
self._source_config = source_config
51+
52+
evaluated_manifest = {}
53+
resolved_source_config = ManifestReferenceResolver().preprocess_manifest(source_config, evaluated_manifest, "")
54+
self._source_config = resolved_source_config
5155
self._debug = debug
5256
self._factory = DeclarativeComponentFactory()
5357

@@ -135,8 +139,8 @@ def _stream_configs(self):
135139

136140
@staticmethod
137141
def generate_schema() -> str:
138-
expanded_source_definition = ManifestDeclarativeSource.expand_schema_interfaces(ConcreteDeclarativeSource, {})
139-
expanded_schema = expanded_source_definition.json_schema()
142+
expanded_source_manifest = ManifestDeclarativeSource.expand_schema_interfaces(ConcreteDeclarativeSource, {})
143+
expanded_schema = expanded_source_manifest.json_schema()
140144
return json.dumps(expanded_schema, cls=SchemaEncoder)
141145

142146
@staticmethod

airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/config_parser.py

Lines changed: 0 additions & 17 deletions
This file was deleted.

airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/yaml_parser.py renamed to airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/manifest_reference_resolver.py

Lines changed: 8 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,12 @@
55
from copy import deepcopy
66
from typing import Any, Mapping, Tuple, Union
77

8-
import yaml
9-
from airbyte_cdk.sources.declarative.parsers.config_parser import ConnectionDefinitionParser
108
from airbyte_cdk.sources.declarative.parsers.undefined_reference_exception import UndefinedReferenceException
11-
from airbyte_cdk.sources.declarative.types import ConnectionDefinition
129

1310

14-
class YamlParser(ConnectionDefinitionParser):
11+
class ManifestReferenceResolver:
1512
"""
16-
Parses a Yaml string to a ConnectionDefinition
17-
18-
In addition to standard Yaml parsing, the input_string can contain references to values previously defined.
13+
An incoming manifest can contain references to values previously defined.
1914
This parser will dereference these values to produce a complete ConnectionDefinition.
2015
2116
References can be defined using a *ref(<arg>) string.
@@ -101,31 +96,20 @@ class YamlParser(ConnectionDefinitionParser):
10196

10297
ref_tag = "$ref"
10398

104-
def parse(self, connection_definition_str: str) -> ConnectionDefinition:
105-
"""
106-
Parses a yaml file and dereferences string in the form "*ref({reference)"
107-
to {reference}
108-
:param connection_definition_str: yaml string to parse
109-
:return: The ConnectionDefinition parsed from connection_definition_str
110-
"""
111-
input_mapping = yaml.safe_load(connection_definition_str)
112-
evaluated_definition = {}
113-
return self._preprocess_dict(input_mapping, evaluated_definition, "")
114-
115-
def _preprocess_dict(self, input_mapping: Mapping[str, Any], evaluated_mapping: Mapping[str, Any], path: Union[str, Tuple[str]]):
99+
def preprocess_manifest(self, manifest: Mapping[str, Any], evaluated_mapping: Mapping[str, Any], path: Union[str, Tuple[str]]):
116100

117101
"""
118-
:param input_mapping: mapping produced by parsing yaml
102+
:param manifest: incoming manifest that could have references to previously defined components
119103
:param evaluated_mapping: mapping produced by dereferencing the content of input_mapping
120104
:param path: curent path in configuration traversal
121105
:return:
122106
"""
123107
d = {}
124-
if self.ref_tag in input_mapping:
125-
partial_ref_string = input_mapping[self.ref_tag]
108+
if self.ref_tag in manifest:
109+
partial_ref_string = manifest[self.ref_tag]
126110
d = deepcopy(self._preprocess(partial_ref_string, evaluated_mapping, path))
127111

128-
for key, value in input_mapping.items():
112+
for key, value in manifest.items():
129113
if key == self.ref_tag:
130114
continue
131115
full_path = self._resolve_value(key, path)
@@ -180,7 +164,7 @@ def _preprocess(self, value, evaluated_config: Mapping[str, Any], path):
180164
key = *key[:-1], split[0], ".".join(split[1:])
181165
raise UndefinedReferenceException(path, ref_key)
182166
elif isinstance(value, dict):
183-
return self._preprocess_dict(value, evaluated_config, path)
167+
return self.preprocess_manifest(value, evaluated_config, path)
184168
elif type(value) == list:
185169
evaluated_list = [
186170
# pass in elem's path instead of the list's path

airbyte-cdk/python/airbyte_cdk/sources/declarative/yaml_declarative_source.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44

55
import pkgutil
66

7+
import yaml
78
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
8-
from airbyte_cdk.sources.declarative.parsers.yaml_parser import YamlParser
99
from airbyte_cdk.sources.declarative.types import ConnectionDefinition
1010

1111

@@ -25,8 +25,18 @@ def _read_and_parse_yaml_file(self, path_to_yaml_file) -> ConnectionDefinition:
2525

2626
yaml_config = pkgutil.get_data(package, path_to_yaml_file)
2727
decoded_yaml = yaml_config.decode()
28-
return YamlParser().parse(decoded_yaml)
28+
return self._parse(decoded_yaml)
2929

3030
def _emit_manifest_debug_message(self, extra_args: dict):
3131
extra_args["path_to_yaml"] = self._path_to_yaml
3232
self.logger.debug("declarative source created from parsed YAML manifest", extra=extra_args)
33+
34+
@staticmethod
35+
def _parse(connection_definition_str: str) -> ConnectionDefinition:
36+
"""
37+
Parses a yaml file into a manifest. Component references still exist in the manifest which will be
38+
resolved during the creating of the DeclarativeSource.
39+
:param connection_definition_str: yaml string to parse
40+
:return: The ConnectionDefinition parsed from connection_definition_str
41+
"""
42+
return yaml.safe_load(connection_definition_str)

airbyte-cdk/python/setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
setup(
1717
name="airbyte-cdk",
18-
version="0.9.3",
18+
version="0.9.4",
1919
description="A framework for writing Airbyte Connectors.",
2020
long_description=README,
2121
long_description_content_type="text/markdown",
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
#
2+
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
import pytest
6+
from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import ManifestReferenceResolver
7+
from airbyte_cdk.sources.declarative.parsers.undefined_reference_exception import UndefinedReferenceException
8+
9+
resolver = ManifestReferenceResolver()
10+
11+
12+
def test_get_ref():
13+
s = "*ref(limit)"
14+
ref_key = resolver._get_ref_key(s)
15+
assert ref_key == "limit"
16+
17+
18+
def test_get_ref_no_ref():
19+
s = "limit: 50"
20+
21+
ref_key = resolver._get_ref_key(s)
22+
assert ref_key is None
23+
24+
25+
def test_refer():
26+
content = {
27+
"limit": 50,
28+
"limit_ref": "*ref(limit)"
29+
}
30+
config = resolver.preprocess_manifest(content, {}, "")
31+
assert config["limit_ref"] == 50
32+
33+
34+
def test_refer_to_inner():
35+
content = {
36+
"dict": {
37+
"limit": 50
38+
},
39+
"limit_ref": "*ref(dict.limit)"
40+
}
41+
config = resolver.preprocess_manifest(content, {}, "")
42+
assert config["limit_ref"] == 50
43+
44+
45+
def test_refer_to_non_existant_struct():
46+
content = {
47+
"dict": {
48+
"limit": 50
49+
},
50+
"limit_ref": "*ref(not_dict)"
51+
}
52+
with pytest.raises(UndefinedReferenceException):
53+
resolver.preprocess_manifest(content, {}, "")
54+
55+
56+
def test_refer_in_dict():
57+
content = {
58+
"limit": 50,
59+
"offset_request_parameters": {
60+
"offset": "{{ next_page_token['offset'] }}",
61+
"limit": "*ref(limit)"
62+
}
63+
}
64+
config = resolver.preprocess_manifest(content, {}, "")
65+
assert config["offset_request_parameters"]["offset"] == "{{ next_page_token['offset'] }}"
66+
assert config["offset_request_parameters"]["limit"] == 50
67+
68+
69+
def test_refer_to_dict():
70+
content = {
71+
"limit": 50,
72+
"offset_request_parameters": {
73+
"offset": "{{ next_page_token['offset'] }}",
74+
"limit": "*ref(limit)"
75+
},
76+
"offset_pagination_request_parameters": {
77+
"class": "InterpolatedRequestParameterProvider",
78+
"request_parameters": "*ref(offset_request_parameters)"
79+
}
80+
}
81+
config = resolver.preprocess_manifest(content, {}, "")
82+
assert config["limit"] == 50
83+
assert config["offset_request_parameters"]["limit"] == 50
84+
assert len(config["offset_pagination_request_parameters"]) == 2
85+
assert config["offset_pagination_request_parameters"]["request_parameters"]["limit"] == 50
86+
assert config["offset_pagination_request_parameters"]["request_parameters"]["offset"] == "{{ next_page_token['offset'] }}"
87+
88+
89+
def test_refer_and_overwrite():
90+
content = {
91+
"limit": 50,
92+
"custom_limit": 25,
93+
"offset_request_parameters": {
94+
"offset": "{{ next_page_token['offset'] }}",
95+
"limit": "*ref(limit)"
96+
},
97+
"custom_request_parameters": {
98+
"$ref": "*ref(offset_request_parameters)",
99+
"limit": "*ref(custom_limit)"
100+
}
101+
}
102+
config = resolver.preprocess_manifest(content, {}, "")
103+
assert config["offset_request_parameters"]["limit"] == 50
104+
assert config["custom_request_parameters"]["limit"] == 25
105+
106+
assert config["offset_request_parameters"]["offset"] == "{{ next_page_token['offset'] }}"
107+
assert config["custom_request_parameters"]["offset"] == "{{ next_page_token['offset'] }}"
108+
109+
110+
def test_collision():
111+
content = {
112+
"example": {
113+
"nested":{
114+
"path": "first one",
115+
"more_nested": {
116+
"value": "found it!"
117+
}
118+
},
119+
"nested.path": "uh oh",
120+
},
121+
"reference_to_nested_path": {
122+
"$ref": "*ref(example.nested.path)"
123+
},
124+
"reference_to_nested_nested_value": {
125+
"$ref": "*ref(example.nested.more_nested.value)"
126+
}
127+
}
128+
config = resolver.preprocess_manifest(content, {}, "")
129+
assert config["example"]["nested"]["path"] == "first one"
130+
assert config["example"]["nested.path"] == "uh oh"
131+
assert config["reference_to_nested_path"] == "uh oh"
132+
assert config["example"]["nested"]["more_nested"]["value"] == "found it!"
133+
assert config["reference_to_nested_nested_value"] == "found it!"
134+
135+
136+
def test_list():
137+
content = {
138+
"list": ["A", "B"],
139+
"elem_ref": "*ref(list[0])"
140+
}
141+
config = resolver.preprocess_manifest(content, {}, "")
142+
elem_ref = config["elem_ref"]
143+
assert elem_ref == "A"

0 commit comments

Comments
 (0)