|
| 1 | +from typing import Any, Dict, List |
| 2 | + |
| 3 | +import pandas as pd |
| 4 | + |
| 5 | +from hamilton.function_modifiers import ( |
| 6 | + apply_to, |
| 7 | + extract_columns, |
| 8 | + extract_fields, |
| 9 | + mutate, |
| 10 | + source, |
| 11 | + value, |
| 12 | +) |
| 13 | + |
| 14 | + |
| 15 | +def data_1() -> pd.DataFrame: |
| 16 | + df = pd.DataFrame.from_dict({"col_1": [3, 2, pd.NA, 0], "col_2": ["a", "b", pd.NA, "d"]}) |
| 17 | + return df |
| 18 | + |
| 19 | + |
| 20 | +def data_2() -> pd.DataFrame: |
| 21 | + df = pd.DataFrame.from_dict( |
| 22 | + {"col_1": ["a", "b", pd.NA, "d", "e"], "col_2": [150, 155, 145, 200, 5000]} |
| 23 | + ) |
| 24 | + return df |
| 25 | + |
| 26 | + |
| 27 | +def data_3() -> pd.DataFrame: |
| 28 | + df = pd.DataFrame.from_dict({"col_1": [150, 155, 145, 200, 5000], "col_2": [10, 23, 32, 50, 0]}) |
| 29 | + return df |
| 30 | + |
| 31 | + |
| 32 | +@extract_fields({"field_1": pd.Series, "field_2": pd.Series}) |
| 33 | +def feat_A(data_1: pd.DataFrame, data_2: pd.DataFrame) -> Dict[str, pd.Series]: |
| 34 | + df = ( |
| 35 | + data_1.set_index("col_2").join(data_2.reset_index(names=["col_3"]).set_index("col_1")) |
| 36 | + ).reset_index(names=["col_0"]) |
| 37 | + return {"field_1": df.iloc[:, 1], "field_2": df.iloc[:, 2]} |
| 38 | + |
| 39 | + |
| 40 | +@extract_columns("col_2", "col_3") |
| 41 | +def feat_B(data_1: pd.DataFrame, data_2: pd.DataFrame) -> pd.DataFrame: |
| 42 | + return ( |
| 43 | + data_1.set_index("col_2").join(data_2.reset_index(names=["col_3"]).set_index("col_1")) |
| 44 | + ).reset_index(names=["col_0"]) |
| 45 | + |
| 46 | + |
| 47 | +def feat_C(field_1: pd.Series, col_3: pd.Series) -> pd.DataFrame: |
| 48 | + return pd.concat([field_1, col_3], axis=1) |
| 49 | + |
| 50 | + |
| 51 | +def feat_D(field_2: pd.Series, col_2: pd.Series) -> pd.DataFrame: |
| 52 | + return pd.concat([field_2, col_2], axis=1) |
| 53 | + |
| 54 | + |
| 55 | +# data1 and data2 |
| 56 | +@mutate(apply_to(data_1).when_in(a=[1, 2, 3]), apply_to(data_2).when_not_in(a=[1, 2, 3])) |
| 57 | +def _filter(some_data: pd.DataFrame) -> pd.DataFrame: |
| 58 | + """Remove NAN values. |
| 59 | +
|
| 60 | + Mutate accepts a `config.*` family conditional where we can choose when the transform will be applied |
| 61 | + onto the target function. |
| 62 | + """ |
| 63 | + return some_data.dropna() |
| 64 | + |
| 65 | + |
| 66 | +# data 2 |
| 67 | +# this is for value |
| 68 | +@mutate(apply_to(data_2), missing_row=value(["c", 145])) |
| 69 | +def _add_missing_value(some_data: pd.DataFrame, missing_row: List[Any]) -> pd.DataFrame: |
| 70 | + """Add row to dataframe. |
| 71 | +
|
| 72 | + The functions decorated with mutate can be viewed as steps in pipe_output in the order they |
| 73 | + are implemented. This means that data_2 had a row removed with NAN and here we add back a row |
| 74 | + by hand that replaces that row. |
| 75 | + """ |
| 76 | + some_data.loc[-1] = missing_row |
| 77 | + return some_data |
| 78 | + |
| 79 | + |
| 80 | +# data 2 |
| 81 | +# this is for source |
| 82 | +@mutate( |
| 83 | + apply_to(data_2).named(name="", namespace="some_random_namespace"), other_data=source("data_3") |
| 84 | +) |
| 85 | +def join(some_data: pd.DataFrame, other_data: pd.DataFrame) -> pd.DataFrame: |
| 86 | + """Join two dataframes. |
| 87 | +
|
| 88 | + We can use results from other nodes in the DAG by using the `source` functionality. Here we join |
| 89 | + data_2 table with another table - data_3 - that is the output of another node. |
| 90 | +
|
| 91 | + In addition, mutate also support adding custom names to the nodes. |
| 92 | + """ |
| 93 | + return some_data.set_index("col_2").join(other_data.set_index("col_1")) |
| 94 | + |
| 95 | + |
| 96 | +# data1 and data2 |
| 97 | +@mutate(apply_to(data_1), apply_to(data_2)) |
| 98 | +def sort(some_data: pd.DataFrame) -> pd.DataFrame: |
| 99 | + """Sort dataframes by first column. |
| 100 | +
|
| 101 | + This is the last step of our pipeline(s) and gets again applied to data_1 and data_2. We did some |
| 102 | + light pre-processing on data_1 by removing NANs and sorting and more elaborate pre-processing on |
| 103 | + data_2 where we added values and joined another table. |
| 104 | + """ |
| 105 | + columns = some_data.columns |
| 106 | + return some_data.sort_values(by=columns[0]) |
| 107 | + |
| 108 | + |
| 109 | +# we want to apply some adjustment coefficient to all the columns of feat_B, but only to field_1 of feat_A |
| 110 | +@mutate( |
| 111 | + apply_to(feat_A, factor=value(100)).on_output("field_1").named("Europe"), |
| 112 | + apply_to(feat_B, factor=value(10)).named("US"), |
| 113 | +) |
| 114 | +def _adjustment_factor(some_data: pd.Series, factor: float) -> pd.Series: |
| 115 | + """Adjust the value by some factor. |
| 116 | +
|
| 117 | + You can imagine this step occurring later in time. We first constructed our DAG with features A and |
| 118 | + B only to realize that something is off. We are now experimenting post-hoc to improve and find the |
| 119 | + best possible features. |
| 120 | +
|
| 121 | + We first split the features by columns of interest and then adjust them by a regional factor to |
| 122 | + combine them into improved features we can use further down the pipeline. |
| 123 | + """ |
| 124 | + return some_data * factor |
0 commit comments