Skip to content

Change the storage of frame to use threadLocal rather than Dict #21993

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 4, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 35 additions & 12 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@
import os
import pickle
import signal
import threading
import warnings
from collections import defaultdict
from datetime import datetime, timedelta
from functools import partial
from inspect import currentframe
from tempfile import NamedTemporaryFile
from types import FrameType
from types import TracebackType
from typing import (
IO,
TYPE_CHECKING,
Expand All @@ -45,7 +46,6 @@
Union,
)
from urllib.parse import quote
from weakref import WeakKeyDictionary

import dill
import jinja2
Expand Down Expand Up @@ -129,7 +129,7 @@
from airflow.models.dagrun import DagRun
from airflow.models.operator import Operator

_EXECUTION_FRAME_MAPPING: "WeakKeyDictionary[Operator, FrameType]" = WeakKeyDictionary()
_TASK_EXECUTION_FRAME_LOCAL_STORAGE = threading.local()


@contextlib.contextmanager
Expand Down Expand Up @@ -1537,7 +1537,7 @@ def _execute_task(self, context, task_copy):
else:
result = execute_callable(context=context)
except: # noqa: E722
_EXECUTION_FRAME_MAPPING[task_copy] = currentframe()
_TASK_EXECUTION_FRAME_LOCAL_STORAGE.frame = currentframe()
raise
# If the task returns a result, push an XCom containing it
if task_copy.do_xcom_push and result is not None:
Expand Down Expand Up @@ -1731,6 +1731,36 @@ def _handle_reschedule(
session.commit()
self.log.info('Rescheduling task, marking task as UP_FOR_RESCHEDULE')

def get_truncated_error_traceback(self, error: BaseException) -> Optional[TracebackType]:
"""
Returns truncated error traceback.

This method returns traceback of the error truncated to the
frame saved by earlier try/except along the way. If the frame
is found, the traceback will be truncated to below the frame.

:param error: exception to get traceback from
:return: traceback to print
"""
tb = error.__traceback__
try:
execution_frame = _TASK_EXECUTION_FRAME_LOCAL_STORAGE.frame
except AttributeError:
self.log.warning(
"We expected to get frame set in local storage but it was not."
" Please report this as an issue with full logs"
" at https://github.com/apache/airflow/issues/new",
exc_info=True,
)
return tb
_TASK_EXECUTION_FRAME_LOCAL_STORAGE.frame = None
while tb is not None:
if tb.tb_frame is execution_frame:
tb = tb.tb_next
break
tb = tb.tb_next
return tb or error.__traceback__

@provide_session
def handle_failure(
self,
Expand All @@ -1746,14 +1776,7 @@ def handle_failure(

if error:
if isinstance(error, BaseException):
execution_frame = _EXECUTION_FRAME_MAPPING.get(self.task)
tb = error.__traceback__
while tb is not None:
if tb.tb_frame is execution_frame:
tb = tb.tb_next
break
tb = tb.tb_next
tb = tb or error.__traceback__
tb = self.get_truncated_error_traceback(error)
self.log.error("Task failed with exception", exc_info=(type(error), error, tb))
else:
self.log.error("%s", error)
Expand Down