Skip to content

Commit e0bac4a

Browse files
🐛 Fix normalization SCD partition by float columns errors with BigQuery (#9281)
1 parent 7727b86 commit e0bac4a

File tree

67 files changed

+980
-787
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

67 files changed

+980
-787
lines changed

airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_nested_streams/first_output/airbyte_incremental/scd/test_normalization/nested_stream_with_complex_columns_resulting_into_long_names_scd.sql

+10-7
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ scd_data as (
2626
), '')) as
2727
string
2828
))) as _airbyte_unique_key,
29-
id,
30-
date,
31-
`partition`,
29+
id,
30+
date,
31+
`partition`,
3232
date as _airbyte_start_at,
3333
lag(date) over (
3434
partition by id
@@ -54,7 +54,10 @@ dedup_data as (
5454
-- we need to ensure de-duplicated rows for merge/update queries
5555
-- additionally, we generate a unique key for the scd table
5656
row_number() over (
57-
partition by _airbyte_unique_key, _airbyte_start_at, _airbyte_emitted_at
57+
partition by
58+
_airbyte_unique_key,
59+
_airbyte_start_at,
60+
_airbyte_emitted_at
5861
order by _airbyte_active_row desc, _airbyte_ab_id
5962
) as _airbyte_row_num,
6063
to_hex(md5(cast(concat(coalesce(cast(_airbyte_unique_key as
@@ -72,9 +75,9 @@ dedup_data as (
7275
select
7376
_airbyte_unique_key,
7477
_airbyte_unique_key_scd,
75-
id,
76-
date,
77-
`partition`,
78+
id,
79+
date,
80+
`partition`,
7881
_airbyte_start_at,
7982
_airbyte_end_at,
8083
_airbyte_active_row,

airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_nested_streams/models/generated/airbyte_incremental/scd/test_normalization/nested_stream_with_complex_columns_resulting_into_long_names_scd.sql

+11-8
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,11 @@ scd_data as (
5757
-- SQL model to build a Type 2 Slowly Changing Dimension (SCD) table for each record identified by their primary key
5858
select
5959
{{ dbt_utils.surrogate_key([
60-
'id',
60+
'id',
6161
]) }} as _airbyte_unique_key,
62-
id,
63-
date,
64-
{{ adapter.quote('partition') }},
62+
id,
63+
date,
64+
{{ adapter.quote('partition') }},
6565
date as _airbyte_start_at,
6666
lag(date) over (
6767
partition by id
@@ -87,7 +87,10 @@ dedup_data as (
8787
-- we need to ensure de-duplicated rows for merge/update queries
8888
-- additionally, we generate a unique key for the scd table
8989
row_number() over (
90-
partition by _airbyte_unique_key, _airbyte_start_at, _airbyte_emitted_at
90+
partition by
91+
_airbyte_unique_key,
92+
_airbyte_start_at,
93+
_airbyte_emitted_at
9194
order by _airbyte_active_row desc, _airbyte_ab_id
9295
) as _airbyte_row_num,
9396
{{ dbt_utils.surrogate_key([
@@ -101,9 +104,9 @@ dedup_data as (
101104
select
102105
_airbyte_unique_key,
103106
_airbyte_unique_key_scd,
104-
id,
105-
date,
106-
{{ adapter.quote('partition') }},
107+
id,
108+
date,
109+
{{ adapter.quote('partition') }},
107110
_airbyte_start_at,
108111
_airbyte_end_at,
109112
_airbyte_active_row,

airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_simple_streams/first_output/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql

+20-17
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,14 @@ scd_data as (
3030
), '')) as
3131
string
3232
))) as _airbyte_unique_key,
33-
id,
34-
currency,
35-
date,
36-
timestamp_col,
37-
HKD_special___characters,
38-
HKD_special___characters_1,
39-
NZD,
40-
USD,
33+
id,
34+
currency,
35+
date,
36+
timestamp_col,
37+
HKD_special___characters,
38+
HKD_special___characters_1,
39+
NZD,
40+
USD,
4141
date as _airbyte_start_at,
4242
lag(date) over (
4343
partition by id, currency, cast(NZD as
@@ -67,7 +67,10 @@ dedup_data as (
6767
-- we need to ensure de-duplicated rows for merge/update queries
6868
-- additionally, we generate a unique key for the scd table
6969
row_number() over (
70-
partition by _airbyte_unique_key, _airbyte_start_at, _airbyte_emitted_at
70+
partition by
71+
_airbyte_unique_key,
72+
_airbyte_start_at,
73+
_airbyte_emitted_at
7174
order by _airbyte_active_row desc, _airbyte_ab_id
7275
) as _airbyte_row_num,
7376
to_hex(md5(cast(concat(coalesce(cast(_airbyte_unique_key as
@@ -85,14 +88,14 @@ dedup_data as (
8588
select
8689
_airbyte_unique_key,
8790
_airbyte_unique_key_scd,
88-
id,
89-
currency,
90-
date,
91-
timestamp_col,
92-
HKD_special___characters,
93-
HKD_special___characters_1,
94-
NZD,
95-
USD,
91+
id,
92+
currency,
93+
date,
94+
timestamp_col,
95+
HKD_special___characters,
96+
HKD_special___characters_1,
97+
NZD,
98+
USD,
9699
_airbyte_start_at,
97100
_airbyte_end_at,
98101
_airbyte_active_row,

airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql

+23-20
Original file line numberDiff line numberDiff line change
@@ -59,18 +59,18 @@ scd_data as (
5959
-- SQL model to build a Type 2 Slowly Changing Dimension (SCD) table for each record identified by their primary key
6060
select
6161
{{ dbt_utils.surrogate_key([
62-
'id',
63-
'currency',
64-
'NZD',
62+
'id',
63+
'currency',
64+
'NZD',
6565
]) }} as _airbyte_unique_key,
66-
id,
67-
currency,
68-
date,
69-
timestamp_col,
70-
HKD_special___characters,
71-
HKD_special___characters_1,
72-
NZD,
73-
USD,
66+
id,
67+
currency,
68+
date,
69+
timestamp_col,
70+
HKD_special___characters,
71+
HKD_special___characters_1,
72+
NZD,
73+
USD,
7474
date as _airbyte_start_at,
7575
lag(date) over (
7676
partition by id, currency, cast(NZD as {{ dbt_utils.type_string() }})
@@ -96,7 +96,10 @@ dedup_data as (
9696
-- we need to ensure de-duplicated rows for merge/update queries
9797
-- additionally, we generate a unique key for the scd table
9898
row_number() over (
99-
partition by _airbyte_unique_key, _airbyte_start_at, _airbyte_emitted_at
99+
partition by
100+
_airbyte_unique_key,
101+
_airbyte_start_at,
102+
_airbyte_emitted_at
100103
order by _airbyte_active_row desc, _airbyte_ab_id
101104
) as _airbyte_row_num,
102105
{{ dbt_utils.surrogate_key([
@@ -110,14 +113,14 @@ dedup_data as (
110113
select
111114
_airbyte_unique_key,
112115
_airbyte_unique_key_scd,
113-
id,
114-
currency,
115-
date,
116-
timestamp_col,
117-
HKD_special___characters,
118-
HKD_special___characters_1,
119-
NZD,
120-
USD,
116+
id,
117+
currency,
118+
date,
119+
timestamp_col,
120+
HKD_special___characters,
121+
HKD_special___characters_1,
122+
NZD,
123+
USD,
121124
_airbyte_start_at,
122125
_airbyte_end_at,
123126
_airbyte_active_row,

airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_simple_streams/modified_models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql

+23-20
Original file line numberDiff line numberDiff line change
@@ -59,18 +59,18 @@ scd_data as (
5959
-- SQL model to build a Type 2 Slowly Changing Dimension (SCD) table for each record identified by their primary key
6060
select
6161
{{ dbt_utils.surrogate_key([
62-
'id',
63-
'currency',
64-
'NZD',
62+
'id',
63+
'currency',
64+
'NZD',
6565
]) }} as _airbyte_unique_key,
66-
id,
67-
currency,
68-
new_column,
69-
date,
70-
timestamp_col,
71-
HKD_special___characters,
72-
NZD,
73-
USD,
66+
id,
67+
currency,
68+
new_column,
69+
date,
70+
timestamp_col,
71+
HKD_special___characters,
72+
NZD,
73+
USD,
7474
date as _airbyte_start_at,
7575
lag(date) over (
7676
partition by cast(id as {{ dbt_utils.type_string() }}), currency, cast(NZD as {{ dbt_utils.type_string() }})
@@ -96,7 +96,10 @@ dedup_data as (
9696
-- we need to ensure de-duplicated rows for merge/update queries
9797
-- additionally, we generate a unique key for the scd table
9898
row_number() over (
99-
partition by _airbyte_unique_key, _airbyte_start_at, _airbyte_emitted_at
99+
partition by
100+
_airbyte_unique_key,
101+
_airbyte_start_at,
102+
_airbyte_emitted_at
100103
order by _airbyte_active_row desc, _airbyte_ab_id
101104
) as _airbyte_row_num,
102105
{{ dbt_utils.surrogate_key([
@@ -110,14 +113,14 @@ dedup_data as (
110113
select
111114
_airbyte_unique_key,
112115
_airbyte_unique_key_scd,
113-
id,
114-
currency,
115-
new_column,
116-
date,
117-
timestamp_col,
118-
HKD_special___characters,
119-
NZD,
120-
USD,
116+
id,
117+
currency,
118+
new_column,
119+
date,
120+
timestamp_col,
121+
HKD_special___characters,
122+
NZD,
123+
USD,
121124
_airbyte_start_at,
122125
_airbyte_end_at,
123126
_airbyte_active_row,

airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_incremental/scd/test_normalization/dedup_cdc_excluded_scd.sql

+25-21
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,10 @@ input_data_with_active_row_num as (
2626
row_number() over (
2727
partition by id
2828
order by
29-
_airbyte_emitted_at is null asc,
30-
_airbyte_emitted_at desc,
31-
_airbyte_emitted_at desc, _ab_cdc_updated_at desc
29+
_ab_cdc_lsn is null asc,
30+
_ab_cdc_lsn desc,
31+
_ab_cdc_updated_at desc,
32+
_airbyte_emitted_at desc
3233
) as _airbyte_active_row_num
3334
from input_data
3435
),
@@ -40,21 +41,21 @@ scd_data as (
4041
toString(id)
4142

4243
))) as _airbyte_unique_key,
43-
id,
44-
name,
45-
_ab_cdc_lsn,
46-
_ab_cdc_updated_at,
47-
_ab_cdc_deleted_at,
48-
_airbyte_emitted_at as _airbyte_start_at,
44+
id,
45+
name,
46+
_ab_cdc_lsn,
47+
_ab_cdc_updated_at,
48+
_ab_cdc_deleted_at,
49+
_ab_cdc_lsn as _airbyte_start_at,
4950
case when _airbyte_active_row_num = 1 and _ab_cdc_deleted_at is null then 1 else 0 end as _airbyte_active_row,
50-
anyOrNull(_airbyte_emitted_at) over (
51+
anyOrNull(_ab_cdc_lsn) over (
5152
partition by id
5253
order by
53-
_airbyte_emitted_at is null asc,
54-
_airbyte_emitted_at desc,
55-
_airbyte_emitted_at desc, _ab_cdc_updated_at desc
56-
ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING
57-
) as _airbyte_end_at,
54+
_ab_cdc_lsn is null asc,
55+
_ab_cdc_lsn desc,
56+
_ab_cdc_updated_at desc,
57+
_airbyte_emitted_at desc
58+
ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING) as _airbyte_end_at,
5859
_airbyte_ab_id,
5960
_airbyte_emitted_at,
6061
_airbyte_dedup_cdc_excluded_hashid
@@ -65,7 +66,10 @@ dedup_data as (
6566
-- we need to ensure de-duplicated rows for merge/update queries
6667
-- additionally, we generate a unique key for the scd table
6768
row_number() over (
68-
partition by _airbyte_unique_key, _airbyte_start_at, _airbyte_emitted_at, accurateCastOrNull(_ab_cdc_deleted_at, 'String'), accurateCastOrNull(_ab_cdc_updated_at, 'String')
69+
partition by
70+
_airbyte_unique_key,
71+
_airbyte_start_at,
72+
_airbyte_emitted_at, accurateCastOrNull(_ab_cdc_deleted_at, 'String'), accurateCastOrNull(_ab_cdc_updated_at, 'String')
6973
order by _airbyte_active_row desc, _airbyte_ab_id
7074
) as _airbyte_row_num,
7175
assumeNotNull(hex(MD5(
@@ -91,11 +95,11 @@ dedup_data as (
9195
select
9296
_airbyte_unique_key,
9397
_airbyte_unique_key_scd,
94-
id,
95-
name,
96-
_ab_cdc_lsn,
97-
_ab_cdc_updated_at,
98-
_ab_cdc_deleted_at,
98+
id,
99+
name,
100+
_ab_cdc_lsn,
101+
_ab_cdc_updated_at,
102+
_ab_cdc_deleted_at,
99103
_airbyte_start_at,
100104
_airbyte_end_at,
101105
_airbyte_active_row,

0 commit comments

Comments
 (0)