Skip to content

Commit 9c4ce94

Browse files
Merge branch 'master' into leti/refactor-suggested-destinations-component
* master: 🪟 🎉 Enable frontend validation for <1hr syncs (cloud) #19028 Stream returns AirbyteMessages (#18572) 🎉 New Source - Recruitee [low-code SDK] (#18671) 🎉 New source: Breezometer [low-code cdk] (#18650) Check disabled connections after protocol update (#18990) Simple default replication worker refactor (#19002) 🎉 New Source: Visma e-conomic (#18595) 🎉 New Source: Fastbill (#18593) Bmoric/extract state api (#18980) 🎉 New Source: Zapier Supported Storage (#18442) 🎉 New source: Klarna (#18385) `AirbyteEstimateTraceMessage` (#18875) Extract source definition api (#18977) [low-code cdk] Allow for spec file to be defined in the yaml manifest instead of an external file (#18411) 🐛 Source HubSpot: fix property scopes (#18624) Ensure that only 6-character hex values are passed to monaco editor (#18943)
2 parents 5040772 + 7067977 commit 9c4ce94

File tree

212 files changed

+10683
-590
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

212 files changed

+10683
-590
lines changed

airbyte-cdk/python/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
# Changelog
22

3+
## 0.7.0
4+
Low-code: Allow connector specifications to be defined in the manifest
5+
36
## 0.6.0
47
Low-code: Add support for monthly and yearly incremental updates for `DatetimeStreamSlicer`
58

airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ class Config:
8181

8282
class TraceType(Enum):
8383
ERROR = "ERROR"
84+
ESTIMATE = "ESTIMATE"
8485

8586

8687
class FailureType(Enum):
@@ -98,6 +99,28 @@ class Config:
9899
failure_type: Optional[FailureType] = Field(None, description="The type of error")
99100

100101

102+
class EstimateType(Enum):
103+
STREAM = "STREAM"
104+
SYNC = "SYNC"
105+
106+
107+
class AirbyteEstimateTraceMessage(BaseModel):
108+
class Config:
109+
extra = Extra.allow
110+
111+
name: str = Field(..., description="The name of the stream")
112+
type: EstimateType = Field(..., description="The type of estimate", title="estimate type")
113+
namespace: Optional[str] = Field(None, description="The namespace of the stream")
114+
row_estimate: Optional[int] = Field(
115+
None,
116+
description="The estimated number of rows to be emitted by this sync for this stream",
117+
)
118+
byte_estimate: Optional[int] = Field(
119+
None,
120+
description="The estimated number of bytes to be emitted by this sync for this stream",
121+
)
122+
123+
101124
class OrchestratorType(Enum):
102125
CONNECTOR_CONFIG = "CONNECTOR_CONFIG"
103126

@@ -213,6 +236,10 @@ class Config:
213236
type: TraceType = Field(..., description="the type of trace message", title="trace type")
214237
emitted_at: float = Field(..., description="the time in ms that the message was emitted")
215238
error: Optional[AirbyteErrorTraceMessage] = Field(None, description="error trace message: the error object")
239+
estimate: Optional[AirbyteEstimateTraceMessage] = Field(
240+
None,
241+
description="Estimate trace message: a guess at how much data will be produced in this sync",
242+
)
216243

217244

218245
class AirbyteControlMessage(BaseModel):

airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py

Lines changed: 33 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,12 @@
44

55
import logging
66
from abc import ABC, abstractmethod
7-
from datetime import datetime
8-
from functools import lru_cache
97
from typing import Any, Dict, Iterator, List, Mapping, MutableMapping, Optional, Tuple, Union
108

