Skip to content

[low code connectors] generate complete json schema from classes #15647

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 40 commits into from
Aug 18, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
1efe2f0
draft: first pass at complete schema language generation and factory …
brianjlai Aug 11, 2022
30a73ae
Merge branch 'master' into brian/low_code_validate_schema
brianjlai Aug 14, 2022
5ca8e9e
actually a working validator and fixes to the schema that went uncaught
brianjlai Aug 14, 2022
385a593
remove extra spike file
brianjlai Aug 14, 2022
a29c048
fix formatting file
brianjlai Aug 14, 2022
4be31d8
Add method to generate the complete JSON schema of the low code decla…
brianjlai Aug 14, 2022
5a53476
add testing of a few components during schema gen
brianjlai Aug 15, 2022
41eea30
pr feedback and a little bit of refactoring
brianjlai Aug 16, 2022
e232f09
Merge branch 'master' into brian/low_code_validate_schema
brianjlai Aug 16, 2022
9ee6b68
Merge branch 'brian/low_code_validate_schema' into brian/generate_com…
brianjlai Aug 16, 2022
f55828f
test for schema version
brianjlai Aug 16, 2022
1111da5
Merge branch 'master' into brian/low_code_validate_schema
brianjlai Aug 16, 2022
2d71d85
fix some types that were erroneously marked as invalid schema
brianjlai Aug 17, 2022
65a7bd0
some comments
brianjlai Aug 17, 2022
8d3d1fc
Merge branch 'master' into brian/low_code_validate_schema
brianjlai Aug 17, 2022
3fb3732
Merge branch 'brian/low_code_validate_schema' into brian/generate_com…
brianjlai Aug 17, 2022
561a285
add jsonschemamixin to interfaces
brianjlai Aug 18, 2022
5d2e722
Merge branch 'master' into brian/low_code_validate_schema
brianjlai Aug 18, 2022
29e6e3f
Merge branch 'brian/low_code_validate_schema' into brian/generate_com…
brianjlai Aug 18, 2022
019165d
update tests now that interfaces are jsonschemamixin
brianjlai Aug 18, 2022
e378a7f
accidentally removed a mixin
brianjlai Aug 18, 2022
7eac17e
remove unneeded test
brianjlai Aug 18, 2022
33f98e1
make comment a little more clear
brianjlai Aug 18, 2022
2c3f720
Merge branch 'master' into brian/low_code_validate_schema
brianjlai Aug 18, 2022
6ef9e10
update changelog
brianjlai Aug 18, 2022
3612789
bump version
brianjlai Aug 18, 2022
687281d
Merge branch 'brian/low_code_validate_schema' into brian/generate_com…
brianjlai Aug 18, 2022
74e0e51
generic enum not enum class
brianjlai Aug 18, 2022
4cdccb4
Add method to generate the complete JSON schema of the low code decla…
brianjlai Aug 14, 2022
c60ce3a
add testing of a few components during schema gen
brianjlai Aug 15, 2022
743fbaf
test for schema version
brianjlai Aug 16, 2022
4eb1cc9
update tests now that interfaces are jsonschemamixin
brianjlai Aug 18, 2022
23d9dd1
accidentally removed a mixin
brianjlai Aug 18, 2022
9476b5c
remove unneeded test
brianjlai Aug 18, 2022
b1da2c2
make comment a little more clear
brianjlai Aug 18, 2022
364d2ca
generic enum not enum class
brianjlai Aug 18, 2022
d687e70
Merge branch 'brian/generate_complete_schema' of github.com:airbytehq…
brianjlai Aug 18, 2022
522aa67
Merge branch 'master' into brian/generate_complete_schema
brianjlai Aug 18, 2022
89105fc
add generated json file and update docs to reference it
brianjlai Aug 18, 2022
63d395d
verbage
brianjlai Aug 18, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from abc import ABC
from dataclasses import dataclass


@dataclass
class DeclarativeAuthenticator(ABC):
"""
Interface used to associate which authenticators can be used as part of the declarative framework
"""
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
from typing import Any, List, Mapping, Optional, Union

