Skip to content

Commit 69c94cc

Browse files
jernejfrankelijahbenizzy
authored andcommitted
Add tests and docs, refactor mutate
Testing with unit tests inidividual pieces and examples have a new smoke test / abstract exampe how to use mutate. In that example are also edge cases and how they are handled or if they are supported. Added a more user friendly example by converting the existing pipe_output example and replaced the pipe_output implementation by mutate. Light refactoring / improved naming and added TODOs.
1 parent b51d75d commit 69c94cc

File tree

16 files changed

+4090
-97
lines changed

16 files changed

+4090
-97
lines changed

dag_example_module.png

26.7 KB
Loading

docs/reference/decorators/pipe.rst

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,24 @@
11
=======================
2-
pipe
2+
pipe family
33
=======================
44

5-
We have a family of decorators that can help with transforming the input and output of a node in the DAG. For a hands on example have a look at https://github.com/DAGWorks-Inc/hamilton/tree/main/examples/scikit-learn/species_distribution_modeling
5+
We have a family of decorators that represent a chained set of transformations. This specifically solves the "node redefinition"
6+
problem, and is meant to represent a pipeline of chaining/redefinitions. This is similar (and can happily be
7+
used in conjunction with) ``pipe`` in pandas. In Pyspark this is akin to the common operation of redefining a dataframe
8+
with new columns.
9+
10+
For some examples have a look at: https://github.com/DAGWorks-Inc/hamilton/tree/main/examples/scikit-learn/species_distribution_modeling
11+
12+
While it is generally reasonable to contain constructs within a node's function,
13+
you should consider the pipe family for any of the following reasons:
14+
15+
1. You want the transformations to display as nodes in the DAG, with the possibility of storing or visualizing
16+
the result.
17+
18+
2. You want to pull in functions from an external repository, and build the DAG a little more procedurally.
19+
20+
3. You want to use the same function multiple times, but with different parameters -- while ``@does`` / ``@parameterize`` can
21+
do this, this presents an easier way to do this, especially in a chain.
622

723
--------------
824

