Skip to content

Use 3-level namespace when catalog is set. #94

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jun 9, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## dbt-databricks 1.1.1 (Release TBD)

### Features
- Support for Databricks CATALOG as a DATABASE in DBT compilations ([#95](https://github.com/databricks/dbt-databricks/issues/95), [#89](https://github.com/databricks/dbt-databricks/pull/89), [#94](https://github.com/databricks/dbt-databricks/pull/94))

### Fixes
- Block taking jinja2.runtime.Undefined into DatabricksAdapter ([#98](https://github.com/databricks/dbt-databricks/pull/98))
- Avoid using Cursor.schema API when database is None ([#100](https://github.com/databricks/dbt-databricks/pull/100))
Expand Down
131 changes: 127 additions & 4 deletions dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,32 @@
from dataclasses import dataclass
from typing import Optional, List, Dict, Tuple, Union
import re
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union

from agate import Table
from agate import Row, Table

from dbt.contracts.connection import AdapterResponse
from dbt.adapters.base import AdapterConfig
from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.spark.impl import SparkAdapter, LIST_SCHEMAS_MACRO_NAME
from dbt.adapters.spark.impl import (
SparkAdapter,
KEY_TABLE_OWNER,
KEY_TABLE_STATISTICS,
LIST_RELATIONS_MACRO_NAME,
LIST_SCHEMAS_MACRO_NAME,
)
from dbt.contracts.connection import AdapterResponse
from dbt.contracts.relation import RelationType
import dbt.exceptions
from dbt.events import AdapterLogger

from dbt.adapters.databricks.column import DatabricksColumn
from dbt.adapters.databricks.connections import DatabricksConnectionManager
from dbt.adapters.databricks.relation import DatabricksRelation
from dbt.adapters.databricks.utils import undefined_proof


logger = AdapterLogger("Databricks")


@dataclass
class DatabricksConfig(AdapterConfig):
file_format: str = "delta"
Expand Down Expand Up @@ -67,3 +80,113 @@ def execute(
finally:
if staging_table is not None:
self.drop_relation(staging_table)

def list_relations_without_caching(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to add a new method in dbt-spark that returns the catalog field to avoid duplicate code in the future (but we need to know the release schedule for dbt-spark).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can ask dbt-spark to make the change.
Let's have them here and work on it in a separate PR for now.

self, schema_relation: DatabricksRelation
) -> List[DatabricksRelation]:
kwargs = {"schema_relation": schema_relation}
try:
results = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs)
except dbt.exceptions.RuntimeException as e:
errmsg = getattr(e, "msg", "")
if f"Database '{schema_relation}' not found" in errmsg:
return []
else:
description = "Error while retrieving information about"
logger.debug(f"{description} {schema_relation}: {e.msg}")
return []

relations = []
for row in results:
if len(row) != 4:
raise dbt.exceptions.RuntimeException(
f'Invalid value from "show table extended ...", '
f"got {len(row)} values, expected 4"
)
_schema, name, _, information = row
rel_type = RelationType.View if "Type: VIEW" in information else RelationType.Table
is_delta = "Provider: delta" in information
is_hudi = "Provider: hudi" in information
relation = self.Relation.create(
database=schema_relation.database,
schema=_schema,
identifier=name,
type=rel_type,
information=information,
is_delta=is_delta,
is_hudi=is_hudi,
)
relations.append(relation)

return relations

def get_relation(
self, database: Optional[str], schema: str, identifier: str
) -> Optional[BaseRelation]:
return super(SparkAdapter, self).get_relation(database, schema, identifier)

def parse_describe_extended(
self, relation: DatabricksRelation, raw_rows: List[Row]
) -> List[DatabricksColumn]:
# Convert the Row to a dict
dict_rows = [dict(zip(row._keys, row._values)) for row in raw_rows]
# Find the separator between the rows and the metadata provided
# by the DESCRIBE TABLE EXTENDED statement
pos = self.find_table_information_separator(dict_rows)

# Remove rows that start with a hash, they are comments
rows = [row for row in raw_rows[0:pos] if not row["col_name"].startswith("#")]
metadata = {col["col_name"]: col["data_type"] for col in raw_rows[pos + 1 :]}

raw_table_stats = metadata.get(KEY_TABLE_STATISTICS)
table_stats = DatabricksColumn.convert_table_stats(raw_table_stats)
return [
DatabricksColumn(
table_database=relation.database,
table_schema=relation.schema,
table_name=relation.name,
table_type=relation.type,
table_owner=str(metadata.get(KEY_TABLE_OWNER)),
table_stats=table_stats,
column=column["col_name"],
column_index=idx,
dtype=column["data_type"],
)
for idx, column in enumerate(rows)
]

def parse_columns_from_information(
self, relation: DatabricksRelation
) -> List[DatabricksColumn]:
owner_match = re.findall(self.INFORMATION_OWNER_REGEX, relation.information)
owner = owner_match[0] if owner_match else None
matches = re.finditer(self.INFORMATION_COLUMNS_REGEX, relation.information)
columns = []
stats_match = re.findall(self.INFORMATION_STATISTICS_REGEX, relation.information)
raw_table_stats = stats_match[0] if stats_match else None
table_stats = DatabricksColumn.convert_table_stats(raw_table_stats)
for match_num, match in enumerate(matches):
column_name, column_type, nullable = match.groups()
column = DatabricksColumn(
table_database=relation.database,
table_schema=relation.schema,
table_name=relation.table,
table_type=relation.type,
column_index=match_num,
table_owner=owner,
column=column_name,
dtype=column_type,
table_stats=table_stats,
)
columns.append(column)
return columns

def _get_columns_for_catalog(self, relation: DatabricksRelation) -> Iterable[Dict[str, Any]]:
columns = self.parse_columns_from_information(relation)

for column in columns:
# convert DatabricksRelation into catalog dicts
as_dict = column.to_column_dict()
as_dict["column_name"] = as_dict.pop("column", None)
as_dict["column_type"] = as_dict.pop("dtype")
yield as_dict
2 changes: 1 addition & 1 deletion dbt/adapters/databricks/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

@dataclass
class DatabricksIncludePolicy(Policy):
database: bool = False # TODO: should be True
database: bool = True
schema: bool = True
identifier: bool = True

Expand Down
20 changes: 20 additions & 0 deletions dbt/include/databricks/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,23 @@
{% endif %}
{% endfor %}
{% endmacro %}

{% macro databricks__generate_database_name(custom_database_name=none, node=none) -%}
{%- set default_database = target.database -%}
{%- if custom_database_name is none -%}
{{ return(default_database) }}
{%- else -%}
{{ return(custom_database_name) }}
{%- endif -%}
{%- endmacro %}

{% macro databricks__make_temp_relation(base_relation, suffix) %}
{% set tmp_identifier = base_relation.identifier ~ suffix %}
{% set tmp_relation = base_relation.incorporate(path = {
"identifier": tmp_identifier,
"schema": None,
"database": None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't specify "database": None here, what's the default value for it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tmp_relation will hold the database of base_relation, that results with database.identifier for the temp relation name if base_relation has database field.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. Thanks for the explanation!

}) -%}

{% do return(tmp_relation) %}
{% endmacro %}
27 changes: 21 additions & 6 deletions dbt/include/databricks/macros/materializations/snapshot.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,22 @@
{% macro databricks_build_snapshot_staging_table(strategy, sql, target_relation) %}
{% set tmp_identifier = target_relation.identifier ~ '__dbt_tmp' %}

{%- set tmp_relation = api.Relation.create(identifier=tmp_identifier,
schema=target_relation.schema,
database=target_relation.database,
type='view') -%}

{% set select = snapshot_staging_table(strategy, sql, target_relation) %}

{# needs to be a non-temp view so that its columns can be ascertained via `describe` #}
{% call statement('build_snapshot_staging_relation') %}
{{ create_view_as(tmp_relation, select) }}
{% endcall %}

{% do return(tmp_relation) %}
{% endmacro %}


{% materialization snapshot, adapter='databricks' %}
{%- set config = model['config'] -%}

Expand All @@ -8,7 +27,7 @@
{%- set file_format = config.get('file_format', 'delta') -%}

{% set target_relation_exists, target_relation = get_or_create_relation(
database=none,
database=model.database,
schema=model.schema,
identifier=target_table,
type='table') -%}
Expand All @@ -30,10 +49,6 @@
{% endif %}
{% endif %}

{% if not adapter.check_schema_exists(model.database, model.schema) %}
{% do create_schema(model.database, model.schema) %}
{% endif %}

{%- if not target_relation.is_table -%}
{% do exceptions.relation_wrong_type(target_relation, 'table') %}
{%- endif -%}
Expand All @@ -58,7 +73,7 @@

{{ adapter.valid_snapshot_target(target_relation) }}

{% set staging_table = spark_build_snapshot_staging_table(strategy, sql, target_relation) %}
{% set staging_table = databricks_build_snapshot_staging_table(strategy, sql, target_relation) %}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not urgent but should we use the dispatch pattern here to make it backward compatible?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point.
Actually I think if users override spark_build_snapshot_staging_table macro, it will highly likely break the catalog support.
How about using spark_build_snapshot_staging_table only when target_relation.database is None?
Also maybe we should mention it in the change log and the release note.


-- this may no-op if the database does not require column expansion
{% do adapter.expand_target_column_types(from_relation=staging_table,
Expand Down
29 changes: 24 additions & 5 deletions tests/integration/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,11 +360,19 @@ def _get_schema_fqn(self, database, schema):
return schema_fqn

def _create_schema_named(self, database, schema):
self.run_sql("CREATE SCHEMA {schema}")
self.run_sql(
"CREATE SCHEMA {database_schema}", kwargs=dict(database=database, schema=schema)
)

def _drop_schema_named(self, database, schema):
self.run_sql("DROP SCHEMA IF EXISTS {schema} CASCADE")
self.run_sql("DROP SCHEMA IF EXISTS {schema}_dbt_test__audit CASCADE")
self.run_sql(
"DROP SCHEMA IF EXISTS {database_schema} CASCADE",
kwargs=dict(database=database, schema=schema),
)
self.run_sql(
"DROP SCHEMA IF EXISTS {database_schema}_dbt_test__audit CASCADE",
kwargs=dict(database=database, schema=schema),
)

def _create_schemas(self):
schema = self.unique_schema()
Expand Down Expand Up @@ -435,11 +443,22 @@ def transform_sql(self, query, kwargs=None):

base_kwargs = {
"schema": self.unique_schema(),
"database": self.adapter.quote(self.default_database),
"database": (
self.adapter.quote(self.default_database)
if self.default_database is not None
else None
),
}
if kwargs is None:
kwargs = {}
base_kwargs.update(kwargs)
base_kwargs.update({key: value for key, value in kwargs.items() if value is not None})
if "database_schema" not in base_kwargs:
if base_kwargs["database"] is not None:
base_kwargs[
"database_schema"
] = f"{base_kwargs['database']}.{base_kwargs['schema']}"
else:
base_kwargs["database_schema"] = base_kwargs["schema"]

to_return = to_return.format(**base_kwargs)

Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
insert into {schema}.seed values (0, 'Cathy', '2022-03-01');
insert into {database_schema}.seed values (0, 'Cathy', '2022-03-01');
Original file line number Diff line number Diff line change
@@ -1 +1 @@
insert into {schema}.seed values (3, null, '2022-03-01');
insert into {database_schema}.seed values (3, null, '2022-03-01');
22 changes: 11 additions & 11 deletions tests/integration/persist_constraints/test_persist_constraints.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ def project_config(self):
}

def check_constraints(self, model_name: str, expected: Dict[str, str]):
rows = self.run_sql(f"show tblproperties {self.unique_schema()}.{model_name}", fetch="all")
rows = self.run_sql(
"show tblproperties {database_schema}.{model_name}",
fetch="all",
kwargs=dict(model_name=model_name),
)
constraints = {
row.key: row.value for row in rows if row.key.startswith("delta.constraints")
}
Expand All @@ -36,9 +40,7 @@ def run_and_check_failure(self, model_name: str, err_msg: str):
assert err_msg in res.message

def check_staging_table_cleaned(self):
tmp_tables = self.run_sql(
f"SHOW TABLES IN {self.unique_schema()} LIKE '*__dbt_tmp'", fetch="all"
)
tmp_tables = self.run_sql("SHOW TABLES IN {database_schema} LIKE '*__dbt_tmp'", fetch="all")
assert len(tmp_tables) == 0


Expand Down Expand Up @@ -89,27 +91,25 @@ def test_incremental_constraints(self):
self.run_dbt(["run", "--select", model_name, "--full-refresh"])
self.check_constraints(model_name, {"delta.constraints.id_greater_than_zero": "id > 0"})

schema = self.unique_schema()

# Insert a row into the seed model with an invalid id.
self.run_sql_file("insert_invalid_id.sql")
self.run_and_check_failure(
model_name,
err_msg="CHECK constraint id_greater_than_zero",
)
self.check_staging_table_cleaned()
self.run_sql(f"delete from {schema}.seed where id = 0")
self.run_sql("delete from {database_schema}.seed where id = 0")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, so if we use {database_schema} in the query then dbt will automatically change it to use the target database and target schema?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I made some changes in tests/integration/base.py to handle database_schema.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 it's in transform_sql


# Insert a row into the seed model with an invalid name.
self.run_sql_file("insert_invalid_name.sql")
self.run_and_check_failure(
model_name, err_msg="NOT NULL constraint violated for column: name"
)
self.check_staging_table_cleaned()
self.run_sql(f"delete from {schema}.seed where id = 3")
self.run_sql("delete from {database_schema}.seed where id = 3")

# Insert a valid row into the seed model.
self.run_sql(f"insert into {schema}.seed values (3, 'Cathy', '2022-03-01')")
self.run_sql("insert into {database_schema}.seed values (3, 'Cathy', '2022-03-01')")
self.run_dbt(["run", "--select", model_name])
expected_model_name = "expected_incremental_model"
self.run_dbt(["run", "--select", expected_model_name])
Expand All @@ -134,7 +134,7 @@ def test_databricks_uc_sql_endpoint(self):

class TestSnapshotConstraints(TestConstraints):
def check_snapshot_results(self, num_rows: int):
results = self.run_sql(f"select * from {self.unique_schema()}.my_snapshot", fetch="all")
results = self.run_sql("select * from {database_schema}.my_snapshot", fetch="all")
self.assertEqual(len(results), num_rows)

def test_snapshot(self):
Expand Down Expand Up @@ -237,7 +237,7 @@ def test_delta_constraints_disabled(self):
self.check_constraints(model_name, {})

# Insert a row into the seed model with the name being null.
self.run_sql(f"insert into {self.unique_schema()}.seed values (3, null, '2022-03-01')")
self.run_sql("insert into {database_schema}.seed values (3, null, '2022-03-01')")

# Check the table can be created without failure.
self.run_dbt(["run", "--select", model_name])
Expand Down
10 changes: 2 additions & 8 deletions tests/integration/set_tblproperties/test_set_tblproperties.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,15 @@ def test_set_tblproperties(self):
self.assertTablesEqual("set_tblproperties_to_view", "expected")

results = self.run_sql(
"show tblproperties {schema}.{table}".format(
schema=self.unique_schema(), table="set_tblproperties"
),
fetch="all",
"show tblproperties {database_schema}.set_tblproperties", fetch="all"
)
tblproperties = [result[0] for result in results]

assert "delta.autoOptimize.optimizeWrite" in tblproperties
assert "delta.autoOptimize.autoCompact" in tblproperties

results = self.run_sql(
"show tblproperties {schema}.{table}".format(
schema=self.unique_schema(), table="set_tblproperties_to_view"
),
fetch="all",
"show tblproperties {database_schema}.set_tblproperties_to_view", fetch="all"
)
tblproperties = [result[0] for result in results]

Expand Down