import pendulum
from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator
from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.streams.http.requests_native_auth.abstract_oauth import AbstractOauth2Authenticator
from dataclasses_jsonschema import JsonSchemaMixin


@dataclass
class DeclarativeOauth2Authenticator(AbstractOauth2Authenticator, JsonSchemaMixin):
class DeclarativeOauth2Authenticator(AbstractOauth2Authenticator, DeclarativeAuthenticator, JsonSchemaMixin):
"""
Generates OAuth2.0 access tokens from an OAuth2.0 refresh token and client credentials based on
a declarative connector configuration file. Credentials can be defined explicitly or via interpolation
Expand All @@ -40,7 +41,7 @@ class DeclarativeOauth2Authenticator(AbstractOauth2Authenticator, JsonSchemaMixi
options: InitVar[Mapping[str, Any]]
scopes: Optional[List[str]] = None
token_expiry_date: Optional[Union[InterpolatedString, str]] = None
_token_expiry_date: pendulum.DateTime = field(init=False, repr=False)
_token_expiry_date: pendulum.DateTime = field(init=False, repr=False, default=None)
access_token_name: Union[InterpolatedString, str] = "access_token"
expires_in_name: Union[InterpolatedString, str] = "expires_in"
refresh_request_body: Optional[Mapping[str, Any]] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
from dataclasses import InitVar, dataclass
from typing import Any, Mapping, Union

from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.types import Config
from airbyte_cdk.sources.streams.http.requests_native_auth.abstract_token import AbstractHeaderAuthenticator
from dataclasses_jsonschema import JsonSchemaMixin


@dataclass
class ApiKeyAuthenticator(AbstractHeaderAuthenticator, JsonSchemaMixin):
class ApiKeyAuthenticator(AbstractHeaderAuthenticator, DeclarativeAuthenticator, JsonSchemaMixin):
"""
ApiKeyAuth sets a request header on the HTTP requests sent.

Expand Down Expand Up @@ -51,7 +52,7 @@ def token(self) -> str:


@dataclass
class BearerAuthenticator(AbstractHeaderAuthenticator, JsonSchemaMixin):
class BearerAuthenticator(AbstractHeaderAuthenticator, DeclarativeAuthenticator, JsonSchemaMixin):
"""
Authenticator that sets the Authorization header on the HTTP requests sent.

Expand Down Expand Up @@ -81,7 +82,7 @@ def token(self) -> str:


@dataclass
class BasicHttpAuthenticator(AbstractHeaderAuthenticator):
class BasicHttpAuthenticator(AbstractHeaderAuthenticator, DeclarativeAuthenticator, JsonSchemaMixin):
"""
Builds auth based off the basic authentication scheme as defined by RFC 7617, which transmits credentials as USER ID/password pairs, encoded using base64
https://developer.mozilla.org/en-US/docs/Web/HTTP/Authentication#basic_authentication_scheme
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ class DeclarativeStream(Stream, JsonSchemaMixin):
config: Config
options: InitVar[Mapping[str, Any]]
name: str
_name: str = field(init=False, repr=False)
_name: str = field(init=False, repr=False, default="")
primary_key: Optional[Union[str, List[str], List[List[str]]]]
_primary_key: str = field(init=False, repr=False)
_primary_key: str = field(init=False, repr=False, default="")
stream_cursor_field: Optional[List[str]] = None
transformations: List[RecordTransformation] = None
checkpoint_interval: Optional[int] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@

import requests
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from dataclasses_jsonschema import JsonSchemaMixin


@dataclass
class JsonDecoder(Decoder):
class JsonDecoder(Decoder, JsonSchemaMixin):
"""
Decoder strategy that returns the json-encoded content of a response, if any.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from dataclasses import InitVar, dataclass, field
from dataclasses import InitVar, dataclass
from typing import Any, List, Mapping, Optional

from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
Expand All @@ -20,7 +20,7 @@ class RecordFilter(JsonSchemaMixin):
"""

options: InitVar[Mapping[str, Any]]
config: Config = field(default=dict)
config: Config
condition: str = ""

