Skip to content

Commit 73627fd

Browse files
authored
Avoid scheduler crash with passing executor_config with CeleryExecutor (apache#47375)
* Scheduler should not crash when executor_config is provided with CE * removing bad test
1 parent 5a5bae8 commit 73627fd

File tree

2 files changed

+34
-2
lines changed

2 files changed

+34
-2
lines changed

providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ def send_task_to_executor(
263263
if AIRFLOW_V_3_0_PLUS:
264264
if TYPE_CHECKING:
265265
assert isinstance(args, workloads.BaseWorkload)
266-
args = (args.model_dump_json(),)
266+
args = (args.model_dump_json(exclude={"ti": {"executor_config"}}),)
267267
try:
268268
with timeout(seconds=OPERATION_TIMEOUT):
269269
result = task_to_run.apply_async(args=args, queue=queue)

tests/integration/executors/test_celery_executor.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
from celery.backends.database import DatabaseBackend
3737
from celery.contrib.testing.worker import start_worker
3838
from kombu.asynchronous import set_event_loop
39+
from kubernetes.client import models as k8s
3940

4041
from airflow.configuration import conf
4142
from airflow.exceptions import AirflowException, AirflowTaskTimeout
@@ -137,7 +138,37 @@ def _change_state(self, key: TaskInstanceKey, state: TaskInstanceState, info=Non
137138

138139
@pytest.mark.flaky(reruns=3)
139140
@pytest.mark.parametrize("broker_url", _prepare_test_bodies())
140-
def test_celery_integration(self, broker_url):
141+
@pytest.mark.parametrize(
142+
"executor_config",
143+
[
144+
pytest.param({}, id="no_executor_config"),
145+
pytest.param(
146+
{
147+
"pod_override": k8s.V1Pod(
148+
spec=k8s.V1PodSpec(
149+
containers=[
150+
k8s.V1Container(
151+
name="base",
152+
resources=k8s.V1ResourceRequirements(
153+
requests={
154+
"cpu": "100m",
155+
"memory": "384Mi",
156+
},
157+
limits={
158+
"cpu": 1,
159+
"memory": "500Mi",
160+
},
161+
),
162+
)
163+
]
164+
)
165+
)
166+
},
167+
id="pod_override_executor_config",
168+
),
169+
],
170+
)
171+
def test_celery_integration(self, broker_url, executor_config):
141172
from airflow.providers.celery.executors import celery_executor, celery_executor_utils
142173

143174
def fake_execute_workload(command):
@@ -157,6 +188,7 @@ def fake_execute_workload(command):
157188
try_number=0,
158189
priority_weight=1,
159190
queue=celery_executor_utils.celery_configuration["task_default_queue"],
191+
executor_config=executor_config,
160192
)
161193
keys = [
162194
TaskInstanceKey("id", "success", "abc", 0, -1),

0 commit comments

Comments
 (0)