Skip to content

Feat/kafka schema registry integration #1959

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ Please visit [Architecture](https://www.amundsen.io/amundsen/architecture/) for
- [Elasticsearch](https://www.elastic.co/)
- [Google BigQuery](https://cloud.google.com/bigquery)
- [IBM DB2](https://www.ibm.com/analytics/db2)
- [Kafka Schema Registry](https://docs.confluent.io/platform/current/schema-registry/index.html)
- [Microsoft SQL Server](https://www.microsoft.com/en-us/sql-server/default.aspx)
- [MySQL](https://www.mysql.com/)
- [Oracle](https://www.oracle.com/index.html) (through dbapi or sql_alchemy)
Expand Down
22 changes: 22 additions & 0 deletions databuilder/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1655,6 +1655,28 @@ job = DefaultJob(
job.launch()
```

#### [KafkaSchemaRegistryExtractor](https://github.com/amundsen-io/amundsen/blob/main/databuilder/databuilder/extractor/kafka_schema_registry_extractor.py "KafkaSchemaRegistryExtractor")

An extractor that extracts schema metadata Confluent Kafka Schema registry with Avro format.

A sample job config is shown below.

```python
job_config = ConfigFactory.from_dict({
f"extractor.kafka_schema_registry.{KafkaSchemaRegistryExtractor.REGISTRY_URL_KEY}": "http://localhost:8081",
f"extractor.kafka_schema_registry.{KafkaSchemaRegistryExtractor.REGISTRY_USERNAME_KEY}": "username",
f"extractor.kafka_schema_registry.{KafkaSchemaRegistryExtractor.REGISTRY_PASSWORD_KEY}": "password",
})
job = DefaultJob(
conf=job_config,
task=DefaultTask(
extractor=KafkaSchemaRegistryExtractor(),
loader=AnyLoader()))
job.launch()
```

**Note: username and password are not mandatory. Only provide if you schema registry need authorization.**

## List of transformers

Transformers are implemented by subclassing [Transformer](https://github.com/amundsen-io/amundsen/blob/main/databuilder/databuilder/transformer/base_transformer.py#L12 "Transformer") and implementing `transform(self, record)`. A transformer can:
Expand Down
181 changes: 181 additions & 0 deletions databuilder/databuilder/extractor/kafka_schema_registry_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
import logging
from asyncio.log import logger
from typing import (
Any, Dict, Iterator, List, Optional, Union,
)

from pyhocon import ConfigTree
from schema_registry.client import Auth, SchemaRegistryClient
from schema_registry.client.utils import SchemaVersion

from databuilder.extractor.base_extractor import Extractor
from databuilder.models.table_metadata import ColumnMetadata, TableMetadata

LOGGER = logging.getLogger(__name__)


class KafkaSchemaRegistryExtractor(Extractor):
"""
Extracts the latest version of all schemas from a given
Kafka Schema Registry URL
"""

REGISTRY_URL_KEY = "registry_url"
REGISTRY_USERNAME_KEY = "registry_username"
REGISTRY_PASSWORD_KEY = "registry_password"

def init(self, conf: ConfigTree) -> None:
self._registry_base_url = conf.get(
KafkaSchemaRegistryExtractor.REGISTRY_URL_KEY
)

self._registry_username = conf.get(
KafkaSchemaRegistryExtractor.REGISTRY_USERNAME_KEY, None
)

self._registry_password = conf.get(
KafkaSchemaRegistryExtractor.REGISTRY_PASSWORD_KEY, None
)

# Add authentication if user and password are provided
if all((self._registry_username, self._registry_password)):
self._client = SchemaRegistryClient(
url=self._registry_base_url,
auth=Auth(
username=self._registry_username,
password=self._registry_password
)
)
else:
self._client = SchemaRegistryClient(
url=self._registry_base_url,
)

self._extract_iter: Union[None, Iterator] = None

def extract(self) -> Union[TableMetadata, None]:
if not self._extract_iter:
self._extract_iter = self._get_extract_iter()
try:
return next(self._extract_iter)
except StopIteration:
return None
except Exception as e:
logger.error(f'Failed to generate next table: {e}')
return None

def get_scope(self) -> str:
return 'extractor.kafka_schema_registry'

def _get_extract_iter(self) -> Optional[Iterator[TableMetadata]]:
"""
Return an iterator generating TableMetadata for all of the schemas.
"""
for schema_version in self._get_raw_extract_iter():
subject = schema_version.subject
schema = schema_version.schema.raw_schema
LOGGER.info((f'Subject: {subject}, '
f'Schema: {schema}'))

try:
yield KafkaSchemaRegistryExtractor._create_table(
schema=schema,
subject_name=subject,
cluster_name=schema.get(
'namespace', 'kafka-schema-registry'
),
schema_name=schema.get('name', ''),
schema_description=schema.get('doc', None),
)
except Exception as e:
logger.warning(f'Failed to generate table for {subject}: {e}')
continue

def _get_raw_extract_iter(self) -> Iterator[SchemaVersion]:
"""
Return iterator of results row from schema registry
"""
subjects = self._client.get_subjects()

LOGGER.info(f'Number of extracted subjects: {len(subjects)}')
LOGGER.info(f'Extracted subjects: {subjects}')

for subj in subjects:
subj_schema = self._client.get_schema(subj)
LOGGER.info(f'Subject <{subj}> max version: {subj_schema.version}')

yield subj_schema

@staticmethod
def _create_table(
schema: Dict[str, Any],
subject_name: str,
cluster_name: str,
schema_name: str,
schema_description: str,
) -> Optional[TableMetadata]:
"""
Create TableMetadata based on given schema and names
"""
columns: List[ColumnMetadata] = []

for i, field in enumerate(schema['fields']):
columns.append(
ColumnMetadata(
name=field['name'],
description=field.get('doc', None),
col_type=KafkaSchemaRegistryExtractor._get_property_type(
field
),
sort_order=i,
)
)

return TableMetadata(
database='kafka_schema_registry',
cluster=cluster_name,
schema=subject_name,
name=schema_name,
description=schema_description,
columns=columns,
)

@staticmethod
def _get_property_type(schema: Dict) -> str:
"""
Return type of the given schema.
It will also works for nested schema types.
"""
if 'type' not in schema:
return 'object'

if type(schema['type']) is dict:
return KafkaSchemaRegistryExtractor._get_property_type(
schema['type']
)

# If schema can have multiple types
if type(schema['type']) is list:
return '|'.join(schema['type'])

if schema['type'] == 'record':
properties = [
f"{field['name']}:"
f"{KafkaSchemaRegistryExtractor._get_property_type(field)}"
for field in schema.get('fields', {})
]
if len(properties) > 0:
if 'name' in schema:
return schema['name'] + \
':struct<' + ','.join(properties) + '>'
return 'struct<' + ','.join(properties) + '>'
return 'struct<object>'
elif schema['type'] == 'array':
items = KafkaSchemaRegistryExtractor._get_property_type(
schema.get("items", {})
)
return 'array<' + items + '>'
else:
return schema['type']
7 changes: 6 additions & 1 deletion databuilder/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,13 @@
'teradatasqlalchemy==17.0.0.0'
]

schema_registry = [
'python-schema-registry-client==2.4.0'
]

all_deps = requirements + requirements_dev + kafka + cassandra + glue + snowflake + athena + \
bigquery + jsonpath + db2 + dremio + druid + spark + feast + neptune + rds \
+ atlas + salesforce + oracle + teradata
+ atlas + salesforce + oracle + teradata + schema_registry

setup(
name='amundsen-databuilder',
Expand Down Expand Up @@ -132,6 +136,7 @@
'salesforce': salesforce,
'oracle': oracle,
'teradata': teradata,
'schema_registry': schema_registry,
},
classifiers=[
'Programming Language :: Python :: 3.7',
Expand Down
Loading