Skip to content

Commit 1bf395b

Browse files
authored
fix(ingest): split merge statements correctly (datahub-project#12989)
1 parent a61a585 commit 1bf395b

File tree

4 files changed

+102
-14
lines changed

4 files changed

+102
-14
lines changed

metadata-ingestion/src/datahub/sql_parsing/split_statements.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,8 +237,10 @@ def _process_normal(self, most_recent_real_char: str) -> Iterator[str]:
237237
),
238238
)
239239
if (
240-
is_force_new_statement_keyword and most_recent_real_char != ")"
241-
): # usually we'd have a close paren that closes a CTE
240+
is_force_new_statement_keyword
241+
and not self._has_preceding_cte(most_recent_real_char)
242+
and not self._is_part_of_merge_query()
243+
):
242244
# Force termination of current statement
243245
yield from self._yield_if_complete()
244246

@@ -251,6 +253,14 @@ def _process_normal(self, most_recent_real_char: str) -> Iterator[str]:
251253
else:
252254
self.current_statement.append(c)
253255

256+
def _has_preceding_cte(self, most_recent_real_char: str) -> bool:
257+
# usually we'd have a close paren that closes a CTE
258+
return most_recent_real_char == ")"
259+
260+
def _is_part_of_merge_query(self) -> bool:
261+
# In merge statement we'd have `when matched then` or `when not matched then"
262+
return "".join(self.current_statement).strip().lower().endswith("then")
263+
254264

255265
def split_statements(sql: str) -> Iterator[str]:
256266
"""

metadata-ingestion/tests/unit/sql_parsing/test_split_statements.py

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ def test_split_statement_with_quotes_in_sting_in_query():
180180
assert statements == expected
181181

182182

183-
def test_split_statement_with_merge_query_fails():
183+
def test_split_statement_with_merge_query():
184184
test_sql = """\
185185
MERGE INTO myTable AS t
186186
USING myTable2 AS s
@@ -191,13 +191,4 @@ def test_split_statement_with_merge_query_fails():
191191
INSERT (a, b) VALUES (s.a, s.b)"""
192192
statements = [statement.strip() for statement in split_statements(test_sql)]
193193
expected = [test_sql]
194-
assert statements != expected
195-
assert statements == [
196-
"""MERGE INTO myTable AS t
197-
USING myTable2 AS s
198-
ON t.a = s.a
199-
WHEN MATCHED THEN""",
200-
"""UPDATE SET t.b = s.b
201-
WHEN NOT MATCHED THEN""",
202-
"""INSERT (a, b) VALUES (s.a, s.b)""",
203-
]
194+
assert statements == expected

metadata-ingestion/tests/unit/stored_procedure/golden_files/snowflake/procedure_with_multi_statements.json

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,38 @@
77
"aspect": {
88
"json": {
99
"inputDatasets": [
10+
"urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.source_table1,PROD)",
1011
"urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.source_table3,PROD)",
1112
"urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.source_table2,PROD)"
1213
],
1314
"outputDatasets": [
15+
"urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.target_table,PROD)",
1416
"urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.target_table_delete,PROD)",
1517
"urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.target_table_insert,PROD)"
1618
],
1719
"fineGrainedLineages": [
20+
{
21+
"upstreamType": "FIELD_SET",
22+
"upstreams": [
23+
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.source_table1,PROD),id)"
24+
],
25+
"downstreamType": "FIELD",
26+
"downstreams": [
27+
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.target_table,PROD),id)"
28+
],
29+
"confidenceScore": 0.2
30+
},
31+
{
32+
"upstreamType": "FIELD_SET",
33+
"upstreams": [
34+
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.source_table1,PROD),column1)"
35+
],
36+
"downstreamType": "FIELD",
37+
"downstreams": [
38+
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.target_table,PROD),column1)"
39+
],
40+
"confidenceScore": 0.2
41+
},
1842
{
1943
"upstreamType": "FIELD_SET",
2044
"upstreams": [
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,66 @@
11
[
2-
2+
{
3+
"entityType": "dataJob",
4+
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(snowflake,default_db.default_schema.stored_procedures,PROD),procedure_with_multitable_lineage.sql)",
5+
"changeType": "UPSERT",
6+
"aspectName": "dataJobInputOutput",
7+
"aspect": {
8+
"json": {
9+
"inputDatasets": [
10+
"urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.source_table1,PROD)",
11+
"urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.source_table2,PROD)",
12+
"urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.source_table3,PROD)"
13+
],
14+
"outputDatasets": [
15+
"urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.target_table,PROD)"
16+
],
17+
"fineGrainedLineages": [
18+
{
19+
"upstreamType": "FIELD_SET",
20+
"upstreams": [
21+
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.source_table1,PROD),id)"
22+
],
23+
"downstreamType": "FIELD",
24+
"downstreams": [
25+
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.target_table,PROD),id)"
26+
],
27+
"confidenceScore": 0.2
28+
},
29+
{
30+
"upstreamType": "FIELD_SET",
31+
"upstreams": [
32+
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.source_table1,PROD),column1)"
33+
],
34+
"downstreamType": "FIELD",
35+
"downstreams": [
36+
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.target_table,PROD),column1)"
37+
],
38+
"confidenceScore": 0.2
39+
},
40+
{
41+
"upstreamType": "FIELD_SET",
42+
"upstreams": [
43+
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.source_table2,PROD),column2)"
44+
],
45+
"downstreamType": "FIELD",
46+
"downstreams": [
47+
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.target_table,PROD),column2)"
48+
],
49+
"confidenceScore": 0.2
50+
},
51+
{
52+
"upstreamType": "FIELD_SET",
53+
"upstreams": [
54+
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.source_table3,PROD),column3)"
55+
],
56+
"downstreamType": "FIELD",
57+
"downstreams": [
58+
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:snowflake,default_db.default_schema.target_table,PROD),column3)"
59+
],
60+
"confidenceScore": 0.2
61+
}
62+
]
63+
}
64+
}
65+
}
366
]

0 commit comments

Comments
 (0)