Skip to content

Commit 010543c

Browse files
committed
Merge master to branch
2 parents 193d59f + f222fcc commit 010543c

File tree

7 files changed

+143
-4
lines changed

7 files changed

+143
-4
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1863,6 +1863,19 @@ definitions:
18631863
$parameters:
18641864
type: object
18651865
additionalProperties: true
1866+
FlattenFields:
1867+
title: Flatten Fields
1868+
description: A transformation that flatten record to single level format.
1869+
type: object
1870+
required:
1871+
- type
1872+
properties:
1873+
type:
1874+
type: string
1875+
enum: [FlattenFields]
1876+
$parameters:
1877+
type: object
1878+
additionalProperties: true
18661879
IterableDecoder:
18671880
title: Iterable Decoder
18681881
description: Use this if the response consists of strings separated by new lines (`\n`). The Decoder will wrap each row into a JSON object with the `record` key.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -715,6 +715,11 @@ class KeysToSnakeCase(BaseModel):
715715
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
716716

717717

718+
class FlattenFields(BaseModel):
719+
type: Literal["FlattenFields"]
720+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
721+
722+
718723
class IterableDecoder(BaseModel):
719724
type: Literal["IterableDecoder"]
720725

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,9 @@
197197
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
198198
ExponentialBackoffStrategy as ExponentialBackoffStrategyModel,
199199
)
200+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
201+
FlattenFields as FlattenFieldsModel,
202+
)
200203
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
201204
GzipJsonDecoder as GzipJsonDecoderModel,
202205
)
@@ -390,6 +393,9 @@
390393
RemoveFields,
391394
)
392395
from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition
396+
from airbyte_cdk.sources.declarative.transformations.flatten_fields import (
397+
FlattenFields,
398+
)
393399
from airbyte_cdk.sources.declarative.transformations.keys_to_lower_transformation import (
394400
KeysToLowerTransformation,
395401
)
@@ -479,6 +485,7 @@ def _init_mappings(self) -> None:
479485
GzipJsonDecoderModel: self.create_gzipjson_decoder,
480486
KeysToLowerModel: self.create_keys_to_lower_transformation,
481487
KeysToSnakeCaseModel: self.create_keys_to_snake_transformation,
488+
FlattenFieldsModel: self.create_flatten_fields,
482489
IterableDecoderModel: self.create_iterable_decoder,
483490
XmlDecoderModel: self.create_xml_decoder,
484491
JsonFileSchemaLoaderModel: self.create_json_file_schema_loader,
@@ -599,6 +606,11 @@ def create_keys_to_snake_transformation(
599606
) -> KeysToSnakeCaseTransformation:
600607
return KeysToSnakeCaseTransformation()
601608

609+
def create_flatten_fields(
610+
self, model: FlattenFieldsModel, config: Config, **kwargs: Any
611+
) -> FlattenFields:
612+
return FlattenFields()
613+
602614
@staticmethod
603615
def _json_schema_type_name_to_type(value_type: Optional[ValueType]) -> Optional[Type[Any]]:
604616
if not value_type:
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
#
2+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from dataclasses import dataclass
6+
from typing import Any, Dict, Optional
7+
8+
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
9+
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
10+
11+
12+
@dataclass
13+
class FlattenFields(RecordTransformation):
14+
def transform(
15+
self,
16+
record: Dict[str, Any],
17+
config: Optional[Config] = None,
18+
stream_state: Optional[StreamState] = None,
19+
stream_slice: Optional[StreamSlice] = None,
20+
) -> None:
21+
transformed_record = self.flatten_record(record)
22+
record.clear()
23+
record.update(transformed_record)
24+
25+
def flatten_record(self, record: Dict[str, Any]) -> Dict[str, Any]:
26+
stack = [(record, "_")]
27+
transformed_record: Dict[str, Any] = {}
28+
force_with_parent_name = False
29+
30+
while stack:
31+
current_record, parent_key = stack.pop()
32+
33+
if isinstance(current_record, dict):
34+
for current_key, value in current_record.items():
35+
new_key = (
36+
f"{parent_key}.{current_key}"
37+
if (current_key in transformed_record or force_with_parent_name)
38+
else current_key
39+
)
40+
stack.append((value, new_key))
41+
42+
elif isinstance(current_record, list):
43+
for i, item in enumerate(current_record):
44+
force_with_parent_name = True
45+
stack.append((item, f"{parent_key}.{i}"))
46+
47+
else:
48+
transformed_record[parent_key] = current_record
49+
50+
return transformed_record

