Skip to content

feat(Airbyte-ci): add command generate-erd-schema #43310

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions airbyte-ci/connectors/erd/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# erd

A collection of utilities for generating ERDs.

# Setup

## Installation

`erd` tools use [Poetry](https://github.com/python-poetry/poetry) to manage dependencies,
and targets Python 3.10 and higher.

Assuming you're in Airbyte repo root:

```bash
cd airbyte-ci/connectors/erd
poetry install
```

## Usage

Pre-requisites:
* Env variable `GENAI_API_KEY`. Can be found at URL https://aistudio.google.com/app/apikey

`poetry run erd --source-path <source path>`

The script supports the option to ignore the LLM generation by passing parameter `--skip-llm-relationships`

## Contributing to `erd`

### Running tests

To run tests locally:

```bash
poetry run pytest
```

## Changelog
- 0.1.0: Initial commit
2,001 changes: 2,001 additions & 0 deletions airbyte-ci/connectors/erd/poetry.lock

Large diffs are not rendered by default.

46 changes: 46 additions & 0 deletions airbyte-ci/connectors/erd/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "erd"
version = "0.1.0"
description = "Contains utilities for generating ERDs."
authors = ["Airbyte <[email protected]>"]
license = "MIT"
homepage = "https://github.com/airbytehq/airbyte"
readme = "README.md"
packages = [
{ include = "erd", from = "src" },
]

[tool.poetry.dependencies]
python = "^3.10,<3.12"
airbyte-cdk = "*"
click = "^8.1.3"
dpath = "^2.1.6"
google-generativeai = "^0.7.2"
markdown-it-py = ">=2.2.0"
pydbml = "^1.1.0"
pytest = "^8.1.1"

[tool.poetry.group.dev.dependencies]
ruff = "^0.3.0"
mypy = "^1.8.0"

[tool.ruff.lint]
select = ["I", "F"]

[tool.ruff.lint.isort]
known-first-party = ["connection-retriever"]

[tool.poe.tasks]
test = "pytest tests"
type_check = "mypy src --disallow-untyped-defs"
pre-push = []

[tool.poetry.scripts]
erd = "erd.erd_service:main"

[tool.airbyte_ci]
python_versions = ["3.10"]
1 change: 1 addition & 0 deletions airbyte-ci/connectors/erd/src/erd/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
153 changes: 153 additions & 0 deletions airbyte-ci/connectors/erd/src/erd/dbml_assembler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

from pathlib import Path
from typing import List, Set, Union

import yaml
from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import ManifestReferenceResolver
from airbyte_protocol.models import AirbyteCatalog, AirbyteStream
from erd.relationships import Relationships
from pydbml import Database
from pydbml.classes import Column, Index, Reference, Table


class Source:
def __init__(self, source_folder: Path) -> None:
self._source_folder = source_folder

def is_dynamic(self, stream_name: str) -> bool:
"""
This method is a very flaky heuristic to know if a stream is dynamic or not. A stream will be considered dynamic if:
* The stream name is in the schemas folder
* The stream is within the manifest and the schema definition is `InlineSchemaLoader`
"""
manifest_static_streams = set()
if self._has_manifest():
with open(self._get_manifest_path()) as manifest_file:
resolved_manifest = ManifestReferenceResolver().preprocess_manifest(yaml.safe_load(manifest_file))
for stream in resolved_manifest["streams"]:
if stream["schema_loader"]["type"] == "InlineSchemaLoader":
name = stream["name"] if "name" in stream else stream.get("$parameters").get("name", None)
if not name:
print(f"Could not retrieve name for this stream: {stream}")
continue
manifest_static_streams.add(stream["name"] if "name" in stream else stream.get("$parameters").get("name", None))

return stream_name not in manifest_static_streams | self._get_streams_from_schemas_folder()

def _get_streams_from_schemas_folder(self) -> Set[str]:
schemas_folder = self._source_folder / self._source_folder.name.replace("-", "_") / "schemas"
return {p.name.replace(".json", "") for p in schemas_folder.iterdir() if p.is_file()} if schemas_folder.exists() else set()

def _get_manifest_path(self) -> Path:
return self._source_folder / self._source_folder.name.replace("-", "_") / "manifest.yaml"

def _has_manifest(self) -> bool:
return self._get_manifest_path().exists()


class DbmlAssembler:
def assemble(self, source: Source, discovered_catalog: AirbyteCatalog, relationships: Relationships) -> Database:
database = Database()
for stream in discovered_catalog.streams:
if source.is_dynamic(stream.name):
print(f"Skipping stream {stream.name} as it is dynamic")
continue

database.add(self._create_table(stream))

self._add_references(source, database, relationships)

return database

def _create_table(self, stream: AirbyteStream) -> Table:
dbml_table = Table(stream.name)
for property_name, property_information in stream.json_schema.get("properties").items():
try:
dbml_table.add_column(
Column(
name=property_name,
type=self._extract_type(property_information["type"]),
pk=self._is_pk(stream, property_name),
)
)
except (KeyError, ValueError) as exception:
print(f"Ignoring field {property_name}: {exception}")
continue

if stream.source_defined_primary_key and len(stream.source_defined_primary_key) > 1:
if any(map(lambda key: len(key) != 1, stream.source_defined_primary_key)):
raise ValueError(f"Does not support nested key as part of primary key `{stream.source_defined_primary_key}`")

composite_key_columns = [
column for key in stream.source_defined_primary_key for column in dbml_table.columns if column.name in key
]
if len(composite_key_columns) < len(stream.source_defined_primary_key):
raise ValueError("Unexpected error: missing PK column from dbml table")

dbml_table.add_index(
Index(
subjects=composite_key_columns,
pk=True,
)
)
return dbml_table

def _add_references(self, source: Source, database: Database, relationships: Relationships) -> None:
for stream in relationships["streams"]:
for column_name, relationship in stream["relations"].items():
if source.is_dynamic(stream["name"]):
print(f"Skipping relationship as stream {stream['name']} from relationship is dynamic")
continue

try:
target_table_name, target_column_name = relationship.split(".")
except ValueError as exception:
raise ValueError("If 'too many values to unpack', relationship to nested fields is not supported") from exception

if source.is_dynamic(target_table_name):
print(f"Skipping relationship as target stream {target_table_name} is dynamic")
continue

try:
database.add_reference(
Reference(
type="<>", # we don't have the information of which relationship type it is so we assume many-to-many for now
col1=self._get_column(database, stream["name"], column_name),
col2=self._get_column(database, target_table_name, target_column_name),
)
)
except ValueError as exception:
print(f"Skipping relationship: {exception}")

def _extract_type(self, property_type: Union[str, List[str]]) -> str:
if isinstance(property_type, str):
return property_type

types = list(property_type)
if "null" in types:
# As we flag everything as nullable (except PK and cursor field), there is little value in keeping the information in order to
# show this in DBML
types.remove("null")
if len(types) != 1:
raise ValueError(f"Expected only one type apart from `null` but got {len(types)}: {property_type}")
return types[0]

def _is_pk(self, stream: AirbyteStream, property_name: str) -> bool:
return stream.source_defined_primary_key == [[property_name]]

def _get_column(self, database: Database, table_name: str, column_name: str) -> Column:
matching_tables = list(filter(lambda dbml_table: dbml_table.name == table_name, database.tables))
if len(matching_tables) == 0:
raise ValueError(f"Could not find table {table_name}")
elif len(matching_tables) > 1:
raise ValueError(f"Unexpected error: many tables found with name {table_name}")

table: Table = matching_tables[0]
matching_columns = list(filter(lambda column: column.name == column_name, table.columns))
if len(matching_columns) == 0:
raise ValueError(f"Could not find column {column_name} in table {table_name}. Columns are: {table.columns}")
elif len(matching_columns) > 1:
raise ValueError(f"Unexpected error: many columns found with name {column_name} for table {table_name}")

return matching_columns[0]
Loading
Loading