Skip to content

Commit 2caf390

Browse files
🎉 MySQL destination: normalization (#4163)
* Add mysql dbt package * Add mysql normalization support in java * Add mysql normalization support in python * Fix unit tests * Update readme * Setup mysql container in integration test * Add macros * Depend on dbt-mysql from git repo * Remove mysql limitation test * Test normalization * Revert protocol format change * Fix mysel json macros * Fix two more macros * Fix table name length * Fix array macro * Fix equality test macro * Update replace-identifiers * Add more identifiers to replace * Fix unnest macro * Fix equality macro * Check in mysql test output * Update column limit test for mysql * Escape parentheses * Remove unnecessary mysql test * Remove mysql output for easier code review * Remove unnecessary mysql test * Remove parentheses * Update dependencies * Skip mysql instead of manually write out types * Bump version * Check in unit test for mysql name transformer * Fix type conversion * Use json_value to extract scalar json fields * Move dbt-mysql to Dockerfile (#4459) * Format code * Check in mysql dbt output * Remove unnecessary quote * Update mysql equality test to match 0.19.0 * Check in schema_test update * Update readme * Bump base normalization version * Update document Co-authored-by: Christophe Duong <[email protected]>
1 parent 9531c4d commit 2caf390

File tree

124 files changed

+2880
-76
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

124 files changed

+2880
-76
lines changed

airbyte-integrations/bases/base-normalization/Dockerfile

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM fishtownanalytics/dbt:0.19.1
1+
FROM fishtownanalytics/dbt:0.19.0
22
COPY --from=airbyte/base-airbyte-protocol-python:0.1.1 /airbyte /airbyte
33

44
WORKDIR /airbyte
@@ -14,6 +14,7 @@ RUN pip install .
1414

1515
WORKDIR /airbyte/normalization_code
1616
RUN pip install .
17+
RUN pip install git+https://github.com/dbeatty10/dbt-mysql@96655ea9f7fca7be90c9112ce8ffbb5aac1d3716#egg=dbt-mysql
1718

1819
WORKDIR /airbyte/normalization_code/dbt-template/
1920
# Download external dbt dependencies
@@ -23,5 +24,5 @@ WORKDIR /airbyte
2324
ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh"
2425
ENTRYPOINT ["/airbyte/entrypoint.sh"]
2526

26-
LABEL io.airbyte.version=0.1.34
27+
LABEL io.airbyte.version=0.1.35
2728
LABEL io.airbyte.name=airbyte/normalization

airbyte-integrations/bases/base-normalization/README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ allowed characters, if quotes are needed or not, and the length limitations:
5454
- [postgres](../../../docs/integrations/destinations/postgres.md)
5555
- [redshift](../../../docs/integrations/destinations/redshift.md)
5656
- [snowflake](../../../docs/integrations/destinations/snowflake.md)
57+
- [mysql](../../../docs/integrations/destinations/mysql.md)
5758

5859
Rules about truncations, for example for both of these strings which are too long for the postgres 64 limit:
5960
- `Aaaa_Bbbb_Cccc_Dddd_Eeee_Ffff_Gggg_Hhhh_Iiii`
@@ -216,6 +217,15 @@ A nice improvement would be to add csv/json seed files as expected output data f
216217
The integration tests would verify that the content of such tables in the destination would match
217218
these seed files or fail.
218219

220+
### Debug dbt operations with local database
221+
This only works for testing databases launched in local containers (e.g. postgres and mysql).
222+
223+
- In `dbt_integration_test.py`, comment out the `tear_down_db` method so that the relevant database container is not deleted.
224+
- Find the name of the database container in the logs (e.g. by searching `Executing`).
225+
- Connect to the container by running `docker exec -it <container-name> bash` in the commandline.
226+
- Connect to the database inside the container (e.g. `mysql -u root` for mysql).
227+
- Test the generated dbt operations directly in the database.
228+
219229
## Standard Destination Tests
220230

221231
Generally, to invoke standard destination tests, you run with gradle using:

airbyte-integrations/bases/base-normalization/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ task("customIntegrationTestPython", type: PythonTask, dependsOn: installTestReqs
2525
dependsOn ':airbyte-integrations:connectors:destination-postgres:airbyteDocker'
2626
dependsOn ':airbyte-integrations:connectors:destination-redshift:airbyteDocker'
2727
dependsOn ':airbyte-integrations:connectors:destination-snowflake:airbyteDocker'
28+
dependsOn ':airbyte-integrations:connectors:destination-mysql:airbyteDocker'
2829
}
2930

3031
integrationTest.dependsOn("customIntegrationTestPython")

airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/array.sql

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@
2828
) as _airbyte_nested_data
2929
{%- endmacro %}
3030

31+
{% macro mysql__cross_join_unnest(stream_name, array_col) -%}
32+
left join joined on _airbyte_{{ stream_name }}_hashid = joined._airbyte_hashid
33+
{%- endmacro %}
34+
3135
{% macro redshift__cross_join_unnest(stream_name, array_col) -%}
3236
left join joined on _airbyte_{{ stream_name }}_hashid = joined._airbyte_hashid
3337
{%- endmacro %}
@@ -36,7 +40,7 @@
3640
cross join table(flatten({{ array_col }})) as {{ array_col }}
3741
{%- endmacro %}
3842

39-
{# unnested_column_value ------------------------------------------------- #}
43+
{# unnested_column_value -- this macro is related to unnest_cte #}
4044

4145
{% macro unnested_column_value(column_col) -%}
4246
{{ adapter.dispatch('unnested_column_value')(column_col) }}
@@ -58,6 +62,10 @@
5862
_airbyte_nested_data
5963
{%- endmacro %}
6064

65+
{% macro mysql__unnested_column_value(column_col) -%}
66+
_airbyte_nested_data
67+
{%- endmacro %}
68+
6169
{# unnest_cte ------------------------------------------------- #}
6270

6371
{% macro unnest_cte(table_name, stream_name, column_col) -%}
@@ -97,3 +105,37 @@ joined as (
97105
where numbers.generated_number <= json_array_length({{ column_col }}, true)
98106
)
99107
{%- endmacro %}
108+
109+
{% macro mysql__unnest_cte(table_name, stream_name, column_col) -%}
110+
{%- if not execute -%}
111+
{{ return('') }}
112+
{% endif %}
113+
114+
{%- call statement('max_json_array_length', fetch_result=True) -%}
115+
with max_value as (
116+
select max(json_length({{ column_col }})) as max_number_of_items
117+
from {{ ref(table_name) }}
118+
)
119+
select
120+
case when max_number_of_items is not null and max_number_of_items > 1
121+
then max_number_of_items
122+
else 1 end as max_number_of_items
123+
from max_value
124+
{%- endcall -%}
125+
126+
{%- set max_length = load_result('max_json_array_length') -%}
127+
with numbers as (
128+
{{ dbt_utils.generate_series(max_length["data"][0][0]) }}
129+
),
130+
joined as (
131+
select
132+
_airbyte_{{ stream_name }}_hashid as _airbyte_hashid,
133+
{# -- json_extract(column_col, '$[i][0]') as _airbyte_nested_data #}
134+
json_extract({{ column_col }}, concat("$[", numbers.generated_number - 1, "][0]")) as _airbyte_nested_data
135+
from {{ ref(table_name) }}
136+
cross join numbers
137+
-- only generate the number of records in the cross join that corresponds
138+
-- to the number of items in {{ table_name }}.{{ column_col }}
139+
where numbers.generated_number <= json_length({{ column_col }})
140+
)
141+
{%- endmacro %}

airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/datatypes.sql

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,44 @@
1919
{% macro snowflake__type_json() %}
2020
variant
2121
{% endmacro %}
22+
23+
{%- macro mysql__type_json() -%}
24+
json
25+
{%- endmacro -%}
26+
27+
28+
{# string ------------------------------------------------- #}
29+
30+
{%- macro mysql__type_string() -%}
31+
char
32+
{%- endmacro -%}
33+
34+
35+
{# float ------------------------------------------------- #}
36+
{% macro mysql__type_float() %}
37+
float
38+
{% endmacro %}
39+
40+
41+
{# int ------------------------------------------------- #}
42+
{% macro default__type_int() %}
43+
signed
44+
{% endmacro %}
45+
46+
47+
{# bigint ------------------------------------------------- #}
48+
{% macro mysql__type_bigint() %}
49+
signed
50+
{% endmacro %}
51+
52+
53+
{# numeric ------------------------------------------------- #}
54+
{% macro mysql__type_numeric() %}
55+
float
56+
{% endmacro %}
57+
58+
59+
{# timestamp ------------------------------------------------- #}
60+
{% macro mysql__type_timestamp() %}
61+
time
62+
{% endmacro %}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{% macro mysql__except() %}
2+
{% do exceptions.warn("MySQL does not support EXCEPT operator") %}
3+
{% endmacro %}

airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/json_operations.sql

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@
22
Adapter Macros for the following functions:
33
- Bigquery: JSON_EXTRACT(json_string_expr, json_path_format) -> https://cloud.google.com/bigquery/docs/reference/standard-sql/json_functions
44
- Snowflake: JSON_EXTRACT_PATH_TEXT( <column_identifier> , '<path_name>' ) -> https://docs.snowflake.com/en/sql-reference/functions/json_extract_path_text.html
5-
- Redshift: json_extract_path_text('json_string', 'path_elem' [,'path_elem'[, ] ] [, null_if_invalid ] ) -> https://docs.aws.amazon.com/redshift/latest/dg/JSON_EXTRACT_PATH_TEXT.html
5+
- Redshift: json_extract_path_text('json_string', 'path_elem' [,'path_elem'[, ...] ] [, null_if_invalid ] ) -> https://docs.aws.amazon.com/redshift/latest/dg/JSON_EXTRACT_PATH_TEXT.html
66
- Postgres: json_extract_path_text(<from_json>, 'path' [, 'path' [, ...}}) -> https://www.postgresql.org/docs/12/functions-json.html
7+
- MySQL: JSON_EXTRACT(json_doc, 'path' [, 'path'] ...) -> https://dev.mysql.com/doc/refman/8.0/en/json-search-functions.html
78
#}
89

910
{# format_json_path -------------------------------------------------- #}
@@ -23,6 +24,11 @@
2324
{{ "'" ~ json_path_list|join("','") ~ "'" }}
2425
{%- endmacro %}
2526

27+
{% macro mysql__format_json_path(json_path_list) -%}
28+
{# -- '$."x"."y"."z"' #}
29+
{{ "'$.\"" ~ json_path_list|join(".") ~ "\"'" }}
30+
{%- endmacro %}
31+
2632
{% macro redshift__format_json_path(json_path_list) -%}
2733
{{ "'" ~ json_path_list|join("','") ~ "'" }}
2834
{%- endmacro %}
@@ -49,6 +55,10 @@
4955
jsonb_extract_path({{ json_column }}, {{ format_json_path(json_path_list) }})
5056
{%- endmacro %}
5157

58+
{% macro mysql__json_extract(json_column, json_path_list) -%}
59+
json_extract({{ json_column }}, {{ format_json_path(json_path_list) }})
60+
{%- endmacro %}
61+
5262
{% macro redshift__json_extract(json_column, json_path_list) -%}
5363
case when json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}, true) != '' then json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}, true) end
5464
{%- endmacro %}
@@ -75,6 +85,10 @@
7585
jsonb_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }})
7686
{%- endmacro %}
7787

88+
{% macro mysql__json_extract_scalar(json_column, json_path_list) -%}
89+
json_value({{ json_column }}, {{ format_json_path(json_path_list) }})
90+
{%- endmacro %}
91+
7892
{% macro redshift__json_extract_scalar(json_column, json_path_list) -%}
7993
case when json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}, true) != '' then json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}, true) end
8094
{%- endmacro %}
@@ -101,6 +115,10 @@
101115
jsonb_extract_path({{ json_column }}, {{ format_json_path(json_path_list) }})
102116
{%- endmacro %}
103117

118+
{% macro mysql__json_extract_array(json_column, json_path_list) -%}
119+
json_extract({{ json_column }}, {{ format_json_path(json_path_list) }})
120+
{%- endmacro %}
121+
104122
{% macro redshift__json_extract_array(json_column, json_path_list) -%}
105123
json_extract_path_text({{ json_column }}, {{ format_json_path(json_path_list) }}, true)
106124
{%- endmacro %}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
{#
2+
-- Adapted from https://github.com/dbt-labs/dbt-utils/blob/0-19-0-updates/macros/schema_tests/equality.sql
3+
-- dbt-utils version: 0.6.4
4+
-- This macro needs to be updated accordingly when dbt-utils is upgraded.
5+
-- This is needed because MySQL does not support the EXCEPT operator!
6+
#}
7+
8+
{% macro mysql__test_equality(model, compare_model, compare_columns=None) %}
9+
10+
{%- if not execute -%}
11+
{{ return('') }}
12+
{% endif %}
13+
14+
{%- do dbt_utils._is_relation(model, 'test_equality') -%}
15+
16+
{%- if not compare_columns -%}
17+
{%- do dbt_utils._is_ephemeral(model, 'test_equality') -%}
18+
{%- set compare_columns = adapter.get_columns_in_relation(model) | map(attribute='quoted') -%}
19+
{%- endif -%}
20+
21+
{% set compare_cols_csv = compare_columns | join(', ') %}
22+
23+
with a as (
24+
select * from {{ model }}
25+
),
26+
27+
b as (
28+
select * from {{ compare_model }}
29+
),
30+
31+
a_minus_b as (
32+
select {{ compare_cols_csv }} from a
33+
where ({{ compare_cols_csv }}) not in
34+
(select {{ compare_cols_csv }} from b)
35+
),
36+
37+
b_minus_a as (
38+
select {{ compare_cols_csv }} from b
39+
where ({{ compare_cols_csv }}) not in
40+
(select {{ compare_cols_csv }} from a)
41+
),
42+
43+
unioned as (
44+
select * from a_minus_b
45+
union all
46+
select * from b_minus_a
47+
),
48+
49+
final as (
50+
select (select count(*) from unioned) +
51+
(select abs(
52+
(select count(*) from a_minus_b) -
53+
(select count(*) from b_minus_a)
54+
))
55+
as count
56+
)
57+
58+
select count from final
59+
60+
{% endmacro %}

airbyte-integrations/bases/base-normalization/integration_tests/dbt_integration_test.py

Lines changed: 50 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,17 @@
4242
class DbtIntegrationTest(object):
4343
def __init__(self):
4444
self.target_schema = "test_normalization"
45-
self.container_name = "test_normalization_db_" + self.random_string(3)
45+
self.container_prefix = f"test_normalization_db_{self.random_string(3)}"
46+
self.db_names = ["postgres", "mysql"]
4647

4748
@staticmethod
4849
def random_string(length: int) -> str:
4950
return "".join(random.choice(string.ascii_lowercase) for i in range(length))
5051

52+
def setup_db(self):
53+
self.setup_postgres_db()
54+
self.setup_mysql_db()
55+
5156
def setup_postgres_db(self):
5257
print("Starting localhost postgres container for tests")
5358
port = self.find_free_port()
@@ -64,7 +69,7 @@ def setup_postgres_db(self):
6469
"run",
6570
"--rm",
6671
"--name",
67-
f"{self.container_name}",
72+
f"{self.container_prefix}_postgres",
6873
"-e",
6974
f"POSTGRES_USER={config['username']}",
7075
"-e",
@@ -81,6 +86,42 @@ def setup_postgres_db(self):
8186
with open("../secrets/postgres.json", "w") as fh:
8287
fh.write(json.dumps(config))
8388

89+
def setup_mysql_db(self):
90+
print("Starting localhost mysql container for tests")
91+
port = self.find_free_port()
92+
config = {
93+
"type": "mysql",
94+
"host": "localhost",
95+
"port": port,
96+
"database": self.target_schema,
97+
"username": "root",
98+
"password": "",
99+
}
100+
commands = [
101+
"docker",
102+
"run",
103+
"--rm",
104+
"--name",
105+
f"{self.container_prefix}_mysql",
106+
"-e",
107+
"MYSQL_ALLOW_EMPTY_PASSWORD=yes",
108+
"-e",
109+
"MYSQL_INITDB_SKIP_TZINFO=yes",
110+
"-e",
111+
f"MYSQL_DATABASE={config['database']}",
112+
"-p",
113+
f"{config['port']}:3306",
114+
"-d",
115+
"mysql",
116+
]
117+
print("Executing: ", " ".join(commands))
118+
subprocess.call(commands)
119+
120+
if not os.path.exists("../secrets"):
121+
os.makedirs("../secrets")
122+
with open("../secrets/mysql.json", "w") as fh:
123+
fh.write(json.dumps(config))
124+
84125
@staticmethod
85126
def find_free_port():
86127
"""
@@ -92,12 +133,13 @@ def find_free_port():
92133
s.close()
93134
return addr[1]
94135

95-
def tear_down_postgres_db(self):
96-
print("Stopping localhost postgres container for tests")
97-
try:
98-
subprocess.call(["docker", "kill", f"{self.container_name}"])
99-
except Exception as e:
100-
print(f"WARN: Exception while shutting down postgres db: {e}")
136+
def tear_down_db(self):
137+
for db_name in self.db_names:
138+
print(f"Stopping localhost {db_name} container for tests")
139+
try:
140+
subprocess.call(["docker", "kill", f"{self.container_prefix}_{db_name}"])
141+
except Exception as e:
142+
print(f"WARN: Exception while shutting down {db_name}: {e}")
101143

102144
@staticmethod
103145
def change_current_test_dir(request):

airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_primary_key_streams/models/dbt_schema_tests/schema_test.yml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@ models:
44
- name: exchange_rate
55
tests:
66
- dbt_utils.equality:
7-
description: check_streams_are_equal
8-
In this integration test, we are sending the same records to both streams
9-
exchange_rate and dedup_exchange_rate.
10-
The SCD table of dedup_exchange_rate in append_dedup mode should therefore mirror
11-
the final table with append or overwrite mode from exchange_rate.
7+
# description: check_streams_are_equal
8+
# In this integration test, we are sending the same records to both streams
9+
# exchange_rate and dedup_exchange_rate.
10+
# The SCD table of dedup_exchange_rate in append_dedup mode should therefore mirror
11+
# the final table with append or overwrite mode from exchange_rate.
1212
compare_model: ref('dedup_exchange_rate_scd')
1313
compare_columns:
1414
- id

0 commit comments

Comments
 (0)