119
from airbyte_cdk.models import (
1210
AirbyteCatalog,
1311
AirbyteConnectionStatus,
1412
AirbyteMessage,
15-
AirbyteRecordMessage,
1613
AirbyteStateMessage,
1714
ConfiguredAirbyteCatalog,
1815
ConfiguredAirbyteStream,
@@ -24,8 +21,8 @@
2421
from airbyte_cdk.sources.source import Source
2522
from airbyte_cdk.sources.streams import Stream
2623
from airbyte_cdk.sources.streams.http.http import HttpStream
24+
from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message
2725
from airbyte_cdk.sources.utils.schema_helpers import InternalConfig, split_config
28-
from airbyte_cdk.sources.utils.transform import TypeTransformer
2926
from airbyte_cdk.utils.event_timing import create_timer
3027
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
3128

@@ -241,20 +238,27 @@ def _read_incremental(
241238
stream_state=stream_state,
242239
cursor_field=configured_stream.cursor_field or None,
243240
)
244-
for record_counter, record_data in enumerate(records, start=1):
245-
yield self._as_airbyte_record(stream_name, record_data)
246-
stream_state = stream_instance.get_updated_state(stream_state, record_data)
247-
checkpoint_interval = stream_instance.state_checkpoint_interval
248-
if checkpoint_interval and record_counter % checkpoint_interval == 0:
249-
yield self._checkpoint_state(stream_instance, stream_state, state_manager)
250-
251-
total_records_counter += 1
252-
# This functionality should ideally live outside of this method
253-
# but since state is managed inside this method, we keep track
254-
# of it here.
255-
if self._limit_reached(internal_config, total_records_counter):
256-
# Break from slice loop to save state and exit from _read_incremental function.
257-
break
241+
record_counter = 0
242+
for message_counter, record_data_or_message in enumerate(records, start=1):
243+
message = stream_data_to_airbyte_message(
244+
stream_name, record_data_or_message, stream_instance.transformer, stream_instance.get_json_schema()
245+
)
246+
yield message
247+
if message.type == MessageType.RECORD:
248+
record = message.record
249+
stream_state = stream_instance.get_updated_state(stream_state, record.data)
250+
checkpoint_interval = stream_instance.state_checkpoint_interval
251+
record_counter += 1
252+
if checkpoint_interval and record_counter % checkpoint_interval == 0:
253+
yield self._checkpoint_state(stream_instance, stream_state, state_manager)
254+
255+
total_records_counter += 1
256+
# This functionality should ideally live outside of this method
257+
# but since state is managed inside this method, we keep track
258+
# of it here.
259+
if self._limit_reached(internal_config, total_records_counter):
260+
# Break from slice loop to save state and exit from _read_incremental function.
261+
break
258262

259263
yield self._checkpoint_state(stream_instance, stream_state, state_manager)
260264
if self._limit_reached(internal_config, total_records_counter):
@@ -277,50 +281,32 @@ def _read_full_refresh(
277281
total_records_counter = 0
278282
for _slice in slices:
279283
logger.debug("Processing stream slice", extra={"slice": _slice})
280-
records = stream_instance.read_records(
284+
record_data_or_messages = stream_instance.read_records(
281285
stream_slice=_slice,
282286
sync_mode=SyncMode.full_refresh,
283287
cursor_field=configured_stream.cursor_field,
284288
)
285-
for record in records:
286-
yield self._as_airbyte_record(configured_stream.stream.name, record)
287-
total_records_counter += 1
288-
if self._limit_reached(internal_config, total_records_counter):
289-
return
289+
for record_data_or_message in record_data_or_messages:
290+
message = stream_data_to_airbyte_message(
291+
stream_instance.name, record_data_or_message, stream_instance.transformer, stream_instance.get_json_schema()
292+
)
293+
yield message
294+
if message.type == MessageType.RECORD:
295+
total_records_counter += 1
296+
if self._limit_reached(internal_config, total_records_counter):
297+
return
290298

291299
def _checkpoint_state(self, stream: Stream, stream_state, state_manager: ConnectorStateManager):
292300
# First attempt to retrieve the current state using the stream's state property. We receive an AttributeError if the state
293301
# property is not implemented by the stream instance and as a fallback, use the stream_state retrieved from the stream
294302
# instance's deprecated get_updated_state() method.
295303
try:
296304
state_manager.update_state_for_stream(stream.name, stream.namespace, stream.state)
305+
297306
except AttributeError:
298307
state_manager.update_state_for_stream(stream.name, stream.namespace, stream_state)
299308
return state_manager.create_state_message(stream.name, stream.namespace, send_per_stream_state=self.per_stream_state_enabled)
300309

301-
@lru_cache(maxsize=None)
302-
def _get_stream_transformer_and_schema(self, stream_name: str) -> Tuple[TypeTransformer, Mapping[str, Any]]:
303-
"""
304-
Lookup stream's transform object and jsonschema based on stream name.
305-
This function would be called a lot so using caching to save on costly
306-
get_json_schema operation.
307-
:param stream_name name of stream from catalog.
308-
:return tuple with stream transformer object and discover json schema.
309-
"""
310-
stream_instance = self._stream_to_instance_map[stream_name]
311-
return stream_instance.transformer, stream_instance.get_json_schema()
312-
313-
def _as_airbyte_record(self, stream_name: str, data: Mapping[str, Any]):
314-
now_millis = int(datetime.now().timestamp() * 1000)
315-
transformer, schema = self._get_stream_transformer_and_schema(stream_name)
316-
# Transform object fields according to config. Most likely you will
317-
# need it to normalize values against json schema. By default no action
318-
# taken unless configured. See
319-
# docs/connector-development/cdk-python/schemas.md for details.
320-
transformer.transform(data, schema) # type: ignore
321-
message = AirbyteRecordMessage(stream=stream_name, data=data, emitted_at=now_millis)
322-
return AirbyteMessage(type=MessageType.RECORD, record=message)
323-
324310
@staticmethod
325311
def _apply_log_level_to_stream_logger(logger: logging.Logger, stream_instance: Stream):
326312
"""

airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/class_types_registry.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.page_increment import PageIncrement
3434
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
3535
from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import JsonFileSchemaLoader
36+
from airbyte_cdk.sources.declarative.spec import Spec
3637
from airbyte_cdk.sources.declarative.stream_slicers.cartesian_product_stream_slicer import CartesianProductStreamSlicer
3738
from airbyte_cdk.sources.declarative.stream_slicers.datetime_stream_slicer import DatetimeStreamSlicer
3839
from airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer import ListStreamSlicer
@@ -75,6 +76,7 @@
7576
"RemoveFields": RemoveFields,
7677
"SimpleRetriever": SimpleRetriever,
7778
"SingleSlice": SingleSlice,
79+
"Spec": Spec,
7880
"SubstreamSlicer": SubstreamSlicer,
7981
"WaitUntilTimeFromHeader": WaitUntilTimeFromHeaderBackoffStrategy,
8082
"WaitTimeFromHeader": WaitTimeFromHeaderBackoffStrategy,

airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/factory.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,11 @@ def is_object_definition_with_class_name(definition):
246246

247247
@staticmethod
248248
def is_object_definition_with_type(definition):
249-
return isinstance(definition, dict) and "type" in definition
249+
# The `type` field is an overloaded term in the context of the low-code manifest. As part of the language, `type` is shorthand
250+
# for convenience to avoid defining the entire classpath. For the connector specification, `type` is a part of the spec schema.
251+
# For spec parsing, as part of this check, when the type is set to object, we want it to remain a mapping. But when type is
252+
# defined any other way, then it should be parsed as a declarative component in the manifest.
253+
return isinstance(definition, dict) and "type" in definition and definition["type"] != "object"
250254

251255
@staticmethod
252256
def get_default_type(parameter_name, parent_class):
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
#
2+
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from airbyte_cdk.sources.declarative.spec.spec import Spec
6+
7+
__all__ = ["Spec"]
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
#
2+
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from dataclasses import InitVar, dataclass
6+
from typing import Any, Mapping
7+
8+
from airbyte_cdk.models.airbyte_protocol import ConnectorSpecification
9+
from dataclasses_jsonschema import JsonSchemaMixin
10+
11+
12+
@dataclass
13+
class Spec(JsonSchemaMixin):
14+
"""
15+
Returns a connection specification made up of information about the connector and how it can be configured
16+
17+
Attributes:
18+
documentation_url (str): The link the Airbyte documentation about this connector
19+
connection_specification (Mapping[str, Any]): information related to how a connector can be configured
20+
"""
21+
22+
documentation_url: str
23+
connection_specification: Mapping[str, Any]
24+
options: InitVar[Mapping[str, Any]]
25+
26+
def generate_spec(self) -> ConnectorSpecification:
27+
"""
28+
Returns the connector specification according the spec block defined in the low code connector manifest.
29+
"""
30+
31+
# We remap these keys to camel case because that's the existing format expected by the rest of the platform
32+
return ConnectorSpecification.parse_obj(
33+
{"documentationUrl": self.documentation_url, "connectionSpecification": self.connection_specification}
34+
)

airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import datetime
66
import re
77
from dataclasses import InitVar, dataclass, field
8-
from dateutil.relativedelta import relativedelta
98
from typing import Any, Iterable, Mapping, Optional, Union
109

1110
from airbyte_cdk.models import SyncMode
@@ -17,6 +16,7 @@
1716
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
1817
from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState
1918
from dataclasses_jsonschema import JsonSchemaMixin
19+
from dateutil.relativedelta import relativedelta
2020

2121

2222
@dataclass
@@ -71,7 +71,9 @@ class DatetimeStreamSlicer(StreamSlicer, JsonSchemaMixin):
7171
stream_state_field_end: Optional[str] = None
7272
lookback_window: Optional[Union[InterpolatedString, str]] = None
7373

74-
timedelta_regex = re.compile(r"((?P<years>[\.\d]+?)y)?" r"((?P<months>[\.\d]+?)m)?" r"((?P<weeks>[\.\d]+?)w)?" r"((?P<days>[\.\d]+?)d)?$")
74+
timedelta_regex = re.compile(
75+
r"((?P<years>[\.\d]+?)y)?" r"((?P<months>[\.\d]+?)m)?" r"((?P<weeks>[\.\d]+?)w)?" r"((?P<days>[\.\d]+?)d)?$"
76+
)
7577

7678
def __post_init__(self, options: Mapping[str, Any]):
7779
if not isinstance(self.start_datetime, MinMaxDatetime):

airbyte-cdk/python/airbyte_cdk/sources/declarative/yaml_declarative_source.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from enum import Enum, EnumMeta
1212
from typing import Any, List, Mapping, Union
1313

14+
from airbyte_cdk.models import ConnectorSpecification
1415
from airbyte_cdk.sources.declarative.checks import CheckStream
1516
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
1617
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
@@ -33,7 +34,7 @@ class ConcreteDeclarativeSource(JsonSchemaMixin):
3334
class YamlDeclarativeSource(DeclarativeSource):
3435
"""Declarative source defined by a yaml file"""
3536

36-
VALID_TOP_LEVEL_FIELDS = {"definitions", "streams", "check", "version"}
37+
VALID_TOP_LEVEL_FIELDS = {"check", "definitions", "spec", "streams", "version"}
3738

3839
def __init__(self, path_to_yaml):
3940
"""
@@ -69,6 +70,28 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
6970
self._apply_log_level_to_stream_logger(self.logger, stream)
7071
return source_streams
7172

73+
def spec(self, logger: logging.Logger) -> ConnectorSpecification:
74+
"""
75+
Returns the connector specification (spec) as defined in the Airbyte Protocol. The spec is an object describing the possible
76+
configurations (e.g: username and password) which can be configured when running this connector. For low-code connectors, this
77+
will first attempt to load the spec from the manifest's spec block, otherwise it will load it from "spec.yaml" or "spec.json"
78+
in the project root.
79+
"""
80+
81+
self.logger.debug(
82+
"parsed YAML into declarative source",
83+
extra={"path_to_yaml_file": self._path_to_yaml, "source_name": self.name, "parsed_config": json.dumps(self._source_config)},
84+
)
85+
86+
spec = self._source_config.get("spec")
87+
if spec:
88+
if "class_name" not in spec:
89+
spec["class_name"] = "airbyte_cdk.sources.declarative.spec.Spec"
90+
spec_component = self._factory.create_component(spec, dict())()
91+
return spec_component.generate_spec()
92+
else:
93+
return super().spec(logger)
94+
7295
def _read_and_parse_yaml_file(self, path_to_yaml_file):
7396
package = self.__class__.__module__.split(".")[0]
7497

airbyte-cdk/python/airbyte_cdk/sources/streams/core.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,24 @@
66
import inspect
77
import logging
88
from abc import ABC, abstractmethod
9+
from functools import lru_cache
910
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
1011

1112
import airbyte_cdk.sources.utils.casing as casing
12-
from airbyte_cdk.models import AirbyteStream, SyncMode
13+
from airbyte_cdk.models import AirbyteLogMessage, AirbyteRecordMessage, AirbyteStream, AirbyteTraceMessage, SyncMode
14+
15+
# list of all possible HTTP methods which can be used for sending of request bodies
1316
from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader
1417
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
1518
from deprecated.classic import deprecated
1619

20+
# A stream's read method can return one of the following types:
21+
# Mapping[str, Any]: The content of an AirbyteRecordMessage
22+
# AirbyteRecordMessage: An AirbyteRecordMessage
23+
# AirbyteLogMessage: A log message
24+
# AirbyteTraceMessage: A trace message
25+
StreamData = Union[Mapping[str, Any], AirbyteRecordMessage, AirbyteLogMessage, AirbyteTraceMessage]
26+
1727

1828
def package_name_from_class(cls: object) -> str:
1929
"""Find the package name given a class name"""
@@ -94,11 +104,12 @@ def read_records(
94104
cursor_field: List[str] = None,
95105
stream_slice: Mapping[str, Any] = None,
96106
stream_state: Mapping[str, Any] = None,
97-
) -> Iterable[Mapping[str, Any]]:
107+
) -> Iterable[StreamData]:
98108
"""
99109
This method should be overridden by subclasses to read records based on the inputs
100110
"""
101111

112+
@lru_cache(maxsize=None)
102113
def get_json_schema(self) -> Mapping[str, Any]:
103114
"""
104115
:return: A dict of the JSON schema representing this stream.
Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#
2-
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
2+
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
33
#
44

55
# Initialize Utils Package
6+
7+
__all__ = ["record_helper"]

0 commit comments

Comments
 (0)