Skip to content

Commit 5477231

Browse files
clnollalafanechere
authored andcommitted
live-tests: debug mode and initial regression tests framework (#35624)
Co-authored-by: alafanechere <[email protected]> Co-authored-by: Augustin <[email protected]>
1 parent 30b8fef commit 5477231

File tree

22 files changed

+2097
-2
lines changed

22 files changed

+2097
-2
lines changed

.github/workflows/airbyte-ci-tests.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ jobs:
3838
- airbyte-ci/connectors/connector_ops/**
3939
- airbyte-ci/connectors/connectors_qa/**
4040
- airbyte-ci/connectors/ci_credentials/**
41+
- airbyte-ci/connectors/live-tests/**
4142
- airbyte-ci/connectors/metadata_service/lib/**
4243
- airbyte-ci/connectors/metadata_service/orchestrator/**
4344
- airbyte-integrations/bases/connector-acceptance-test/**

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ static_checker_reports/
1616
# Logs
1717
acceptance_tests_logs/
1818
airbyte_ci_logs/
19+
live_tests_debug_reports/
1920

2021
# Secrets
2122
secrets
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
# Connector Live Testing
2+
3+
This project contains utilities for running connector tests against live data.
4+
5+
## Requirements
6+
* `docker`
7+
* `Python ^3.10`
8+
* `pipx`
9+
* `poetry`
10+
11+
## Install
12+
```bash
13+
# From airbyte-ci/connectors/live-tests
14+
pipx install .
15+
# To install in editable mode for development
16+
pipx install . --force --editable
17+
```
18+
19+
## Commands
20+
21+
### `debug`
22+
23+
```
24+
Usage: live-tests debug [OPTIONS] COMMAND
25+
26+
Run a specific command on one or multiple connectors and persists the
27+
outputs to local storage.
28+
29+
Options:
30+
-c, --connector-image TEXT Docker image name of the connector to debug
31+
(e.g. `source-faker:latest`, `source-
32+
faker:dev`) [required]
33+
-o, --output-directory DIRECTORY
34+
Directory in which connector output and test
35+
results should be stored.
36+
Defaults to the current directory.
37+
--config-path FILE Path to the connector config.
38+
--catalog-path FILE Path to the connector catalog.
39+
--state-path FILE Path to the connector state.
40+
-hc, --http-cache Use the HTTP cache for the connector.
41+
--help Show this message and exit.
42+
```
43+
44+
This command is made to run any of the following connector commands against one or multiple connector images.
45+
46+
**Available connector commands:**
47+
* `spec`
48+
* `check`
49+
* `discover`
50+
* `read` or `read_with_state` (requires a `--state-path` to be passed)
51+
52+
It will write artifacts to an output directory:
53+
* `stdout.log`: The collected standard output following the command execution
54+
* `stderr.log`: The collected standard error following the c
55+
* `http_dump.txt`: An `mitmproxy` http stream log. Can be consumed with `mitmweb` (version `9.0.1`) for debugging.
56+
57+
#### Example
58+
Let's run `debug` to check the output of `read` on two different versions of the same connector:
59+
60+
```bash
61+
live-tests debug read \
62+
--connector-image=airbyte/source-pokeapi:dev \
63+
--connector-image=airbyte/source-pokeapi:latest \
64+
--config-path=poke_config.json \
65+
--catalog-path=configured_catalog.json
66+
```
67+
68+
It will store the results in a `live_test_debug_reports` directory under the current working directory:
69+
70+
```
71+
live_tests_debug_reports
72+
└── 1709547771
73+
└── source-pokeapi
74+
└── read
75+
├── dev
76+
│   ├── airbyte_messages
77+
│   │   ├── logs.jsonl
78+
│   │   ├── pokemon_records.jsonl
79+
│   │   └── traces.jsonl
80+
│   ├── http_dump.mitm
81+
│   ├── stderr.log
82+
│   └── stdout.log
83+
└── latest
84+
├── airbyte_messages
85+
│   ├── logs.jsonl
86+
│   ├── pokemon_records.jsonl
87+
│   └── traces.jsonl
88+
├── http_dump.mitm
89+
├── stderr.log
90+
└── stdout.log
91+
92+
```
93+
94+
##### Consuming `http_dump.mitm`
95+
You can install [`mitmproxy`](https://mitmproxy.org/):
96+
```bash
97+
pipx install mitmproxy
98+
```
99+
100+
And run:
101+
```bash
102+
mitmweb --rfile=http_dump.mitm
103+
```
104+
105+
## Changelog
106+
107+
### 0.1.0
108+
Implement initial primitives and a `debug` command to run connector commands and persist the outputs to local storage.

airbyte-ci/connectors/live-tests/poetry.lock

Lines changed: 1061 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
[build-system]
2+
requires = ["poetry-core>=1.0.0"]
3+
build-backend = "poetry.core.masonry.api"
4+
5+
[tool.poetry]
6+
name = "live-tests"
7+
version = "0.1.0"
8+
description = "Contains utilities for testing connectors against live data."
9+
authors = ["Airbyte <[email protected]>"]
10+
license = "MIT"
11+
homepage = "https://github.com/airbytehq/airbyte"
12+
readme = "README.md"
13+
packages = [
14+
{ include = "live_tests", from = "src" },
15+
]
16+
17+
[tool.poetry.dependencies]
18+
python = "^3.10"
19+
airbyte-protocol-models = "<1.0.0"
20+
cachetools = "~=5.3.3"
21+
dagger-io = "==0.9.6"
22+
pydantic = "*"
23+
pytest = "~=8.0.2"
24+
pytest-asyncio = "~=0.23.5"
25+
pydash = "~=7.0.7"
26+
docker = ">=6,<7"
27+
asyncclick = "^8.1.7.1"
28+
29+
[tool.poetry.scripts]
30+
live-tests = "live_tests.cli:live_tests"
31+
32+
[tool.poetry.group.dev.dependencies]
33+
ruff = "^0.3.0"
34+
mypy = "^1.8.0"
35+
types-cachetools = "^5.3.0.7"
36+
37+
[tool.poe.tasks]
38+
test = "pytest tests"
39+
lint = "ruff check src"
40+
type_check = "mypy src"
41+
42+
[tool.airbyte_ci]
43+
poe_tasks = ["test", "lint", "type_check"]
44+
45+
[tool.pytest.ini_options]
46+
pythonpath = ["src"]
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
2+
3+
import asyncclick as click
4+
from live_tests.debug.cli import debug_cmd
5+
6+
7+
@click.group()
8+
@click.pass_context
9+
async def live_tests(ctx):
10+
pass
11+
12+
13+
live_tests.add_command(debug_cmd)
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
2+
3+
from .base_backend import BaseBackend
4+
from .file_backend import FileBackend
5+
6+
__all__ = ["BaseBackend", "FileBackend"]
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
2+
3+
from abc import ABC, abstractmethod
4+
from typing import Iterable
5+
6+
from airbyte_protocol.models import AirbyteMessage # type: ignore
7+
8+
9+
class BaseBackend(ABC):
10+
"""
11+
Interface to be shared between the file backend and the database backend(s)
12+
"""
13+
14+
@abstractmethod
15+
async def write(self, airbyte_messages: Iterable[AirbyteMessage]) -> None:
16+
...
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
2+
3+
import json
4+
from pathlib import Path
5+
from typing import Iterable, TextIO, Tuple
6+
7+
import pydash
8+
from airbyte_protocol.models import AirbyteMessage # type: ignore
9+
from airbyte_protocol.models import Type as AirbyteMessageType
10+
from cachetools import LRUCache, cached
11+
from live_tests.commons.backends.base_backend import BaseBackend
12+
13+
14+
class FileDescriptorLRUCache(LRUCache):
15+
def popitem(self):
16+
filepath, fd = LRUCache.popitem(self)
17+
fd.close() # Close the file descriptor when it's evicted from the cache
18+
return filepath, fd
19+
20+
21+
class FileBackend(BaseBackend):
22+
RELATIVE_CATALOGS_PATH = "catalog.jsonl"
23+
RELATIVE_CONNECTION_STATUS_PATH = "connection_status.jsonl"
24+
RELATIVE_RECORDS_PATH = "records.jsonl"
25+
RELATIVE_SPECS_PATH = "spec.jsonl"
26+
RELATIVE_STATES_PATH = "states.jsonl"
27+
RELATIVE_TRACES_PATH = "traces.jsonl"
28+
RELATIVE_LOGS_PATH = "logs.jsonl"
29+
RELATIVE_CONTROLS_PATH = "controls.jsonl"
30+
RECORD_PATHS_TO_POP = ["emitted_at"]
31+
CACHE = FileDescriptorLRUCache(maxsize=250)
32+
33+
def __init__(self, output_directory: Path):
34+
self._output_directory = output_directory
35+
36+
async def write(self, airbyte_messages: Iterable[AirbyteMessage]):
37+
"""
38+
Write AirbyteMessages to the appropriate file.
39+
40+
Catalogs, connection status messages, specs, trace messages, logs, and control messages are all written to their
41+
own file (e.g. "catalog.jsonl", "spec.jsonl").
42+
43+
Records and state messages are further subdivided, with one file per stream (e.g. "my_stream_records.jsonl",
44+
"my_stream_states.jsonl"). Streams with global state are stored in a "_global_states.jsonl" file.
45+
46+
We use an LRU cache here to manage open file objects, in order to limit the number of concurrently open file
47+
descriptors. This mitigates the risk of hitting limits on the number of open file descriptors, particularly for
48+
connections with a high number of streams. The cache is designed to automatically close files upon eviction.
49+
"""
50+
51+
@cached(cache=self.CACHE)
52+
def _open_file(path: Path) -> TextIO:
53+
return open(path, "a")
54+
55+
try:
56+
for _message in airbyte_messages:
57+
if not isinstance(_message, AirbyteMessage):
58+
continue
59+
filepath, message = self._get_filepath_and_message(_message)
60+
_open_file(self._output_directory / filepath).write(f"{message}\n")
61+
finally:
62+
for f in self.CACHE.values():
63+
f.close()
64+
65+
def _get_filepath_and_message(self, message: AirbyteMessage) -> Tuple[str, str]:
66+
if message.type == AirbyteMessageType.CATALOG:
67+
return self.RELATIVE_CATALOGS_PATH, message.catalog.json()
68+
69+
elif message.type == AirbyteMessageType.CONNECTION_STATUS:
70+
return self.RELATIVE_CONNECTION_STATUS_PATH, message.connectionStatus.json()
71+
72+
elif message.type == AirbyteMessageType.RECORD:
73+
record = json.loads(message.record.json())
74+
# TODO: once we have a comparator and/or database backend implemented we can remove this
75+
for key_path in self.RECORD_PATHS_TO_POP:
76+
pydash.objects.unset(record, key_path)
77+
return f"{message.record.stream}_{self.RELATIVE_RECORDS_PATH}", json.dumps(record)
78+
79+
elif message.type == AirbyteMessageType.SPEC:
80+
return self.RELATIVE_SPECS_PATH, message.spec.json()
81+
82+
elif message.type == AirbyteMessageType.STATE:
83+
if message.state.stream and message.state.stream.stream_descriptor:
84+
stream_name = message.state.stream.stream_descriptor.name
85+
stream_namespace = message.state.stream.stream_descriptor.namespace
86+
filepath = (
87+
f"{stream_name}_{stream_namespace}_{self.RELATIVE_STATES_PATH}"
88+
if stream_namespace
89+
else f"{stream_name}_{self.RELATIVE_STATES_PATH}"
90+
)
91+
else:
92+
filepath = f"_global_{self.RELATIVE_STATES_PATH}"
93+
return filepath, message.state.json()
94+
95+
elif message.type == AirbyteMessageType.TRACE:
96+
return self.RELATIVE_TRACES_PATH, message.trace.json()
97+
98+
elif message.type == AirbyteMessageType.LOG:
99+
return self.RELATIVE_LOGS_PATH, message.log.json()
100+
101+
elif message.type == AirbyteMessageType.CONTROL:
102+
return self.RELATIVE_CONTROLS_PATH, message.control.json()
103+
104+
raise NotImplementedError(f"No handling for AirbyteMessage type {message.type} has been implemented. This is unexpected.")

0 commit comments

Comments
 (0)