Skip to content

feat(airbyte-cdk): Have better fallback error message #43399

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@

import requests
from airbyte_cdk.sources.streams.http.error_handlers import ErrorHandler
from airbyte_cdk.sources.streams.http.error_handlers.response_models import DEFAULT_ERROR_RESOLUTION, ErrorResolution, ResponseAction
from airbyte_cdk.sources.streams.http.error_handlers.response_models import (
ErrorResolution,
ResponseAction,
create_fallback_error_resolution,
)


@dataclass
Expand Down Expand Up @@ -69,4 +73,4 @@ def interpret_response(self, response_or_exception: Optional[Union[requests.Resp
if matched_error_resolution:
return matched_error_resolution

return DEFAULT_ERROR_RESOLUTION
return create_fallback_error_resolution(response_or_exception)
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
from airbyte_cdk.sources.declarative.requesters.error_handlers.default_http_response_filter import DefaultHttpResponseFilter
from airbyte_cdk.sources.declarative.requesters.error_handlers.http_response_filter import HttpResponseFilter
from airbyte_cdk.sources.streams.http.error_handlers import BackoffStrategy, ErrorHandler
from airbyte_cdk.sources.streams.http.error_handlers.response_models import DEFAULT_ERROR_RESOLUTION, SUCCESS_RESOLUTION, ErrorResolution
from airbyte_cdk.sources.streams.http.error_handlers.response_models import (
SUCCESS_RESOLUTION,
ErrorResolution,
create_fallback_error_resolution,
)
from airbyte_cdk.sources.types import Config


Expand Down Expand Up @@ -114,7 +118,11 @@ def interpret_response(self, response_or_exception: Optional[Union[requests.Resp
default_reponse_filter = DefaultHttpResponseFilter(parameters={}, config=self.config)
default_response_filter_resolution = default_reponse_filter.matches(response_or_exception)

return default_response_filter_resolution if default_response_filter_resolution else DEFAULT_ERROR_RESOLUTION
return (
default_response_filter_resolution
if default_response_filter_resolution
else create_fallback_error_resolution(response_or_exception)
)

def backoff_time(
self, response_or_exception: Optional[Union[requests.Response, requests.RequestException]], attempt_count: int = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import requests
from airbyte_cdk.sources.declarative.requesters.error_handlers.http_response_filter import HttpResponseFilter
from airbyte_cdk.sources.streams.http.error_handlers.default_error_mapping import DEFAULT_ERROR_MAPPING
from airbyte_cdk.sources.streams.http.error_handlers.response_models import DEFAULT_ERROR_RESOLUTION, ErrorResolution
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ErrorResolution, create_fallback_error_resolution


class DefaultHttpResponseFilter(HttpResponseFilter):
Expand All @@ -25,4 +25,6 @@ def matches(self, response_or_exception: Optional[Union[requests.Response, Excep

default_mapped_error_resolution = DEFAULT_ERROR_MAPPING.get(mapped_key)

return default_mapped_error_resolution if default_mapped_error_resolution else DEFAULT_ERROR_RESOLUTION
return (
default_mapped_error_resolution if default_mapped_error_resolution else create_fallback_error_resolution(response_or_exception)
)
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@

from dataclasses import dataclass
from enum import Enum
from typing import Optional
from typing import Optional, Union

import requests
from airbyte_cdk.models import FailureType
from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets
from requests import HTTPError


class ResponseAction(Enum):
Expand All @@ -22,10 +25,34 @@ class ErrorResolution:
error_message: Optional[str] = None


DEFAULT_ERROR_RESOLUTION = ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.system_error,
error_message="The request failed due to an unknown error.",
)
def _format_exception_error_message(exception: Exception) -> str:
return f"{type(exception).__name__}: {str(exception)}"


def _format_response_error_message(response: requests.Response) -> str:
try:
response.raise_for_status()
except HTTPError as exception:
return filter_secrets(f"Response was not ok: `{str(exception)}`. Response content is: {response.text}")
# We purposefully do not add the response.content because the response is "ok" so there might be sensitive information in the payload.
# Feel free the
return f"Unexpected response with HTTP status {response.status_code}"


def create_fallback_error_resolution(response_or_exception: Optional[Union[requests.Response, Exception]]) -> ErrorResolution:
if response_or_exception is None:
# We do not expect this case to happen but if it does, it would be good to understand the cause and improve the error message
error_message = "Unexpected argument while handling error"
elif isinstance(response_or_exception, Exception):
error_message = _format_exception_error_message(response_or_exception)
else:
error_message = _format_response_error_message(response_or_exception)

return ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.system_error,
error_message=error_message,
)


SUCCESS_RESOLUTION = ErrorResolution(response_action=ResponseAction.SUCCESS, failure_type=None, error_message=None)
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from airbyte_cdk.sources.declarative.requesters.error_handlers.composite_error_handler import CompositeErrorHandler
from airbyte_cdk.sources.declarative.requesters.error_handlers.default_error_handler import DefaultErrorHandler
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ErrorResolution, ResponseAction
from airbyte_protocol.models import FailureType

SOME_BACKOFF_TIME = 60

Expand Down Expand Up @@ -97,6 +98,24 @@ def test_composite_error_handler(test_name, first_handler_behavior, second_handl
assert retrier.interpret_response(response_mock) == expected_behavior


def test_given_unmatched_response_or_exception_then_return_default_error_resolution():
composite_error_handler = CompositeErrorHandler(
error_handlers=[
DefaultErrorHandler(
response_filters=[],
parameters={},
config={},
)
],
parameters={},
)

error_resolution = composite_error_handler.interpret_response(ValueError("Any error"))

assert error_resolution.response_action == ResponseAction.RETRY
assert error_resolution.failure_type == FailureType.system_error


def test_composite_error_handler_no_handlers():
try:
CompositeErrorHandler(error_handlers=[], parameters={})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,7 @@
)
from airbyte_cdk.sources.declarative.requesters.error_handlers.default_error_handler import DefaultErrorHandler, HttpResponseFilter
from airbyte_cdk.sources.streams.http.error_handlers.default_error_mapping import DEFAULT_ERROR_MAPPING
from airbyte_cdk.sources.streams.http.error_handlers.response_models import (
DEFAULT_ERROR_RESOLUTION,
ErrorResolution,
FailureType,
ResponseAction,
)
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ErrorResolution, FailureType, ResponseAction

SOME_BACKOFF_TIME = 60

Expand Down Expand Up @@ -55,7 +50,7 @@
ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.system_error,
error_message="The request failed due to an unknown error.",
error_message="Unexpected response with HTTP status 418",
),
)
],
Expand Down Expand Up @@ -246,7 +241,9 @@ def test_default_error_handler_with_unmapped_http_code():
response_mock.ok = False
response_mock.headers = {}
actual_error_resolution = error_handler.interpret_response(response_mock)
assert actual_error_resolution == DEFAULT_ERROR_RESOLUTION
assert actual_error_resolution
assert actual_error_resolution.failure_type == FailureType.system_error
assert actual_error_resolution.response_action == ResponseAction.RETRY


def test_predicate_takes_precedent_over_default_mapped_error():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
import pytest
from airbyte_cdk.sources.declarative.requesters.error_handlers.default_http_response_filter import DefaultHttpResponseFilter
from airbyte_cdk.sources.streams.http.error_handlers.default_error_mapping import DEFAULT_ERROR_MAPPING
from airbyte_cdk.sources.streams.http.error_handlers.response_models import DEFAULT_ERROR_RESOLUTION
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction
from airbyte_protocol.models import FailureType
from requests import RequestException, Response


Expand Down Expand Up @@ -69,4 +70,6 @@ def test_unmapped_http_status_code_returns_default_error_resolution():
)

actual_error_resolution = response_filter.matches(response)
assert actual_error_resolution == DEFAULT_ERROR_RESOLUTION
assert actual_error_resolution
assert actual_error_resolution.failure_type == FailureType.system_error
assert actual_error_resolution.response_action == ResponseAction.RETRY
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

from unittest import TestCase

import requests
import requests_mock
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction, create_fallback_error_resolution
from airbyte_cdk.utils.airbyte_secrets_utils import update_secrets
from airbyte_protocol.models import FailureType

_A_SECRET = "a-secret"
_A_URL = "https://a-url.com"

class DefaultErrorResolutionTest(TestCase):

def setUp(self) -> None:
update_secrets([_A_SECRET])

def tearDown(self) -> None:
# to avoid other tests being impacted by added secrets
update_secrets([])

def test_given_none_when_create_fallback_error_resolution_then_return_error_resolution(self) -> None:
error_resolution = create_fallback_error_resolution(None)

assert error_resolution.failure_type == FailureType.system_error
assert error_resolution.response_action == ResponseAction.RETRY
assert error_resolution.error_message == "Unexpected argument while handling error"

def test_given_exception_when_create_fallback_error_resolution_then_return_error_resolution(self) -> None:
exception = ValueError("This is an exception")

error_resolution = create_fallback_error_resolution(exception)

assert error_resolution.failure_type == FailureType.system_error
assert error_resolution.response_action == ResponseAction.RETRY
assert error_resolution.error_message
assert "ValueError" in error_resolution.error_message
assert str(exception) in error_resolution.error_message

def test_given_response_can_raise_for_status_when_create_fallback_error_resolution_then_error_resolution(self) -> None:
response = self._create_response(512)

error_resolution = create_fallback_error_resolution(response)

assert error_resolution.failure_type == FailureType.system_error
assert error_resolution.response_action == ResponseAction.RETRY
assert error_resolution.error_message and "512 Server Error: None for url: https://a-url.com/" in error_resolution.error_message

def test_given_response_is_ok_when_create_fallback_error_resolution_then_error_resolution(self) -> None:
response = self._create_response(205)

error_resolution = create_fallback_error_resolution(response)

assert error_resolution.failure_type == FailureType.system_error
assert error_resolution.response_action == ResponseAction.RETRY
assert error_resolution.error_message and str(response.status_code) in error_resolution.error_message

def _create_response(self, status_code: int) -> requests.Response:
with requests_mock.Mocker() as http_mocker:
http_mocker.get(_A_URL, status_code=status_code)
return requests.get(_A_URL)
Loading