Skip to content

Create metrics to track Scheduled->Queued->Running task state transition times #30612

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Apr 28, 2023

Conversation

vandonr-amz
Copy link
Contributor

@vandonr-amz vandonr-amz commented Apr 12, 2023

The goal of this PR is to provide metrics that are more insightful than the often-misunderstood and very wide "task landing time"

I made a little graph to illustrate the different timings we currently have access to:
image
the landing time in purple is only available in the airflow UI. In black are the metrics currently being sent, and I'm proposing to add the orange one in this PR (actually it's not adoption time and it'll be 2 metrics but I don't want to re-do the graph).

@ferruzzi
Copy link
Contributor

ferruzzi commented Apr 12, 2023

I want to suggest finding a home for that diagram in the docs, I really like that.

[EDIT] Just saw this brought up on the Slack server. +1

@jedcunningham
Copy link
Member

I think adding this metric is a good idea. However, that's not "adoption" time. Adoption refers to tasks a scheduler adopts from a no-longer-running scheduler.

It also may make sense to be more granular here and tract scheduled -> queued and queued -> running separately.

@potiuk
Copy link
Member

potiuk commented Apr 13, 2023

I want to suggest finding a home for that diagram in the docs, I really like that.

Big +1 (of course it should be "correct" version). As mentioned on slack I am advocating for more similar diagrams describing our code/architecture/decisions following the diagrams that can be generated automatically from sources with mermaid being at clear advantage as being supported by GitHub natively (See https://github.com/apache/airflow/blob/main/airflow/jobs/JOB_LIFECYCLE.md)

@ferruzzi
Copy link
Contributor

However, that's not "adoption" time. Adoption refers to tasks a scheduler adopts from a no-longer-running scheduler.

Sometimes it feels like naming things is the hardest part of the job. Do you have any suggestions?

@jedcunningham
Copy link
Member

Absolutely! I've been involved with discussions recently around caching and now naming. Makes me think of two things.

If we split them, maybe scheduled_duration and queued_duration?

@potiuk
Copy link
Member

potiuk commented Apr 13, 2023

Absolutely! I've been involved with discussions recently around caching and now naming. Makes me think of two things.

I think @jedcunningham you meant THREE things (or was it off-by-one?)

@vandonr-amz
Copy link
Contributor Author

I need to check, but I think splitting this into 2 metrics might make the job a tad harder...
I'm not really sure where the scheduled -> queued transition takes place, I need to insert some code there, and also "remember" when it happens and carry that date to where the "running" transition happen.
I was really happy to see that TaskInstance already saved the date where it was scheduled here 😬

@shubham22
Copy link

shubham22 commented Apr 14, 2023

I am personally inclined towards one metric. Users generally care about the lag between when there task was ready for execution and when the task actually started executing, not about time it took at each stage inside Airflow. We can have one metric task_execution_latency or task_start_delay that conveys the meaning without requiring knowledge about what is scheduled state and what is queued state. Not a strong opinion!

I know cost is not something that we like to talk about when deciding, but having 1 metric does reduce the cost incurred in half :)

@potiuk
Copy link
Member

potiuk commented Apr 14, 2023

Yeah. I think scheduled -> queued and queued -> RUNNING would both be useful as @jedcunningham mentioned. They indicate two different transitions and they might indicate different kind of problems.

BTW. One more thing that we should have is a nice description of "What might be the likely reason when you see this and that metrics high". It would be really great (of coure not now) to see not only production of the metrics and their description but also educated guess on what could be likely causes and where to look, even if we cannot have a good and perfect answer but some kind of "usually when this metrics is high it means that there are not enough workers etc. etc." . I think that would help not only the users, but also us - maintainers to be able to reason better on what's going on when we got queries from users.

@vandonr-amz
Copy link
Contributor Author

I'm working on splitting this in 2, and I have a question:
in case a state is skipped, for instance here:

self.log.debug("Sending %s to executor", ti)
# Skip scheduled state, we are executing immediately
ti.state = TaskInstanceState.QUEUED
ti.queued_by_job_id = self.job.id
ti.queued_dttm = timezone.utcnow()

should we emit a metric that the task spent 0s in scheduled state ? Or should we not emit at all ?
I'm taking the "no metric" approach for now, but I'm not convinced it's the best.

