|
23 | 23 | #
|
24 | 24 |
|
25 | 25 |
|
| 26 | +import importlib |
26 | 27 | import json
|
27 | 28 | import os
|
28 | 29 | import pkgutil
|
29 | 30 | from typing import Any, ClassVar, Dict, Mapping, Tuple
|
30 | 31 |
|
31 |
| -import pkg_resources |
| 32 | +import jsonref |
32 | 33 | from airbyte_cdk.logger import AirbyteLogger
|
33 | 34 | from airbyte_cdk.models import ConnectorSpecification
|
34 |
| -from jsonschema import RefResolver, validate |
| 35 | +from jsonschema import validate |
35 | 36 | from jsonschema.exceptions import ValidationError
|
36 | 37 | from pydantic import BaseModel, Field
|
37 | 38 |
|
38 | 39 |
|
39 |
| -class JsonSchemaResolver: |
40 |
| - """Helper class to expand $ref items in json schema""" |
41 |
| - |
42 |
| - def __init__(self, shared_schemas_path: str): |
43 |
| - self._shared_refs = self._load_shared_schema_refs(shared_schemas_path) |
44 |
| - |
45 |
| - @staticmethod |
46 |
| - def _load_shared_schema_refs(shared_schemas_path: str): |
47 |
| - shared_file_names = [f.name for f in os.scandir(shared_schemas_path) if f.is_file()] |
48 |
| - shared_schema_refs = {} |
49 |
| - for shared_file in shared_file_names: |
50 |
| - with open(os.path.join(shared_schemas_path, shared_file)) as data_file: |
51 |
| - shared_schema_refs[shared_file] = json.load(data_file) |
52 |
| - |
53 |
| - return shared_schema_refs |
54 |
| - |
55 |
| - def _resolve_schema_references(self, schema: dict, resolver: RefResolver) -> dict: |
56 |
| - if "$ref" in schema: |
57 |
| - reference_path = schema.pop("$ref", None) |
58 |
| - resolved = resolver.resolve(reference_path)[1] |
59 |
| - schema.update(resolved) |
60 |
| - return self._resolve_schema_references(schema, resolver) |
61 |
| - |
62 |
| - if "properties" in schema: |
63 |
| - for k, val in schema["properties"].items(): |
64 |
| - schema["properties"][k] = self._resolve_schema_references(val, resolver) |
65 |
| - |
66 |
| - if "patternProperties" in schema: |
67 |
| - for k, val in schema["patternProperties"].items(): |
68 |
| - schema["patternProperties"][k] = self._resolve_schema_references(val, resolver) |
69 |
| - |
70 |
| - if "items" in schema: |
71 |
| - schema["items"] = self._resolve_schema_references(schema["items"], resolver) |
72 |
| - |
73 |
| - if "anyOf" in schema: |
74 |
| - for i, element in enumerate(schema["anyOf"]): |
75 |
| - schema["anyOf"][i] = self._resolve_schema_references(element, resolver) |
76 |
| - |
77 |
| - return schema |
78 |
| - |
79 |
| - def resolve(self, schema: dict, refs: Dict[str, dict] = None) -> dict: |
80 |
| - """Resolves and replaces json-schema $refs with the appropriate dict. |
81 |
| - Recursively walks the given schema dict, converting every instance |
82 |
| - of $ref in a 'properties' structure with a resolved dict. |
83 |
| - This modifies the input schema and also returns it. |
84 |
| - Arguments: |
85 |
| - schema: |
86 |
| - the schema dict |
87 |
| - refs: |
88 |
| - a dict of <string, dict> which forms a store of referenced schemata |
89 |
| - Returns: |
90 |
| - schema |
91 |
| - """ |
92 |
| - refs = refs or {} |
93 |
| - refs = {**self._shared_refs, **refs} |
94 |
| - return self._resolve_schema_references(schema, RefResolver("", schema, store=refs)) |
95 |
| - |
96 |
| - |
97 | 40 | class ResourceSchemaLoader:
|
98 | 41 | """JSONSchema loader from package resources"""
|
99 | 42 |
|
@@ -124,10 +67,63 @@ def get_schema(self, name: str) -> dict:
|
124 | 67 | print(f"Invalid JSON file format for file {schema_filename}")
|
125 | 68 | raise
|
126 | 69 |
|
127 |
| - shared_schemas_folder = pkg_resources.resource_filename(self.package_name, "schemas/shared/") |
128 |
| - if os.path.exists(shared_schemas_folder): |
129 |
| - return JsonSchemaResolver(shared_schemas_folder).resolve(raw_schema) |
130 |
| - return raw_schema |
| 70 | + return self.__resolve_schema_references(raw_schema) |
| 71 | + |
| 72 | + def __resolve_schema_references(self, raw_schema: dict) -> dict: |
| 73 | + """ |
| 74 | + Resolve links to external references and move it to local "definitions" map. |
| 75 | + :param raw_schema jsonschema to lookup for external links. |
| 76 | + :return JSON serializable object with references without external dependencies. |
| 77 | + """ |
| 78 | + |
| 79 | + class JsonFileLoader: |
| 80 | + """ |
| 81 | + Custom json file loader to resolve references to resources located in "shared" directory. |
| 82 | + We need this for compatability with existing schemas cause all of them have references |
| 83 | + pointing to shared_schema.json file instead of shared/shared_schema.json |
| 84 | + """ |
| 85 | + |
| 86 | + def __init__(self, uri_base: str, shared: str): |
| 87 | + self.shared = shared |
| 88 | + self.uri_base = uri_base |
| 89 | + |
| 90 | + def __call__(self, uri: str) -> Dict[str, Any]: |
| 91 | + uri = uri.replace(self.uri_base, f"{self.uri_base}/{self.shared}/") |
| 92 | + return json.load(open(uri)) |
| 93 | + |
| 94 | + package = importlib.import_module(self.package_name) |
| 95 | + base = os.path.dirname(package.__file__) + "/" |
| 96 | + |
| 97 | + def create_definitions(obj: dict, definitions: dict) -> Dict[str, Any]: |
| 98 | + """ |
| 99 | + Scan resolved schema and compose definitions section, also convert |
| 100 | + jsonref.JsonRef object to JSON serializable dict. |
| 101 | + :param obj - jsonschema object with ref field resovled. |
| 102 | + :definitions - object for storing generated definitions. |
| 103 | + :return JSON serializable object with references without external dependencies. |
| 104 | + """ |
| 105 | + if isinstance(obj, jsonref.JsonRef): |
| 106 | + def_key = obj.__reference__["$ref"] |
| 107 | + def_key = def_key.replace("#/definitions/", "").replace(".json", "_") |
| 108 | + definition = create_definitions(obj.__subject__, definitions) |
| 109 | + # Omit existance definitions for extenal resource since |
| 110 | + # we dont need it anymore. |
| 111 | + definition.pop("definitions", None) |
| 112 | + definitions[def_key] = definition |
| 113 | + return {"$ref": "#/definitions/" + def_key} |
| 114 | + elif isinstance(obj, dict): |
| 115 | + return {k: create_definitions(v, definitions) for k, v in obj.items()} |
| 116 | + elif isinstance(obj, list): |
| 117 | + return [create_definitions(item, definitions) for item in obj] |
| 118 | + else: |
| 119 | + return obj |
| 120 | + |
| 121 | + resolved = jsonref.JsonRef.replace_refs(raw_schema, loader=JsonFileLoader(base, "schemas/shared"), base_uri=base) |
| 122 | + definitions = {} |
| 123 | + resolved = create_definitions(resolved, definitions) |
| 124 | + if definitions: |
| 125 | + resolved["definitions"] = definitions |
| 126 | + return resolved |
131 | 127 |
|
132 | 128 |
|
133 | 129 | def check_config_against_spec_or_exit(config: Mapping[str, Any], spec: ConnectorSpecification, logger: AirbyteLogger):
|
|
0 commit comments