Skip to content

Commit 2be9063

Browse files
Merge branch 'master' into leti/experiment-add-source-in-signup
2 parents b591170 + 5f63618 commit 2be9063

File tree

467 files changed

+13468
-4542
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

467 files changed

+13468
-4542
lines changed

airbyte-api/src/main/java/io/airbyte/api/client/AirbyteApiClient.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import io.airbyte.api.client.generated.SourceApi;
1717
import io.airbyte.api.client.generated.SourceDefinitionApi;
1818
import io.airbyte.api.client.generated.SourceDefinitionSpecificationApi;
19+
import io.airbyte.api.client.generated.StateApi;
1920
import io.airbyte.api.client.generated.WorkspaceApi;
2021
import io.airbyte.api.client.invoker.generated.ApiClient;
2122

@@ -46,8 +47,8 @@ public class AirbyteApiClient {
4647
private final WorkspaceApi workspaceApi;
4748
private final HealthApi healthApi;
4849
private final DbMigrationApi dbMigrationApi;
49-
5050
private final AttemptApi attemptApi;
51+
private final StateApi stateApi;
5152

5253
public AirbyteApiClient(final ApiClient apiClient) {
5354
connectionApi = new ConnectionApi(apiClient);
@@ -64,6 +65,7 @@ public AirbyteApiClient(final ApiClient apiClient) {
6465
healthApi = new HealthApi(apiClient);
6566
dbMigrationApi = new DbMigrationApi(apiClient);
6667
attemptApi = new AttemptApi(apiClient);
68+
stateApi = new StateApi(apiClient);
6769
}
6870

6971
public ConnectionApi getConnectionApi() {
@@ -122,4 +124,8 @@ public AttemptApi getAttemptApi() {
122124
return attemptApi;
123125
}
124126

127+
public StateApi getStateApi() {
128+
return stateApi;
129+
}
130+
125131
}

airbyte-api/src/main/openapi/config.yaml

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ tags:
6363
description: Export/Import Airbyte Configuration and Database resources.
6464
- name: attempt
6565
description: Interactions with attempt related resources.
66+
- name: state
67+
description: Interactions with state related resources.
6668

6769
paths:
6870
/v1/workspaces/create:
@@ -1389,7 +1391,7 @@ paths:
13891391
/v1/state/get:
13901392
post:
13911393
tags:
1392-
- connection
1394+
- state
13931395
summary: Fetch the current state for a connection.
13941396
operationId: getState
13951397
requestBody:
@@ -1412,7 +1414,7 @@ paths:
14121414
/v1/state/create_or_update:
14131415
post:
14141416
tags:
1415-
- connection
1417+
- state
14161418
- internal
14171419
summary: Create or update the state for a connection.
14181420
operationId: createOrUpdateState
@@ -1994,7 +1996,7 @@ paths:
19941996
/v1/web_backend/state/get_type:
19951997
post:
19961998
tags:
1997-
- connection
1999+
- web_backend
19982000
summary: Fetch the current state type for a connection.
19992001
operationId: getStateType
20002002
requestBody:
@@ -2169,6 +2171,26 @@ paths:
21692171
$ref: "#/components/responses/NotFoundResponse"
21702172
"422":
21712173
$ref: "#/components/responses/InvalidInputResponse"
2174+
/v1/jobs/get_normalization_status:
2175+
post:
2176+
tags:
2177+
- jobs
2178+
- internal
2179+
summary: Get normalization status to determine if we can bypass normalization phase
2180+
operationId: getAttemptNormalizationStatusesForJob
2181+
requestBody:
2182+
content:
2183+
application/json:
2184+
schema:
2185+
$ref: "#/components/schemas/JobIdRequestBody"
2186+
responses:
2187+
"200":
2188+
description: Successful operation
2189+
content:
2190+
application/json:
2191+
schema:
2192+
$ref: "#/components/schemas/AttemptNormalizationStatusReadList"
2193+
21722194
/v1/health:
21732195
get:
21742196
tags:
@@ -2242,6 +2264,7 @@ paths:
22422264
application/json:
22432265
schema:
22442266
$ref: "#/components/schemas/InternalOperationResult"
2267+
22452268
components:
22462269
securitySchemes:
22472270
bearerAuth:
@@ -4838,6 +4861,26 @@ components:
48384861
properties:
48394862
succeeded:
48404863
type: boolean
4864+
AttemptNormalizationStatusReadList:
4865+
type: object
4866+
properties:
4867+
attemptNormalizationStatuses:
4868+
type: array
4869+
items:
4870+
$ref: "#/components/schemas/AttemptNormalizationStatusRead"
4871+
AttemptNormalizationStatusRead:
4872+
type: object
4873+
properties:
4874+
attemptNumber:
4875+
$ref: "#/components/schemas/AttemptNumber"
4876+
hasRecordsCommitted:
4877+
type: boolean
4878+
recordsCommitted:
4879+
type: integer
4880+
format: int64
4881+
hasNormalizationFailed:
4882+
type: boolean
4883+
48414884
InvalidInputProperty:
48424885
type: object
48434886
required:

airbyte-bootloader/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ dependencies {
1111
implementation project(':airbyte-protocol:protocol-models')
1212
implementation project(':airbyte-persistence:job-persistence')
1313

14-
implementation 'io.temporal:temporal-sdk:1.8.1'
14+
implementation libs.temporal.sdk
1515
implementation libs.flyway.core
1616

1717
testImplementation libs.platform.testcontainers.postgresql

airbyte-cdk/python/CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# Changelog
22

3+
## 0.4.0
4+
Low-code: Add support for custom error messages on error response filters
5+
6+
## 0.3.0
7+
Publish python typehints via `py.typed` file.
8+
39
## 0.2.3
410
- Propagate options to InterpolatedRequestInputProvider
511

airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,10 @@ def __post_init__(self, options: Mapping[str, Any]):
4949
def max_retries(self) -> Union[int, None]:
5050
return self.error_handlers[0].max_retries
5151

52-
def should_retry(self, response: requests.Response) -> ResponseStatus:
52+
def interpret_response(self, response: requests.Response) -> ResponseStatus:
5353
should_retry = None
5454
for retrier in self.error_handlers:
55-
should_retry = retrier.should_retry(response)
55+
should_retry = retrier.interpret_response(response)
5656
if should_retry.action == ResponseAction.SUCCESS:
5757
return response_status.SUCCESS
5858
if should_retry == response_status.IGNORE or should_retry.action == ResponseAction.RETRY:

airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/default_error_handler.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from airbyte_cdk.sources.declarative.requesters.error_handlers.http_response_filter import HttpResponseFilter
1616
from airbyte_cdk.sources.declarative.requesters.error_handlers.response_action import ResponseAction
1717
from airbyte_cdk.sources.declarative.requesters.error_handlers.response_status import ResponseStatus
18+
from airbyte_cdk.sources.declarative.types import Config
1819
from dataclasses_jsonschema import JsonSchemaMixin
1920

2021

@@ -91,6 +92,7 @@ class DefaultErrorHandler(ErrorHandler, JsonSchemaMixin):
9192

9293
DEFAULT_BACKOFF_STRATEGY = ExponentialBackoffStrategy
9394

95+
config: Config
9496
options: InitVar[Mapping[str, Any]]
9597
response_filters: Optional[List[HttpResponseFilter]] = None
9698
max_retries: Optional[int] = 5
@@ -102,9 +104,11 @@ def __post_init__(self, options: Mapping[str, Any]):
102104

103105
if not self.response_filters:
104106
self.response_filters.append(
105-
HttpResponseFilter(ResponseAction.RETRY, http_codes=HttpResponseFilter.DEFAULT_RETRIABLE_ERRORS, options={})
107+
HttpResponseFilter(
108+
ResponseAction.RETRY, http_codes=HttpResponseFilter.DEFAULT_RETRIABLE_ERRORS, config=self.config, options={}
109+
)
106110
)
107-
self.response_filters.append(HttpResponseFilter(ResponseAction.IGNORE, options={}))
111+
self.response_filters.append(HttpResponseFilter(ResponseAction.IGNORE, config={}, options={}))
108112

109113
if not self.backoff_strategies:
110114
self.backoff_strategies = [DefaultErrorHandler.DEFAULT_BACKOFF_STRATEGY()]
@@ -122,20 +126,20 @@ def max_retries(self, value: Union[int, None]):
122126
if not isinstance(value, property):
123127
self._max_retries = value
124128

125-
def should_retry(self, response: requests.Response) -> ResponseStatus:
129+
def interpret_response(self, response: requests.Response) -> ResponseStatus:
126130
request = response.request
127131

128132
if request not in self._last_request_to_attempt_count:
129133
self._last_request_to_attempt_count = {request: 1}
130134
else:
131135
self._last_request_to_attempt_count[request] += 1
132136
for response_filter in self.response_filters:
133-
filter_action = response_filter.matches(response)
134-
if filter_action is not None:
135-
if filter_action == ResponseAction.RETRY:
136-
return ResponseStatus(ResponseAction.RETRY, self._backoff_time(response, self._last_request_to_attempt_count[request]))
137-
else:
138-
return ResponseStatus(filter_action)
137+
matched_status = response_filter.matches(
138+
response=response, backoff_time=self._backoff_time(response, self._last_request_to_attempt_count[request])
139+
)
140+
if matched_status is not None:
141+
return matched_status
142+
139143
if response.ok:
140144
return response_status.SUCCESS
141145
# Fail if the response matches no filters

airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/error_handler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ def max_retries(self) -> Union[int, None]:
2626
pass
2727

2828
@abstractmethod
29-
def should_retry(self, response: requests.Response) -> ResponseStatus:
29+
def interpret_response(self, response: requests.Response) -> ResponseStatus:
3030
"""
3131
Evaluate response status describing whether a failing request should be retried or ignored.
3232

airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/http_response_filter.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,11 @@
66
from typing import Any, Mapping, Optional, Set, Union
77

88
import requests
9+
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
910
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
1011
from airbyte_cdk.sources.declarative.requesters.error_handlers.response_action import ResponseAction
12+
from airbyte_cdk.sources.declarative.requesters.error_handlers.response_status import ResponseStatus
13+
from airbyte_cdk.sources.declarative.types import Config
1114
from airbyte_cdk.sources.streams.http.http import HttpStream
1215
from dataclasses_jsonschema import JsonSchemaMixin
1316

@@ -22,25 +25,43 @@ class HttpResponseFilter(JsonSchemaMixin):
2225
http_codes (Set[int]): http code of matching requests
2326
error_message_contains (str): error substring of matching requests
2427
predicate (str): predicate to apply to determine if a request is matching
28+
error_message (Union[InterpolatedString, str): error message to display if the response matches the filter
2529
"""
2630

2731
TOO_MANY_REQUESTS_ERRORS = {429}
2832
DEFAULT_RETRIABLE_ERRORS = set([x for x in range(500, 600)]).union(TOO_MANY_REQUESTS_ERRORS)
2933

3034
action: Union[ResponseAction, str]
35+
config: Config
3136
options: InitVar[Mapping[str, Any]]
3237
http_codes: Set[int] = None
3338
error_message_contains: str = None
3439
predicate: Union[InterpolatedBoolean, str] = ""
40+
error_message: Union[InterpolatedString, str] = ""
3541

3642
def __post_init__(self, options: Mapping[str, Any]):
3743
if isinstance(self.action, str):
3844
self.action = ResponseAction[self.action]
3945
self.http_codes = self.http_codes or set()
4046
if isinstance(self.predicate, str):
4147
self.predicate = InterpolatedBoolean(condition=self.predicate, options=options)
48+
self.error_message = InterpolatedString.create(string_or_interpolated=self.error_message, options=options)
4249

43-
def matches(self, response: requests.Response) -> Optional[ResponseAction]:
50+
def matches(self, response: requests.Response, backoff_time: Optional[float] = None) -> Optional[ResponseStatus]:
51+
filter_action = self._matches_filter(response)
52+
if filter_action is not None:
53+
error_message = self._create_error_message(response)
54+
if filter_action == ResponseAction.RETRY:
55+
return ResponseStatus(
56+
response_action=ResponseAction.RETRY,
57+
retry_in=backoff_time,
58+
error_message=error_message,
59+
)
60+
else:
61+
return ResponseStatus(filter_action, error_message=error_message)
62+
return None
63+
64+
def _matches_filter(self, response: requests.Response) -> Optional[ResponseAction]:
4465
"""
4566
Apply the filter on the response and return the action to execute if it matches
4667
:param response: The HTTP response to evaluate
@@ -55,6 +76,14 @@ def matches(self, response: requests.Response) -> Optional[ResponseAction]:
5576
else:
5677
return None
5778

79+
def _create_error_message(self, response: requests.Response) -> str:
80+
"""
81+
Construct an error message based on the specified message template of the filter.
82+
:param response: The HTTP response which can be used during interpolation
83+
:return: The evaluated error message string to be emitted
84+
"""
85+
return self.error_message.eval(self.config, response=response.json(), headers=response.headers)
86+
5887
def _response_matches_predicate(self, response: requests.Response) -> bool:
5988
return self.predicate and self.predicate.eval(None, response=response.json(), headers=response.headers)
6089

airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/error_handlers/response_status.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,20 +9,22 @@
99

1010
class ResponseStatus:
1111
"""
12-
ResponseAction amended with backoff time if a action is RETRY
12+
ResponseAction amended with backoff time if an action is RETRY
1313
"""
1414

15-
def __init__(self, response_action: Union[ResponseAction, str], retry_in: Optional[float] = None):
15+
def __init__(self, response_action: Union[ResponseAction, str], retry_in: Optional[float] = None, error_message: str = ""):
1616
"""
1717
:param response_action: response action to execute
1818
:param retry_in: backoff time (if action is RETRY)
19+
:param error_message: the error to be displayed back to the customer
1920
"""
2021
if isinstance(response_action, str):
2122
response_action = ResponseAction[response_action]
2223
if retry_in and response_action != ResponseAction.RETRY:
2324
raise ValueError(f"Unexpected backoff time ({retry_in} for non-retryable response action {response_action}")
2425
self._retry_in = retry_in
2526
self._action = response_action
27+
self._error_message = error_message
2628

2729
@property
2830
def action(self):
@@ -34,6 +36,11 @@ def retry_in(self) -> Optional[float]:
3436
"""How long to backoff before retrying a response. None if no wait required."""
3537
return self._retry_in
3638

39+
@property
40+
def error_message(self) -> str:
41+
"""The message to be displayed when an error response is received"""
42+
return self._error_message
43+
3744
@classmethod
3845
def retry(cls, retry_in: Optional[float]) -> "ResponseStatus":
3946
"""

airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/http_requester.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ def __post_init__(self, options: Mapping[str, Any]):
6060
if type(self.http_method) == str:
6161
self.http_method = HttpMethod[self.http_method]
6262
self._method = self.http_method
63-
self.error_handler = self.error_handler or DefaultErrorHandler(options=options)
63+
self.error_handler = self.error_handler or DefaultErrorHandler(options=options, config=self.config)
6464
self._options = options
6565

6666
# We are using an LRU cache in should_retry() method which requires all incoming arguments (including self) to be hashable.
@@ -88,9 +88,9 @@ def get_method(self):
8888
# use a tiny cache to limit the memory footprint. It doesn't have to be large because we mostly
8989
# only care about the status of the last response received
9090
@lru_cache(maxsize=10)
91-
def should_retry(self, response: requests.Response) -> ResponseStatus:
91+
def interpret_response_status(self, response: requests.Response) -> ResponseStatus:
9292
# Cache the result because the HttpStream first checks if we should retry before looking at the backoff time
93-
return self.error_handler.should_retry(response)
93+
return self.error_handler.interpret_response(response)
9494

9595
def get_request_params(
9696
self,

airbyte-cdk/python/airbyte_cdk/sources/declarative/requesters/requester.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,9 @@ def get_request_params(
7070
"""
7171

7272
@abstractmethod
73-
def should_retry(self, response: requests.Response) -> ResponseStatus:
73+
def interpret_response_status(self, response: requests.Response) -> ResponseStatus:
7474
"""
75-
Specifies conditions for backoff based on the response from the server.
75+
Specifies conditions for backoff, error handling and reporting based on the response from the server.
7676
7777
By default, back off on the following HTTP response statuses:
7878
- 429 (Too Many Requests) indicating rate limiting

0 commit comments

Comments
 (0)