Skip to content

Commit 1856d9c

Browse files
committed
Reimplement within the registry framework
1 parent c373b5d commit 1856d9c

21 files changed

+1291
-1185
lines changed

dag_example_module.png

-42 Bytes
Loading

docs/reference/decorators/with_columns.rst

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,27 +2,17 @@
22
with_columns
33
=======================
44

5-
Pandas
6-
--------------
5+
Pandas and Polars
6+
-----------------------
77

8-
We have a ``with_columns`` option to run operations on columns of a Pandas dataframe and append the results as new columns.
8+
We have a ``with_columns`` option to run operations on columns of a Pandas / Polars dataframe and append the results as new columns.
99

1010
**Reference Documentation**
1111

12-
.. autoclass:: hamilton.plugins.h_pandas.with_columns
12+
.. autoclass:: hamilton.function_modifiers.with_columns
1313
:special-members: __init__
1414

1515

16-
Polars
17-
--------------
18-
19-
We have a ``with_columns`` decorator to run operations on columns of a Polars dataframe or lazyframe and append the results as new columns.
20-
21-
**Reference Documentation**
22-
23-
.. autoclass:: hamilton.plugins.h_polars.with_columns
24-
:special-members: __init__
25-
2616
PySpark
2717
--------------
2818

examples/pandas/with_columns/notebook.ipynb

Lines changed: 311 additions & 322 deletions
Large diffs are not rendered by default.

examples/polars/with_columns/notebook.ipynb

Lines changed: 645 additions & 649 deletions
Large diffs are not rendered by default.

hamilton/function_modifiers/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888

8989
subdag = recursive.subdag
9090
parameterized_subdag = recursive.parameterized_subdag
91+
with_columns = recursive.with_columns
9192

9293
# resolve/meta stuff -- power user features
9394

hamilton/function_modifiers/recursive.py

Lines changed: 126 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,6 @@
1414
else:
1515
from typing import NotRequired
1616

17-
from pandas import DataFrame as PandasDataFrame
18-
from polars import DataFrame as PolarsDataFrame
19-
from polars import LazyFrame as PolarsLazyFrame
20-
2117
# Copied this over from function_graph
2218
# TODO -- determine the best place to put this code
2319
from hamilton import graph_utils, node, registry
@@ -635,24 +631,96 @@ def prune_nodes(nodes: List[node.Node], select: Optional[List[str]] = None) -> L
635631
return output
636632

637633