airbyte_cdk/sources/streams/http/http_client.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ def _send_with_retry(
262262
user_backoff_handler = user_defined_backoff_handler(max_tries=max_tries, max_time=max_time)(
263263
self._send
264264
)
265-
rate_limit_backoff_handler = rate_limit_default_backoff_handler()
265+
rate_limit_backoff_handler = rate_limit_default_backoff_handler(max_tries=max_tries)
266266
backoff_handler = http_client_default_backoff_handler(
267267
max_tries=max_tries, max_time=max_time
268268
)
@@ -472,7 +472,9 @@ def _handle_error_resolution(
472472

473473
elif retry_endlessly:
474474
raise RateLimitBackoffException(
475-
request=request, response=response or exc, error_message=error_message
475+
request=request,
476+
response=(response if response is not None else exc),
477+
error_message=error_message,
476478
)
477479

478480
raise DefaultBackoffException(
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
#
2+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
import pytest
6+
7+
from airbyte_cdk.sources.declarative.transformations.flatten_fields import (
8+
FlattenFields,
9+
)
10+
11+
12+
@pytest.mark.parametrize(
13+
"input_record, expected_output",
14+
[
15+
({"FirstName": "John", "LastName": "Doe"}, {"FirstName": "John", "LastName": "Doe"}),
16+
({"123Number": 123, "456Another123": 456}, {"123Number": 123, "456Another123": 456}),
17+
(
18+
{
19+
"NestedRecord": {"FirstName": "John", "LastName": "Doe"},
20+
"456Another123": 456,
21+
},
22+
{
23+
"FirstName": "John",
24+
"LastName": "Doe",
25+
"456Another123": 456,
26+
},
27+
),
28+
(
29+
{"ListExample": [{"A": "a"}, {"A": "b"}]},
30+
{"ListExample.0.A": "a", "ListExample.1.A": "b"},
31+
),
32+
(
33+
{
34+
"MixedCase123": {
35+
"Nested": [{"Key": {"Value": "test1"}}, {"Key": {"Value": "test2"}}]
36+
},
37+
"SimpleKey": "SimpleValue",
38+
},
39+
{
40+
"Nested.0.Key.Value": "test1",
41+
"Nested.1.Key.Value": "test2",
42+
"SimpleKey": "SimpleValue",
43+
},
44+
),
45+
(
46+
{"List": ["Item1", "Item2", "Item3"]},
47+
{"List.0": "Item1", "List.1": "Item2", "List.2": "Item3"},
48+
),
49+
],
50+
)
51+
def test_flatten_fields(input_record, expected_output):
52+
flattener = FlattenFields()
53+
flattener.transform(input_record)
54+
assert input_record == expected_output

unit_tests/sources/streams/http/test_http_client.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
)
2121
from airbyte_cdk.sources.streams.http.exceptions import (
2222
DefaultBackoffException,
23+
RateLimitBackoffException,
2324
RequestBodyException,
2425
UserDefinedBackoffException,
2526
)
@@ -690,10 +691,12 @@ def backoff_time(self, *args, **kwargs):
690691

691692
@pytest.mark.parametrize(
692693
"exit_on_rate_limit, expected_call_count, expected_error",
693-
[[True, 6, DefaultBackoffException], [False, 38, OverflowError]],
694+
[[True, 6, DefaultBackoffException], [False, 6, RateLimitBackoffException]],
694695
)
695696
@pytest.mark.usefixtures("mock_sleep")
696-
def test_backoff_strategy_endless(exit_on_rate_limit, expected_call_count, expected_error):
697+
def test_backoff_strategy_endless(
698+
exit_on_rate_limit: bool, expected_call_count: int, expected_error: Exception
699+
):
697700
http_client = HttpClient(
698701
name="test", logger=MagicMock(), error_handler=HttpStatusErrorHandler(logger=MagicMock())
699702
)

0 commit comments

Comments
 (0)