Skip to content

[airbyte-ci] Format using a poe task #38043

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ def create_source(config: Mapping[str, Any], limits: TestReadLimits) -> Manifest


def read_stream(
source: DeclarativeSource, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog, state: List[AirbyteStateMessage], limits: TestReadLimits
source: DeclarativeSource,
config: Mapping[str, Any],
configured_catalog: ConfiguredAirbyteCatalog,
state: List[AirbyteStateMessage],
limits: TestReadLimits,
) -> AirbyteMessage:
try:
handler = MessageGrouper(limits.max_pages_per_slice, limits.max_slices, limits.max_records)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ def get_message_groups(
raise ValueError(f"Unknown message group type: {type(message_group)}")

try:
configured_stream = configured_catalog.streams[0] # The connector builder currently only supports reading from a single stream at a time
# The connector builder currently only supports reading from a single stream at a time
configured_stream = configured_catalog.streams[0]
schema = schema_inferrer.get_stream_schema(configured_stream.stream.name)
except SchemaValidationException as exception:
for validation_error in exception.validation_errors:
Expand Down Expand Up @@ -183,7 +184,11 @@ def _get_message_groups(
and message.type == MessageType.LOG
and message.log.message.startswith(SliceLogger.SLICE_LOG_PREFIX)
):
yield StreamReadSlices(pages=current_slice_pages, slice_descriptor=current_slice_descriptor, state=[latest_state_message] if latest_state_message else [])
yield StreamReadSlices(
pages=current_slice_pages,
slice_descriptor=current_slice_descriptor,
state=[latest_state_message] if latest_state_message else [],
)
current_slice_descriptor = self._parse_slice_description(message.log.message)
current_slice_pages = []
at_least_one_page_in_group = False
Expand Down Expand Up @@ -230,7 +235,11 @@ def _get_message_groups(
else:
if current_page_request or current_page_response or current_page_records:
self._close_page(current_page_request, current_page_response, current_slice_pages, current_page_records)
yield StreamReadSlices(pages=current_slice_pages, slice_descriptor=current_slice_descriptor, state=[latest_state_message] if latest_state_message else [])
yield StreamReadSlices(
pages=current_slice_pages,
slice_descriptor=current_slice_descriptor,
state=[latest_state_message] if latest_state_message else [],
)

@staticmethod
def _need_to_close_page(at_least_one_page_in_group: bool, message: AirbyteMessage, json_message: Optional[Dict[str, Any]]) -> bool:
Expand Down Expand Up @@ -281,8 +290,11 @@ def _close_page(
current_page_records.clear()

def _read_stream(
self, source: DeclarativeSource, config: Mapping[str, Any], configured_catalog: ConfiguredAirbyteCatalog,
state: List[AirbyteStateMessage]
self,
source: DeclarativeSource,
config: Mapping[str, Any],
configured_catalog: ConfiguredAirbyteCatalog,
state: List[AirbyteStateMessage],
) -> Iterator[AirbyteMessage]:
# the generator can raise an exception
# iterate over the generated messages. if next raise an exception, catch it and yield it as an AirbyteLogMessage
Expand Down
6 changes: 5 additions & 1 deletion airbyte-cdk/python/airbyte_cdk/test/catalog_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ def with_stream(self, name: Union[str, ConfiguredAirbyteStreamBuilder], sync_mod

# to avoid a breaking change, `name` needs to stay in the API but this can be either a name or a builder
name_or_builder = name
builder = name_or_builder if isinstance(name_or_builder, ConfiguredAirbyteStreamBuilder) else ConfiguredAirbyteStreamBuilder().with_name(name_or_builder).with_sync_mode(sync_mode)
builder = (
name_or_builder
if isinstance(name_or_builder, ConfiguredAirbyteStreamBuilder)
else ConfiguredAirbyteStreamBuilder().with_name(name_or_builder).with_sync_mode(sync_mode)
)
self._streams.append(builder)
return self

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,7 @@ def build(self) -> Dict[str, Any]:

class HttpResponseBuilder:
def __init__(
self,
template: Dict[str, Any],
records_path: Union[FieldPath, NestedPath],
pagination_strategy: Optional[PaginationStrategy]
self, template: Dict[str, Any], records_path: Union[FieldPath, NestedPath], pagination_strategy: Optional[PaginationStrategy]
):
self._response = template
self._records: List[RecordBuilder] = []
Expand Down Expand Up @@ -198,16 +195,16 @@ def create_record_builder(
try:
record_template = records_path.extract(response_template)[0]
if not record_template:
raise ValueError(f"Could not extract any record from template at path `{records_path}`. "
f"Please fix the template to provide a record sample or fix `records_path`.")
raise ValueError(
f"Could not extract any record from template at path `{records_path}`. "
f"Please fix the template to provide a record sample or fix `records_path`."
)
return RecordBuilder(record_template, record_id_path, record_cursor_path)
except (IndexError, KeyError):
raise ValueError(f"Error while extracting records at path `{records_path}` from response template `{response_template}`")


def create_response_builder(
response_template: Dict[str, Any],
records_path: Union[FieldPath, NestedPath],
pagination_strategy: Optional[PaginationStrategy] = None
response_template: Dict[str, Any], records_path: Union[FieldPath, NestedPath], pagination_strategy: Optional[PaginationStrategy] = None
) -> HttpResponseBuilder:
return HttpResponseBuilder(response_template, records_path, pagination_strategy)
12 changes: 3 additions & 9 deletions airbyte-cdk/python/airbyte_cdk/test/state_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,9 @@ def __init__(self) -> None:
self._state: List[AirbyteStateMessage] = []

def with_stream_state(self, stream_name: str, state: Any) -> "StateBuilder":
self._state.append(AirbyteStateMessage.parse_obj({
"type": "STREAM",
"stream": {
"stream_state": state,
"stream_descriptor": {
"name": stream_name
}
}
}))
self._state.append(
AirbyteStateMessage.parse_obj({"type": "STREAM", "stream": {"stream_state": state, "stream_descriptor": {"name": stream_name}}})
)
return self

def build(self) -> List[AirbyteStateMessage]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,6 @@ class FormatConfiguration:
Formatter.PYTHON,
["**/*.py"],
format_python_container,
["poetry run isort --settings-file pyproject.toml .", "poetry run black --config pyproject.toml ."],
["poetry run poe format"],
),
]
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
"**/pnpm-lock.yaml", # This file is generated and should not be formatted
"**/normalization_test_output",
"**/source-amplitude/unit_tests/api_data/zipped.json", # Zipped file presents as non-UTF-8 making spotless sad
"**/tools/git_hooks/tests/test_spec_linter.py",
"airbyte-cdk/python/airbyte_cdk/sources/declarative/models/**", # These files are generated and should not be formatted
"airbyte-ci/connectors/metadata_service/lib/metadata_service/models/generated/**", # These files are generated and should not be formatted
"**/airbyte-ci/connectors/metadata_service/lib/tests/fixtures/**/invalid", # This is a test directory with invalid and sometimes unformatted code
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from dagger import Container



async def pre_connector_install(base_image_container: Container) -> Container:
"""This function will run before the connector installation.
We set these environment variable to match what was originally in the Dockerfile.
Expand Down
76 changes: 53 additions & 23 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

57 changes: 38 additions & 19 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,30 @@ python = "~3.10"
isort = "5.6.4"
black = "~22.3.0"
ruff = "^0.4"
poethepoet = "^0.26.1"

[tool.poe.tasks]
isort = { cmd = "poetry run isort --settings-file pyproject.toml ." }
black = { cmd = "poetry run black --config pyproject.toml ." }
format = { sequence = [
"isort",
"black",
], help = "Format Python code in the repository. This command is invoked in airbyte-ci format." }

[tool.black]
line-length = 140
target-version = ["py310"]
extend-exclude = "(build|integration_tests|unit_tests|generated)"

[tool.coverage.report]
fail_under = 0
skip_empty = true
sort = "-cover"
omit = [
".venv/*",
"main.py",
"setup.py",
"unit_tests/*",
"integration_tests/*",
"**/generated/*",
]
extend-exclude = """
/(
build
| integration_tests
| unit_tests
| generated
| airbyte-cdk/python/airbyte_cdk/sources/declarative/models
| invalid
| non_formatted_code
)/
"""

[tool.flake8]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meh, let's nuke this in favor of Ruff (soon). Happy to extract into a separate one, but thought I'd remove it while I was there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly putting this back solves the formatting mismatch in CDK, so perhaps I should scope this PR only on formatting, and not linting, and that might be easier.

extend-exclude = [
Expand All @@ -37,15 +43,12 @@ extend-exclude = [
"build",
"models",
".eggs",
"airbyte-cdk/python/airbyte_cdk/models/__init__.py",
"airbyte-cdk/python/airbyte_cdk/sources/declarative/models/__init__.py",
".tox",
"airbyte_api_client",
"**/__init__.py",
"**/generated/*",
"**/declarative/models/*",
]
max-complexity = 20
max-line-length = 140
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specifically removing this line results in a BARRAGE of problems in python CDK because it tries to do flake check.

There are two paths forward — either remove the flake check before this PR, or remove it from this PR and do after.


extend-ignore = [
"E203", # whitespace before ':' (conflicts with Black)
"E231", # Bad trailing comma (conflicts with Black)
Expand All @@ -54,6 +57,19 @@ extend-ignore = [
"F811", # TODO: ella fix after pflake8 version update
]

[tool.coverage.report]
fail_under = 0
skip_empty = true
sort = "-cover"
omit = [
".venv/*",
"main.py",
"setup.py",
"unit_tests/*",
"integration_tests/*",
"**/generated/*",
]

# TODO: This will be removed in favor of the section below.
[tool.isort]
profile = "black"
Expand All @@ -65,6 +81,9 @@ include_trailing_comma = true
force_grid_wrap = 0
use_parentheses = true
skip_glob = [
"airbyte-cdk/python/airbyte_cdk/sources/declarative/models/**",
"**/invalid/**",
"**/non_formatted_code/**",
"**/connector_builder/generated/**",
# TODO: Remove this after we move to Ruff. Ruff is mono-repo-aware and
# correctly handles first-party imports in subdirectories.
Expand Down
Loading