Skip to content

Commit 6130993

Browse files
authored
Fix KubernetesPodOperator duplicating logs when interrupted (#33500)
1 parent 79b8cfc commit 6130993

File tree

2 files changed

+19
-3
lines changed

2 files changed

+19
-3
lines changed

airflow/providers/cncf/kubernetes/utils/pod_manager.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,7 @@ def consume_logs(
398398
399399
Returns the last timestamp observed in logs.
400400
"""
401-
timestamp = None
401+
last_captured_timestamp = None
402402
try:
403403
logs = self.read_pod_logs(
404404
pod=pod,
@@ -412,7 +412,9 @@ def consume_logs(
412412
)
413413
for raw_line in logs:
414414
line = raw_line.decode("utf-8", errors="backslashreplace")
415-
timestamp, message = self.parse_log_line(line)
415+
line_timestamp, message = self.parse_log_line(line)
416+
if line_timestamp is not None:
417+
last_captured_timestamp = line_timestamp
416418
self.log.info("[%s] %s", container_name, message)
417419
except BaseHTTPError as e:
418420
self.log.warning(
@@ -426,7 +428,7 @@ def consume_logs(
426428
pod.metadata.name,
427429
exc_info=True,
428430
)
429-
return timestamp or since_time
431+
return last_captured_timestamp or since_time
430432

431433
# note: `read_pod_logs` follows the logs, so we shouldn't necessarily *need* to
432434
# loop as we do here. But in a long-running process we might temporarily lose connectivity.

tests/providers/cncf/kubernetes/utils/test_pod_manager.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from datetime import datetime
2121
from json.decoder import JSONDecodeError
2222
from types import SimpleNamespace
23+
from typing import cast
2324
from unittest import mock
2425
from unittest.mock import MagicMock
2526

@@ -254,6 +255,19 @@ def test_parse_log_line(self):
254255
assert timestamp == pendulum.parse(real_timestamp)
255256
assert line == log_message
256257

258+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
259+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.read_pod_logs")
260+
def test_fetch_container_logs_returning_last_timestamp(
261+
self, mock_read_pod_logs, mock_container_is_running
262+
):
263+
timestamp_string = "2020-10-08T14:16:17.793417674Z"
264+
mock_read_pod_logs.return_value = [bytes(f"{timestamp_string} message", "utf-8"), b"notimestamp"]
265+
mock_container_is_running.side_effect = [True, False]
266+
267+
status = self.pod_manager.fetch_container_logs(mock.MagicMock(), mock.MagicMock(), follow=True)
268+
269+
assert status.last_log_time == cast(DateTime, pendulum.parse(timestamp_string))
270+
257271
def test_parse_invalid_log_line(self, caplog):
258272
with caplog.at_level(logging.INFO):
259273
self.pod_manager.parse_log_line("2020-10-08T14:16:17.793417674ZInvalidmessage\n")

0 commit comments

Comments
 (0)