Skip to content

live-tests: debug mode and initial regression tests framework #35624

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/airbyte-ci-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ jobs:
- airbyte-ci/connectors/connector_ops/**
- airbyte-ci/connectors/connectors_qa/**
- airbyte-ci/connectors/ci_credentials/**
- airbyte-ci/connectors/live-tests/**
- airbyte-ci/connectors/metadata_service/lib/**
- airbyte-ci/connectors/metadata_service/orchestrator/**
- airbyte-integrations/bases/connector-acceptance-test/**
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ static_checker_reports/
# Logs
acceptance_tests_logs/
airbyte_ci_logs/
live_tests_debug_reports/

# Secrets
secrets
Expand Down
108 changes: 108 additions & 0 deletions airbyte-ci/connectors/live-tests/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# Connector Live Testing

This project contains utilities for running connector tests against live data.

## Requirements
* `docker`
* `Python ^3.10`
* `pipx`
* `poetry`

## Install
```bash
# From airbyte-ci/connectors/live-tests
pipx install .
# To install in editable mode for development
pipx install . --force --editable
```

## Commands

### `debug`

```
Usage: live-tests debug [OPTIONS] COMMAND

Run a specific command on one or multiple connectors and persists the
outputs to local storage.

Options:
-c, --connector-image TEXT Docker image name of the connector to debug
(e.g. `source-faker:latest`, `source-
faker:dev`) [required]
-o, --output-directory DIRECTORY
Directory in which connector output and test
results should be stored.
Defaults to the current directory.
--config-path FILE Path to the connector config.
--catalog-path FILE Path to the connector catalog.
--state-path FILE Path to the connector state.
-hc, --http-cache Use the HTTP cache for the connector.
--help Show this message and exit.
```

This command is made to run any of the following connector commands against one or multiple connector images.

**Available connector commands:**
* `spec`
* `check`
* `discover`
* `read` or `read_with_state` (requires a `--state-path` to be passed)

It will write artifacts to an output directory:
* `stdout.log`: The collected standard output following the command execution
* `stderr.log`: The collected standard error following the c
* `http_dump.txt`: An `mitmproxy` http stream log. Can be consumed with `mitmweb` (version `9.0.1`) for debugging.

#### Example
Let's run `debug` to check the output of `read` on two different versions of the same connector:

```bash
live-tests debug read \
--connector-image=airbyte/source-pokeapi:dev \
--connector-image=airbyte/source-pokeapi:latest \
--config-path=poke_config.json \
--catalog-path=configured_catalog.json
```

It will store the results in a `live_test_debug_reports` directory under the current working directory:

```
live_tests_debug_reports
└── 1709547771
└── source-pokeapi
└── read
├── dev
│   ├── airbyte_messages
│   │   ├── logs.jsonl
│   │   ├── pokemon_records.jsonl
│   │   └── traces.jsonl
│   ├── http_dump.mitm
│   ├── stderr.log
│   └── stdout.log
└── latest
├── airbyte_messages
│   ├── logs.jsonl
│   ├── pokemon_records.jsonl
│   └── traces.jsonl
├── http_dump.mitm
├── stderr.log
└── stdout.log

```

##### Consuming `http_dump.mitm`
You can install [`mitmproxy`](https://mitmproxy.org/):
```bash
pipx install mitmproxy
```

And run:
```bash
mitmweb --rfile=http_dump.mitm
```

## Changelog

### 0.1.0
Implement initial primitives and a `debug` command to run connector commands and persist the outputs to local storage.
1,061 changes: 1,061 additions & 0 deletions airbyte-ci/connectors/live-tests/poetry.lock

Large diffs are not rendered by default.

46 changes: 46 additions & 0 deletions airbyte-ci/connectors/live-tests/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "live-tests"
version = "0.1.0"
description = "Contains utilities for testing connectors against live data."
authors = ["Airbyte <[email protected]>"]
license = "MIT"
homepage = "https://github.com/airbytehq/airbyte"
readme = "README.md"
packages = [
{ include = "live_tests", from = "src" },
]

[tool.poetry.dependencies]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to add click as a depenency
click = "^8.1.3"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after an update, I now needed to add
asyncclick ="^8.1.3.2"

python = "^3.10"
airbyte-protocol-models = "<1.0.0"
cachetools = "~=5.3.3"
dagger-io = "==0.9.6"
pydantic = "*"
pytest = "~=8.0.2"
pytest-asyncio = "~=0.23.5"
pydash = "~=7.0.7"
docker = ">=6,<7"
asyncclick = "^8.1.7.1"

[tool.poetry.scripts]
live-tests = "live_tests.cli:live_tests"

[tool.poetry.group.dev.dependencies]
ruff = "^0.3.0"
mypy = "^1.8.0"
types-cachetools = "^5.3.0.7"

[tool.poe.tasks]
test = "pytest tests"
lint = "ruff check src"
type_check = "mypy src"

[tool.airbyte_ci]
poe_tasks = ["test", "lint", "type_check"]

[tool.pytest.ini_options]
pythonpath = ["src"]
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
13 changes: 13 additions & 0 deletions airbyte-ci/connectors/live-tests/src/live_tests/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

import asyncclick as click
from live_tests.debug.cli import debug_cmd


@click.group()
@click.pass_context
async def live_tests(ctx):
pass


live_tests.add_command(debug_cmd)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

from .base_backend import BaseBackend
from .file_backend import FileBackend

__all__ = ["BaseBackend", "FileBackend"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

from abc import ABC, abstractmethod
from typing import Iterable

from airbyte_protocol.models import AirbyteMessage # type: ignore


class BaseBackend(ABC):
"""
Interface to be shared between the file backend and the database backend(s)
"""

@abstractmethod
async def write(self, airbyte_messages: Iterable[AirbyteMessage]) -> None:
...
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

import json
from pathlib import Path
from typing import Iterable, TextIO, Tuple

import pydash
from airbyte_protocol.models import AirbyteMessage # type: ignore
from airbyte_protocol.models import Type as AirbyteMessageType
from cachetools import LRUCache, cached
from live_tests.commons.backends.base_backend import BaseBackend


class FileDescriptorLRUCache(LRUCache):
def popitem(self):
filepath, fd = LRUCache.popitem(self)
fd.close() # Close the file descriptor when it's evicted from the cache
return filepath, fd


class FileBackend(BaseBackend):
RELATIVE_CATALOGS_PATH = "catalog.jsonl"
RELATIVE_CONNECTION_STATUS_PATH = "connection_status.jsonl"
RELATIVE_RECORDS_PATH = "records.jsonl"
RELATIVE_SPECS_PATH = "spec.jsonl"
RELATIVE_STATES_PATH = "states.jsonl"
RELATIVE_TRACES_PATH = "traces.jsonl"
RELATIVE_LOGS_PATH = "logs.jsonl"
RELATIVE_CONTROLS_PATH = "controls.jsonl"
RECORD_PATHS_TO_POP = ["emitted_at"]
CACHE = FileDescriptorLRUCache(maxsize=250)

def __init__(self, output_directory: Path):
self._output_directory = output_directory

async def write(self, airbyte_messages: Iterable[AirbyteMessage]):
"""
Write AirbyteMessages to the appropriate file.

Catalogs, connection status messages, specs, trace messages, logs, and control messages are all written to their
own file (e.g. "catalog.jsonl", "spec.jsonl").

Records and state messages are further subdivided, with one file per stream (e.g. "my_stream_records.jsonl",
"my_stream_states.jsonl"). Streams with global state are stored in a "_global_states.jsonl" file.

We use an LRU cache here to manage open file objects, in order to limit the number of concurrently open file
descriptors. This mitigates the risk of hitting limits on the number of open file descriptors, particularly for
connections with a high number of streams. The cache is designed to automatically close files upon eviction.
"""

@cached(cache=self.CACHE)
def _open_file(path: Path) -> TextIO:
return open(path, "a")

try:
for _message in airbyte_messages:
if not isinstance(_message, AirbyteMessage):
continue
filepath, message = self._get_filepath_and_message(_message)
_open_file(self._output_directory / filepath).write(f"{message}\n")
finally:
for f in self.CACHE.values():
f.close()

def _get_filepath_and_message(self, message: AirbyteMessage) -> Tuple[str, str]:
if message.type == AirbyteMessageType.CATALOG:
return self.RELATIVE_CATALOGS_PATH, message.catalog.json()

elif message.type == AirbyteMessageType.CONNECTION_STATUS:
return self.RELATIVE_CONNECTION_STATUS_PATH, message.connectionStatus.json()

elif message.type == AirbyteMessageType.RECORD:
record = json.loads(message.record.json())
# TODO: once we have a comparator and/or database backend implemented we can remove this
for key_path in self.RECORD_PATHS_TO_POP:
pydash.objects.unset(record, key_path)
return f"{message.record.stream}_{self.RELATIVE_RECORDS_PATH}", json.dumps(record)

elif message.type == AirbyteMessageType.SPEC:
return self.RELATIVE_SPECS_PATH, message.spec.json()

elif message.type == AirbyteMessageType.STATE:
if message.state.stream and message.state.stream.stream_descriptor:
stream_name = message.state.stream.stream_descriptor.name
stream_namespace = message.state.stream.stream_descriptor.namespace
filepath = (
f"{stream_name}_{stream_namespace}_{self.RELATIVE_STATES_PATH}"
if stream_namespace
else f"{stream_name}_{self.RELATIVE_STATES_PATH}"
)
else:
filepath = f"_global_{self.RELATIVE_STATES_PATH}"
return filepath, message.state.json()

elif message.type == AirbyteMessageType.TRACE:
return self.RELATIVE_TRACES_PATH, message.trace.json()

elif message.type == AirbyteMessageType.LOG:
return self.RELATIVE_LOGS_PATH, message.log.json()

elif message.type == AirbyteMessageType.CONTROL:
return self.RELATIVE_CONTROLS_PATH, message.control.json()

raise NotImplementedError(f"No handling for AirbyteMessage type {message.type} has been implemented. This is unexpected.")
Loading