Skip to content

Commit e029353

Browse files
authored
SimpleRetriever yield request and response as log messages (#18644)
* method yielding airbytemessage * move to Stream * update abstract source * reset * missing file * Yield request and response as log messages * only emit request and responses if the debug flag is on * add test docker image * script to run acceptance tests with local cdk * Update conftest to use a different image * extract to method * dont use a different image tag * Always install local cdk * break the cdk * get path from current working directory * or * ignore unit test * debug log * Revert "AMI change: ami-0f23be2f917510c26 -> ami-005924fb76f7477ce (#18689)" This reverts commit bf06dec. * build from the top * Update source-acceptance-test * fix * copy setup * some work on the gradle plugin * reset to master * delete unused file * delete unused file * reset to master * optional argument * delete dead code * use latest cdk with sendgrid * fix sendgrid dockerfile * break the cdk * use local file * Revert "break the cdk" This reverts commit 600c195. * dont raise an exception * reset to master * unit tests * missing test * more unit tests * remove deprecated comment * newline * reset to master * remove files * reset * Update abstract source * remove method from stream * convert to airbytemessage * unittests * Update * unit test * remove debug logs * Revert "remove debug logs" This reverts commit a1a139e. * Revert "Revert "remove debug logs"" This reverts commit b1d62cd. * Revert "reset to master" This reverts commit 3fa6a00. * fix * slightly better test * typing * extract method * Revert "Revert "reset to master"" This reverts commit 5dac7c2. * reset to master * reset to master * Revert "reset to master" This reverts commit 3fa6a00. * Comment * operate on the message * Revert "Revert "reset to master"" This reverts commit 5833c84. * comment * test * Revert "test" This reverts commit 2f91b80. * test * Revert "test" This reverts commit 62d95eb. * test * Revert "test" This reverts commit 27150ba. * format * format * symlink * Update setup * update path * reset to master * update * Add local files * greenhouse * format * symlink * try reordering * better error message * better log message * reset to master * Revert "merge for qa" This reverts commit ad7128f, reversing changes made to 7196c22. * reset to master * reset to master * reset to master * format * gradlew format * right type hints * reset to master * reset to master * gradlew format * a bunch of small fixes * Update output format * fixes from feedback * fixme comment * streams cannot return AirbyteRecordMessage * fix * format * only return logs when running on debug mode * move branching * update typing * remove dead code * fix simpleretriever name * i think this is better * log response.text * debug flag * comment * pass config * comments * run SATs * fix most of the unit tests * fix unit test * reset to master * runFromPath * Revert "runFromPath" This reverts commit 85979a8. * Revert "run SATs" This reverts commit a8a8a2d. * no need to convert to dict * fix test
1 parent 64b50ad commit e029353

File tree

13 files changed

+178
-94
lines changed

13 files changed

+178
-94
lines changed

airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
2121
from airbyte_cdk.sources.source import Source
2222
from airbyte_cdk.sources.streams import Stream
23+
from airbyte_cdk.sources.streams.core import StreamData
2324
from airbyte_cdk.sources.streams.http.http import HttpStream
2425
from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message
2526
from airbyte_cdk.sources.utils.schema_helpers import InternalConfig, split_config
@@ -240,9 +241,7 @@ def _read_incremental(
240241
)
241242
record_counter = 0
242243
for message_counter, record_data_or_message in enumerate(records, start=1):
243-
message = stream_data_to_airbyte_message(
244-
stream_name, record_data_or_message, stream_instance.transformer, stream_instance.get_json_schema()
245-
)
244+
message = self._get_message(record_data_or_message, stream_instance)
246245
yield message
247246
if message.type == MessageType.RECORD:
248247
record = message.record
@@ -287,9 +286,7 @@ def _read_full_refresh(
287286
cursor_field=configured_stream.cursor_field,
288287
)
289288
for record_data_or_message in record_data_or_messages:
290-
message = stream_data_to_airbyte_message(
291-
stream_instance.name, record_data_or_message, stream_instance.transformer, stream_instance.get_json_schema()
292-
)
289+
message = self._get_message(record_data_or_message, stream_instance)
293290
yield message
294291
if message.type == MessageType.RECORD:
295292
total_records_counter += 1
@@ -315,3 +312,12 @@ def _apply_log_level_to_stream_logger(logger: logging.Logger, stream_instance: S
315312
"""
316313
if hasattr(logger, "level"):
317314
stream_instance.logger.setLevel(logger.level)
315+
316+
def _get_message(self, record_data_or_message: Union[StreamData, AirbyteMessage], stream: Stream):
317+
"""
318+
Converts the input to an AirbyteMessage if it is a StreamData. Returns the input as is if it is already an AirbyteMessage
319+
"""
320+
if isinstance(record_data_or_message, AirbyteMessage):
321+
return record_data_or_message
322+
else:
323+
return stream_data_to_airbyte_message(stream.name, record_data_or_message, stream.transformer, stream.get_json_schema())

airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,15 @@
88
import typing
99
from dataclasses import dataclass, fields
1010
from enum import Enum, EnumMeta
11-
from typing import Any, List, Mapping, Union
12-
13-
from airbyte_cdk.models import ConnectorSpecification
11+
from typing import Any, Iterator, List, Mapping, MutableMapping, Union
12+
13+
from airbyte_cdk.models import (
14+
AirbyteConnectionStatus,
15+
AirbyteMessage,
16+
AirbyteStateMessage,
17+
ConfiguredAirbyteCatalog,
18+
ConnectorSpecification,
19+
)
1420
from airbyte_cdk.sources.declarative.checks import CheckStream
1521
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
1622
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
@@ -35,12 +41,14 @@ class ManifestDeclarativeSource(DeclarativeSource):
3541

3642
VALID_TOP_LEVEL_FIELDS = {"check", "definitions", "spec", "streams", "version"}
3743

38-
def __init__(self, source_config: ConnectionDefinition):
44+
def __init__(self, source_config: ConnectionDefinition, debug: bool = False):
3945
"""
4046
:param source_config(Mapping[str, Any]): The manifest of low-code components that describe the source connector
47+
:param debug(bool): True if debug mode is enabled
4148
"""
4249
self.logger = logging.getLogger(f"airbyte.{self.name}")
4350
self._source_config = source_config
51+
self._debug = debug
4452
self._factory = DeclarativeComponentFactory()
4553

4654
self._validate_source()
@@ -73,6 +81,7 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification:
7381
will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
7482
in the project root.
7583
"""
84+
self._configure_logger_level(logger)
7685
self._emit_manifest_debug_message(extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)})
7786