@vandonr-amz vandonr-amz changed the title Create a metric for task adoption time Create a metrics to track Scheduled->Queued->Running task state transition times Apr 21, 2023
@vandonr-amz vandonr-amz changed the title Create a metrics to track Scheduled->Queued->Running task state transition times Create metrics to track Scheduled->Queued->Running task state transition times Apr 21, 2023
@potiuk
Copy link
Member

potiuk commented Apr 22, 2023

No metrics would be my choice too.

Copy link
Contributor

@o-nikolas o-nikolas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, left a few nits 👍

return
timing = (timezone.utcnow() - self.start_date).total_seconds()
else:
raise NotImplementedError
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a string to the error that provides a bit more context and info. you can add which state was provided and which states are supported.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a bit more context, but it's an error for developers, not for customers, so I don't think too much info is needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a developer I appreciate helpful tracebacks 🙂

Comment on lines +1435 to +1436
Stats.timing(f"dag.{self.dag_id}.{self.task_id}.{metric_name}", timing)
Stats.timing(f"task.{metric_name}", timing, tags={"task_id": self.task_id, "dag_id": self.dag_id})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Contributor

@ferruzzi ferruzzi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a question out of curiosity, but no blocking requests.

Comment on lines +167 to +169
``dag.<dag_id>.<task_id>.duration`` Seconds taken to run a task
``dag.<dag_id>.<task_id>.scheduled_duration`` Seconds a task spends in the Scheduled state, before being Queued
``dag.<dag_id>.<task_id>.queued_duration`` Seconds a task spends in the Queued state, before being Running
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Non-blocking) Just curious, are these lines manual changes or from some automated change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added them manually, there is nothing enforcing sync between this doc and the code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, that was what I thought. I went looking for something like this when I started on the OTel work and didn't find it. That's buried pretty deep. 👍

@htpawel
Copy link
Contributor

htpawel commented Mar 6, 2024

@vandonr-amz @potiuk Hi,
Above change has two bugs. One is that Stats.timing expects milliseconds instead of seconds (PR with fix: 37936) and second one is that scheduled_duration metric does not work as self.start_date is always None at time of execution of this. It is because self.start_date is not corresponding to time of schedule (real start) but to actual running start (after scheduled and queued states), see comment here: 34771#discussion_r1432601056. I think something with naming convention should be done as it can be very misleading, and also we need some scheduled_dttm variable like stated in the comment.

@ferruzzi
Copy link
Contributor

ferruzzi commented Mar 6, 2024

@htpawel This is quite old, would you please open a new Issue with your bug report?

helperbot-recidiviz pushed a commit to Recidiviz/pulse-data that referenced this pull request May 29, 2024
…iviz/recidiviz-data#29712)

## Description of the change

Recidiviz/recidiviz-data#27852 upgraded to `composer-2.6.2-airflow-2.6.3` which failed during
the tf step and was reverted in Recidiviz/recidiviz-data#29696

