Skip to content

Let users disable datasets renaming for namespaced pipelines #4700

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 21, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 39 additions & 13 deletions kedro/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ def __init__( # noqa: PLR0913
parameters: str | set[str] | dict[str, str] | None = None,
tags: str | Iterable[str] | None = None,
namespace: str | None = None,
rename: bool = True,
):
"""Initialise ``Pipeline`` with a list of ``Node`` instances.

Expand Down Expand Up @@ -232,6 +233,7 @@ def __init__( # noqa: PLR0913
parameters=parameters,
tags=tags,
namespace=namespace,
rename=rename,
)

if nodes is None:
Expand Down Expand Up @@ -1015,7 +1017,9 @@ def to_json(self) -> str:

return json.dumps(pipeline_versioned)

def _rename(self, name: str, mapping: dict, namespace: str | None) -> str:
def _rename(
self, name: str, mapping: dict, namespace: str | None, rename: bool
) -> str:
def _prefix_dataset(name: str) -> str:
return f"{namespace}.{name}"

Expand All @@ -1040,12 +1044,22 @@ def _map_transcode_base(name: str) -> str:
(_is_all_parameters, lambda n: n),
# if transcode base is mapped to a new name, update with new base
(_is_transcode_base_in_mapping, _map_transcode_base),
# if name refers to a single parameter and a namespace is given, apply prefix
(lambda n: bool(namespace) and _is_single_parameter(n), _prefix_param),
# if namespace given for a dataset, prefix name using that namespace
(lambda n: bool(namespace), _prefix_dataset),
]

# Add rules for prefixing only if rename is True
if rename:
rules.extend(
[
# if name refers to a single parameter and a namespace is given, apply prefix
(
lambda n: bool(namespace) and _is_single_parameter(n),
_prefix_param,
),
# if namespace given for a dataset, prefix name using that namespace
(lambda n: bool(namespace), _prefix_dataset),
]
)

for predicate, processor in rules:
if predicate(name): # type: ignore[no-untyped-call]
processor_name: str = processor(name) # type: ignore[no-untyped-call]
Expand All @@ -1059,33 +1073,42 @@ def _process_dataset_names(
datasets: str | list[str] | dict[str, str] | None,
mapping: dict,
namespace: str | None,
rename: bool = True,
) -> str | list[str] | dict[str, str] | None:
if datasets is None:
return None
if isinstance(datasets, str):
return self._rename(datasets, mapping, namespace)
return self._rename(datasets, mapping, namespace, rename)
if isinstance(datasets, list):
return [self._rename(name, mapping, namespace) for name in datasets]
return [self._rename(name, mapping, namespace, rename) for name in datasets]
if isinstance(datasets, dict):
return {
key: self._rename(value, mapping, namespace)
key: self._rename(value, mapping, namespace, rename)
for key, value in datasets.items()
}
raise ValueError(
f"Unexpected input {datasets} of type {type(datasets)}"
) # pragma: no cover

def _copy_node(self, node: Node, mapping: dict, namespace: str | None) -> Node:
def _copy_node(
self, node: Node, mapping: dict, namespace: str | None, rename: bool = True
) -> Node:
new_namespace = node.namespace
if namespace:
new_namespace = (
f"{namespace}.{node.namespace}" if node.namespace else namespace
)
return node._copy(
inputs=self._process_dataset_names(node._inputs, mapping, namespace),
outputs=self._process_dataset_names(node._outputs, mapping, namespace),
inputs=self._process_dataset_names(
node._inputs, mapping, namespace, rename
),
outputs=self._process_dataset_names(
node._outputs, mapping, namespace, rename
),
namespace=new_namespace,
confirms=self._process_dataset_names(node._confirms, mapping, namespace),
confirms=self._process_dataset_names(
node._confirms, mapping, namespace, rename
),
)

def _map_nodes( # noqa: PLR0913
Expand All @@ -1096,6 +1119,7 @@ def _map_nodes( # noqa: PLR0913
parameters: str | set[str] | dict[str, str] | None = None,
tags: str | Iterable[str] | None = None,
namespace: str | None = None,
rename: bool = True,
) -> list[Node]:
"""Map namespace to the inputs, outputs, parameters and nodes of the pipeline."""
if isinstance(pipe, Pipeline):
Expand All @@ -1112,7 +1136,7 @@ def _map_nodes( # noqa: PLR0913
_validate_inputs_outputs(inputs.keys(), outputs.keys(), pipe)

mapping = {**inputs, **outputs, **parameters}
new_nodes = [self._copy_node(n, mapping, namespace) for n in pipe.nodes]
new_nodes = [self._copy_node(n, mapping, namespace, rename) for n in pipe.nodes]
return new_nodes


Expand All @@ -1124,6 +1148,7 @@ def pipeline( # noqa: PLR0913
parameters: str | set[str] | dict[str, str] | None = None,
tags: str | Iterable[str] | None = None,
namespace: str | None = None,
rename: bool = True,
) -> Pipeline:
r"""Create a ``Pipeline`` from a collection of nodes and/or ``Pipeline``\s.

Expand Down Expand Up @@ -1177,6 +1202,7 @@ def pipeline( # noqa: PLR0913
parameters=parameters,
tags=tags,
namespace=namespace,
rename=rename,
)


Expand Down