7887
spec = self._source_config.get("spec")
@@ -84,6 +93,27 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification:
8493
else:
8594
return super().spec(logger)
8695

96+
def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
97+
self._configure_logger_level(logger)
98+
return super().check(logger, config)
99+
100+
def read(
101+
self,
102+
logger: logging.Logger,
103+
config: Mapping[str, Any],
104+
catalog: ConfiguredAirbyteCatalog,
105+
state: Union[List[AirbyteStateMessage], MutableMapping[str, Any]] = None,
106+
) -> Iterator[AirbyteMessage]:
107+
self._configure_logger_level(logger)
108+
yield from super().read(logger, config, catalog, state)
109+
110+
def _configure_logger_level(self, logger: logging.Logger):
111+
"""
112+
Set the log level to logging.DEBUG if debug mode is enabled
113+
"""
114+
if self._debug:
115+
logger.setLevel(logging.DEBUG)
116+
87117
def _validate_source(self):
88118
full_config = {}
89119
if "version" in self._source_config:

airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/retriever.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
from typing import Iterable, List, Optional
88

99
from airbyte_cdk.models import SyncMode
10-
from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState
10+
from airbyte_cdk.sources.declarative.types import StreamSlice, StreamState
11+
from airbyte_cdk.sources.streams.core import StreamData
1112
from dataclasses_jsonschema import JsonSchemaMixin
1213