def __post_init__(self, options: Mapping[str, Any]):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@
import copy
import enum
import importlib
import typing
from dataclasses import fields
from typing import Any, List, Literal, Mapping, Type, Union, get_args, get_origin, get_type_hints

from airbyte_cdk.sources.declarative.create_partial import OPTIONS_STR, create
from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation
from airbyte_cdk.sources.declarative.parsers.class_types_registry import CLASS_TYPES_REGISTRY
from airbyte_cdk.sources.declarative.parsers.default_implementation_registry import DEFAULT_IMPLEMENTATIONS_REGISTRY
from airbyte_cdk.sources.declarative.types import Config
from jsonschema.validators import validate

ComponentDefinition: Union[Literal, Mapping, List]

Expand Down Expand Up @@ -99,7 +102,7 @@ class DeclarativeComponentFactory:
def __init__(self):
self._interpolator = JinjaInterpolation()

def create_component(self, component_definition: ComponentDefinition, config: Config):
def create_component(self, component_definition: ComponentDefinition, config: Config, instantiate: bool = True):
"""
Create a component defined by `component_definition`.

Expand All @@ -115,20 +118,43 @@ def create_component(self, component_definition: ComponentDefinition, config: Co
class_name = CLASS_TYPES_REGISTRY[kwargs.pop("type")]
else:
raise ValueError(f"Failed to create component because it has no class_name or type. Definition: {component_definition}")
return self.build(class_name, config, **kwargs)
kwargs.pop("config", None)
return self.build(
class_name,
config,
instantiate,
**kwargs,
)

def build(self, class_or_class_name: Union[str, Type], config, **kwargs):
def build(self, class_or_class_name: Union[str, Type], config, instantiate: bool = True, **kwargs):
if isinstance(class_or_class_name, str):
class_ = self._get_class_from_fully_qualified_class_name(class_or_class_name)
else:
class_ = class_or_class_name

# create components in options before propagating them
if OPTIONS_STR in kwargs:
kwargs[OPTIONS_STR] = {k: self._create_subcomponent(k, v, kwargs, config, class_) for k, v in kwargs[OPTIONS_STR].items()}
kwargs[OPTIONS_STR] = {
k: self._create_subcomponent(k, v, kwargs, config, class_, instantiate) for k, v in kwargs[OPTIONS_STR].items()
}

updated_kwargs = {k: self._create_subcomponent(k, v, kwargs, config, class_) for k, v in kwargs.items()}
return create(class_, config=config, **updated_kwargs)
updated_kwargs = {k: self._create_subcomponent(k, v, kwargs, config, class_, instantiate) for k, v in kwargs.items()}

if instantiate:
return create(class_, config=config, **updated_kwargs)
else:
# Because the component's data fields definitions use interfaces, we need to resolve the underlying types into the
# concrete classes that implement the interface before generating the schema
DeclarativeComponentFactory._transform_interface_to_union(class_)
schema = class_.json_schema()

component_definition = {
**updated_kwargs,
**{k: v for k, v in updated_kwargs.get(OPTIONS_STR, {}).items() if k not in updated_kwargs},
"config": config,
}
validate(component_definition, schema)
return lambda: component_definition

@staticmethod
def _get_class_from_fully_qualified_class_name(class_name: str):
Expand All @@ -141,7 +167,7 @@ def _get_class_from_fully_qualified_class_name(class_name: str):
def _merge_dicts(d1, d2):
return {**d1, **d2}

def _create_subcomponent(self, key, definition, kwargs, config, parent_class):
def _create_subcomponent(self, key, definition, kwargs, config, parent_class, instantiate: bool = True):
"""
There are 5 ways to define a component.
1. dict with "class_name" field -> create an object of type "class_name"
Expand All @@ -153,14 +179,14 @@ def _create_subcomponent(self, key, definition, kwargs, config, parent_class):
if self.is_object_definition_with_class_name(definition):
# propagate kwargs to inner objects
definition[OPTIONS_STR] = self._merge_dicts(kwargs.get(OPTIONS_STR, dict()), definition.get(OPTIONS_STR, dict()))
return self.create_component(definition, config)()
return self.create_component(definition, config, instantiate)()
elif self.is_object_definition_with_type(definition):
# If type is set instead of class_name, get the class_name from the CLASS_TYPES_REGISTRY
definition[OPTIONS_STR] = self._merge_dicts(kwargs.get(OPTIONS_STR, dict()), definition.get(OPTIONS_STR, dict()))
object_type = definition.pop("type")
class_name = CLASS_TYPES_REGISTRY[object_type]
definition["class_name"] = class_name
return self.create_component(definition, config)()
return self.create_component(definition, config, instantiate)()
elif isinstance(definition, dict):
# Try to infer object type
expected_type = self.get_default_type(key, parent_class)
Expand All @@ -169,17 +195,22 @@ def _create_subcomponent(self, key, definition, kwargs, config, parent_class):
if expected_type and not self._is_builtin_type(expected_type):
definition["class_name"] = expected_type
definition[OPTIONS_STR] = self._merge_dicts(kwargs.get(OPTIONS_STR, dict()), definition.get(OPTIONS_STR, dict()))
return self.create_component(definition, config)()
return self.create_component(definition, config, instantiate)()
else:
return definition
elif isinstance(definition, list):
return [
self._create_subcomponent(
key, sub, self._merge_dicts(kwargs.get(OPTIONS_STR, dict()), self._get_subcomponent_options(sub)), config, parent_class
key,
sub,
self._merge_dicts(kwargs.get(OPTIONS_STR, dict()), self._get_subcomponent_options(sub)),
config,
parent_class,
instantiate,
)
for sub in definition
]
else:
elif instantiate:
expected_type = self.get_default_type(key, parent_class)
if expected_type and not isinstance(definition, expected_type):
# call __init__(definition) if definition is not a dict and is not of the expected type
Expand All @@ -193,8 +224,7 @@ def _create_subcomponent(self, key, definition, kwargs, config, parent_class):
return expected_type(definition, options=options)
except Exception as e:
raise Exception(f"failed to instantiate type {expected_type}. {e}")
else:
return definition
return definition

