-
Notifications
You must be signed in to change notification settings - Fork 135
Use 3-level namespace when catalog is set. #94
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b8152b6
6841705
5cf7488
6b6d35f
eef40f6
2e8d66f
cfa1d5e
804dc91
d2e491e
377342c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -143,3 +143,23 @@ | |
{% endif %} | ||
{% endfor %} | ||
{% endmacro %} | ||
|
||
{% macro databricks__generate_database_name(custom_database_name=none, node=none) -%} | ||
{%- set default_database = target.database -%} | ||
{%- if custom_database_name is none -%} | ||
{{ return(default_database) }} | ||
{%- else -%} | ||
{{ return(custom_database_name) }} | ||
{%- endif -%} | ||
{%- endmacro %} | ||
|
||
{% macro databricks__make_temp_relation(base_relation, suffix) %} | ||
{% set tmp_identifier = base_relation.identifier ~ suffix %} | ||
{% set tmp_relation = base_relation.incorporate(path = { | ||
"identifier": tmp_identifier, | ||
"schema": None, | ||
"database": None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we don't specify There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make sense. Thanks for the explanation! |
||
}) -%} | ||
|
||
{% do return(tmp_relation) %} | ||
{% endmacro %} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1 @@ | ||
insert into {schema}.seed values (0, 'Cathy', '2022-03-01'); | ||
insert into {database_schema}.seed values (0, 'Cathy', '2022-03-01'); |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1 @@ | ||
insert into {schema}.seed values (3, null, '2022-03-01'); | ||
insert into {database_schema}.seed values (3, null, '2022-03-01'); |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,7 +21,11 @@ def project_config(self): | |
} | ||
|
||
def check_constraints(self, model_name: str, expected: Dict[str, str]): | ||
rows = self.run_sql(f"show tblproperties {self.unique_schema()}.{model_name}", fetch="all") | ||
rows = self.run_sql( | ||
"show tblproperties {database_schema}.{model_name}", | ||
fetch="all", | ||
kwargs=dict(model_name=model_name), | ||
) | ||
constraints = { | ||
row.key: row.value for row in rows if row.key.startswith("delta.constraints") | ||
} | ||
|
@@ -36,9 +40,7 @@ def run_and_check_failure(self, model_name: str, err_msg: str): | |
assert err_msg in res.message | ||
|
||
def check_staging_table_cleaned(self): | ||
tmp_tables = self.run_sql( | ||
f"SHOW TABLES IN {self.unique_schema()} LIKE '*__dbt_tmp'", fetch="all" | ||
) | ||
tmp_tables = self.run_sql("SHOW TABLES IN {database_schema} LIKE '*__dbt_tmp'", fetch="all") | ||
assert len(tmp_tables) == 0 | ||
|
||
|
||
|
@@ -89,27 +91,25 @@ def test_incremental_constraints(self): | |
self.run_dbt(["run", "--select", model_name, "--full-refresh"]) | ||
self.check_constraints(model_name, {"delta.constraints.id_greater_than_zero": "id > 0"}) | ||
|
||
schema = self.unique_schema() | ||
|
||
# Insert a row into the seed model with an invalid id. | ||
self.run_sql_file("insert_invalid_id.sql") | ||
self.run_and_check_failure( | ||
model_name, | ||
err_msg="CHECK constraint id_greater_than_zero", | ||
) | ||
self.check_staging_table_cleaned() | ||
self.run_sql(f"delete from {schema}.seed where id = 0") | ||
self.run_sql("delete from {database_schema}.seed where id = 0") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Interesting, so if we use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually I made some changes in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 it's in |
||
|
||
# Insert a row into the seed model with an invalid name. | ||
self.run_sql_file("insert_invalid_name.sql") | ||
self.run_and_check_failure( | ||
model_name, err_msg="NOT NULL constraint violated for column: name" | ||
) | ||
self.check_staging_table_cleaned() | ||
self.run_sql(f"delete from {schema}.seed where id = 3") | ||
self.run_sql("delete from {database_schema}.seed where id = 3") | ||
|
||
# Insert a valid row into the seed model. | ||
self.run_sql(f"insert into {schema}.seed values (3, 'Cathy', '2022-03-01')") | ||
self.run_sql("insert into {database_schema}.seed values (3, 'Cathy', '2022-03-01')") | ||
self.run_dbt(["run", "--select", model_name]) | ||
expected_model_name = "expected_incremental_model" | ||
self.run_dbt(["run", "--select", expected_model_name]) | ||
|
@@ -134,7 +134,7 @@ def test_databricks_uc_sql_endpoint(self): | |
|
||
class TestSnapshotConstraints(TestConstraints): | ||
def check_snapshot_results(self, num_rows: int): | ||
results = self.run_sql(f"select * from {self.unique_schema()}.my_snapshot", fetch="all") | ||
results = self.run_sql("select * from {database_schema}.my_snapshot", fetch="all") | ||
self.assertEqual(len(results), num_rows) | ||
|
||
def test_snapshot(self): | ||
|
@@ -237,7 +237,7 @@ def test_delta_constraints_disabled(self): | |
self.check_constraints(model_name, {}) | ||
|
||
# Insert a row into the seed model with the name being null. | ||
self.run_sql(f"insert into {self.unique_schema()}.seed values (3, null, '2022-03-01')") | ||
self.run_sql("insert into {database_schema}.seed values (3, null, '2022-03-01')") | ||
|
||
# Check the table can be created without failure. | ||
self.run_dbt(["run", "--select", model_name]) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to add a new method in dbt-spark that returns the catalog field to avoid duplicate code in the future (but we need to know the release schedule for dbt-spark).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can ask
dbt-spark
to make the change.Let's have them here and work on it in a separate PR for now.