638-
SUPPORTED_DATAFAME_TYPES = [PandasDataFrame, PolarsDataFrame, PolarsLazyFrame]
639-
640-
641-
class with_columns_factory(base.NodeInjector, abc.ABC):
642-
"""Performs with_columns operation on a dataframe. This is a special case of NodeInjector
643-
that applies only to dataframes. For now can be used with:
634+
class with_columns(base.NodeInjector, abc.ABC):
635+
"""Performs with_columns operation on a dataframe. This is used when you want to extract some
636+
columns out of the dataframe, perform operations on them and then append to the original dataframe.
637+
For now can be used with:
644638
645639
- Pandas
646640
- Polars
647641
648-
This is used when you want to extract some columns out of the dataframe, perform operations
649-
on them and then append to the original dataframe.
650642
651-
def processed_data(data: pd.DataFrame) -> pd.DataFrame:
643+
644+
Here's an example of calling it on a pandas dataframe -- if you've seen ``@subdag``, you should be familiar with
645+
the concepts:
646+
647+
.. code-block:: python
648+
649+
# my_module.py
650+
def a(a_from_df: pd.Series) -> pd.Series:
651+
return _process(a)
652+
653+
def b(b_from_df: pd.Series) -> pd.Series:
654+
return _process(b)
655+
656+
def a_b_average(a_from_df: pd.Series, b_from_df: pd.Series) -> pd.Series:
657+
return (a_from_df + b_from_df) / 2
658+
659+
660+
.. code-block:: python
661+
662+
# with_columns_module.py
663+
def a_plus_b(a: pd.Series, b: pd.Series) -> pd.Series:
664+
return a + b
665+
666+
667+
# the with_columns call
668+
@with_columns(
669+
*[my_module], # Load from any module
670+
*[a_plus_b], # or list operations directly
671+
columns_to_pass=["a_from_df", "b_from_df"], # The columns to pass from the dataframe to
672+
# the subdag
673+
select=["a", "b", "a_plus_b", "a_b_average"], # The columns to select from the dataframe
674+
)
675+
def final_df(initial_df: pd.DataFrame) -> pd.DataFrame:
676+
# process, or just return unprocessed
652677
...
653678
654-
In this case we would build a subdag out of the node ``data`` and append selected nodes back to
655-
the original dataframe before feeding it into ``processed_data``.
679+
In this instance the ``initial_df`` would get two columns added: ``a_plus_b`` and ``a_b_average``.
680+
681+
The operations are applied in topological order. This allows you to
682+
express the operations individually, making it easy to unit-test and reuse.
683+
684+
Note that the operation is "append", meaning that the columns that are selected are appended
685+
onto the dataframe.
686+
687+
If the function takes multiple dataframes, the dataframe input to process will always be
688+
the first argument. This will be passed to the subdag, transformed, and passed back to the function.
689+
This follows the hamilton rule of reference by parameter name. To demonstarte this, in the code
690+
above, the dataframe that is passed to the subdag is `initial_df`. That is transformed
691+
by the subdag, and then returned as the final dataframe.
692+
693+
You can read it as:
694+
695+
"final_df is a function that transforms the upstream dataframe initial_df, running the transformations
696+
from my_module. It starts with the columns a_from_df and b_from_df, and then adds the columns
697+
a, b, and a_plus_b to the dataframe. It then returns the dataframe, and does some processing on it."
698+
699+
In case you need more flexibility you can alternatively use ``pass_dataframe_as``, for example,
700+
701+
.. code-block:: python
702+
703+
# with_columns_module.py
704+
def a_from_df(initial_df: pd.Series) -> pd.Series:
705+
return initial_df["a_from_df"] / 100
706+
707+
def b_from_df(initial_df: pd.Series) -> pd.Series:
708+
return initial_df["b_from_df"] / 100
709+
710+
711+
# the with_columns call
712+
@with_columns(
713+
*[my_module],
714+
*[a_from_df],
715+
columns_to_pass=["a_from_df", "b_from_df"],
716+
select=["a_from_df", "b_from_df", "a", "b", "a_plus_b", "a_b_average"],
717+
)
718+
def final_df(initial_df: pd.DataFrame) -> pd.DataFrame:
719+
# process, or just return unprocessed
720+
...
721+
722+
the above would output a dataframe where the two columns ``a_from_df`` and ``b_from_df`` get
723+
overwritten.
656724
"""
657725

658726
# TODO: if we rename the column nodes into something smarter this can be avoided and
@@ -674,14 +742,6 @@ def _check_for_duplicates(nodes_: List[node.Node]) -> bool:
674742
return True
675743
return False
676744

677-
def validate_dataframe_type(self):
678-
if not set(self.allowed_dataframe_types).issubset(list(SUPPORTED_DATAFAME_TYPES)):
679-
raise InvalidDecoratorException(
680-
f"The provided dataframe types: {self.allowed_dataframe_types} are currently not supported "
681-
"to be used in `with_columns`. Please reach out if you need it. "
682-
f"We currently only support: {SUPPORTED_DATAFAME_TYPES}."
683-
)
684-
685745
def __init__(
686746
self,
687747
*load_from: Union[Callable, ModuleType],
@@ -690,7 +750,6 @@ def __init__(
690750
select: List[str] = None,
691751
namespace: str = None,
692752
config_required: List[str] = None,
693-
dataframe_types: Collection[Type] = None,
694753
):
695754
"""Instantiates a ``@with_column`` decorator.
696755
@@ -711,14 +770,6 @@ def __init__(
711770
if you want the functions/modules to have access to all possible config.
712771
"""
713772

714-
if dataframe_types is None:
715-
raise ValueError("You need to specify which dataframe types it will be applied to.")
716-
else:
717-
if isinstance(dataframe_types, Type):
718-
dataframe_types = [dataframe_types]
719-
self.allowed_dataframe_types = dataframe_types
720-
self.validate_dataframe_type()
721-
722773
self.subdag_functions = subdag.collect_functions(load_from)
723774
self.select = select
724775

@@ -796,44 +847,67 @@ def _get_inital_nodes(
796847
f"It might not be compatible with some other decorators."
797848
)
798849

799-
if input_types[inject_parameter] not in self.allowed_dataframe_types:
800-
raise ValueError(f"Dataframe has to be a {self.allowed_dataframe_types} DataFrame.")
801-
else:
802-
self.dataframe_type = input_types[inject_parameter]
803-
850+
dataframe_type = input_types[inject_parameter]
804851
initial_nodes = (
805852
[]
806853
if self.dataframe_subdag_param is not None
807854
else self._create_column_nodes(inject_parameter=inject_parameter, params=params)
808855
)
809856

810-
return inject_parameter, initial_nodes
857+
return inject_parameter, initial_nodes, dataframe_type
858+
859+
def create_merge_node(
860+
self, upstream_node: str, node_name: str, dataframe_type: Type
861+
) -> node.Node:
862+
"Node that adds to / overrides columns for the original dataframe based on selected output."
863+
if self.is_async:
811864

812-
@abc.abstractmethod
813-
def create_merge_node(self, upstream_node: str, node_name: str) -> node.Node:
814-
"""Should create a node that merges the results back into the original dataframe.
865+
async def new_callable(**kwargs) -> Any:
866+
df = kwargs[upstream_node]
867+
columns_to_append = {}
868+
for column in self.select:
869+
columns_to_append[column] = kwargs[column]
870+
new_df = registry.with_columns(df, columns_to_append)
871+
return new_df
872+
else:
815873

816-
Node that adds to / overrides columns for the original dataframe based on selected output.
874+
def new_callable(**kwargs) -> Any:
875+
df = kwargs[upstream_node]
876+
columns_to_append = {}
877+
for column in self.select:
878+
columns_to_append[column] = kwargs[column]
817879

818-
This will be platform specific, see Pandas and Polars plugins for implementation.
819-
"""
820-
pass
880+
new_df = registry.with_columns(df, columns_to_append)
881+
return new_df
882+
883+
column_type = registry.get_column_type_from_df_type(dataframe_type)
884+
input_map = {column: column_type for column in self.select}
885+
input_map[upstream_node] = dataframe_type
886+
887+
return node.Node(
888+
name=node_name,
889+
typ=dataframe_type,
890+
callabl=new_callable,
891+
input_types=input_map,
892+
)
821893

822894
def inject_nodes(
823895
self, params: Dict[str, Type[Type]], config: Dict[str, Any], fn: Callable
824896
) -> Tuple[List[node.Node], Dict[str, str]]:
825897
self.is_async = inspect.iscoroutinefunction(fn)
826898
namespace = fn.__name__ if self.namespace is None else self.namespace
827899

828-
inject_parameter, initial_nodes = self._get_inital_nodes(fn=fn, params=params)
900+
inject_parameter, initial_nodes, dataframe_type = self._get_inital_nodes(
901+
fn=fn, params=params
902+
)
829903

830904
subdag_nodes = subdag.collect_nodes(config, self.subdag_functions)
831905

832906
# TODO: for now we restrict that if user wants to change columns that already exist, he needs to
833907
# pass the dataframe and extract them himself. If we add namespace to initial nodes and rewire the
834908
# initial node names with the ongoing ones that have a column argument, we can also allow in place
835909
# changes when using columns_to_pass
836-
if with_columns_factory._check_for_duplicates(initial_nodes + subdag_nodes):
910+
if with_columns._check_for_duplicates(initial_nodes + subdag_nodes):
837911
raise ValueError(
838912
"You can only specify columns once. You used `columns_to_pass` and we "
839913
"extract the columns for you. In this case they cannot be overwritten -- only new columns get "
@@ -853,14 +927,16 @@ def inject_nodes(
853927
self.select = [
854928
sink_node.name
855929
for sink_node in pruned_nodes
856-
if sink_node.type == registry.get_column_type_from_df_type(self.dataframe_type)
930+
if sink_node.type == registry.get_column_type_from_df_type(dataframe_type)
857931
]
858932

859-
merge_node = self.create_merge_node(inject_parameter, node_name="__append")
933+
merge_node = self.create_merge_node(
934+
inject_parameter, node_name="__append", dataframe_type=dataframe_type
935+
)
860936

861937
output_nodes = initial_nodes + pruned_nodes + [merge_node]
862938
output_nodes = subdag.add_namespace(output_nodes, namespace)
863939
return output_nodes, {inject_parameter: assign_namespace(merge_node.name, namespace)}
864940

865941
def validate(self, fn: Callable):
866-
self.validate_dataframe_type()
942+
pass

hamilton/plugins/dask_extensions.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,13 @@ def fill_with_scalar_dask(df: dd.DataFrame, column_name: str, value: Any) -> dd.
2222
return df
2323

2424

25+
@registry.with_columns.register(dd.DataFrame)
26+
def with_columns_dask(df: dd.DataFrame, columns: dd.Series) -> dd.DataFrame:
27+
raise NotImplementedError(
28+
"As of Hamilton version 1.83.1, with_columns for Dask isn't supported."
29+
)
30+
31+
2532
def register_types():
2633
"""Function to register the types for this extension."""
2734
registry.register_types("dask", DATAFRAME_TYPE, COLUMN_TYPE)

hamilton/plugins/geopandas_extensions.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,13 @@ def fill_with_scalar_geopandas(
2424
return df
2525

2626

27+
@registry.with_columns.register(gpd.GeoDataFrame)
28+
def with_columns_geopandas(df: gpd.GeoDataFrame, columns: gpd.GeoSeries) -> gpd.GeoDataFrame:
29+
raise NotImplementedError(
30+
"As of Hamilton version 1.83.1, with_columns for geopandas isn't supported."
31+
)
32+
33+
2734
def register_types():
2835
"""Function to register the types for this extension."""
2936
registry.register_types("geopandas", DATAFRAME_TYPE, COLUMN_TYPE)

0 commit comments

Comments
 (0)