@staticmethod
def is_object_definition_with_class_name(definition):
Expand Down Expand Up @@ -238,3 +268,40 @@ def _is_builtin_type(cls) -> bool:
if not cls:
return False
return cls.__module__ == "builtins"

@staticmethod
def _transform_interface_to_union(cls: type):
copy_cls = type(cls.__name__ + "Copy", cls.__bases__, dict(cls.__dict__))
class_fields = fields(copy_cls)
for field in class_fields:
unpacked_field_types = DeclarativeComponentFactory.unpack(field.type)
copy_cls.__annotations__[field.name] = unpacked_field_types

@staticmethod
def unpack(field_type: type):
"""
Recursive function that takes in a field type and unpacks the underlying fields (if it is a generic) or
returns the field type if it is not in a generic container
:param field_type: The current set of field types to unpack
:return: A list of unpacked types
"""
generic_type = typing.get_origin(field_type)
if generic_type is None:
# Functions as the base case since the origin is none for non-typing classes. If it is an interface then we derive
# and return the union of its subclasses or return the original type if it is a concrete class or a primitive type
module = field_type.__module__
if module != "builtins" and module != "typing":
subclasses = field_type.__subclasses__()
if subclasses:
return Union[tuple(subclasses)]
return field_type
elif generic_type is list or generic_type is Union:
unpacked_types = [DeclarativeComponentFactory.unpack(underlying_type) for underlying_type in typing.get_args(field_type)]
if generic_type is list:
# For lists we extract the underlying list type and attempt to unpack it again since it could be another container
return List[Union[tuple(unpacked_types)]]
elif generic_type is Union:
# For Unions (and Options which evaluate into a Union of types and NoneType) we unpack the underlying type since it could
# be another container
return Union[tuple(unpacked_types)]
return field_type
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@
from typing import Any, Mapping, MutableMapping, Optional, Union

