Skip to content

feat(Airbyte-ci): add command generate-erd-schema #43310

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions airbyte-ci/connectors/pipelines/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,62 @@ Do it just for a few connectors:
You can also set or set/change the title or body of the PR:
`airbyte-ci connectors --name source-aha --name source-quickbooks pull-request -m "upgrading connectors" -b ci_update/round2 --title "New title" --body "full body\n\ngoes here"`

### <a id="connectors-list-command"></a>`connectors generate-erd` command

Generates a couple of files and publish a new ERD to dbdocs. The generated files are:
* `<source code_directory>/erd/discovered_catalog.json`: the catalog used to generate the estimated relations and the dbml file
* `<source code_directory>/erd/estimated_relationships.json`: the output of the LLM trying to figure out the relationships between the different streams
* `<source code_directory>/erd/source.dbml`: the file used the upload the ERDs to dbdocs

Pre-requisites:
* The config file use to discover the catalog should be available in `<source code_directory>/secrets/config.json`
* The following secrets should be copy/pasted in `airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/generate_erd/pipeline.py`:
* Gemini API key: can be found [here](https://aistudio.google.com/app/apikey)
* dbdocs token: for Airbyte employee, use integration-test in our secret manager

#### Create initial diagram workflow or on connector's schema change

Steps
* Ensure the pre-requisites mentioned above are met
* Run `connectors --name=<source name> generate-erd`
* Create a PR with files `<source code_directory>/erd/estimated_relationships.json` and `<source code_directory>/erd/source.dbml` for documentation purposes

Expected Outcome
* The diagram is available in dbdocs
* `<source code_directory>/erd/estimated_relationships.json` and `<source code_directory>/erd/source.dbml` are updated on master

#### On manual validation

Steps
* If not exists, create file `<source code_directory>/erd/confirmed_relationships.json` with the following format and add:
* `relations` describes the relationships that we know exist
* `false_positives` describes the relationships the LLM found that we know do not exist
```
{
"streams": [
{
"name": <stream_name>,
"relations": {
<stream_name property>: "<target stream>.<target stream column>"
}
"false_positives": {
<stream_name property>: "<target stream>.<target stream column>"
}
},
<...>
]
}
```
* Ensure the pre-requisites mentioned above are met
* Run `connectors --name=<source name> generate-erd -x llm_relationships`
* Create a PR with files `<source code_directory>/erd/confirmed_relationships.json` and `<source code_directory>/erd/source.dbml` for documentation purposes

#### Options

| Option | Required | Default | Mapped environment variable | Description |
|------------------|----------| ------- |-----------------------------|-------------------------------------------------------------|
| `--skip-step/-x` | False | | | Skip steps by id e.g. `-x llm_relationships -x publish_erd` |

### <a id="format-subgroup"></a>`format` command subgroup

Available commands:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ def validate_environment(is_local: bool) -> None:
"migrate-to-poetry": "pipelines.airbyte_ci.connectors.migrate_to_poetry.commands.migrate_to_poetry",
"migrate-to-inline_schemas": "pipelines.airbyte_ci.connectors.migrate_to_inline_schemas.commands.migrate_to_inline_schemas",
"migrate-to-logging-logger": "pipelines.airbyte_ci.connectors.migrate_to_logging_logger.commands.migrate_to_logging_logger",
"generate-erd": "pipelines.airbyte_ci.connectors.generate_erd.commands.generate_erd",
"upgrade-cdk": "pipelines.airbyte_ci.connectors.upgrade_cdk.commands.upgrade_cdk",
"up-to-date": "pipelines.airbyte_ci.connectors.up_to_date.commands.up_to_date",
"pull-request": "pipelines.airbyte_ci.connectors.pull_request.commands.pull_request",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ class CONNECTOR_TEST_STEP_ID(str, Enum):
INLINE_MIGRATION = "migration_to_inline_schemas.migration"
AIRBYTE_LOGGER_CANDIDATE = "migration_to_logging_logger.candidate"
AIRBYTE_LOGGER_MIGRATION = "migration_to_logging_logger.migration"
LLM_RELATIONSHIPS = "llm_relationships"
DBML_FILE = "dbml_file"
PUBLISH_ERD = "publish_erd"
PULL_REQUEST_CREATE = "pull_request.create"
PULL_REQUEST_UPDATE = "pull_request.update"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
from typing import List

import asyncclick as click

from pipelines.airbyte_ci.connectors.consts import CONNECTOR_TEST_STEP_ID
from pipelines.airbyte_ci.connectors.generate_erd.pipeline import run_connector_generate_erd_pipeline
from pipelines.cli.dagger_pipeline_command import DaggerPipelineCommand
from pipelines.helpers.connectors.command import run_connector_pipeline
from pipelines.helpers.connectors.format import verify_formatters


@click.command(
cls=DaggerPipelineCommand,
short_help="Generate ERD",
)
@click.option(
"--report",
is_flag=True,
type=bool,
default=False,
help="Auto open report browser.",
)
@click.option(
"--skip-step",
"-x",
"skip_steps",
multiple=True,
type=click.Choice([step_id.value for step_id in CONNECTOR_TEST_STEP_ID]),
help="Skip a step by name. Can be used multiple times to skip multiple steps.",
)
@click.pass_context
async def generate_erd(ctx: click.Context, report: bool, skip_steps: List[str]) -> bool:
verify_formatters()
return await run_connector_pipeline(
ctx,
"Generate ERD schema",
report,
run_connector_generate_erd_pipeline,
skip_steps,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import yaml

from typing import Set, List, Union

from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import ManifestReferenceResolver
from pathlib import Path

from airbyte_protocol.models import AirbyteStream, AirbyteCatalog
from pydbml import Database
from pydbml.classes import Column, Index, Reference, Table

from pipelines.airbyte_ci.connectors.generate_erd.relationships import Relationships


class Source:
def __init__(self, source_folder: Path) -> None:
self._source_folder = source_folder

def is_dynamic(self, stream_name: str) -> bool:
"""
This method is a very flaky heuristic to know if a stream is dynamic or not. A stream will be considered dynamic if:
* The stream name is in the schemas folder
* The stream is within the manifest and the schema definition is `InlineSchemaLoader`
"""
manifest_static_streams = set()
if self._has_manifest():
with open(self._get_manifest_path()) as manifest_file:
resolved_manifest = ManifestReferenceResolver().preprocess_manifest(yaml.safe_load(manifest_file))
for stream in resolved_manifest["streams"]:
if stream["schema_loader"]["type"] == "InlineSchemaLoader":
name = stream["name"] if "name" in stream else stream.get("$parameters").get("name", None)
if not name:
print(f"Could not retrieve name for this stream: {stream}")
continue
manifest_static_streams.add(stream["name"] if "name" in stream else stream.get("$parameters").get("name", None))

return stream_name not in manifest_static_streams | self._get_streams_from_schemas_folder()

def _get_streams_from_schemas_folder(self) -> Set[str]:
schemas_folder = self._source_folder / self._source_folder.name.replace("-", "_") / "schemas"
return {p.name.replace(".json", "") for p in schemas_folder.iterdir() if p.is_file()} if schemas_folder.exists() else set()

def _get_manifest_path(self) -> Path:
return self._source_folder / self._source_folder.name.replace("-", "_") / "manifest.yaml"

def _has_manifest(self) -> bool:
return self._get_manifest_path().exists()


class DbmlAssembler:

def assemble(self, source: Source, discovered_catalog: AirbyteCatalog, relationships: Relationships) -> Database:
database = Database()
for stream in discovered_catalog.streams:
if source.is_dynamic(stream.name):
print(f"Skipping stream {stream.name} as it is dynamic")
continue

database.add(self._create_table(stream))

self._add_references(source, database, relationships)

return database

def _create_table(self, stream: AirbyteStream) -> Table:
dbml_table = Table(stream.name)
for property_name, property_information in stream.json_schema.get("properties").items():
try:
dbml_table.add_column(
Column(
name=property_name,
type=self._extract_type(property_information["type"]),
pk=self._is_pk(stream, property_name),
)
)
except (KeyError, ValueError) as exception:
print(f"Ignoring field {property_name}: {exception}")
continue

if stream.source_defined_primary_key and len(stream.source_defined_primary_key) > 1:
if any(map(lambda key: len(key) != 1, stream.source_defined_primary_key)):
raise ValueError(f"Does not support nested key as part of primary key `{stream.source_defined_primary_key}`")

composite_key_columns = [
column for key in stream.source_defined_primary_key for column in dbml_table.columns if column.name in key
]
if len(composite_key_columns) < len(stream.source_defined_primary_key):
raise ValueError("Unexpected error: missing PK column from dbml table")

dbml_table.add_index(
Index(
subjects=composite_key_columns,
pk=True,
)
)
return dbml_table

def _add_references(self, source: Source, database: Database, relationships: Relationships) -> None:
for stream in relationships["streams"]:
for column_name, relationship in stream["relations"].items():
if source.is_dynamic(stream["name"]):
print(f"Skipping relationship as stream {stream['name']} from relationship is dynamic")
continue

try:
target_table_name, target_column_name = relationship.split(".")
except ValueError as exception:
raise ValueError("If 'too many values to unpack', relationship to nested fields is not supported") from exception

if source.is_dynamic(target_table_name):
print(f"Skipping relationship as target stream {target_table_name} is dynamic")
continue

try:
database.add_reference(
Reference(
type="<>", # we don't have the information of which relationship type it is so we assume many-to-many for now
col1=self._get_column(database, stream["name"], column_name),
col2=self._get_column(database, target_table_name, target_column_name),
)
)
except ValueError as exception:
print(f"Skipping relationship: {exception}")

def _extract_type(self, property_type: Union[str, List[str]]) -> str:
if isinstance(property_type, str):
return property_type

types = list(property_type)
if "null" in types:
# As we flag everything as nullable (except PK and cursor field), there is little value in keeping the information in order to
# show this in DBML
types.remove("null")
if len(types) != 1:
raise ValueError(f"Expected only one type apart from `null` but got {len(types)}: {property_type}")
return types[0]

def _is_pk(self, stream: AirbyteStream, property_name: str) -> bool:
return stream.source_defined_primary_key == [[property_name]]

def _get_column(self, database: Database, table_name: str, column_name: str) -> Column:
matching_tables = list(filter(lambda dbml_table: dbml_table.name == table_name, database.tables))
if len(matching_tables) == 0:
raise ValueError(f"Could not find table {table_name}")
elif len(matching_tables) > 1:
raise ValueError(f"Unexpected error: many tables found with name {table_name}")

table: Table = matching_tables[0]
matching_columns = list(filter(lambda column: column.name == column_name, table.columns))
if len(matching_columns) == 0:
raise ValueError(f"Could not find column {column_name} in table {table_name}. Columns are: {table.columns}")
elif len(matching_columns) > 1:
raise ValueError(f"Unexpected error: many columns found with name {column_name} for table {table_name}")

return matching_columns[0]
Loading
Loading