Skip to content

Commit 23679f5

Browse files
authored
instantiate a declarative connector and allow for reads to be invoked from the connector builder server (#19333)
* instantiate a declarative connector and allow for reads to be invoked from the connector builder server * various pr feedback and cleaning up the code a bit * refactor grouping logic into a separate function to illustrate how groups are being emitted * fix the webapp to also pass config to the stream list endpoint * fix dereference field * replace error message handling with default FastAPI HTTPException * pr feedback: more error messaging and some code reuse * formatting * regenerate open api spec
1 parent 15c3d62 commit 23679f5

File tree

11 files changed

+876
-9
lines changed

11 files changed

+876
-9
lines changed

airbyte-connector-builder-server/build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ airbytePython {
1313
task generateOpenApiPythonServer(type: GenerateTask){
1414
outputs.upToDateWhen { false }
1515

16-
def generatedCodeDir = "$buildDir/server"
16+
def generatedCodeDir = "$buildDir/airbyte_connector_builder_server"
1717
inputSpec = "$rootDir.absolutePath/airbyte-connector-builder-server/src/main/openapi/openapi.yaml"
18-
outputDir = "$buildDir/airbyte_connector_builder_server"
18+
outputDir = generatedCodeDir
1919

2020
generatorName = "python-fastapi"
2121
configFile = "$projectDir/openapi/generator_config.yaml"
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
#
2+
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
# coding: utf-8
6+
7+
from __future__ import annotations
8+
from datetime import date, datetime # noqa: F401
9+
10+
import re # noqa: F401
11+
from typing import Any, Dict, List, Optional # noqa: F401
12+
13+
from pydantic import AnyUrl, BaseModel, EmailStr, validator # noqa: F401
14+
15+
16+
class StreamReadSliceDescriptor(BaseModel):
17+
"""NOTE: This class is auto generated by OpenAPI Generator (https://openapi-generator.tech).
18+
19+
Do not edit the class manually.
20+
21+
StreamReadSliceDescriptor - a model defined in OpenAPI
22+
23+
start_datetime: The start_datetime of this StreamReadSliceDescriptor [Optional].
24+
list_item: The list_item of this StreamReadSliceDescriptor [Optional].
25+
"""
26+
27+
start_datetime: Optional[datetime] = None
28+
list_item: Optional[str] = None
29+
30+
StreamReadSliceDescriptor.update_forward_refs()

airbyte-connector-builder-server/connector_builder/generated/models/streams_list_read_streams.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,6 @@ class StreamsListReadStreams(BaseModel):
2525
"""
2626

2727
name: str
28-
url: AnyUrl
28+
url: str
2929

3030
StreamsListReadStreams.update_forward_refs()

airbyte-connector-builder-server/connector_builder/generated/models/streams_list_request_body.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ class StreamsListRequestBody(BaseModel):
2121
StreamsListRequestBody - a model defined in OpenAPI
2222
2323
manifest: The manifest of this StreamsListRequestBody.
24+
config: The config of this StreamsListRequestBody.
2425
"""
2526

2627
manifest: Dict[str, Any]
28+
config: Dict[str, Any]
2729

2830
StreamsListRequestBody.update_forward_refs()

airbyte-connector-builder-server/connector_builder/impl/default_api.py

Lines changed: 136 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,31 @@
22
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
33
#
44

5+
import json
6+
import logging
7+
from json import JSONDecodeError
8+
from typing import Any, Dict, Iterable, Optional, Union
9+
from urllib.parse import parse_qs, urljoin, urlparse
510

11+
from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Type
612
from connector_builder.generated.apis.default_api_interface import DefaultApi
13+
from connector_builder.generated.models.http_request import HttpRequest
14+
from connector_builder.generated.models.http_response import HttpResponse
715
from connector_builder.generated.models.stream_read import StreamRead
16+
from connector_builder.generated.models.stream_read_pages import StreamReadPages
817
from connector_builder.generated.models.stream_read_request_body import StreamReadRequestBody
18+
from connector_builder.generated.models.stream_read_slices import StreamReadSlices
919
from connector_builder.generated.models.streams_list_read import StreamsListRead
20+
from connector_builder.generated.models.streams_list_read_streams import StreamsListReadStreams
1021
from connector_builder.generated.models.streams_list_request_body import StreamsListRequestBody
11-
from fastapi import Body
22+
from connector_builder.impl.low_code_cdk_adapter import LowCodeSourceAdapter
23+
from fastapi import Body, HTTPException
24+
from jsonschema import ValidationError
1225

1326

1427
class DefaultApiImpl(DefaultApi):
28+
logger = logging.getLogger("airbyte.connector-builder")
29+
1530
async def get_manifest_template(self) -> str:
1631
return """version: "0.1.0"
1732
@@ -65,7 +80,125 @@ async def get_manifest_template(self) -> str:
6580
"""
6681

6782
async def list_streams(self, streams_list_request_body: StreamsListRequestBody = Body(None, description="")) -> StreamsListRead:
68-
raise Exception("not yet implemented")
83+
"""
84+
Takes in a low code manifest and a config to resolve the list of streams that are available for testing
85+
:param streams_list_request_body: Input parameters to retrieve the list of available streams
86+
:return: Stream objects made up of a stream name and the HTTP URL it will send requests to
87+
"""
88+
adapter = self._create_low_code_adapter(manifest=streams_list_request_body.manifest)
89+
90+
stream_list_read = []
91+
try:
92+
for http_stream in adapter.get_http_streams(streams_list_request_body.config):
93+
stream_list_read.append(
94+
StreamsListReadStreams(
95+
name=http_stream.name,
96+
url=urljoin(http_stream.url_base, http_stream.path()),
97+
)
98+
)
99+
except Exception as error:
100+
raise HTTPException(status_code=400, detail=f"Could not list streams with with error: {error.args[0]}")
101+
return StreamsListRead(streams=stream_list_read)
69102

70103
async def read_stream(self, stream_read_request_body: StreamReadRequestBody = Body(None, description="")) -> StreamRead:
71-
raise Exception("not yet implemented")
104+
"""
105+
Using the provided manifest and config, invokes a sync for the specified stream and returns groups of Airbyte messages
106+
that are produced during the read operation
107+
:param stream_read_request_body: Input parameters to trigger the read operation for a stream
108+
:return: Airbyte record messages produced by the sync grouped by slice and page
109+
"""
110+
adapter = self._create_low_code_adapter(manifest=stream_read_request_body.manifest)
111+
112+
single_slice = StreamReadSlices(pages=[])
113+
log_messages = []
114+
try:
115+
for message_group in self._get_message_groups(
116+
adapter.read_stream(stream_read_request_body.stream, stream_read_request_body.config)
117+
):
118+
if isinstance(message_group, AirbyteLogMessage):
119+
log_messages.append({"message": message_group.message})
120+
else:
121+
single_slice.pages.append(message_group)
122+
except Exception as error:
123+
# TODO: We're temporarily using FastAPI's default exception model. Ideally we should use exceptions defined in the OpenAPI spec
124+
raise HTTPException(status_code=400, detail=f"Could not perform read with with error: {error.args[0]}")
125+
126+
return StreamRead(logs=log_messages, slices=[single_slice])
127+
128+
def _get_message_groups(self, messages: Iterable[AirbyteMessage]) -> Iterable[Union[StreamReadPages, AirbyteLogMessage]]:
129+
"""
130+
Message groups are partitioned according to when request log messages are received. Subsequent response log messages
131+
and record messages belong to the prior request log message and when we encounter another request, append the latest
132+
message group.
133+
134+
Messages received from the CDK read operation will always arrive in the following order:
135+
{type: LOG, log: {message: "request: ..."}}
136+
{type: LOG, log: {message: "response: ..."}}
137+
... 0 or more record messages
138+
{type: RECORD, record: {data: ...}}
139+
{type: RECORD, record: {data: ...}}
140+
Repeats for each request/response made
141+
142+
Note: The exception is that normal log messages can be received at any time which are not incorporated into grouping
143+
"""
144+
first_page = True
145+
current_records = []
146+
current_page_request: Optional[HttpRequest] = None
147+
current_page_response: Optional[HttpResponse] = None
148+
for message in messages:
149+
if first_page and message.type == Type.LOG and message.log.message.startswith("request:"):
150+
first_page = False
151+
request = self._create_request_from_log_message(message.log)
152+
current_page_request = request
153+
elif message.type == Type.LOG and message.log.message.startswith("request:"):
154+
if not current_page_request or not current_page_response:
155+
raise ValueError("Every message grouping should have at least one request and response")
156+
yield StreamReadPages(request=current_page_request, response=current_page_response, records=current_records)
157+
current_page_request = self._create_request_from_log_message(message.log)
158+
current_records = []
159+
elif message.type == Type.LOG and message.log.message.startswith("response:"):
160+
current_page_response = self._create_response_from_log_message(message.log)
161+
elif message.type == Type.LOG:
162+
yield message.log
163+
elif message.type == Type.RECORD:
164+
current_records.append(message.record.data)
165+
else:
166+
if not current_page_request or not current_page_response:
167+
raise ValueError("Every message grouping should have at least one request and response")
168+
yield StreamReadPages(request=current_page_request, response=current_page_response, records=current_records)
169+
170+
def _create_request_from_log_message(self, log_message: AirbyteLogMessage) -> Optional[HttpRequest]:
171+
# TODO: As a temporary stopgap, the CDK emits request data as a log message string. Ideally this should come in the
172+
# form of a custom message object defined in the Airbyte protocol, but this unblocks us in the immediate while the
173+
# protocol change is worked on.
174+
raw_request = log_message.message.partition("request:")[2]
175+
try:
176+
request = json.loads(raw_request)
177+
url = urlparse(request.get("url", ""))
178+
full_path = f"{url.scheme}://{url.hostname}{url.path}" if url else ""
179+
parameters = parse_qs(url.query) or None
180+
return HttpRequest(url=full_path, headers=request.get("headers"), parameters=parameters, body=request.get("body"))
181+
except JSONDecodeError as error:
182+
self.logger.warning(f"Failed to parse log message into request object with error: {error}")
183+
return None
184+
185+
def _create_response_from_log_message(self, log_message: AirbyteLogMessage) -> Optional[HttpResponse]:
186+
# TODO: As a temporary stopgap, the CDK emits response data as a log message string. Ideally this should come in the
187+
# form of a custom message object defined in the Airbyte protocol, but this unblocks us in the immediate while the
188+
# protocol change is worked on.
189+
raw_response = log_message.message.partition("response:")[2]
190+
try:
191+
response = json.loads(raw_response)
192+
body = json.loads(response.get("body", "{}"))
193+
return HttpResponse(status=response.get("status_code"), body=body, headers=response.get("headers"))
194+
except JSONDecodeError as error:
195+
self.logger.warning(f"Failed to parse log message into response object with error: {error}")
196+
return None
197+
198+
@staticmethod
199+
def _create_low_code_adapter(manifest: Dict[str, Any]) -> LowCodeSourceAdapter:
200+
try:
201+
return LowCodeSourceAdapter(manifest=manifest)
202+
except ValidationError as error:
203+
# TODO: We're temporarily using FastAPI's default exception model. Ideally we should use exceptions defined in the OpenAPI spec
204+
raise HTTPException(status_code=400, detail=f"Invalid connector manifest with error: {error.message}")
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
#
2+
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from typing import Any, Dict, Iterable, List
6+
7+
from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog
8+
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
9+
from airbyte_cdk.sources.declarative.yaml_declarative_source import ManifestDeclarativeSource
10+
from airbyte_cdk.sources.streams.http import HttpStream
11+
12+
13+
class LowCodeSourceAdapter:
14+
def __init__(self, manifest: Dict[str, Any]):
15+
# Request and response messages are only emitted for a sources that have debug turned on
16+
self._source = ManifestDeclarativeSource(manifest, debug=True)
17+
18+
def get_http_streams(self, config: Dict[str, Any]) -> List[HttpStream]:
19+
http_streams = []
20+
for stream in self._source.streams(config=config):
21+
if isinstance(stream, DeclarativeStream):
22+
if isinstance(stream.retriever, HttpStream):
23+
http_streams.append(stream.retriever)
24+
else:
25+
raise TypeError(
26+
f"A declarative stream should only have a retriever of type HttpStream, but received: {stream.retriever.__class__}")
27+
else:
28+
raise TypeError(f"A declarative source should only contain streams of type DeclarativeStream, but received: {stream.__class__}")
29+
return http_streams
30+
31+
def read_stream(self, stream: str, config: Dict[str, Any]) -> Iterable[AirbyteMessage]:
32+
configured_catalog = ConfiguredAirbyteCatalog.parse_obj(
33+
{
34+
"streams": [
35+
{
36+
"stream": {
37+
"name": stream,
38+
"json_schema": {},
39+
"supported_sync_modes": ["full_refresh", "incremental"],
40+
},
41+
"sync_mode": "full_refresh",
42+
"destination_sync_mode": "overwrite",
43+
}
44+
]
45+
}
46+
)
47+
generator = self._source.read(logger=self._source.logger, config=config, catalog=configured_catalog)
48+
for message in generator:
49+
yield message

airbyte-connector-builder-server/setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
},
4242
packages=find_packages(exclude=("unit_tests", "integration_tests", "docs")),
4343
package_data={},
44-
install_requires=["fastapi", "uvicorn"],
44+
install_requires=["airbyte-cdk~=0.8", "fastapi", "uvicorn"],
4545
python_requires=">=3.9.11",
4646
extras_require={
4747
"tests": ["MyPy~=0.812", "pytest~=6.2.5", "pytest-cov", "pytest-mock", "pytest-recording", "requests-mock", "pre-commit"],

airbyte-connector-builder-server/src/main/openapi/openapi.yaml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,11 +189,15 @@ components:
189189
type: object
190190
required:
191191
- manifest
192+
- config
192193
properties:
193194
manifest:
194195
type: object
195196
description: The config-based connector manifest contents
196197
# $ref: "#/components/schemas/ConnectorManifest"
198+
config:
199+
type: object
200+
description: The config blob containing the user inputs for testing
197201
StreamsListRead:
198202
type: object
199203
required:
@@ -213,7 +217,6 @@ components:
213217
description: The name of the stream
214218
url:
215219
type: string
216-
format: uri
217220
description: The URL to which read requests will be made for this stream
218221
# --- Potential addition for a later phase ---
219222
# slices:

0 commit comments

Comments
 (0)