Skip to content

[low-code cdk] Allow for read command to be run on low code connector streams w/o a schema file #18532

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Nov 3, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.declarative.schema.json_schema import JsonSchema
from airbyte_cdk.sources.declarative.schema import MockSchemaLoader
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.declarative.types import Config, StreamSlice
Expand Down Expand Up @@ -48,7 +48,7 @@ class DeclarativeStream(Stream, JsonSchemaMixin):
def __post_init__(self, options: Mapping[str, Any]):
self.stream_cursor_field = self.stream_cursor_field or []
self.transformations = self.transformations or []
self._schema_loader = self.schema_loader if self.schema_loader else JsonSchema(config=self.config, options=options)
self._schema_loader = self.schema_loader if self.schema_loader else MockSchemaLoader(config=self.config, options=options)

@property
def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.offset_increment import OffsetIncrement
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.page_increment import PageIncrement
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
from airbyte_cdk.sources.declarative.schema.json_schema import JsonSchema
from airbyte_cdk.sources.declarative.schema import MockSchemaLoader
from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import JsonFileSchemaLoader
from airbyte_cdk.sources.declarative.stream_slicers.cartesian_product_stream_slicer import CartesianProductStreamSlicer
from airbyte_cdk.sources.declarative.stream_slicers.datetime_stream_slicer import DatetimeStreamSlicer
from airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer import ListStreamSlicer
Expand Down Expand Up @@ -62,9 +63,10 @@
"HttpRequester": HttpRequester,
"InterpolatedBoolean": InterpolatedBoolean,
"InterpolatedString": InterpolatedString,
"JsonSchema": JsonSchema,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a worry that this might break existing users?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I scanned through our existing low code implementations and i'm going to migrate them to the new name JsonFileSchemaLoader after the CDK is published first. However, on second thought, I think instead of doing the rename now, we'll just bundle this up into a series of cosmetic changes we have slated in November. Even though Hacktober fest is over, some contributors are still making adjustment and we don't want to add unnecessary friction if it can wait

"JsonFileSchemaLoader": JsonFileSchemaLoader,
"ListStreamSlicer": ListStreamSlicer,
"MinMaxDatetime": MinMaxDatetime,
"MockSchemaLoader": MockSchemaLoader,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: I'd replace Mock with Empty in the name

"NoAuth": NoAuth,
"NoPagination": NoPagination,
"OAuthAuthenticator": DeclarativeOauth2Authenticator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from airbyte_cdk.sources.declarative.requesters.requester import Requester
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
from airbyte_cdk.sources.declarative.schema.json_schema import JsonSchema
from airbyte_cdk.sources.declarative.schema import MockSchemaLoader
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
from airbyte_cdk.sources.declarative.stream_slicers.single_slice import SingleSlice
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
Expand Down Expand Up @@ -58,7 +58,7 @@
RequestOptionsProvider: InterpolatedRequestOptionsProvider,
Requester: HttpRequester,
Retriever: SimpleRetriever,
SchemaLoader: JsonSchema,
SchemaLoader: MockSchemaLoader,
Stream: DeclarativeStream,
StreamSlicer: SingleSlice,
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk.sources.declarative.schema.json_schema import JsonSchema
from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import JsonFileSchemaLoader
from airbyte_cdk.sources.declarative.schema.mock_schema_loader import MockSchemaLoader
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader

__all__ = ["JsonSchema", "SchemaLoader"]
__all__ = ["JsonFileSchemaLoader", "MockSchemaLoader", "SchemaLoader"]
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def _default_file_path() -> str:


@dataclass
class JsonSchema(SchemaLoader, JsonSchemaMixin):
class JsonFileSchemaLoader(SchemaLoader, JsonSchemaMixin):
"""
Loads the schema from a json file

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from dataclasses import InitVar, dataclass
from typing import Any, Mapping

from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
from airbyte_cdk.sources.declarative.types import Config
from dataclasses_jsonschema import JsonSchemaMixin


@dataclass
class MockSchemaLoader(SchemaLoader, JsonSchemaMixin):
"""
Loads an empty schema for streams that have not defined their schema file yet.

Attributes:
config (Config): The user-provided configuration as specified by the source's spec
options (Mapping[str, Any]): Additional arguments to pass to the string interpolation if needed
"""

config: Config
options: InitVar[Mapping[str, Any]]

def __post_init__(self, options: Mapping[str, Any]):
pass

def get_json_schema(self) -> Mapping[str, Any]:
"""
Returns by default the empty schema.

