Skip to content

Commit c15c93b

Browse files
author
Dmytro Rezchykov
committed
CDK: support nested refs resolving
1 parent 7a6da86 commit c15c93b

File tree

4 files changed

+108
-68
lines changed

4 files changed

+108
-68
lines changed

airbyte-cdk/python/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
# Changelog
22

3+
## 0.1.20
4+
Resolve nested schema references and move external references to single schema definitions.
5+
36
## 0.1.19
47
No longer prints full config files on validation error to prevent exposing secrets to log file: https://github.com/airbytehq/airbyte/pull/5879
58

airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py

Lines changed: 60 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -23,77 +23,20 @@
2323
#
2424

2525

26+
import importlib
2627
import json
2728
import os
2829
import pkgutil
2930
from typing import Any, ClassVar, Dict, Mapping, Tuple
3031

31-
import pkg_resources
32+
import jsonref
3233
from airbyte_cdk.logger import AirbyteLogger
3334
from airbyte_cdk.models import ConnectorSpecification
34-
from jsonschema import RefResolver, validate
35+
from jsonschema import validate
3536
from jsonschema.exceptions import ValidationError
3637
from pydantic import BaseModel, Field
3738

3839

39-
class JsonSchemaResolver:
40-
"""Helper class to expand $ref items in json schema"""
41-
42-
def __init__(self, shared_schemas_path: str):
43-
self._shared_refs = self._load_shared_schema_refs(shared_schemas_path)
44-
45-
@staticmethod
46-
def _load_shared_schema_refs(shared_schemas_path: str):
47-
shared_file_names = [f.name for f in os.scandir(shared_schemas_path) if f.is_file()]
48-
shared_schema_refs = {}
49-
for shared_file in shared_file_names:
50-
with open(os.path.join(shared_schemas_path, shared_file)) as data_file:
51-
shared_schema_refs[shared_file] = json.load(data_file)
52-
53-
return shared_schema_refs
54-
55-
def _resolve_schema_references(self, schema: dict, resolver: RefResolver) -> dict:
56-
if "$ref" in schema:
57-
reference_path = schema.pop("$ref", None)
58-
resolved = resolver.resolve(reference_path)[1]
59-
schema.update(resolved)
60-
return self._resolve_schema_references(schema, resolver)
61-
62-
if "properties" in schema:
63-
for k, val in schema["properties"].items():
64-
schema["properties"][k] = self._resolve_schema_references(val, resolver)
65-
66-
if "patternProperties" in schema:
67-
for k, val in schema["patternProperties"].items():
68-
schema["patternProperties"][k] = self._resolve_schema_references(val, resolver)
69-
70-
if "items" in schema:
71-
schema["items"] = self._resolve_schema_references(schema["items"], resolver)
72-
73-
if "anyOf" in schema:
74-
for i, element in enumerate(schema["anyOf"]):
75-
schema["anyOf"][i] = self._resolve_schema_references(element, resolver)
76-
77-
return schema
78-
79-
def resolve(self, schema: dict, refs: Dict[str, dict] = None) -> dict:
80-
"""Resolves and replaces json-schema $refs with the appropriate dict.
81-
Recursively walks the given schema dict, converting every instance
82-
of $ref in a 'properties' structure with a resolved dict.
83-
This modifies the input schema and also returns it.
84-
Arguments:
85-
schema:
86-
the schema dict
87-
refs:
88-
a dict of <string, dict> which forms a store of referenced schemata
89-
Returns:
90-
schema
91-
"""
92-
refs = refs or {}
93-
refs = {**self._shared_refs, **refs}
94-
return self._resolve_schema_references(schema, RefResolver("", schema, store=refs))
95-
96-
9740
class ResourceSchemaLoader:
9841
"""JSONSchema loader from package resources"""
9942

@@ -124,10 +67,63 @@ def get_schema(self, name: str) -> dict:
12467
print(f"Invalid JSON file format for file {schema_filename}")
12568
raise
12669

127-
shared_schemas_folder = pkg_resources.resource_filename(self.package_name, "schemas/shared/")
128-
if os.path.exists(shared_schemas_folder):
129-
return JsonSchemaResolver(shared_schemas_folder).resolve(raw_schema)
130-
return raw_schema
70+
return self.__resolve_schema_references(raw_schema)
71+
72+
def __resolve_schema_references(self, raw_schema: dict) -> dict:
73+
"""
74+
Resolve links to external references and move it to local "definitions" map.
75+
:param raw_schema jsonschema to lookup for external links.
76+
:return JSON serializable object with references without external dependencies.
77+
"""
78+
79+
class JsonFileLoader:
80+
"""
81+
Custom json file loader to resolve references to resources located in "shared" directory.
82+
We need this for compatability with existing schemas cause all of them have references
83+
pointing to shared_schema.json file instead of shared/shared_schema.json
84+
"""
85+
86+
def __init__(self, uri_base: str, shared: str):
87+
self.shared = shared
88+
self.uri_base = uri_base
89+
90+
def __call__(self, uri: str) -> Dict[str, Any]:
91+
uri = uri.replace(self.uri_base, f"{self.uri_base}/{self.shared}/")
92+
return json.load(open(uri))
93+
94+
package = importlib.import_module(self.package_name)
95+
base = os.path.dirname(package.__file__) + "/"
96+
97+
def create_definitions(obj: dict, definitions: dict) -> Dict[str, Any]:
98+
"""
99+
Scan resolved schema and compose definitions section, also convert
100+
jsonref.JsonRef object to JSON serializable dict.
101+
:param obj - jsonschema object with ref field resovled.
102+
:definitions - object for storing generated definitions.
103+
:return JSON serializable object with references without external dependencies.
104+
"""
105+
if isinstance(obj, jsonref.JsonRef):
106+
def_key = obj.__reference__["$ref"]
107+
def_key = def_key.replace("#/definitions/", "").replace(".json", "_")
108+
definition = create_definitions(obj.__subject__, definitions)
109+
# Omit existance definitions for extenal resource since
110+
# we dont need it anymore.
111+
definition.pop("definitions", None)
112+
definitions[def_key] = definition
113+
return {"$ref": "#/definitions/" + def_key}
114+
elif isinstance(obj, dict):
115+
return {k: create_definitions(v, definitions) for k, v in obj.items()}
116+
elif isinstance(obj, list):
117+
return [create_definitions(item, definitions) for item in obj]
118+
else:
119+
return obj
120+
121+
resolved = jsonref.JsonRef.replace_refs(raw_schema, loader=JsonFileLoader(base, "schemas/shared"), base_uri=base)
122+
definitions = {}
123+
resolved = create_definitions(resolved, definitions)
124+
if definitions:
125+
resolved["definitions"] = definitions
126+
return resolved
131127

