Skip to content

Commit 83efcaa

Browse files
authored
always push ECS task ARN to xcom in EcsRunTaskOperator (#33703)
* always push ECS task ARN to xcom in EcsRunTaskOperator
1 parent 36b06a7 commit 83efcaa

File tree

3 files changed

+13
-6
lines changed

3 files changed

+13
-6
lines changed

airflow/providers/amazon/aws/operators/ecs.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -543,6 +543,9 @@ def execute(self, context):
543543
# start the task except if we reattached to an existing one just before.
544544
self._start_task()
545545

546+
if self.do_xcom_push:
547+
self.xcom_push(context, key="ecs_task_arn", value=self.arn)
548+
546549
if self.deferrable:
547550
self.defer(
548551
trigger=TaskDoneTrigger(

tests/providers/amazon/aws/operators/test_ecs.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,7 @@ def test_template_fields_overrides(self):
265265
],
266266
],
267267
)
268+
@mock.patch.object(EcsRunTaskOperator, "xcom_push")
268269
@mock.patch.object(EcsRunTaskOperator, "_wait_for_task_ended")
269270
@mock.patch.object(EcsRunTaskOperator, "_check_success_task")
270271
@mock.patch.object(EcsBaseOperator, "client")
@@ -273,6 +274,7 @@ def test_execute_without_failures(
273274
client_mock,
274275
check_mock,
275276
wait_mock,
277+
xcom_mock,
276278
launch_type,
277279
capacity_provider_strategy,
278280
platform_version,
@@ -626,28 +628,31 @@ def test_reattach_save_task_arn_xcom(
626628
assert self.ecs.arn == f"arn:aws:ecs:us-east-1:012345678910:task/{TASK_ID}"
627629
assert "No active previously launched task found to reattach" in caplog.messages
628630

631+
@mock.patch.object(EcsRunTaskOperator, "xcom_push")
629632
@mock.patch.object(EcsBaseOperator, "client")
630633
@mock.patch("airflow.providers.amazon.aws.utils.task_log_fetcher.AwsTaskLogFetcher")
631-
def test_execute_xcom_with_log(self, log_fetcher_mock, client_mock):
634+
def test_execute_xcom_with_log(self, log_fetcher_mock, client_mock, xcom_mock):
632635
self.ecs.do_xcom_push = True
633636
self.ecs.task_log_fetcher = log_fetcher_mock
634637

635638
log_fetcher_mock.get_last_log_message.return_value = "Log output"
636639

637640
assert self.ecs.execute(None) == "Log output"
638641

642+
@mock.patch.object(EcsRunTaskOperator, "xcom_push")
639643
@mock.patch.object(EcsBaseOperator, "client")
640644
@mock.patch("airflow.providers.amazon.aws.utils.task_log_fetcher.AwsTaskLogFetcher")
641-
def test_execute_xcom_with_no_log(self, log_fetcher_mock, client_mock):
645+
def test_execute_xcom_with_no_log(self, log_fetcher_mock, client_mock, xcom_mock):
642646
self.ecs.do_xcom_push = True
643647
self.ecs.task_log_fetcher = log_fetcher_mock
644648

645649
log_fetcher_mock.get_last_log_message.return_value = None
646650

647651
assert self.ecs.execute(None) is None
648652

653+
@mock.patch.object(EcsRunTaskOperator, "xcom_push")
649654
@mock.patch.object(EcsBaseOperator, "client")
650-
def test_execute_xcom_with_no_log_fetcher(self, client_mock):
655+
def test_execute_xcom_with_no_log_fetcher(self, client_mock, xcom_mock):
651656
self.ecs.do_xcom_push = True
652657
assert self.ecs.execute(None) is None
653658

@@ -657,8 +662,9 @@ def test_execute_xcom_disabled(self, log_fetcher_mock, client_mock):
657662
self.ecs.do_xcom_push = False
658663
assert self.ecs.execute(None) is None
659664

665+
@mock.patch.object(EcsRunTaskOperator, "xcom_push")
660666
@mock.patch.object(EcsRunTaskOperator, "client")
661-
def test_with_defer(self, client_mock):
667+
def test_with_defer(self, client_mock, xcom_mock):
662668
self.ecs.deferrable = True
663669

664670
client_mock.run_task.return_value = RESPONSE_WITHOUT_FAILURES

tests/system/providers/amazon/aws/example_ecs_fargate.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,6 @@ def delete_cluster(cluster_name: str) -> None:
122122
"assignPublicIp": "ENABLED",
123123
},
124124
},
125-
# You must set `reattach=True` in order to get ecs_task_arn if you plan to use a Sensor.
126-
reattach=True,
127125
)
128126
# [END howto_operator_ecs]
129127

0 commit comments

Comments
 (0)