Skip to content

Map length determined incorrectly when mapping over an output of a multi-output task #51109

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
1 of 2 tasks
Dev-iL opened this issue May 27, 2025 · 0 comments
Open
1 of 2 tasks
Labels
area:core area:dynamic-task-mapping AIP-42 area:TaskGroup kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet

Comments

@Dev-iL
Copy link
Contributor

Dev-iL commented May 27, 2025

Apache Airflow version

2.11.0

If "Other Airflow 2 version" selected, which one?

No response

What happened?

The mapping length in expandinput is determined incorrectly to be the number of returns from a task, instead of the number of elements in the sequence we map on (which is one of the values returned by the aforementioned task).

Note how all_lengths is always 2 regardless of the contents of value:

Image

Image

Depending on the relative size of the determined and actual map size, the dag either reports the error below, works correctly, or ends up not processing the later elements.

.../.venv/lib64/python3.11/site-packages/airflow/models/expandinput.py", line 197, in _expand_mapped_field
    return value[found_index]
           ~~~~~^^^^^^^^^^^^^
IndexError: list index out of range

What you think should happen instead?

all_length should correctly reflect the number of elements we're mapping on.

How to reproduce

from __future__ import annotations

import pendulum
from airflow.decorators import task, task_group
from airflow.models.dag import DAG

TYPE_CHECKING: bool = False
if TYPE_CHECKING:
    from datetime import datetime
    from typing import Any


@task.python(multiple_outputs=True)
def determine_run_params() -> dict[str, Any]:
    return {
        "targets": [f"target{n}" for n in range(1)],  # <<<<<<<<<< Modify for different results
        "date": pendulum.now(),
        # "str1": "1",  # Uncomment these for different results
        # "str2": "2",
        # "str3": "3",
    }


@task.python
def do_something(
    target: str,
    date: datetime,
) -> str:
    return f"All good for {target} on {date}!"


@task_group
def process(
    date: datetime,
    target: str,
) -> None:
    do_something(target, date)


with DAG(
    "mcve",
    start_date=pendulum.datetime(2020, 1, 1),
) as dag:
    run_params = determine_run_params()
    process.partial(date=run_params["date"]).expand(target=run_params["targets"])


if __name__ == "__main__":
    dag.test()

Workaround

Introduce an intermediate passthrough task:

@task.python
def unpack(something: list[str]) -> list[str]:
    return something
- process.partial(date=run_params["date"]).expand(target=run_params["targets"])
+ process.partial(date=run_params["date"]).expand(target=unpack(run_params["targets"])) 

Operating System

Linux

Versions of Apache Airflow Providers

No response

Deployment

Other

Deployment details

No response

Anything else?

When the number of map elements (length of targets) is equal to the number of outputs from determine_run_params, the code works correctly. Perhaps Airflow's tests related to this functionality are hitting this exact sweet spot.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@Dev-iL Dev-iL added kind:bug This is a clearly a bug area:core needs-triage label for new issues that we didn't triage yet labels May 27, 2025
@Dev-iL Dev-iL changed the title Map length of task_group determined incorrectly Map length determined incorrectly when mapping over one of the outputs of a multi-output task May 27, 2025
@Dev-iL Dev-iL changed the title Map length determined incorrectly when mapping over one of the outputs of a multi-output task Map length determined incorrectly when mapping over an output of a multi-output task May 27, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core area:dynamic-task-mapping AIP-42 area:TaskGroup kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet
Projects
None yet
Development

No branches or pull requests

1 participant