The commits in this PR are as follows --
1. un-reverts Recidiviz/recidiviz-data#27852
2. upgrades to `composer-2.7.1-airflow-2.7.3`, copying requirements over
from [version
page](https://cloud.google.com/composer/docs/concepts/versioning/composer-versions#images)
3.  airflow pipenv lock workflow
4. removes a few imports and checks since
[Recidiviz/recidiviz-data#32731](apache/airflow#32731) opened by our
very own @ohaibbq is included in 2.7.1

read through the [release
notes](https://airflow.apache.org/docs/apache-airflow/stable/release_notes.html#airflow-2-7-0-2023-08-18),
nothing in our usage seems to be deprecated; there are a few things that
might be of interest to folks:
- _Create metrics to track Scheduled->Queued->Running task state
transition times
([Recidiviz/recidiviz-data#30612](apache/airflow#30612 will be nice
for us to be able to see this re: k8s spin up time
- _Add OpenTelemetry to Airflow
([AIP-49](https://github.com/apache/airflow/pulls?q=is%3Apr+is%3Amerged+label%3AAIP-49+milestone%3A%22Airflow+2.7.0%22))_
- _The trigger UI form is skipped in web UI if no parameters are defined
in a DAG ([Recidiviz/recidiviz-data#33351](apache/airflow#33351: not
sure if folks run ad-hoc dags using the UI instead of
`trigger_state_specific_calculation/ingest_dag` but we need to add
`show_trigger_form_if_no_params` if we want to still be able to trigger
via the UI

would love a second set of eyes around these warning logs that occur
after tests run, see
[here](https://github.com/Recidiviz/recidiviz-data/actions/runs/9071731702/job/24926024866?pr=29712#step:6:53).
also occurred locally but i couldn't isolate the cause of the issue
```
Traceback (most recent call last):
  File "/opt/hostedtoolcache/Python/3.11.8/x64/lib/python3.11/logging/__init__.py", line 1113, in emit
    stream.write(msg + self.terminator)
ValueError: I/O operation on closed file.
Call stack:
  File "/home/runner/.local/share/virtualenvs/airflow-jCIRjM3K/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 509, in <lambda>
    and _finalize_fairy(
  File "/home/runner/.local/share/virtualenvs/airflow-jCIRjM3K/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 791, in _finalize_fairy
    pool.logger.error(
Message: 'Exception during reset or similar'
Arguments: ()
--- Logging error ---
Traceback (most recent call last):
  File "/home/runner/.local/share/virtualenvs/airflow-jCIRjM3K/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 763, in _finalize_fairy
    fairy._reset(pool, transaction_was_reset)
  File "/home/runner/.local/share/virtualenvs/airflow-jCIRjM3K/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 1038, in _reset
    pool._dialect.do_rollback(self)
  File "/home/runner/.local/share/virtualenvs/airflow-jCIRjM3K/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 683, in do_rollback
    dbapi_connection.rollback()
psycopg2.OperationalError: server closed the connection unexpectedly
	This probably means the server terminated abnormally
	before or while processing the request.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/hostedtoolcache/Python/3.11.8/x64/lib/python3.11/logging/__init__.py", line 1113, in emit
    stream.write(msg + self.terminator)
ValueError: I/O operation on closed file.
Call stack:
  File "/home/runner/.local/share/virtualenvs/airflow-jCIRjM3K/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 509, in <lambda>
    and _finalize_fairy(
  File "/home/runner/.local/share/virtualenvs/airflow-jCIRjM3K/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 791, in _finalize_fairy
    pool.logger.error(
Message: 'Exception during reset or similar'
Arguments: ()
```

## Type of change

> All pull requests must have at least one of the following labels
applied (otherwise the PR will fail):

| Label | Description |
|-----------------------------
|-----------------------------------------------------------------------------------------------------------
|
| Type: Bug | non-breaking change that fixes an issue |
| Type: Feature | non-breaking change that adds functionality |
| Type: Breaking Change | fix or feature that would cause existing
functionality to not work as expected |
| Type: Non-breaking refactor | change addresses some tech debt item or
prepares for a later change, but does not change functionality |
| Type: Configuration Change | adjusts configuration to achieve some end
related to functionality, development, performance, or security |
| Type: Dependency Upgrade | upgrades a project dependency - these
changes are not included in release notes |

## Related issues

Closes Recidiviz/recidiviz-data#29554

## Checklists

### Development

**This box MUST be checked by the submitter prior to merging**:
- [x] **Double- and triple-checked that there is no Personally
Identifiable Information (PII) being mistakenly added in this pull
request**

These boxes should be checked by the submitter prior to merging:
- [ ] Tests have been written to cover the code changed/added as part of
this pull request

### Code review

These boxes should be checked by reviewers prior to merging:

- [x] This pull request has a descriptive title and information useful
to a reviewer
- [x] Potential security implications or infrastructural changes have
been considered, if relevant

---------

Co-authored-by: Helper Bot <[email protected]>
GitOrigin-RevId: 355d050265f490df1779ff773533f5aefd41de6b
helperbot-recidiviz pushed a commit to Recidiviz/pulse-data that referenced this pull request Feb 27, 2025
…iviz/recidiviz-data#29712)

## Description of the change

Recidiviz/recidiviz-data#27852 upgraded to `composer-2.6.2-airflow-2.6.3` which failed during
the tf step and was reverted in Recidiviz/recidiviz-data#29696

The commits in this PR are as follows --
1. un-reverts Recidiviz/recidiviz-data#27852
2. upgrades to `composer-2.7.1-airflow-2.7.3`, copying requirements over
from [version
page](https://cloud.google.com/composer/docs/concepts/versioning/composer-versions#images)
3.  airflow pipenv lock workflow
4. removes a few imports and checks since
[Recidiviz/recidiviz-data#32731](apache/airflow#32731) opened by our
very own @ohaibbq is included in 2.7.1

read through the [release
notes](https://airflow.apache.org/docs/apache-airflow/stable/release_notes.html#airflow-2-7-0-2023-08-18),
nothing in our usage seems to be deprecated; there are a few things that
might be of interest to folks:
- _Create metrics to track Scheduled->Queued->Running task state
transition times
([Recidiviz/recidiviz-data#30612](apache/airflow#30612 will be nice
for us to be able to see this re: k8s spin up time
- _Add OpenTelemetry to Airflow
([AIP-49](https://github.com/apache/airflow/pulls?q=is%3Apr+is%3Amerged+label%3AAIP-49+milestone%3A%22Airflow+2.7.0%22))_
- _The trigger UI form is skipped in web UI if no parameters are defined
in a DAG ([Recidiviz/recidiviz-data#33351](apache/airflow#33351: not
sure if folks run ad-hoc dags using the UI instead of
`trigger_state_specific_calculation/ingest_dag` but we need to add
`show_trigger_form_if_no_params` if we want to still be able to trigger
via the UI

would love a second set of eyes around these warning logs that occur
after tests run, see
[here](https://github.com/Recidiviz/recidiviz-data/actions/runs/9071731702/job/24926024866?pr=29712#step:6:53).
also occurred locally but i couldn't isolate the cause of the issue
```
Traceback (most recent call last):
  File "/opt/hostedtoolcache/Python/3.11.8/x64/lib/python3.11/logging/__init__.py", line 1113, in emit
    stream.write(msg + self.terminator)
ValueError: I/O operation on closed file.
Call stack:
  File "/home/runner/.local/share/virtualenvs/airflow-jCIRjM3K/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 509, in <lambda>
    and _finalize_fairy(
  File "/home/runner/.local/share/virtualenvs/airflow-jCIRjM3K/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 791, in _finalize_fairy
    pool.logger.error(
Message: 'Exception during reset or similar'
Arguments: ()
--- Logging error ---
Traceback (most recent call last):
  File "/home/runner/.local/share/virtualenvs/airflow-jCIRjM3K/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 763, in _finalize_fairy
    fairy._reset(pool, transaction_was_reset)
  File "/home/runner/.local/share/virtualenvs/airflow-jCIRjM3K/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 1038, in _reset
    pool._dialect.do_rollback(self)
  File "/home/runner/.local/share/virtualenvs/airflow-jCIRjM3K/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 683, in do_rollback
    dbapi_connection.rollback()
psycopg2.OperationalError: server closed the connection unexpectedly
	This probably means the server terminated abnormally
	before or while processing the request.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/hostedtoolcache/Python/3.11.8/x64/lib/python3.11/logging/__init__.py", line 1113, in emit
    stream.write(msg + self.terminator)
ValueError: I/O operation on closed file.
Call stack:
  File "/home/runner/.local/share/virtualenvs/airflow-jCIRjM3K/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 509, in <lambda>
    and _finalize_fairy(
  File "/home/runner/.local/share/virtualenvs/airflow-jCIRjM3K/lib/python3.11/site-packages/sqlalchemy/pool/base.py", line 791, in _finalize_fairy
    pool.logger.error(
Message: 'Exception during reset or similar'
Arguments: ()
```

## Type of change

> All pull requests must have at least one of the following labels
applied (otherwise the PR will fail):

| Label | Description |
|-----------------------------
|-----------------------------------------------------------------------------------------------------------
|
| Type: Bug | non-breaking change that fixes an issue |
| Type: Feature | non-breaking change that adds functionality |
| Type: Breaking Change | fix or feature that would cause existing
functionality to not work as expected |
| Type: Non-breaking refactor | change addresses some tech debt item or
prepares for a later change, but does not change functionality |
| Type: Configuration Change | adjusts configuration to achieve some end
related to functionality, development, performance, or security |
| Type: Dependency Upgrade | upgrades a project dependency - these
changes are not included in release notes |

## Related issues

Closes Recidiviz/recidiviz-data#29554

## Checklists

### Development

**This box MUST be checked by the submitter prior to merging**:
- [x] **Double- and triple-checked that there is no Personally
Identifiable Information (PII) being mistakenly added in this pull
request**

These boxes should be checked by the submitter prior to merging:
- [ ] Tests have been written to cover the code changed/added as part of
this pull request

### Code review

These boxes should be checked by reviewers prior to merging:

- [x] This pull request has a descriptive title and information useful
to a reviewer
- [x] Potential security implications or infrastructural changes have
been considered, if relevant

---------

Co-authored-by: Helper Bot <[email protected]>
GitOrigin-RevId: 355d050265f490df1779ff773533f5aefd41de6b
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants