Skip to content

Commit 7963360

Browse files
Add a check for not templateable fields (#29821)
1 parent 6d2face commit 7963360

File tree

2 files changed

+23
-1
lines changed

2 files changed

+23
-1
lines changed

airflow/serialization/serialized_objects.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import collections.abc
2121
import datetime
2222
import enum
23+
import inspect
2324
import logging
2425
import warnings
2526
import weakref
@@ -702,6 +703,7 @@ def __init__(self, *args, **kwargs):
702703
def task_type(self) -> str:
703704
# Overwrites task_type of BaseOperator to use _task_type instead of
704705
# __class__.__name__.
706+
705707
return self._task_type
706708

707709
@task_type.setter
@@ -770,8 +772,12 @@ def _serialize_node(cls, op: BaseOperator | MappedOperator, include_deps: bool)
770772

771773
# Store all template_fields as they are if there are JSON Serializable
772774
# If not, store them as strings
775+
# And raise an exception if the field is not templateable
776+
forbidden_fields = set(inspect.signature(BaseOperator.__init__).parameters.keys())
773777
if op.template_fields:
774778
for template_field in op.template_fields:
779+
if template_field in forbidden_fields:
780+
raise AirflowException(f"Cannot template BaseOperator fields: {template_field}")
775781
value = getattr(op, template_field, None)
776782
if not cls._is_excluded(value, template_field, op):
777783
serialize_op[template_field] = serialize_template_field(value)

tests/serialization/test_dag_serialization.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838

3939
import airflow
4040
from airflow.datasets import Dataset
41-
from airflow.exceptions import SerializationError
41+
from airflow.exceptions import AirflowException, SerializationError
4242
from airflow.hooks.base import BaseHook
4343
from airflow.kubernetes.pod_generator import PodGenerator
4444
from airflow.models import DAG, Connection, DagBag, Operator
@@ -2016,6 +2016,22 @@ def test_params_serialize_default(self):
20162016
assert param.description == "hello"
20172017
assert param.schema == {"type": "string"}
20182018

2019+
def test_not_templateable_fields_in_serialized_dag(
2020+
self,
2021+
):
2022+
"""
2023+
Test that when we use not templateable fields, an Airflow exception is raised.
2024+
"""
2025+
2026+
class TestOperator(BaseOperator):
2027+
template_fields = ("execution_timeout",)
2028+
2029+
dag = DAG("test_not_templateable_fields", start_date=datetime(2019, 8, 1))
2030+
with dag:
2031+
TestOperator(task_id="test", execution_timeout=timedelta(seconds=10))
2032+
with pytest.raises(AirflowException, match="Cannot template BaseOperator fields: execution_timeout"):
2033+
SerializedDAG.to_dict(dag)
2034+
20192035

20202036
def test_kubernetes_optional():
20212037
"""Serialisation / deserialisation continues to work without kubernetes installed"""

0 commit comments

Comments
 (0)