Skip to content

Commit 8396fd2

Browse files
authored
airbyte-cdk: Improve Error Handling in Legacy CDK (#37576)
1 parent 49fc60d commit 8396fd2

19 files changed

+1761
-582
lines changed

airbyte-cdk/python/airbyte_cdk/sources/streams/http/__init__.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
#
44

55
# Initialize Streams Package
6-
from .exceptions import UserDefinedBackoffException
6+
from .http_client import HttpClient
77
from .http import HttpStream, HttpSubStream
8+
from .exceptions import UserDefinedBackoffException
89

9-
__all__ = ["HttpStream", "HttpSubStream", "UserDefinedBackoffException"]
10+
__all__ = ["HttpClient", "HttpStream", "HttpSubStream", "UserDefinedBackoffException"]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from .backoff_strategy import BackoffStrategy
6+
from .default_backoff_strategy import DefaultBackoffStrategy
7+
from .error_handler import ErrorHandler
8+
from .error_message_parser import ErrorMessageParser
9+
from .http_status_error_handler import HttpStatusErrorHandler
10+
from .json_error_message_parser import JsonErrorMessageParser
11+
from .response_models import ResponseAction, ErrorResolution
12+
13+
__all__ = [
14+
"BackoffStrategy",
15+
"DefaultBackoffStrategy",
16+
"ErrorHandler",
17+
"ErrorMessageParser",
18+
"HttpStatusErrorHandler",
19+
"JsonErrorMessageParser",
20+
"ResponseAction",
21+
"ErrorResolution"
22+
]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from abc import ABC, abstractmethod
6+
from typing import Optional, Union
7+
8+
import requests
9+
10+
11+
class BackoffStrategy(ABC):
12+
@abstractmethod
13+
def backoff_time(self, response_or_exception: Optional[Union[requests.Response, requests.RequestException]]) -> Optional[float]:
14+
"""
15+
Override this method to dynamically determine backoff time e.g: by reading the X-Retry-After header.
16+
17+
This method is called only if should_backoff() returns True for the input request.
18+
19+
:param response:
20+
:return how long to backoff in seconds. The return value may be a floating point number for subsecond precision. Returning None defers backoff
21+
to the default backoff behavior (e.g using an exponential algorithm).
22+
"""
23+
pass
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
2+
3+
4+
from datetime import timedelta
5+
from typing import Optional, Union
6+
7+
import requests
8+
9+
from .backoff_strategy import BackoffStrategy
10+
11+
12+
class DefaultBackoffStrategy(BackoffStrategy):
13+
def __init__(
14+
self,
15+
max_retries: int = 5,
16+
max_time: timedelta = timedelta(seconds=600),
17+
):
18+
self.max_retries = max_retries
19+
self.max_time = max_time.total_seconds()
20+
21+
def backoff_time(self, response_or_exception: Optional[Union[requests.Response, requests.RequestException]]) -> Optional[float]:
22+
"""
23+
Override this method to dynamically determine backoff time e.g: by reading the X-Retry-After header.
24+
25+
:param response:
26+
:return how long to backoff in seconds. The return value may be a floating point number for subsecond precision. Returning None defers backoff
27+
to the default backoff behavior (e.g using an exponential algorithm).
28+
"""
29+
return None
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
2+
3+
from abc import ABC, abstractmethod
4+
from typing import Optional, Union
5+
6+
import requests
7+
8+
from .response_models import ErrorResolution
9+
10+
11+
class ErrorHandler(ABC):
12+
"""
13+
Abstract base class to determine how to handle a failed HTTP request.
14+
"""
15+
16+
@abstractmethod
17+
def interpret_response(self, response: Optional[Union[requests.Response, Exception]]) -> ErrorResolution:
18+
"""
19+
Interpret the response or exception and return the corresponding response action, failure type, and error message.
20+
21+
:param response: The HTTP response object or exception raised during the request.
22+
:return: A tuple containing the response action, failure type, and error message.
23+
"""
24+
pass
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from abc import ABC, abstractmethod
6+
from typing import Optional
7+
8+
import requests
9+
10+
11+
class ErrorMessageParser(ABC):
12+
@abstractmethod
13+
def parse_response_error_message(self, response: requests.Response) -> Optional[str]:
14+
"""
15+
Parse error message from response.
16+
:param response: response received for the request
17+
:return: error message
18+
"""
19+
pass
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
2+
3+
import logging
4+
from typing import Mapping, Optional, Type, Union
5+
6+
import requests
7+
from airbyte_cdk.models import FailureType
8+
from requests import RequestException
9+
10+
from .error_handler import ErrorHandler
11+
from .response_models import ErrorResolution, ResponseAction
12+
13+
14+
class HttpStatusErrorHandler(ErrorHandler):
15+
16+
_DEFAULT_ERROR_MAPPING: Mapping[Union[int, str, Type[Exception]], ErrorResolution] = {
17+
RequestException: ErrorResolution(
18+
response_action=ResponseAction.RETRY,
19+
failure_type=FailureType.transient_error,
20+
error_message="An exception occurred when making the request.",
21+
),
22+
400: ErrorResolution(
23+
response_action=ResponseAction.FAIL,
24+
failure_type=FailureType.system_error,
25+
error_message="Bad request. Please check your request parameters.",
26+
),
27+
401: ErrorResolution(
28+
response_action=ResponseAction.FAIL,
29+
failure_type=FailureType.config_error,
30+
error_message="Unauthorized. Please ensure you are authenticated correctly.",
31+
),
32+
403: ErrorResolution(
33+
response_action=ResponseAction.FAIL,
34+
failure_type=FailureType.config_error,
35+
error_message="Forbidden. You don't have permission to access this resource.",
36+
),
37+
404: ErrorResolution(
38+
response_action=ResponseAction.FAIL,
39+
failure_type=FailureType.system_error,
40+
error_message="Not found. The requested resource was not found on the server.",
41+
),
42+
405: ErrorResolution(
43+
response_action=ResponseAction.FAIL,
44+
failure_type=FailureType.system_error,
45+
error_message="Method not allowed. Please check your request method.",
46+
),
47+
408: ErrorResolution(
48+
response_action=ResponseAction.RETRY,
49+
failure_type=FailureType.transient_error,
50+
error_message="Request timeout.",
51+
),
52+
429: ErrorResolution(
53+
response_action=ResponseAction.RETRY,
54+
failure_type=FailureType.transient_error,
55+
error_message="Too many requests.",
56+
),
57+
500: ErrorResolution(
58+
response_action=ResponseAction.RETRY,
59+
failure_type=FailureType.transient_error,
60+
error_message="Internal server error.",
61+
),
62+
502: ErrorResolution(
63+
response_action=ResponseAction.RETRY,
64+
failure_type=FailureType.transient_error,
65+
error_message="Bad gateway.",
66+
),
67+
503: ErrorResolution(
68+
response_action=ResponseAction.RETRY,
69+
failure_type=FailureType.transient_error,
70+
error_message="Service unavailable.",
71+
),
72+
504: ErrorResolution(
73+
response_action=ResponseAction.RETRY,
74+
failure_type=FailureType.transient_error,
75+
error_message="Gateway timeout.",
76+
),
77+
}
78+
79+
def __init__(
80+
self,
81+
logger: logging.Logger,
82+
error_mapping: Optional[Mapping[Union[int, str, type[Exception]], ErrorResolution]] = None,
83+
) -> None:
84+
"""
85+
Initialize the HttpStatusErrorHandler.
86+
87+
:param error_mapping: Custom error mappings to extend or override the default mappings.
88+
"""
89+
self._logger = logger
90+
self._error_mapping = error_mapping or self._DEFAULT_ERROR_MAPPING
91+
92+
def interpret_response(self, response_or_exception: Optional[Union[requests.Response, Exception]] = None) -> ErrorResolution:
93+
"""
94+
Interpret the response and return the corresponding response action, failure type, and error message.
95+
96+
:param response: The HTTP response object.
97+
:return: A tuple containing the response action, failure type, and error message.
98+
"""
99+
100+
if isinstance(response_or_exception, Exception):
101+
mapped_error: Optional[ErrorResolution] = self._error_mapping.get(response_or_exception.__class__)
102+
103+
if mapped_error is not None:
104+
return mapped_error
105+
else:
106+
self._logger.error(f"Unexpected exception in error handler: {response_or_exception}")
107+
return ErrorResolution(
108+
response_action=ResponseAction.RETRY,
109+
failure_type=FailureType.system_error,
110+
error_message=f"Unexpected exception in error handler: {response_or_exception}",
111+
)
112+
113+
elif isinstance(response_or_exception, requests.Response):
114+
if response_or_exception.status_code is None:
115+
self._logger.error("Response does not include an HTTP status code.")
116+
return ErrorResolution(
117+
response_action=ResponseAction.RETRY,
118+
failure_type=FailureType.transient_error,
119+
error_message="Response does not include an HTTP status code.",
120+
)
121+
122+
if response_or_exception.ok:
123+
return ErrorResolution(
124+
response_action=ResponseAction.SUCCESS,
125+
failure_type=None,
126+
error_message=None,
127+
)
128+
129+
error_key = response_or_exception.status_code
130+
131+
mapped_error = self._error_mapping.get(error_key)
132+
133+
if mapped_error is not None:
134+
return mapped_error
135+
else:
136+
self._logger.warning(f"Unexpected HTTP Status Code in error handler: '{error_key}'")
137+
return ErrorResolution(
138+
response_action=ResponseAction.RETRY,
139+
failure_type=FailureType.system_error,
140+
error_message=f"Unexpected HTTP Status Code in error handler: {error_key}",
141+
)
142+
else:
143+
self._logger.error(f"Received unexpected response type: {type(response_or_exception)}")
144+
return ErrorResolution(
145+
response_action=ResponseAction.FAIL,
146+
failure_type=FailureType.system_error,
147+
error_message=f"Received unexpected response type: {type(response_or_exception)}",
148+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
#
2+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from typing import Optional
6+
7+
import requests
8+
from airbyte_cdk.sources.utils.types import JsonType
9+
10+
from .error_message_parser import ErrorMessageParser
11+
12+
13+
class JsonErrorMessageParser(ErrorMessageParser):
14+
def _try_get_error(self, value: Optional[JsonType]) -> Optional[str]:
15+
if isinstance(value, str):
16+
return value
17+
elif isinstance(value, list):
18+
errors_in_value = [self._try_get_error(v) for v in value]
19+
return ", ".join(v for v in errors_in_value if v is not None)
20+
elif isinstance(value, dict):
21+
new_value = (
22+
value.get("message")
23+
or value.get("messages")
24+
or value.get("error")
25+
or value.get("errors")
26+
or value.get("failures")
27+
or value.get("failure")
28+
or value.get("detail")
29+
)
30+
return self._try_get_error(new_value)
31+
return None
32+
33+
def parse_response_error_message(self, response: requests.Response) -> Optional[str]:
34+
"""
35+
Parses the raw response object from a failed request into a user-friendly error message.
36+
37+
:param response:
38+
:return: A user-friendly message that indicates the cause of the error
39+
"""
40+
try:
41+
body = response.json()
42+
return self._try_get_error(body)
43+
except requests.exceptions.JSONDecodeError:
44+
return None
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
2+
3+
from dataclasses import dataclass
4+
from enum import Enum
5+
from typing import Optional
6+
7+
from airbyte_cdk.models import FailureType
8+
9+
10+
class ResponseAction(Enum):
11+
SUCCESS = "SUCCESS"
12+
RETRY = "RETRY"
13+
FAIL = "FAIL"
14+
IGNORE = "IGNORE"
15+
16+
17+
@dataclass
18+
class ErrorResolution:
19+
response_action: Optional[ResponseAction] = None
20+
failure_type: Optional[FailureType] = None
21+
error_message: Optional[str] = None

airbyte-cdk/python/airbyte_cdk/sources/streams/http/exceptions.py

+23-7
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,27 @@
33
#
44

55

6-
from typing import Union
6+
from typing import Optional, Union
77

88
import requests
99

1010

1111
class BaseBackoffException(requests.exceptions.HTTPError):
12-
def __init__(self, request: requests.PreparedRequest, response: requests.Response, error_message: str = ""):
13-
error_message = (
14-
error_message or f"Request URL: {request.url}, Response Code: {response.status_code}, Response Text: {response.text}"
15-
)
16-
super().__init__(error_message, request=request, response=response)
12+
def __init__(
13+
self,
14+
request: requests.PreparedRequest,
15+
response: Optional[Union[requests.Response, Exception]],
16+
error_message: str = "",
17+
):
18+
19+
if isinstance(response, requests.Response):
20+
error_message = (
21+
error_message or f"Request URL: {request.url}, Response Code: {response.status_code}, Response Text: {response.text}"
22+
)
23+
super().__init__(error_message, request=request, response=response)
24+
else:
25+
error_message = error_message or f"Request URL: {request.url}, Exception: {response}"
26+
super().__init__(error_message, request=request, response=None)
1727

1828

1929
class RequestBodyException(Exception):
@@ -27,7 +37,13 @@ class UserDefinedBackoffException(BaseBackoffException):
2737
An exception that exposes how long it attempted to backoff
2838
"""
2939

30-
def __init__(self, backoff: Union[int, float], request: requests.PreparedRequest, response: requests.Response, error_message: str = ""):
40+
def __init__(
41+
self,
42+
backoff: Union[int, float],
43+
request: requests.PreparedRequest,
44+
response: Optional[Union[requests.Response, Exception]],
45+
error_message: str = "",
46+
):
3147
"""
3248
:param backoff: how long to backoff in seconds
3349
:param request: the request that triggered this backoff exception

0 commit comments

Comments
 (0)