132128

133129
def check_config_against_spec_or_exit(config: Mapping[str, Any], spec: ConnectorSpecification, logger: AirbyteLogger):

airbyte-cdk/python/setup.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535

3636
setup(
3737
name="airbyte-cdk",
38-
version="0.1.18",
38+
version="0.1.20",
3939
description="A framework for writing Airbyte Connectors.",
4040
long_description=README,
4141
long_description_content_type="text/markdown",
@@ -67,6 +67,7 @@
6767
install_requires=[
6868
"backoff",
6969
"jsonschema~=3.2.0",
70+
"jsonref~=0.2",
7071
"pendulum",
7172
"pydantic~=1.6",
7273
"PyYAML~=5.4",

airbyte-cdk/python/unit_tests/sources/utils/test_schema_helpers.py

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
from collections.abc import Mapping
3232
from pathlib import Path
3333

34+
import jsonref
3435
from airbyte_cdk.logger import AirbyteLogger
3536
from airbyte_cdk.models.airbyte_protocol import ConnectorSpecification
3637
from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader, check_config_against_spec_or_exit
@@ -45,8 +46,6 @@
4546
SCHEMAS_ROOT = "/".join(os.path.abspath(MODULE.__file__).split("/")[:-1]) / Path("schemas")
4647

4748

48-
# TODO (sherif) refactor ResourceSchemaLoader to completely separate the functionality for reading data from the package. See https://github.com/airbytehq/airbyte/issues/3222
49-
# and the functionality for resolving schemas. See https://github.com/airbytehq/airbyte/issues/3222
5049
@fixture(autouse=True, scope="session")
5150
def create_and_teardown_schemas_dir():
5251
os.mkdir(SCHEMAS_ROOT)
@@ -117,8 +116,9 @@ def test_shared_schemas_resolves():
117116
"properties": {
118117
"str": {"type": "string"},
119118
"int": {"type": "integer"},
120-
"obj": {"type": ["null", "object"], "properties": {"k1": {"type": "string"}}},
119+
"obj": {"$ref": "#/definitions/shared_schema_"},
121120
},
121+
"definitions": {"shared_schema_": {"type": ["null", "object"], "properties": {"k1": {"type": "string"}}}},
122122
}
123123

124124
partial_schema = {
@@ -135,3 +135,43 @@ def test_shared_schemas_resolves():
135135

136136
actual_schema = resolver.get_schema("complex_schema")
137137
assert actual_schema == expected_schema
138+
139+
@staticmethod
140+
def test_shared_schemas_resolves_nested():
141+
expected_schema = {
142+
"type": ["null", "object"],
143+
"properties": {
144+
"str": {"type": "string"},
145+
"int": {"type": "integer"},
146+
"one_of": {"oneOf": [{"type": "string"}, {"$ref": "#/definitions/shared_schema_type_one"}]},
147+
"obj": {"$ref": "#/definitions/shared_schema_type_one"},
148+
},
149+
"definitions": {"shared_schema_type_one": {"type": ["null", "object"], "properties": {"k1": {"type": "string"}}}},
150+
}
151+
partial_schema = {
152+
"type": ["null", "object"],
153+
"properties": {
154+
"str": {"type": "string"},
155+
"int": {"type": "integer"},
156+
"one_of": {"oneOf": [{"type": "string"}, {"$ref": "shared_schema.json#/definitions/type_one"}]},
157+
"obj": {"$ref": "shared_schema.json#/definitions/type_one"},
158+
},
159+
}
160+
161+
referenced_schema = {
162+
"definitions": {
163+
"type_one": {"$ref": "shared_schema.json#/definitions/type_nested"},
164+
"type_nested": {"type": ["null", "object"], "properties": {"k1": {"type": "string"}}},
165+
}
166+
}
167+
168+
create_schema("complex_schema", partial_schema)
169+
create_schema("shared/shared_schema", referenced_schema)
170+
171+
resolver = ResourceSchemaLoader(MODULE_NAME)
172+
173+
actual_schema = resolver.get_schema("complex_schema")
174+
assert actual_schema == expected_schema
175+
# Make sure generated schema is JSON serializable
176+
assert json.dumps(actual_schema)
177+
assert jsonref.JsonRef.replace_refs(actual_schema)

0 commit comments

Comments
 (0)