@@ -24,3 +40,8 @@ pipe_output
2440
----------------
2541
.. autoclass:: hamilton.function_modifiers.macros.pipe_output
2642
:special-members: __init__
43+
44+
mutate
45+
----------------
46+
.. autoclass:: hamilton.function_modifiers.macros.mutate
47+
:special-members: __init__
Loading
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Mutate
2+
3+
We give some application suggestions for mutating the outputs of functions in a distributed manner with `@mutate.`
4+
When you scroll through the notebook we build examples from most straight forward applications to more complex logic that showcases the amount of flexibility you get with this decorator.
5+
6+
Mutate gives the ability to apply the same transformation to the each output of multiple functions in the DAG. It can be particularly useful in the following scenarios:
7+
8+
1. Loading data and applying pre-cleaning step.
9+
2. Feature engineering via joining, filtering, sorting, applying adjustment factors on a per column basis etc.
10+
3. Experimenting with different transformations across nodes by selectively turning transformations on / off.
11+
12+
13+
and effectively replaces:
14+
1. Having to have unique names and then changing wiring if you want to add/remove/replace something.
15+
2. Enabling more verb like names on functions.
16+
3. Potentially simpler "reuse" of transform functions across DAG paths...
17+
18+
# Modules
19+
The same modules can be viewed and executed in `notebook.ipynb`.
20+
21+
We have six modules:
22+
1. procedural.py: basic example without using Hamilton
23+
2. pipe_output.py: how the above would be implemented using `pipe_output` from Hamilton
24+
3. mutate.py: how the above would be implemented using `mutate`
25+
4. pipe_output_on_output.py: functionality that allows to apply `pipe_output` to user selected nodes (comes in handy with `extract_columns`/`extract_fields`)
26+
5. mutate_on_output.py: same as above but implemented using `mutate`
27+
6. mutate_twice_the_same: how you would apply the same transformation on a node twice usign `mutate`
28+
29+
that demonstrate the same behavior achieved either without Hamilton, using `pipe_output` or `mutate` and that should give you some idea of potential application
30+
31+
![image info](./DAG.png)
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
from typing import Any, List
2+
3+
import pandas as pd
4+
5+
from hamilton.function_modifiers import mutate, source, value
6+
7+
8+
def data_1() -> pd.DataFrame:
9+
df = pd.DataFrame.from_dict({"col_1": [3, 2, pd.NA, 0], "col_2": ["a", "b", pd.NA, "d"]})
10+
return df
11+
12+
13+
def data_2() -> pd.DataFrame:
14+
df = pd.DataFrame.from_dict(
15+
{"col_1": ["a", "b", pd.NA, "d", "e"], "col_2": [150, 155, 145, 200, 5000]}
16+
)
17+
return df
18+
19+
20+
def data_3() -> pd.DataFrame:
21+
df = pd.DataFrame.from_dict({"col_1": [150, 155, 145, 200, 5000], "col_2": [10, 23, 32, 50, 0]})
22+
return df
23+
24+
25+
# data1 and data2
26+
@mutate(data_1, data_2)
27+
def _filter(some_data: pd.DataFrame) -> pd.DataFrame:
28+
"""Remove NAN values.
29+
30+
Decorated with mutate this will be applied to both data_1 and data_2.
31+
"""
32+
return some_data.dropna()
33+
34+
35+
# data 2
36+
# this is for value
37+
@mutate(data_2, missing_row=value(["c", 145]))
38+
def _add_missing_value(some_data: pd.DataFrame, missing_row: List[Any]) -> pd.DataFrame:
39+
"""Add row to dataframe.
40+
41+
The functions decorated with mutate can be viewed as steps in pipe_output in the order they
42+
are implemented. This means that data_2 had a row removed with NAN and here we add back a row
43+
by hand that replaces that row.
44+
"""
45+
some_data.loc[-1] = missing_row
46+
return some_data
47+
48+
49+
# data 2
50+
# this is for source
51+
@mutate(data_2, other_data=source("data_3"))
52+
def _join(some_data: pd.DataFrame, other_data: pd.DataFrame) -> pd.DataFrame:
53+
"""Join two dataframes.
54+
55+
We can use results from other nodes in the DAG by using the `source` functionality. Here we join
56+
data_2 table with another table - data_3 - that is the output of another node.
57+
"""
58+
return some_data.set_index("col_2").join(other_data.set_index("col_1"))
59+
60+
61+
# data1 and data2
62+
@mutate(data_1, data_2)
63+
def _sort(some_data: pd.DataFrame) -> pd.DataFrame:
64+
"""Sort dataframes by first column.
65+
66+
This is the last step of our pipeline(s) and gets again applied to data_1 and data_2. We did some
67+
light pre-processing on data_1 by removing NANs and sorting and more elaborate pre-processing on
68+
data_2 where we added values and joined another table.
69+
"""
70+
columns = some_data.columns
71+
return some_data.sort_values(by=columns[0])
72+
73+
74+
def feat_A(data_1: pd.DataFrame, data_2: pd.DataFrame) -> pd.DataFrame:
75+
"""Combining two raw dataframes to create a feature."""
76+
return (
77+
data_1.set_index("col_2").join(data_2.reset_index(names=["col_3"]).set_index("col_1"))
78+
).reset_index(names=["col_0"])
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
from typing import Any, Dict, List
2+
3+
import pandas as pd
4+
5+
from hamilton.function_modifiers import (
6+
apply_to,
7+
extract_columns,
8+
extract_fields,
9+
mutate,
10+
source,
11+
value,
12+
)
13+
14+
15+
def data_1() -> pd.DataFrame:
16+
df = pd.DataFrame.from_dict({"col_1": [3, 2, pd.NA, 0], "col_2": ["a", "b", pd.NA, "d"]})
17+
return df
18+
19+
20+
def data_2() -> pd.DataFrame:
21+
df = pd.DataFrame.from_dict(
22+
{"col_1": ["a", "b", pd.NA, "d", "e"], "col_2": [150, 155, 145, 200, 5000]}
23+
)
24+
return df
25+
26+
27+
def data_3() -> pd.DataFrame:
28+
df = pd.DataFrame.from_dict({"col_1": [150, 155, 145, 200, 5000], "col_2": [10, 23, 32, 50, 0]})
29+
return df
30+
31+
32+
@extract_fields({"field_1": pd.Series, "field_2": pd.Series})
33+
def feat_A(data_1: pd.DataFrame, data_2: pd.DataFrame) -> Dict[str, pd.Series]:
34+
df = (
35+
data_1.set_index("col_2").join(data_2.reset_index(names=["col_3"]).set_index("col_1"))
36+
).reset_index(names=["col_0"])
37+
return {"field_1": df.iloc[:, 1], "field_2": df.iloc[:, 2]}
38+
39+
40+
@extract_columns("col_2", "col_3")
41+
def feat_B(data_1: pd.DataFrame, data_2: pd.DataFrame) -> pd.DataFrame:
42+
return (
43+
data_1.set_index("col_2").join(data_2.reset_index(names=["col_3"]).set_index("col_1"))
44+
).reset_index(names=["col_0"])
45+
46+
47+
def feat_C(field_1: pd.Series, col_3: pd.Series) -> pd.DataFrame:
48+
return pd.concat([field_1, col_3], axis=1)
49+
50+
51+
def feat_D(field_2: pd.Series, col_2: pd.Series) -> pd.DataFrame:
52+
return pd.concat([field_2, col_2], axis=1)
53+
54+
55+
# data1 and data2
56+
@mutate(apply_to(data_1).when_in(a=[1, 2, 3]), apply_to(data_2).when_not_in(a=[1, 2, 3]))
57+
def _filter(some_data: pd.DataFrame) -> pd.DataFrame:
58+
"""Remove NAN values.
59+
60+
Mutate accepts a `config.*` family conditional where we can choose when the transform will be applied
61+
onto the target function.
62+
"""
63+
return some_data.dropna()
64+
65+
66+
# data 2
67+
# this is for value
68+
@mutate(apply_to(data_2), missing_row=value(["c", 145]))
69+
def _add_missing_value(some_data: pd.DataFrame, missing_row: List[Any]) -> pd.DataFrame:
70+
"""Add row to dataframe.
71+
72+
The functions decorated with mutate can be viewed as steps in pipe_output in the order they
73+
are implemented. This means that data_2 had a row removed with NAN and here we add back a row
74+
by hand that replaces that row.
75+
"""
76+
some_data.loc[-1] = missing_row
77+
return some_data
78+
79+
80+
# data 2
81+
# this is for source
82+
@mutate(
83+
apply_to(data_2).named(name="", namespace="some_random_namespace"), other_data=source("data_3")
84+
)
85+
def join(some_data: pd.DataFrame, other_data: pd.DataFrame) -> pd.DataFrame:
86+
"""Join two dataframes.
87+
88+
We can use results from other nodes in the DAG by using the `source` functionality. Here we join
89+
data_2 table with another table - data_3 - that is the output of another node.
90+
91+
In addition, mutate also support adding custom names to the nodes.
92+
"""
93+
return some_data.set_index("col_2").join(other_data.set_index("col_1"))
94+
95+
96+
# data1 and data2
97+
@mutate(apply_to(data_1), apply_to(data_2))
98+
def sort(some_data: pd.DataFrame) -> pd.DataFrame:
99+
"""Sort dataframes by first column.
100+
101+
This is the last step of our pipeline(s) and gets again applied to data_1 and data_2. We did some
102+
light pre-processing on data_1 by removing NANs and sorting and more elaborate pre-processing on
103+
data_2 where we added values and joined another table.
104+
"""
105+
columns = some_data.columns
106+
return some_data.sort_values(by=columns[0])
107+
108+
109+
# we want to apply some adjustment coefficient to all the columns of feat_B, but only to field_1 of feat_A
110+
@mutate(
111+
apply_to(feat_A, factor=value(100)).on_output("field_1").named("Europe"),
112+
apply_to(feat_B, factor=value(10)).named("US"),
113+
)
114+
def _adjustment_factor(some_data: pd.Series, factor: float) -> pd.Series:
115+
"""Adjust the value by some factor.
116+
117+
You can imagine this step occurring later in time. We first constructed our DAG with features A and
118+
B only to realize that something is off. We are now experimenting post-hoc to improve and find the
119+
best possible features.
120+
121+
We first split the features by columns of interest and then adjust them by a regional factor to
122+
combine them into improved features we can use further down the pipeline.
123+
"""
124+
return some_data * factor
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from hamilton.function_modifiers import mutate
2+
3+
4+
def data_1() -> int:
5+
return 10
6+
7+
8+
@mutate(data_1)
9+
def add_something(user_input: int) -> int:
10+
return user_input + 100
11+
12+
13+
@mutate(data_1)
14+
def add_something_more(user_input: int) -> int:
15+
return user_input + 1000
16+
17+
18+
@mutate(data_1)
19+
def add_something(user_input: int) -> int: # noqa
20+
return user_input + 100

0 commit comments

Comments
 (0)