Skip to content

Concept Refs #11658

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions core/dbt/artifacts/resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from dbt.artifacts.resources.v1.components import (
ColumnInfo,
CompiledResource,
ConceptArgs,
Contract,
DeferRelation,
DependsOn,
Expand All @@ -19,6 +20,12 @@
RefArgs,
Time,
)
from dbt.artifacts.resources.v1.concept import (
Concept,
ConceptColumn,
ConceptConfig,
ConceptJoin,
)
from dbt.artifacts.resources.v1.config import (
Hook,
NodeAndTestConfig,
Expand Down
1 change: 1 addition & 0 deletions core/dbt/artifacts/resources/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class NodeType(StrEnum):
Macro = "macro"
Exposure = "exposure"
Metric = "metric"
Concept = "concept"
Group = "group"
SavedQuery = "saved_query"
SemanticModel = "semantic_model"
Expand Down
17 changes: 17 additions & 0 deletions core/dbt/artifacts/resources/v1/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,22 @@ def keyword_args(self) -> Dict[str, Optional[NodeVersion]]:
return {}


@dataclass
class ConceptArgs(dbtClassMixin):
"""Arguments for referencing a concept"""

name: str
package: Optional[str] = None
columns: List[str] = field(default_factory=list)

@property
def positional_args(self) -> List[str]:
if self.package:
return [self.package, self.name]
else:
return [self.name]


@dataclass
class ColumnInfo(AdditionalPropertiesMixin, ExtensibleDbtClassMixin):
"""Used in all ManifestNodes and SourceDefinition"""
Expand Down Expand Up @@ -241,6 +257,7 @@ class CompiledResource(ParsedResource):
refs: List[RefArgs] = field(default_factory=list)
sources: List[List[str]] = field(default_factory=list)
metrics: List[List[str]] = field(default_factory=list)
concepts: List[ConceptArgs] = field(default_factory=list) # For tracking concept dependencies
depends_on: DependsOn = field(default_factory=DependsOn)
compiled_path: Optional[str] = None
compiled: bool = False
Expand Down
55 changes: 55 additions & 0 deletions core/dbt/artifacts/resources/v1/concept.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Union

from dbt.artifacts.resources.base import GraphResource
from dbt.artifacts.resources.v1.components import DependsOn
from dbt_common.dataclass_schema import dbtClassMixin


@dataclass
class ConceptJoin(dbtClassMixin):
"""Represents a join relationship in a concept definition."""

name: str # name of the model or concept to join
base_key: str # column in base model for join
foreign_key: Optional[str] = None # column in joined model (defaults to primary_key)
alias: Optional[str] = None # alias for the joined table
columns: List[str] = field(default_factory=list) # columns to expose from join
join_type: str = "left" # type of join (left, inner, etc.)


@dataclass
class ConceptColumn(dbtClassMixin):
"""Represents a column definition in a concept."""

name: str
description: Optional[str] = None
alias: Optional[str] = None # optional alias for the column


@dataclass
class ConceptConfig(dbtClassMixin):
"""Configuration for a concept."""

enabled: bool = True
meta: Dict[str, Any] = field(default_factory=dict)


@dataclass
class Concept(GraphResource):
"""A concept resource definition."""

name: str
base_model: str # reference to the base model
description: str = ""
primary_key: Union[str, List[str]] = "id" # primary key column(s)
columns: List[ConceptColumn] = field(default_factory=list)
joins: List[ConceptJoin] = field(default_factory=list)
config: ConceptConfig = field(default_factory=ConceptConfig)
meta: Dict[str, Any] = field(default_factory=dict)
tags: List[str] = field(default_factory=list)
depends_on: DependsOn = field(default_factory=DependsOn)


# Type alias for concept resource
ConceptResource = Concept
198 changes: 198 additions & 0 deletions core/dbt/context/providers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import abc
import os
from copy import deepcopy
from dataclasses import dataclass
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -799,6 +800,183 @@ def resolve(self, target_name: str, target_package: Optional[str] = None) -> Met
return ResolvedMetricReference(target_metric, self.manifest)


# `cref` implementations.
@dataclass
class ConceptReference:
name: str
package: Optional[str] = None
columns: Optional[List[str]] = None

def __post_init__(self):
if self.columns is None:
self.columns = []


class BaseConceptResolver:
def __init__(
self,
db_wrapper: BaseDatabaseWrapper,
model: Resource,
config: RuntimeConfig,
manifest: Manifest,
) -> None:
self.db_wrapper = db_wrapper
self.model = model
self.config = config
self.manifest = manifest
self.current_project = config.project_name
self.Relation = db_wrapper.Relation

def __call__(
self, concept_name: str, columns: List[str], package: Optional[str] = None
) -> str:
"""Entry point for cref() calls from Jinja templates."""
return self.resolve(concept_name, columns, package)

@abc.abstractmethod
def resolve(self, concept_name: str, columns: List[str], package: Optional[str] = None) -> str:
"""Abstract method to resolve concept references."""
pass

def _repack_args(
self, name: str, package: Optional[str], columns: Optional[List[str]]
) -> ConceptReference:
return ConceptReference(name, package, columns)


class ParseConceptResolver(BaseConceptResolver):
def resolve(self, name: str, columns: List[str], package: Optional[str] = None) -> str:
from dbt.artifacts.resources import ConceptArgs

# During parsing, we just track the dependency and return a placeholder
concept_args = ConceptArgs(name=name, package=package, columns=columns)

# Only nodes that inherit from CompiledResource have the concepts attribute
if hasattr(self.model, "concepts"):
self.model.concepts.append(concept_args)

# Return a placeholder that will be replaced during compilation
return f"/* cref placeholder for {name} */"


class RuntimeConceptResolver(BaseConceptResolver):
def resolve(self, concept_name: str, columns: List[str], package: Optional[str] = None) -> str:
# Resolve the concept from the manifest
target_concept = self.manifest.resolve_concept(
concept_name,
package,
self.current_project,
self.model.package_name,
)

if target_concept is None:
raise TargetNotFoundError(
node=self.model,
target_name=concept_name,
target_kind="concept",
target_package=package,
)

# Generate the SQL for the concept reference
return self._generate_concept_sql(target_concept, columns)

def _generate_concept_sql(self, concept, requested_columns: List[str]) -> str:
"""Generate the SQL subquery for a concept reference."""

# Validate that all requested columns are available in the concept
available_columns = self._get_available_columns(concept)
for col in requested_columns:
if col not in available_columns:
raise CompilationError(
f"Column '{col}' is not available in concept '{concept.name}'. "
f"Available columns: {', '.join(sorted(available_columns.keys()))}"
)

# Determine which joins are needed based on requested columns
required_joins = self._determine_required_joins(concept, requested_columns)

# Build the SQL
sql_parts = []

# SELECT clause
select_columns = []
for col in requested_columns:
column_info = available_columns[col]
if column_info["source"] == "base":
select_columns.append(f"base.{col}")
else:
alias = column_info["alias"]
select_columns.append(f"{alias}.{col}")

sql_parts.append("SELECT")
sql_parts.append(" " + ",\n ".join(select_columns))

# FROM clause (base model)
base_ref = f"{{{{ref('{concept.base_model}')}}}}"
sql_parts.append(f"FROM {base_ref} AS base")

# JOIN clauses
for join in required_joins:
join_sql = self._generate_join_sql(join, concept)
sql_parts.append(join_sql)

return "(\n" + "\n".join(sql_parts) + "\n)"

def _get_available_columns(self, concept) -> Dict[str, Dict[str, str]]:
"""Get all available columns from the concept and its joins."""
available = {}

# Add base model columns
for col in concept.columns:
available[col.name] = {"source": "base", "alias": "base", "original_name": col.name}

# Add columns from joins
for join in concept.joins:
alias = join.alias or join.name
for col in join.columns:
available[col.name] = {"source": "join", "alias": alias, "original_name": col.name}

return available

def _determine_required_joins(self, concept, requested_columns: List[str]) -> List:
"""Determine which joins are needed for the requested columns."""
available_columns = self._get_available_columns(concept)
needed_joins = set()

for col in requested_columns:
column_info = available_columns[col]
if column_info["source"] == "join":
# Find the join that provides this column
for join in concept.joins:
alias = join.alias or join.name
if alias == column_info["alias"]:
needed_joins.add(id(join)) # Use id to ensure uniqueness
break

# Return the actual join objects
required_joins = []
for join in concept.joins:
if id(join) in needed_joins:
required_joins.append(join)

return required_joins

def _generate_join_sql(self, join, concept) -> str:
"""Generate SQL for a single join to a model."""
join_alias = join.alias or join.name
foreign_key = join.foreign_key or concept.primary_key

# Handle model references - joins only support models, not other concepts
if join.name.startswith("ref("):
# Direct model reference (e.g., "ref('stg_customers')")
join_ref = join.name
else:
# Model name that needs ref() wrapping (e.g., "stg_customers")
join_ref = f"{{{{ref('{join.name}')}}}}"

return f"LEFT JOIN {join_ref} AS {join_alias} ON base.{join.base_key} = {join_alias}.{foreign_key}"


# `var` implementations.
class ModelConfiguredVar(Var):
def __init__(
Expand Down Expand Up @@ -871,6 +1049,7 @@ class Provider(Protocol):
ref: Type[BaseRefResolver]
source: Type[BaseSourceResolver]
metric: Type[BaseMetricResolver]
cref: Type[BaseConceptResolver]


class ParseProvider(Provider):
Expand All @@ -881,6 +1060,7 @@ class ParseProvider(Provider):
ref = ParseRefResolver
source = ParseSourceResolver
metric = ParseMetricResolver
cref = ParseConceptResolver


class GenerateNameProvider(Provider):
Expand All @@ -891,6 +1071,7 @@ class GenerateNameProvider(Provider):
ref = ParseRefResolver
source = ParseSourceResolver
metric = ParseMetricResolver
cref = ParseConceptResolver


class RuntimeProvider(Provider):
Expand All @@ -901,6 +1082,7 @@ class RuntimeProvider(Provider):
ref = RuntimeRefResolver
source = RuntimeSourceResolver
metric = RuntimeMetricResolver
cref = RuntimeConceptResolver


class RuntimeUnitTestProvider(Provider):
Expand All @@ -911,6 +1093,7 @@ class RuntimeUnitTestProvider(Provider):
ref = RuntimeUnitTestRefResolver
source = RuntimeUnitTestSourceResolver
metric = RuntimeMetricResolver
cref = RuntimeConceptResolver


class OperationProvider(RuntimeProvider):
Expand Down Expand Up @@ -1153,6 +1336,21 @@ def source(self) -> Callable:
def metric(self) -> Callable:
return self.provider.metric(self.db_wrapper, self.model, self.config, self.manifest)

@contextproperty()
def cref(self) -> Callable:
"""The `cref()` function allows you to reference a concept and select
specific columns from it. A concept defines a base model and its
joinable features, allowing for dynamic SQL generation based on the
columns you request.

Usage:
select * from {{ cref('orders', ['order_id', 'customer_name', 'total_amount']) }}

This will generate a subquery that includes only the necessary joins
to provide the requested columns from the 'orders' concept.
"""
return self.provider.cref(self.db_wrapper, self.model, self.config, self.manifest)

@contextproperty("config")
def ctx_config(self) -> Config:
"""The `config` variable exists to handle end-user configuration for
Expand Down
1 change: 1 addition & 0 deletions core/dbt/contracts/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ class SchemaSourceFile(BaseSourceFile):
sources: List[str] = field(default_factory=list)
exposures: List[str] = field(default_factory=list)
metrics: List[str] = field(default_factory=list)
concepts: List[str] = field(default_factory=list)
snapshots: List[str] = field(default_factory=list)
# The following field will no longer be used. Leaving
# here to avoid breaking existing projects. To be removed
Expand Down
Loading
Loading