import requests
from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator
from airbyte_cdk.sources.declarative.auth.token import BasicHttpAuthenticator
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.requesters.error_handlers.default_error_handler import DefaultErrorHandler
from airbyte_cdk.sources.declarative.requesters.error_handlers.error_handler import ErrorHandler
from airbyte_cdk.sources.declarative.requesters.error_handlers.response_status import ResponseStatus
from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import (
InterpolatedRequestOptionsProvider,
)
from airbyte_cdk.sources.declarative.requesters.request_options.request_options_provider import RequestOptionsProvider
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod, Requester
from airbyte_cdk.sources.declarative.types import Config, StreamSlice, StreamState
from airbyte_cdk.sources.streams.http.auth import HttpAuthenticator, NoAuth
from dataclasses_jsonschema import JsonSchemaMixin


Expand All @@ -28,33 +28,35 @@ class HttpRequester(Requester, JsonSchemaMixin):

Attributes:
name (str): Name of the stream. Only used for request/response caching
url_base (InterpolatedString): Base url to send requests to
path (InterpolatedString): Path to send requests to
url_base (Union[InterpolatedString, str]): Base url to send requests to
path (Union[InterpolatedString, str]): Path to send requests to
http_method (Union[str, HttpMethod]): HTTP method to use when sending requests
request_options_provider (Optional[RequestOptionsProvider]): request option provider defining the options to set on outgoing requests
authenticator (HttpAuthenticator): Authenticator defining how to authenticate to the source
request_options_provider (Optional[InterpolatedRequestOptionsProvider]): request option provider defining the options to set on outgoing requests
authenticator (DeclarativeAuthenticator): Authenticator defining how to authenticate to the source
error_handler (Optional[ErrorHandler]): Error handler defining how to detect and handle errors
config (Config): The user-provided configuration as specified by the source's spec
"""

name: str
url_base: InterpolatedString
path: InterpolatedString
url_base: Union[InterpolatedString, str]
path: Union[InterpolatedString, str]
config: Config
options: InitVar[Mapping[str, Any]]
http_method: Union[str, HttpMethod] = HttpMethod.GET
request_options_provider: Optional[RequestOptionsProvider] = None
authenticator: HttpAuthenticator = None
request_options_provider: Optional[InterpolatedRequestOptionsProvider] = None
authenticator: DeclarativeAuthenticator = None
error_handler: Optional[ErrorHandler] = None

def __post_init__(self, options: Mapping[str, Any]):
self.url_base = InterpolatedString.create(self.url_base, options=options)
self.path = InterpolatedString.create(self.path, options=options)
if self.request_options_provider is None:
self._request_options_provider = InterpolatedRequestOptionsProvider(config=self.config, options=options)
elif isinstance(self.request_options_provider, dict):
self._request_options_provider = InterpolatedRequestOptionsProvider(config=self.config, **self.request_options_provider)
else:
self._request_options_provider = self.request_options_provider
self.authenticator = self.authenticator or NoAuth()
self.authenticator = self.authenticator or BasicHttpAuthenticator("", config=self.config, options={})
if type(self.http_method) == str:
self.http_method = HttpMethod[self.http_method]
self._method = self.http_method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@
import requests
from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator
from airbyte_cdk.sources.declarative.types import StreamSlice, StreamState
from dataclasses_jsonschema import JsonSchemaMixin


@dataclass
class NoPagination(Paginator):
class NoPagination(Paginator, JsonSchemaMixin):
"""
Pagination implementation that never returns a next page.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@
from typing import Any, List, Mapping, Optional

import requests
from dataclasses_jsonschema import JsonSchemaMixin


@dataclass
class PaginationStrategy(JsonSchemaMixin):
class PaginationStrategy:
"""
Defines how to get the next page token
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ class SimpleRetriever(Retriever, HttpStream, JsonSchemaMixin):
record_selector: HttpSelector
options: InitVar[Mapping[str, Any]]
name: str
_name: str = field(init=False, repr=False)
_name: str = field(init=False, repr=False, default="")
primary_key: Optional[Union[str, List[str], List[List[str]]]]
_primary_key: str = field(init=False, repr=False)
_primary_key: str = field(init=False, repr=False, default="")
paginator: Optional[Paginator] = None
stream_slicer: Optional[StreamSlicer] = SingleSlice(options={})

Expand Down
Loading