Skip to content

Commit 6aadd76

Browse files
committed
Moved transformations to dynamic schema loader
1 parent 2061a62 commit 6aadd76

File tree

4 files changed

+153
-146
lines changed

4 files changed

+153
-146
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1767,6 +1767,17 @@ definitions:
17671767
- "$ref": "#/definitions/AsyncRetriever"
17681768
- "$ref": "#/definitions/CustomRetriever"
17691769
- "$ref": "#/definitions/SimpleRetriever"
1770+
transformations:
1771+
title: Transformations
1772+
description: A list of transformations to be applied to retrieved record.
1773+
type: array
1774+
items:
1775+
anyOf:
1776+
- "$ref": "#/definitions/AddFields"
1777+
- "$ref": "#/definitions/CustomTransformation"
1778+
- "$ref": "#/definitions/RemoveFields"
1779+
- "$ref": "#/definitions/KeysToLower"
1780+
- "$ref": "#/definitions/KeysToSnakeCase"
17701781
schema_type_identifier:
17711782
"$ref": "#/definitions/SchemaTypeIdentifier"
17721783
$parameters:
@@ -2526,16 +2537,6 @@ definitions:
25262537
schema_normalization:
25272538
"$ref": "#/definitions/SchemaNormalization"
25282539
default: None
2529-
transformations:
2530-
title: Transformations
2531-
description: A list of transformations to be applied to each output record.
2532-
type: array
2533-
items:
2534-
anyOf:
2535-
- "$ref": "#/definitions/AddFields"
2536-
- "$ref": "#/definitions/CustomTransformation"
2537-
- "$ref": "#/definitions/RemoveFields"
2538-
- "$ref": "#/definitions/KeysToLower"
25392540
$parameters:
25402541
type: object
25412542
additionalProperties: true

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 73 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -528,7 +528,9 @@ class OAuthAuthenticator(BaseModel):
528528
scopes: Optional[List[str]] = Field(
529529
None,
530530
description="List of scopes that should be granted to the access token.",
531-
examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]],
531+
examples=[
532+
["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]
533+
],
532534
title="Scopes",
533535
)
534536
token_expiry_date: Optional[str] = Field(
@@ -822,7 +824,9 @@ class Config:
822824
access_token_headers: Optional[Dict[str, Any]] = Field(
823825
None,
824826
description="The DeclarativeOAuth Specific optional headers to inject while exchanging the `auth_code` to `access_token` during `completeOAuthFlow` step.",
825-
examples=[{"Authorization": "Basic {base64Encoder:{client_id}:{client_secret}}"}],
827+
examples=[
828+
{"Authorization": "Basic {base64Encoder:{client_id}:{client_secret}}"}
829+
],
826830
title="Access Token Headers",
827831
)
828832
access_token_params: Optional[Dict[str, Any]] = Field(
@@ -891,24 +895,28 @@ class OAuthConfigSpecification(BaseModel):
891895
class Config:
892896
extra = Extra.allow
893897

894-
oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field(
895-
None,
896-
description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }",
897-
examples=[
898-
{"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}},
899-
{
900-
"app_id": {
901-
"type": "string",
902-
"path_in_connector_config": ["info", "app_id"],
903-
}
904-
},
905-
],
906-
title="OAuth user input",
898+
oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = (
899+
Field(
900+
None,
901+
description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }",
902+
examples=[
903+
{"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}},
904+
{
905+
"app_id": {
906+
"type": "string",
907+
"path_in_connector_config": ["info", "app_id"],
908+
}
909+
},
910+
],
911+
title="OAuth user input",
912+
)
907913
)
908-
oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field(
909-
None,
910-
description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{my_var}`.\n- The nested resolution variables like `{{my_nested_var}}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {base64Encoder:{my_var_a}:{my_var_b}}\n + base64Decorer - decode from `base64` encoded string, {base64Decoder:{my_string_variable_or_string_value}}\n + urlEncoder - encode the input string to URL-like format, {urlEncoder:https://test.host.com/endpoint}\n + urlDecorer - decode the input url-encoded string into text format, {urlDecoder:https%3A%2F%2Fairbyte.io}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {codeChallengeS256:{state_value}}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{client_id_key}={{client_id_key}}&{redirect_uri_key}={urlEncoder:{{redirect_uri_key}}}&{state_key}={{state_key}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{auth_code_key}": "{{auth_code_key}}",\n "{client_id_key}": "{{client_id_key}}",\n "{client_secret_key}": "{{client_secret_key}}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }',
911-
title="DeclarativeOAuth Connector Specification",
914+
oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = (
915+
Field(
916+
None,
917+
description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{my_var}`.\n- The nested resolution variables like `{{my_nested_var}}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {base64Encoder:{my_var_a}:{my_var_b}}\n + base64Decorer - decode from `base64` encoded string, {base64Decoder:{my_string_variable_or_string_value}}\n + urlEncoder - encode the input string to URL-like format, {urlEncoder:https://test.host.com/endpoint}\n + urlDecorer - decode the input url-encoded string into text format, {urlDecoder:https%3A%2F%2Fairbyte.io}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {codeChallengeS256:{state_value}}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{client_id_key}={{client_id_key}}&{redirect_uri_key}={urlEncoder:{{redirect_uri_key}}}&{state_key}={{state_key}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{auth_code_key}": "{{auth_code_key}}",\n "{client_id_key}": "{{client_id_key}}",\n "{client_secret_key}": "{{client_secret_key}}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }',
918+
title="DeclarativeOAuth Connector Specification",
919+
)
912920
)
913921
complete_oauth_output_specification: Optional[Dict[str, Any]] = Field(
914922
None,
@@ -926,7 +934,9 @@ class Config:
926934
complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field(
927935
None,
928936
description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }",
929-
examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}],
937+
examples=[
938+
{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}
939+
],
930940
title="OAuth input specification",
931941
)
932942
complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field(
@@ -1490,13 +1500,6 @@ class RecordSelector(BaseModel):
14901500
title="Record Filter",
14911501
)
14921502
schema_normalization: Optional[SchemaNormalization] = SchemaNormalization.None_
1493-
transformations: Optional[
1494-
List[Union[AddFields, CustomTransformation, RemoveFields, KeysToLower]]
1495-
] = Field(
1496-
None,
1497-
description="A list of transformations to be applied to each output record.",
1498-
title="Transformations",
1499-
)
15001503
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
15011504

15021505

@@ -1644,12 +1647,16 @@ class Config:
16441647
description="Component used to coordinate how records are extracted across stream slices and request pages.",
16451648
title="Retriever",
16461649
)
1647-
incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = Field(
1648-
None,
1649-
description="Component used to fetch data incrementally based on a time field in the data.",
1650-
title="Incremental Sync",
1650+
incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = (
1651+
Field(
1652+
None,
1653+
description="Component used to fetch data incrementally based on a time field in the data.",
1654+
title="Incremental Sync",
1655+
)
1656+
)
1657+
name: Optional[str] = Field(
1658+
"", description="The stream name.", example=["Users"], title="Name"
16511659
)
1652-
name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name")
16531660
primary_key: Optional[PrimaryKey] = Field(
16541661
"", description="The primary key of the stream.", title="Primary Key"
16551662
)
@@ -1838,6 +1845,21 @@ class DynamicSchemaLoader(BaseModel):
18381845
description="Component used to coordinate how records are extracted across stream slices and request pages.",
18391846
title="Retriever",
18401847
)
1848+
transformations: Optional[
1849+
List[
1850+
Union[
1851+
AddFields,
1852+
CustomTransformation,
1853+
RemoveFields,
1854+
KeysToLower,
1855+
KeysToSnakeCase,
1856+
]
1857+
]
1858+
] = Field(
1859+
None,
1860+
description="A list of transformations to be applied to retrieved record.",
1861+
title="Transformations",
1862+
)
18411863
schema_type_identifier: SchemaTypeIdentifier
18421864
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
18431865

@@ -1900,7 +1922,11 @@ class SimpleRetriever(BaseModel):
19001922
CustomPartitionRouter,
19011923
ListPartitionRouter,
19021924
SubstreamPartitionRouter,
1903-
List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]],
1925+
List[
1926+
Union[
1927+
CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter
1928+
]
1929+
],
19041930
]
19051931
] = Field(
19061932
[],
@@ -1942,7 +1968,9 @@ class AsyncRetriever(BaseModel):
19421968
)
19431969
download_extractor: Optional[
19441970
Union[CustomRecordExtractor, DpathExtractor, ResponseToFileExtractor]
1945-
] = Field(None, description="Responsible for fetching the records from provided urls.")
1971+
] = Field(
1972+
None, description="Responsible for fetching the records from provided urls."
1973+
)
19461974
creation_requester: Union[CustomRequester, HttpRequester] = Field(
19471975
...,
19481976
description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.",
@@ -1972,7 +2000,11 @@ class AsyncRetriever(BaseModel):
19722000
CustomPartitionRouter,
19732001
ListPartitionRouter,
19742002
SubstreamPartitionRouter,
1975-
List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]],
2003+
List[
2004+
Union[
2005+
CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter
2006+
]
2007+
],
19762008
]
19772009
] = Field(
19782010
[],
@@ -2036,10 +2068,12 @@ class DynamicDeclarativeStream(BaseModel):
20362068
stream_template: DeclarativeStream = Field(
20372069
..., description="Reference to the stream template.", title="Stream Template"
20382070
)
2039-
components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = Field(
2040-
...,
2041-
description="Component resolve and populates stream templates with components values.",
2042-
title="Components Resolver",
2071+
components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = (
2072+
Field(
2073+
...,
2074+
description="Component resolve and populates stream templates with components values.",
2075+
title="Components Resolver",
2076+
)
20432077
)
20442078

20452079

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1638,13 +1638,20 @@ def create_dynamic_schema_loader(
16381638
model.retriever, stream_slicer
16391639
)
16401640

1641+
transformations = []
1642+
if model.transformations:
1643+
for transformation_model in model.transformations:
1644+
transformations.append(
1645+
self._create_component_from_model(model=transformation_model, config=config)
1646+
)
1647+
16411648
retriever = self._create_component_from_model(
16421649
model=model.retriever,
16431650
config=config,
16441651
name="",
16451652
primary_key=None,
16461653
stream_slicer=combined_slicers,
1647-
transformations=[],
1654+
transformations=transformations,
16481655
)
16491656
schema_type_identifier = self._create_component_from_model(
16501657
model.schema_type_identifier, config=config, parameters=model.parameters or {}
@@ -1923,12 +1930,6 @@ def create_record_selector(
19231930
SCHEMA_TRANSFORMER_TYPE_MAPPING[model.schema_normalization]
19241931
)
19251932

1926-
if model.transformations:
1927-
for transformation_model in model.transformations:
1928-
transformations.append(
1929-
self._create_component_from_model(model=transformation_model, config=config)
1930-
)
1931-
19321933
return RecordSelector(
19331934
extractor=extractor,
19341935
name=name,

0 commit comments

Comments
 (0)