Skip to content

Commit e2547ff

Browse files
author
Joe Reuter
authored
CDK: Add schema inferrer class (#20941)
* fix stuff * Update schema_inferrer.py * Update schema_inferrer.py * bump version * review comments * code style * fix formatting * improve tests
1 parent fc05f65 commit e2547ff

File tree

5 files changed

+165
-2
lines changed

5 files changed

+165
-2
lines changed

airbyte-cdk/python/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
# Changelog
22

3+
## 0.17.0
4+
Add utility class to infer schemas from real records
5+
36
## 0.16.3
47
Do not eagerly refresh access token in `SingleUseRefreshTokenOauth2Authenticator` [#20923](https://github.com/airbytehq/airbyte/pull/20923)
58

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#
22
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
33
#
4+
5+
from .schema_inferrer import SchemaInferrer
46
from .traced_exception import AirbyteTracedException
57

6-
__all__ = ["AirbyteTracedException"]
8+
__all__ = ["AirbyteTracedException", "SchemaInferrer"]
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
#
2+
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from collections import defaultdict
6+
from typing import Any, Dict, List, Optional, Union
7+
8+
from airbyte_cdk.models import AirbyteRecordMessage
9+
from genson import SchemaBuilder
10+
from genson.schema.strategies.object import Object
11+
12+
13+
class NoRequiredObj(Object):
14+
"""
15+
This class has Object behaviour, but it does not generate "required[]" fields
16+
every time it parses object. So we dont add unnecessary extra field.
17+
"""
18+
19+
def to_schema(self):
20+
schema = super(NoRequiredObj, self).to_schema()
21+
schema.pop("required", None)
22+
return schema
23+
24+
25+
class NoRequiredSchemaBuilder(SchemaBuilder):
26+
EXTRA_STRATEGIES = (NoRequiredObj,)
27+
28+
29+
# This type is inferred from the genson lib, but there is no alias provided for it - creating it here for type safety
30+
InferredSchema = Dict[str, Union[str, Any, List, List[Dict[str, Union[Any, List]]]]]
31+
32+
33+
class SchemaInferrer:
34+
"""
35+
This class is used to infer a JSON schema which fits all the records passed into it
36+
throughout its lifecycle via the accumulate method.
37+
38+
Instances of this class are stateful, meaning they build their inferred schemas
39+
from every record passed into the accumulate method.
40+
41+
"""
42+
43+
stream_to_builder: Dict[str, SchemaBuilder]
44+
45+
def __init__(self):
46+
self.stream_to_builder = defaultdict(NoRequiredSchemaBuilder)
47+
48+
def accumulate(self, record: AirbyteRecordMessage):
49+
"""Uses the input record to add to the inferred schemas maintained by this object"""
50+
self.stream_to_builder[record.stream].add_object(record.data)
51+
52+
def get_inferred_schemas(self) -> Dict[str, InferredSchema]:
53+
"""
54+
Returns the JSON schemas for all encountered streams inferred by inspecting all records
55+
passed via the accumulate method
56+
"""
57+
schemas = {}
58+
for stream_name, builder in self.stream_to_builder.items():
59+
schemas[stream_name] = builder.to_schema()
60+
return schemas
61+
62+
def get_stream_schema(self, stream_name: str) -> Optional[InferredSchema]:
63+
"""
64+
Returns the inferred JSON schema for the specified stream. Might be `None` if there were no records for the given stream name.
65+
"""
66+
return self.stream_to_builder[stream_name].to_schema() if stream_name in self.stream_to_builder else None

airbyte-cdk/python/setup.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
setup(
1717
name="airbyte-cdk",
18-
version="0.16.3",
18+
version="0.17.0",
1919
description="A framework for writing Airbyte Connectors.",
2020
long_description=README,
2121
long_description_content_type="text/markdown",
@@ -51,6 +51,7 @@
5151
"jsonschema~=3.2.0",
5252
"jsonref~=0.2",
5353
"pendulum",
54+
"genson==1.2.2",
5455
"pydantic~=1.9.2",
5556
"python-dateutil",
5657
"PyYAML~=5.4",
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
#
2+
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from typing import List, Mapping
6+
7+
import pytest
8+
from airbyte_cdk.models.airbyte_protocol import AirbyteRecordMessage
9+
from airbyte_cdk.utils.schema_inferrer import SchemaInferrer
10+
11+
NOW = 1234567
12+
13+
14+
@pytest.mark.parametrize(
15+
"input_records,expected_schemas",
16+
[
17+
pytest.param(
18+
[
19+
{"stream": "my_stream", "data": {"field_A": "abc"}},
20+
{"stream": "my_stream", "data": {"field_A": "def"}},
21+
],
22+
{"my_stream": {"field_A": {"type": "string"}}},
23+
id="test_basic",
24+
),
25+
pytest.param(
26+
[
27+
{"stream": "my_stream", "data": {"field_A": 1.0}},
28+
{"stream": "my_stream", "data": {"field_A": "abc"}},
29+
],
30+
{"my_stream": {"field_A": {"type": ["number", "string"]}}},
31+
id="test_deriving_schema_refine",
32+
),
33+
pytest.param(
34+
[
35+
{"stream": "my_stream", "data": {"obj": {"data": [1.0, 2.0, 3.0]}}},
36+
{"stream": "my_stream", "data": {"obj": {"other_key": "xyz"}}},
37+
],
38+
{
39+
"my_stream": {
40+
"obj": {
41+
"type": "object",
42+
"properties": {
43+
"data": {"type": "array", "items": {"type": "number"}},
44+
"other_key": {"type": "string"},
45+
},
46+
}
47+
}
48+
},
49+
id="test_derive_schema_for_nested_structures",
50+
),
51+
],
52+
)
53+
def test_schema_derivation(input_records: List, expected_schemas: Mapping):
54+
inferrer = SchemaInferrer()
55+
for record in input_records:
56+
inferrer.accumulate(AirbyteRecordMessage(stream=record["stream"], data=record["data"], emitted_at=NOW))
57+
58+
for stream_name, expected_schema in expected_schemas.items():
59+
assert inferrer.get_inferred_schemas()[stream_name] == {
60+
"$schema": "http://json-schema.org/schema#",
61+
"type": "object",
62+
"properties": expected_schema,
63+
}
64+
65+
66+
def test_deriving_schema_multiple_streams():
67+
inferrer = SchemaInferrer()
68+
inferrer.accumulate(AirbyteRecordMessage(stream="my_stream", data={"field_A": 1.0}, emitted_at=NOW))
69+
inferrer.accumulate(AirbyteRecordMessage(stream="my_stream2", data={"field_A": "abc"}, emitted_at=NOW))
70+
inferred_schemas = inferrer.get_inferred_schemas()
71+
assert inferred_schemas["my_stream"] == {
72+
"$schema": "http://json-schema.org/schema#",
73+
"type": "object",
74+
"properties": {"field_A": {"type": "number"}},
75+
}
76+
assert inferred_schemas["my_stream2"] == {
77+
"$schema": "http://json-schema.org/schema#",
78+
"type": "object",
79+
"properties": {"field_A": {"type": "string"}},
80+
}
81+
82+
83+
def test_get_individual_schema():
84+
inferrer = SchemaInferrer()
85+
inferrer.accumulate(AirbyteRecordMessage(stream="my_stream", data={"field_A": 1.0}, emitted_at=NOW))
86+
assert inferrer.get_stream_schema("my_stream") == {
87+
"$schema": "http://json-schema.org/schema#",
88+
"type": "object",
89+
"properties": {"field_A": {"type": "number"}},
90+
}
91+
assert inferrer.get_stream_schema("another_stream") is None

0 commit comments

Comments
 (0)