1314

@@ -24,7 +25,7 @@ def read_records(
2425
cursor_field: Optional[List[str]] = None,
2526
stream_slice: Optional[StreamSlice] = None,
2627
stream_state: Optional[StreamState] = None,
27-
) -> Iterable[Record]:
28+
) -> Iterable[StreamData]:
2829
"""
2930
Fetch a stream's records from an HTTP API source
3031

airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

Lines changed: 48 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,28 @@
22
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
33
#
44

5+
import json
6+
import logging
57
from dataclasses import InitVar, dataclass, field
68
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
79

810
import requests
9-
from airbyte_cdk.models import SyncMode
11+
from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level, SyncMode
12+
from airbyte_cdk.models import Type as MessageType
1013
from airbyte_cdk.sources.declarative.exceptions import ReadException
1114
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
15+
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
1216
from airbyte_cdk.sources.declarative.requesters.error_handlers.response_action import ResponseAction
1317
from airbyte_cdk.sources.declarative.requesters.paginators.no_pagination import NoPagination
1418
from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator
1519
from airbyte_cdk.sources.declarative.requesters.requester import Requester
1620
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
1721
from airbyte_cdk.sources.declarative.stream_slicers.single_slice import SingleSlice
1822
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
19-
from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState
23+
from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState
24+
from airbyte_cdk.sources.streams.core import StreamData
2025
from airbyte_cdk.sources.streams.http import HttpStream
26+
from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets
2127
from dataclasses_jsonschema import JsonSchemaMixin
2228

2329

@@ -46,9 +52,10 @@ class SimpleRetriever(Retriever, HttpStream, JsonSchemaMixin):
4652

4753
requester: Requester
4854
record_selector: HttpSelector
55+
config: Config
4956
options: InitVar[Mapping[str, Any]]
5057
name: str
51-
_name: str = field(init=False, repr=False, default="")
58+
_name: Union[InterpolatedString, str] = field(init=False, repr=False, default="")
5259
primary_key: Optional[Union[str, List[str], List[List[str]]]]
5360
_primary_key: str = field(init=False, repr=False, default="")
5461
paginator: Optional[Paginator] = None
@@ -59,13 +66,15 @@ def __post_init__(self, options: Mapping[str, Any]):
5966
HttpStream.__init__(self, self.requester.get_authenticator())
6067
self._last_response = None
6168
self._last_records = None
69+
self._options = options
70+
self.name = InterpolatedString(self._name, options=options)
6271

6372
@property
6473
def name(self) -> str:
6574
"""
6675
:return: Stream name
6776
"""
68-
return self._name
77+
return self._name.eval(self.config)
6978

7079
@name.setter
7180
def name(self, value: str) -> None:
@@ -347,17 +356,23 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,
347356
def read_records(
348357
self,
349358
sync_mode: SyncMode,
350-
cursor_field: List[str] = None,
359+
cursor_field: Optional[List[str]] = None,
351360
stream_slice: Optional[StreamSlice] = None,
352361
stream_state: Optional[StreamState] = None,
353-
) -> Iterable[Mapping[str, Any]]:
362+
) -> Iterable[StreamData]:
354363
# Warning: use self.state instead of the stream_state passed as argument!
355364
stream_slice = stream_slice or {} # None-check
356365
self.paginator.reset()
357-
records_generator = HttpStream.read_records(self, sync_mode, cursor_field, stream_slice, self.state)
358-
for r in records_generator:
359-
self.stream_slicer.update_cursor(stream_slice, last_record=r)
360-
yield r
366+
records_generator = self._read_pages(
367+
lambda req, res, state, _slice: self.parse_records_and_emit_request_and_responses(
368+
req, res, stream_slice=_slice, stream_state=state
369+
),
370+
stream_slice,
371+
stream_state,
372+
)
373+
for record in records_generator:
374+
self.stream_slicer.update_cursor(stream_slice, last_record=record)
375+
yield record
361376
else:
362377
last_record = self._last_records[-1] if self._last_records else None
363378
self.stream_slicer.update_cursor(stream_slice, last_record=last_record)
@@ -385,3 +400,26 @@ def state(self) -> MutableMapping[str, Any]:
385400
def state(self, value: StreamState):
386401
"""State setter, accept state serialized by state getter."""
387402
self.stream_slicer.update_cursor(value)
403+
404+
def parse_records_and_emit_request_and_responses(self, request, response, stream_slice, stream_state) -> Iterable[StreamData]:
405+
# Only emit requests and responses when running in debug mode
406+
if self.logger.isEnabledFor(logging.DEBUG):
407+
yield self._create_trace_message_from_request(request)
408+
yield self._create_trace_message_from_response(response)
409+
# Not great to need to call _read_pages which is a private method
410+
# A better approach would be to extract the HTTP client from the HttpStream and call it directly from the HttpRequester
411+
yield from self._read_pages(
412+
lambda req, res, state, _slice: self.parse_response(res, stream_slice=_slice, stream_state=state), stream_slice, stream_state
413+
)
414+
415+
def _create_trace_message_from_request(self, request: requests.PreparedRequest):
416+
# FIXME: this should return some sort of trace message
417+
request_dict = {"url": request.url, "headers": dict(request.headers), "body": request.body}
418+
log_message = filter_secrets(f"request:{json.dumps(request_dict)}")
419+
return AirbyteMessage(type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=log_message))
420+
421+
def _create_trace_message_from_response(self, response: requests.Response):
422+
# FIXME: this should return some sort of trace message
423+
response_dict = {"body": response.text, "headers": dict(response.headers), "status_code": response.status_code}
424+
log_message = filter_secrets(f"response:{json.dumps(response_dict)}")
425+
return AirbyteMessage(type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=log_message))

airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/substream_slicer.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from dataclasses import InitVar, dataclass
66
from typing import Any, Iterable, List, Mapping, Optional
77

8-
from airbyte_cdk.models import SyncMode
8+
from airbyte_cdk.models import AirbyteMessage, SyncMode, Type
99
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType
1010
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
1111
from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState
@@ -138,6 +138,12 @@ def stream_slices(self, sync_mode: SyncMode, stream_state: StreamState) -> Itera
138138
for parent_record in parent_stream.read_records(
139139
sync_mode=SyncMode.full_refresh, cursor_field=None, stream_slice=parent_stream_slice, stream_state=None
140140
):
141+
# Skip non-records (eg AirbyteLogMessage)
142+
if isinstance(parent_record, AirbyteMessage):
143+
if parent_record.type == Type.RECORD:
144+
parent_record = parent_record.record.data
145+
else:
146+
continue
141147
empty_parent_slice = False
142148
stream_state_value = parent_record.get(parent_field)
143149
yield {stream_state_field: stream_state_value, "parent_slice": parent_slice}

airbyte-cdk/python/airbyte_cdk/sources/declarative/yaml_declarative_source.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,13 @@
1212
class YamlDeclarativeSource(ManifestDeclarativeSource):
1313
"""Declarative source defined by a yaml file"""
1414

15-
def __init__(self, path_to_yaml):
15+
def __init__(self, path_to_yaml, debug: bool = False):
1616
"""
1717
:param path_to_yaml: Path to the yaml file describing the source
1818
"""
1919
self._path_to_yaml = path_to_yaml
2020
source_config = self._read_and_parse_yaml_file(path_to_yaml)
21-
super().__init__(source_config)
21+
super().__init__(source_config, debug)
2222

2323
def _read_and_parse_yaml_file(self, path_to_yaml_file) -> ConnectionDefinition:
2424
package = self.__class__.__module__.split(".")[0]

airbyte-cdk/python/airbyte_cdk/sources/streams/core.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
1111

1212
import airbyte_cdk.sources.utils.casing as casing
13-
from airbyte_cdk.models import AirbyteLogMessage, AirbyteRecordMessage, AirbyteStream, AirbyteTraceMessage, SyncMode
13+
from airbyte_cdk.models import AirbyteLogMessage, AirbyteStream, AirbyteTraceMessage, SyncMode
1414

1515
# list of all possible HTTP methods which can be used for sending of request bodies
1616
from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader
@@ -22,7 +22,7 @@
2222
# AirbyteRecordMessage: An AirbyteRecordMessage
2323
# AirbyteLogMessage: A log message
2424
# AirbyteTraceMessage: A trace message
25-
StreamData = Union[Mapping[str, Any], AirbyteRecordMessage, AirbyteLogMessage, AirbyteTraceMessage]
25+
StreamData = Union[Mapping[str, Any], AirbyteLogMessage, AirbyteTraceMessage]
2626

2727

2828
def package_name_from_class(cls: object) -> str:

airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@
77
import os
88
from abc import ABC, abstractmethod
99
from contextlib import suppress
10-
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
10+
from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union
1111
from urllib.parse import urljoin
1212

1313
import requests
1414
import requests_cache
1515
from airbyte_cdk.models import SyncMode
16-
from airbyte_cdk.sources.streams.core import Stream
16+
from airbyte_cdk.sources.streams.core import Stream, StreamData
1717
from requests.auth import AuthBase
1818
from requests_cache.session import CachedSession
1919

@@ -408,24 +408,25 @@ def read_records(
408408
cursor_field: List[str] = None,
409409
stream_slice: Mapping[str, Any] = None,
410410
stream_state: Mapping[str, Any] = None,
411-
) -> Iterable[Mapping[str, Any]]:
411+
) -> Iterable[StreamData]:
412+
yield from self._read_pages(
413+
lambda req, res, state, _slice: self.parse_response(res, stream_slice=_slice, stream_state=state), stream_slice, stream_state
414+
)
415+
416+
def _read_pages(
417+
self,
418+
records_generator_fn: Callable[
419+
[requests.PreparedRequest, requests.Response, Mapping[str, Any], Mapping[str, Any]], Iterable[StreamData]
420+
],
421+
stream_slice: Mapping[str, Any] = None,
422+
stream_state: Mapping[str, Any] = None,
423+
) -> Iterable[StreamData]:
412424
stream_state = stream_state or {}
413425
pagination_complete = False
414-
415426
next_page_token = None
416427
while not pagination_complete:
417-
request_headers = self.request_headers(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
418-
request = self._create_prepared_request(
419-
path=self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
420-
headers=dict(request_headers, **self.authenticator.get_auth_header()),
421-
params=self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
422-
json=self.request_body_json(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
423-
data=self.request_body_data(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
424-
)
425-
request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
426-
427-
response = self._send_request(request, request_kwargs)
428-
yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice)
428+
request, response = self._fetch_next_page(stream_slice, stream_state, next_page_token)
429+
yield from records_generator_fn(request, response, stream_state, stream_slice)
429430

430431
next_page_token = self.next_page_token(response)
431432
if not next_page_token:
@@ -434,6 +435,22 @@ def read_records(
434435
# Always return an empty generator just in case no records were ever yielded
435436
yield from []
436437

438+
def _fetch_next_page(
439+
self, stream_slice: Mapping[str, Any] = None, stream_state: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
440+
) -> Tuple[requests.PreparedRequest, requests.Response]:
441+
request_headers = self.request_headers(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
442+
request = self._create_prepared_request(
443+
path=self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
444+
headers=dict(request_headers, **self.authenticator.get_auth_header()),
445+
params=self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
446+
json=self.request_body_json(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
447+
data=self.request_body_data(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
448+
)
449+
request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
450+
451+
response = self._send_request(request, request_kwargs)
452+
return request, response
453+
437454

438455
class HttpSubStream(HttpStream, ABC):
439456
def __init__(self, parent: HttpStream, **kwargs):

0 commit comments

Comments
 (0)