:return: The empty schema
"""

return {}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

import pytest
from airbyte_cdk.sources.declarative.schema import JsonSchema
from airbyte_cdk.sources.declarative.schema import JsonFileSchemaLoader


@pytest.mark.parametrize(
Expand All @@ -19,7 +19,7 @@
],
)
def test_extract_resource_and_schema_path(test_name, input_path, expected_resource, expected_path):
json_schema = JsonSchema(input_path, {}, {})
json_schema = JsonFileSchemaLoader({}, {}, input_path)
actual_resource, actual_path = json_schema.extract_resource_and_schema_path(input_path)

assert actual_resource == expected_resource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
)
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
from airbyte_cdk.sources.declarative.schema.json_schema import JsonSchema
from airbyte_cdk.sources.declarative.schema import MockSchemaLoader
from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import JsonFileSchemaLoader
from airbyte_cdk.sources.declarative.stream_slicers.datetime_stream_slicer import DatetimeStreamSlicer
from airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer import ListStreamSlicer
from airbyte_cdk.sources.declarative.transformations import AddFields, RemoveFields
Expand Down Expand Up @@ -318,7 +319,7 @@ def test_full_config():
partial_stream:
class_name: "airbyte_cdk.sources.declarative.declarative_stream.DeclarativeStream"
schema_loader:
class_name: airbyte_cdk.sources.declarative.schema.json_schema.JsonSchema
class_name: airbyte_cdk.sources.declarative.schema.json_file_schema_loader.JsonFileSchemaLoader
file_path: "./source_sendgrid/schemas/{{ options.name }}.json"
cursor_field: [ ]
list_stream:
Expand Down Expand Up @@ -358,7 +359,7 @@ def test_full_config():
assert type(stream) == DeclarativeStream
assert stream.primary_key == "id"
assert stream.name == "lists"
assert type(stream.schema_loader) == JsonSchema
assert type(stream.schema_loader) == JsonFileSchemaLoader
assert type(stream.retriever) == SimpleRetriever
assert stream.retriever.requester.http_method == HttpMethod.GET
assert stream.retriever.requester.authenticator._token.eval(input_config) == "verysecrettoken"
Expand Down Expand Up @@ -558,13 +559,13 @@ def test_config_with_defaults():
assert type(stream) == DeclarativeStream
assert stream.primary_key == "id"
assert stream.name == "lists"
assert type(stream.schema_loader) == JsonSchema
assert type(stream.schema_loader) == MockSchemaLoader
assert type(stream.retriever) == SimpleRetriever
assert stream.retriever.requester.http_method == HttpMethod.GET

assert stream.retriever.requester.authenticator._token.eval(input_config) == "verysecrettoken"
assert [fp.eval(input_config) for fp in stream.retriever.record_selector.extractor.field_pointer] == ["result"]
assert stream.schema_loader._get_json_filepath() == "./source_sendgrid/schemas/lists.yaml"
assert stream.schema_loader.get_json_schema() == {}
assert isinstance(stream.retriever.paginator, DefaultPaginator)

assert stream.retriever.paginator.url_base.string == "https://api.sendgrid.com"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,8 @@ def test_generate_schema():

declarative_stream = schema["definitions"]["DeclarativeStream"]
assert {"retriever", "config"}.issubset(declarative_stream["required"])
assert declarative_stream["properties"]["schema_loader"]["$ref"] == "#/definitions/JsonSchema"
assert {"$ref": "#/definitions/MockSchemaLoader"} in declarative_stream["properties"]["schema_loader"]["anyOf"]
assert {"$ref": "#/definitions/JsonFileSchemaLoader"} in declarative_stream["properties"]["schema_loader"]["anyOf"]
assert declarative_stream["properties"]["retriever"]["$ref"] == "#/definitions/SimpleRetriever"
assert declarative_stream["properties"]["name"]["type"] == "string"
assert {"type": "array", "items": {"type": "string"}} in declarative_stream["properties"]["primary_key"]["anyOf"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: "0.1.0"

definitions:
schema_loader:
type: JsonSchema
type: JsonFileSchemaLoader
file_path: "./source_courier/schemas/{{ options['name'] }}.json"

root_selector:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: "0.1.0"

definitions:
schema_loader:
type: JsonSchema
type: JsonFileSchemaLoader
file_path: "./source_gocardless/schemas/{{ options['name'] }}.json"
selector:
type: RecordSelector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ version: "0.1.0"

definitions:
schema_loader:
type: JsonFileSchemaLoader
file_path: "./source_greenhouse/schemas/{{ options['name'] }}.json"
selector:
type: RecordSelector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ definitions:
step: "30d"

schema_loader:
type: JsonSchema
type: JsonFileSchemaLoader
file_path: "./source_sendgrid/schemas/{{ options.name }}.json"

requester:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: "0.1.0"
definitions:
page_size: 50
schema_loader:
type: JsonSchema
type: JsonFileSchemaLoader
file_path: "./source_sentry/schemas/{{ options.name }}.json"
selector:
type: RecordSelector
Expand Down