Skip to content

Commit 9b9bf89

Browse files
committed
Final touches
1 parent 0515c7e commit 9b9bf89

File tree

8 files changed

+114
-173
lines changed

8 files changed

+114
-173
lines changed

examples/pandas/with_columns/notebook.ipynb

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -598,16 +598,25 @@
598598
},
599599
{
600600
"cell_type": "code",
601-
"execution_count": 4,
601+
"execution_count": 1,
602602
"metadata": {},
603-
"outputs": [],
603+
"outputs": [
604+
{
605+
"name": "stderr",
606+
"output_type": "stream",
607+
"text": [
608+
"/Users/jernejfrank/miniconda3/envs/hamilton/lib/python3.10/site-packages/pyspark/pandas/__init__.py:50: UserWarning: 'PYARROW_IGNORE_TIMEZONE' environment variable was not set. It is required to set this environment variable to '1' in both driver and executor sides if you use pyarrow>=2.0.0. pandas-on-Spark will set it for you but it does not work if there is a Spark context already launched.\n",
609+
" warnings.warn(\n"
610+
]
611+
}
612+
],
604613
"source": [
605614
"%reload_ext hamilton.plugins.jupyter_magic"
606615
]
607616
},
608617
{
609618
"cell_type": "code",
610-
"execution_count": 7,
619+
"execution_count": 4,
611620
"metadata": {},
612621
"outputs": [],
613622
"source": [
@@ -632,11 +641,11 @@
632641
"\n",
633642
"\n",
634643
"async def mean_b(b: pd.Series) -> pd.Series:\n",
635-
" await asyncio.sleep(0.0001)\n",
644+
" await asyncio.sleep(5)\n",
636645
" return b.mean()\n",
637646
"\n",
638647
"async def a_plus_b(a: pd.Series, b: pd.Series) -> pd.Series:\n",
639-
" await asyncio.sleep(0.0001)\n",
648+
" await asyncio.sleep(1)\n",
640649
" return a + b\n",
641650
"\n",
642651
"async def multiply_a_plus_mean_b(multiply_a: pd.Series, mean_b: pd.Series) -> pd.Series:\n",
@@ -654,7 +663,7 @@
654663
},
655664
{
656665
"cell_type": "code",
657-
"execution_count": 8,
666+
"execution_count": 5,
658667
"metadata": {},
659668
"outputs": [
660669
{

hamilton/function_modifiers/README

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
# with_columns_base
2+
3+
Documenting the current design flow for the `with_columns` decorator. It belongs to the `NodeInjector` lifecycle.
4+
5+
The `with_columns` consists of three parts that are represented in the corresponding three abstract methods in `with_columns_base`:
6+
7+
1. `get_initial_nodes` -- Input node(s): Either a dataframe if `pass_datafame_as` is used or extracted columns into nodes if `columns_to_pass` and is library specific.
8+
2. `get_subdag_nodes` -- Subdag nodes: Creating the `subdag` is outsourced to `recursive.subdag`, left flexibility to pre- and post-process since some libraries need that (see h_spark).
9+
3. `create_merge_node` -- Merge node: The append functionality between dataframe and selected columns is library specific.
10+
11+
Each plugin library that can implement `with_columns` should subclass from this base class and implement the three abstract methods (four since `validate()` is also abstract). The child
12+
classes need to override the `init` where they call out to the parent `init` and pass in `dataframe_type` which is registered in the corresponding `extensions` and has information of what
13+
columns types are permitted for the given dataframe type.
14+
15+
Keeping it for now loosely coupled to the `registry` and detached from `ResultBuilder`. The API is private, should we want to switch to `registry`, the refactoring is straightforward and shouldn't get us into trouble down the road.
16+
17+
## NOTE
18+
The handling of scalars and dataframe types varies between library to library. We made the decision that such a thing should not be permissible, so all the selected columns that want to be
19+
appended to the original dataframe need to have the matching column type that is registered in the `registry` and set in the library extension modules.

hamilton/function_modifiers/recursive.py

Lines changed: 10 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -630,96 +630,23 @@ def prune_nodes(nodes: List[node.Node], select: Optional[List[str]] = None) -> L
630630
return output
631631

632632

633-
class with_columns_factory(base.NodeInjector, abc.ABC):
633+
class with_columns_base(base.NodeInjector, abc.ABC):
634634
"""Factory for with_columns operation on a dataframe. This is used when you want to extract some
635635
columns out of the dataframe, perform operations on them and then append to the original dataframe.
636636
637-
Here's an example of calling it on a pandas dataframe -- if you've seen ``@subdag``, you should be familiar with
638-
the concepts:
637+
This is an internal class that is meant to be extended by each individual dataframe library implementing
638+
the following abstract methods:
639639
640-
.. code-block:: python
641-
642-
# my_module.py
643-
def a(a_from_df: pd.Series) -> pd.Series:
644-
return _process(a)
645-
646-
def b(b_from_df: pd.Series) -> pd.Series:
647-
return _process(b)
648-
649-
def a_b_average(a_from_df: pd.Series, b_from_df: pd.Series) -> pd.Series:
650-
return (a_from_df + b_from_df) / 2
651-
652-
653-
.. code-block:: python
654-
655-
# with_columns_module.py
656-
def a_plus_b(a: pd.Series, b: pd.Series) -> pd.Series:
657-
return a + b
658-
659-
660-
# the with_columns call
661-
@with_columns(
662-
*[my_module], # Load from any module
663-
*[a_plus_b], # or list operations directly
664-
columns_to_pass=["a_from_df", "b_from_df"], # The columns to pass from the dataframe to
665-
# the subdag
666-
select=["a", "b", "a_plus_b", "a_b_average"], # The columns to select from the dataframe
667-
)
668-
def final_df(initial_df: pd.DataFrame) -> pd.DataFrame:
669-
# process, or just return unprocessed
670-
...
671-
672-
In this instance the ``initial_df`` would get two columns added: ``a_plus_b`` and ``a_b_average``.
673-
674-
The operations are applied in topological order. This allows you to
675-
express the operations individually, making it easy to unit-test and reuse.
676-
677-
Note that the operation is "append", meaning that the columns that are selected are appended
678-
onto the dataframe.
679-
680-
If the function takes multiple dataframes, the dataframe input to process will always be
681-
the first argument. This will be passed to the subdag, transformed, and passed back to the function.
682-
This follows the hamilton rule of reference by parameter name. To demonstarte this, in the code
683-
above, the dataframe that is passed to the subdag is `initial_df`. That is transformed
684-
by the subdag, and then returned as the final dataframe.
685-
686-
You can read it as:
687-
688-
"final_df is a function that transforms the upstream dataframe initial_df, running the transformations
689-
from my_module. It starts with the columns a_from_df and b_from_df, and then adds the columns
690-
a, b, and a_plus_b to the dataframe. It then returns the dataframe, and does some processing on it."
691-
692-
In case you need more flexibility you can alternatively use ``pass_dataframe_as``, for example,
693-
694-
.. code-block:: python
695-
696-
# with_columns_module.py
697-
def a_from_df(initial_df: pd.Series) -> pd.Series:
698-
return initial_df["a_from_df"] / 100
699-
700-
def b_from_df(initial_df: pd.Series) -> pd.Series:
701-
return initial_df["b_from_df"] / 100
702-
703-
704-
# the with_columns call
705-
@with_columns(
706-
*[my_module],
707-
*[a_from_df],
708-
columns_to_pass=["a_from_df", "b_from_df"],
709-
select=["a_from_df", "b_from_df", "a", "b", "a_plus_b", "a_b_average"],
710-
)
711-
def final_df(initial_df: pd.DataFrame) -> pd.DataFrame:
712-
# process, or just return unprocessed
713-
...
714-
715-
the above would output a dataframe where the two columns ``a_from_df`` and ``b_from_df`` get
716-
overwritten.
640+
- get_initial_nodes
641+
- get_subdag_nodes
642+
- create_merge_node
643+
- validate
717644
"""
718645

719646
# TODO: if we rename the column nodes into something smarter this can be avoided and
720647
# can also modify columns in place
721648
@staticmethod
722-
def _check_for_duplicates(nodes_: List[node.Node]) -> bool:
649+
def contains_duplicates(nodes_: List[node.Node]) -> bool:
723650
"""Ensures that we don't run into name clashing of columns and group operations.
724651
725652
In the case when we extract columns for the user, because ``columns_to_pass`` was used, we want
@@ -748,7 +675,7 @@ def validate_dataframe(
748675
f"It might not be compatible with some other decorators."
749676
)
750677

751-
if input_types[inject_parameter] != required_type:
678+
if isinstance(input_types[inject_parameter], required_type):
752679
raise InvalidDecoratorException(
753680
"The selected dataframe parameter is not the correct dataframe type. "
754681
f"You selected a parameter of type {input_types[inject_parameter]}, but we expect to get {required_type}"
@@ -865,7 +792,7 @@ def inject_nodes(
865792
# pass the dataframe and extract them himself. If we add namespace to initial nodes and rewire the
866793
# initial node names with the ongoing ones that have a column argument, we can also allow in place
867794
# changes when using columns_to_pass
868-
if with_columns_factory._check_for_duplicates(initial_nodes + subdag_nodes):
795+
if with_columns_base.contains_duplicates(initial_nodes + subdag_nodes):
869796
raise ValueError(
870797
"You can only specify columns once. You used `columns_to_pass` and we "
871798
"extract the columns for you. In this case they cannot be overwritten -- only new columns get "

hamilton/plugins/h_pandas.py

Lines changed: 27 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,11 @@
1313

1414
from hamilton import node, registry
1515
from hamilton.function_modifiers.expanders import extract_columns
16-
from hamilton.function_modifiers.recursive import subdag, with_columns_factory
16+
from hamilton.function_modifiers.recursive import subdag, with_columns_base
1717
from hamilton.plugins.pandas_extensions import DATAFRAME_TYPE
1818

1919

20-
class with_columns(with_columns_factory):
20+
class with_columns(with_columns_base):
2121
"""Initializes a with_columns decorator for pandas. This allows you to efficiently run groups of map operations on a dataframe.
2222
2323
Here's an example of calling it -- if you've seen ``@subdag``, you should be familiar with
@@ -79,24 +79,24 @@ def final_df(initial_df: pd.DataFrame) -> pd.DataFrame:
7979
8080
.. code-block:: python
8181
82-
# with_columns_module.py
83-
def a_from_df(initial_df: pd.Series) -> pd.Series:
84-
return initial_df["a_from_df"] / 100
82+
# with_columns_module.py
83+
def a_from_df(initial_df: pd.Series) -> pd.Series:
84+
return initial_df["a_from_df"] / 100
8585
8686
def b_from_df(initial_df: pd.Series) -> pd.Series:
87-
return initial_df["b_from_df"] / 100
87+
return initial_df["b_from_df"] / 100
8888
8989
90-
# the with_columns call
91-
@with_columns(
92-
*[my_module],
93-
*[a_from_df],
94-
columns_to_pass=["a_from_df", "b_from_df"],
95-
select=["a_from_df", "b_from_df", "a", "b", "a_plus_b", "a_b_average"],
96-
)
97-
def final_df(initial_df: pd.DataFrame) -> pd.DataFrame:
98-
# process, or just return unprocessed
99-
...
90+
# the with_columns call
91+
@with_columns(
92+
*[my_module],
93+
*[a_from_df],
94+
columns_to_pass=["a_from_df", "b_from_df"],
95+
select=["a_from_df", "b_from_df", "a", "b", "a_plus_b", "a_b_average"],
96+
)
97+
def final_df(initial_df: pd.DataFrame) -> pd.DataFrame:
98+
# process, or just return unprocessed
99+
...
100100
101101
the above would output a dataframe where the two columns ``a_from_df`` and ``b_from_df`` get
102102
overwritten.
@@ -122,7 +122,8 @@ def __init__(
122122
If you pass this in, you are responsible for extracting columns out. If not provided, you have
123123
to pass columns_to_pass in, and we will extract the columns out for you.
124124
:param select: The end nodes that represent columns to be appended to the original dataframe
125-
via with_columns. Existing columns will be overridden.
125+
via with_columns. Existing columns will be overridden. The selected nodes need to have the
126+
corresponding column type, in this case pd.Series, to be appended to the original dataframe.
126127
:param namespace: The namespace of the nodes, so they don't clash with the global namespace
127128
and so this can be reused. If its left out, there will be no namespace (in which case you'll want
128129
to be careful about repeating it/reusing the nodes in other parts of the DAG.)
@@ -145,14 +146,8 @@ def _create_column_nodes(
145146
) -> List[node.Node]:
146147
output_type = params[inject_parameter]
147148

148-
if inspect.iscoroutinefunction(fn):
149-
150-
async def temp_fn(**kwargs) -> Any:
151-
return kwargs[inject_parameter]
152-
else:
153-
154-
def temp_fn(**kwargs) -> Any:
155-
return kwargs[inject_parameter]
149+
def temp_fn(**kwargs) -> Any:
150+
return kwargs[inject_parameter]
156151

157152
# We recreate the df node to use extract columns
158153
temp_node = node.Node(
@@ -180,7 +175,7 @@ def get_initial_nodes(
180175
# If we don't have a specified dataframe we assume it's the first argument
181176
inject_parameter = list(sig.parameters.values())[0].name
182177

183-
with_columns_factory.validate_dataframe(
178+
with_columns_base.validate_dataframe(
184179
fn=fn,
185180
inject_parameter=inject_parameter,
186181
params=params,
@@ -200,23 +195,14 @@ def get_subdag_nodes(self, config: Dict[str, Any]) -> Collection[node.Node]:
200195

201196
def create_merge_node(self, fn: Callable, inject_parameter: str) -> node.Node:
202197
"Node that adds to / overrides columns for the original dataframe based on selected output."
203-
if inspect.iscoroutinefunction(fn):
204-
205-
async def new_callable(**kwargs) -> Any:
206-
df = kwargs[inject_parameter]
207-
columns_to_append = {}
208-
for column in self.select:
209-
columns_to_append[column] = kwargs[column]
210-
return df.assign(**columns_to_append)
211-
else:
212198

213-
def new_callable(**kwargs) -> Any:
214-
df = kwargs[inject_parameter]
215-
columns_to_append = {}
216-
for column in self.select:
217-
columns_to_append[column] = kwargs[column]
199+
def new_callable(**kwargs) -> Any:
200+
df = kwargs[inject_parameter]
201+
columns_to_append = {}
202+
for column in self.select:
203+
columns_to_append[column] = kwargs[column]
218204

219-
return df.assign(**columns_to_append)
205+
return df.assign(**columns_to_append)
220206

221207
column_type = registry.get_column_type_from_df_type(self.dataframe_type)
222208
input_map = {column: column_type for column in self.select}

hamilton/plugins/h_polars.py

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
# TODO -- determine the best place to put this code
1818
from hamilton import base, node, registry
1919
from hamilton.function_modifiers.expanders import extract_columns
20-
from hamilton.function_modifiers.recursive import subdag, with_columns_factory
20+
from hamilton.function_modifiers.recursive import subdag, with_columns_base
2121
from hamilton.plugins.polars_extensions import DATAFRAME_TYPE
2222

2323

@@ -73,7 +73,7 @@ def output_type(self) -> Type:
7373

7474

7575
# Do we need this here?
76-
class with_columns(with_columns_factory):
76+
class with_columns(with_columns_base):
7777
"""Initializes a with_columns decorator for polars.
7878
7979
This allows you to efficiently run groups of map operations on a dataframe. We support
@@ -130,23 +130,23 @@ def final_df(initial_df: pl.DataFrame) -> pl.DataFrame:
130130
131131
.. code-block:: python
132132
133-
# with_columns_module.py
134-
def a_from_df() -> pl.Expr:
135-
return pl.col(a).alias("a") / 100
133+
# with_columns_module.py
134+
def a_from_df() -> pl.Expr:
135+
return pl.col(a).alias("a") / 100
136136
137-
def b_from_df() -> pl.Expr:
138-
return pl.col(b).alias("b") / 100
137+
def b_from_df() -> pl.Expr:
138+
return pl.col(b).alias("b") / 100
139139
140140
141-
# the with_columns call
142-
@with_columns(
143-
*[my_module],
144-
pass_dataframe_as="initial_df",
145-
select=["a_from_df", "b_from_df", "a_plus_b", "a_b_average"],
146-
)
147-
def final_df(initial_df: pl.DataFrame) -> pl.DataFrame:
148-
# process, or just return unprocessed
149-
...
141+
# the with_columns call
142+
@with_columns(
143+
*[my_module],
144+
pass_dataframe_as="initial_df",
145+
select=["a_from_df", "b_from_df", "a_plus_b", "a_b_average"],
146+
)
147+
def final_df(initial_df: pl.DataFrame) -> pl.DataFrame:
148+
# process, or just return unprocessed
149+
...
150150
151151
the above would output a dataframe where the two columns ``a`` and ``b`` get
152152
overwritten.
@@ -172,8 +172,8 @@ def __init__(
172172
If you pass this in, you are responsible for extracting columns out. If not provided, you have
173173
to pass columns_to_pass in, and we will extract the columns out for you.
174174
:param select: The end nodes that represent columns to be appended to the original dataframe
175-
via with_columns. The length of each column has to match the original dataframe length.
176-
Existing columns will be overridden.
175+
via with_columns. Existing columns will be overridden. The selected nodes need to have the
176+
corresponding column type, in this case pl.Series, to be appended to the original dataframe.
177177
:param namespace: The namespace of the nodes, so they don't clash with the global namespace
178178
and so this can be reused. If its left out, there will be no namespace (in which case you'll want
179179
to be careful about repeating it/reusing the nodes in other parts of the DAG.)
@@ -225,7 +225,7 @@ def get_initial_nodes(
225225
# If we don't have a specified dataframe we assume it's the first argument
226226
inject_parameter = list(sig.parameters.values())[0].name
227227

228-
with_columns_factory.validate_dataframe(
228+
with_columns_base.validate_dataframe(
229229
fn=fn,
230230
inject_parameter=inject_parameter,
231231
params=params,

0 commit comments

Comments
 (0)