Skip to content

Commit ae6636e

Browse files
committed
Adds examples for materializers
We do the ML model example, and add some custom ones. Hopefully this gets people started. We have an easy script to run + a notebook.
1 parent af2e6ef commit ae6636e

File tree

8 files changed

+868
-0
lines changed

8 files changed

+868
-0
lines changed

examples/materialization/README.md

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
# Materialization
2+
3+
Hamilton's driver allows for ad-hoc materialization. This enables you to take a DAG you already have,
4+
and save your data to a set of custom locations/url.
5+
6+
Note that these materializers are _isomorphic_ in nature to the
7+
[@save_to](https://hamilton.dagworks.io/en/latest/reference/decorators/save_to/)
8+
decorator. Materializers inject the additional node at runtime, modifying the
9+
DAG to include a data saver node, and returning the metadata around materialization.
10+
11+
This framework is meant to be highly pluggable. While the set of available data savers is currently
12+
limited, we expect folks to build their own materializers (and, hopefully, contribute them back to the community!).
13+
14+
15+
## example
16+
In this example we take the scikit-learn iris_loader pipeline, and materialize outputs to specific
17+
locations through a driver call. We demonstrate:
18+
19+
1. Saving model parameters to a json file (using the default json materializer)
20+
2. Writing a custom data adapters for:
21+
1. Pickling a model to an object file
22+
2. Saving confusion matrices to a csv file
23+
24+
See [run.py](run.py) for the full example.
25+
26+
27+
## `driver.materialize`
28+
29+
This will be a high-level overview. For more details,
30+
see [documentation](https://hamilton.dagworks.io/en/latest/reference/drivers/Driver/#hamilton.driver.Driver.materializehttps://hamilton.dagworks.io/en/latest/reference/drivers/Driver/#hamilton.driver.Driver.materialize).
31+
32+
`driver.materialize()` does the following:
33+
1. Processes a list of materializers to create a new DAG
34+
2. Alters the output to include the materializer nodes
35+
3. Processes a list of "additional variables" (for debugging) to return intermediary data
36+
4. Executes the DAG, including the materializers
37+
5. Returns a tuple of (`materialization metadata`, `additional variables`)
38+
39+
Materializers each consume:
40+
1. A `dependencies` list to materialize
41+
2. A (optional) `combine` parameter to combine the outputs of the dependencies
42+
(this is required if there are multiple dependencies). This is a [ResultMixin](https://hamilton.dagworks.io/en/latest/concepts/customizing-execution/#result-builders) object
43+
3. an `id` parameter to identify the materializer, which serves as the nde name in the DAG
44+
45+
Materializers are referenced by the `to` object in `hamilton.io.materialization`, which utilizes
46+
dynamic dispatch to create the appropriate materializer.
47+
48+
These refer to a `DataSaver`, which are keyed by a string (E.G `csv`).
49+
Multiple data adapters can share the same key, each of which applies to a specific type
50+
(E.G. pandas dataframe, numpy matrix, polars dataframe). New
51+
data adapters are registered by calling `hamilton.registry.register_adapter`
52+
53+
## Custom Materializers
54+
55+
To define a custom materializer, all you have to do is implement the `DataSaver` class
56+
(which will allow use in `save_to` as well.) This is demonstrated in [custom_materializers.py](custom_materializers.py).
57+
58+
## `driver.materialize` vs `@save_to`
59+
60+
`driver.materialize` is an ad-hoc form of `save_to`. You want to use this when you're developing, and
61+
want to do ad-hoc materialization. When you have a production ETL, you can choose between `save_to` and `materialize`.
62+
If the save location/structure is unlikely to change, then you might consider using `save_to`. Otherwise, `materialize`
63+
is an idiomatic way of conducting the maerialization operations that cleanly separates side-effects from transformations.
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import dataclasses
2+
import pickle
3+
from typing import Any, Collection, Dict, Type
4+
5+
import numpy as np
6+
from sklearn import base
7+
8+
from hamilton import registry
9+
from hamilton.io import utils
10+
from hamilton.io.data_adapters import DataSaver
11+
12+
# TODO -- put this back in the standard library
13+
14+
15+
@dataclasses.dataclass
16+
class NumpyMatrixToCSV(DataSaver):
17+
path: str
18+
sep: str = ","
19+
20+
def __post_init__(self):
21+
if not self.path.endswith(".csv"):
22+
raise ValueError(f"CSV files must end with .csv, got {self.path}")
23+
24+
def save_data(self, data: np.ndarray) -> Dict[str, Any]:
25+
np.savetxt(self.path, data, delimiter=self.sep)
26+
return utils.get_file_metadata(self.path)
27+
28+
@classmethod
29+
def applicable_types(cls) -> Collection[Type]:
30+
return [np.ndarray]
31+
32+
@classmethod
33+
def name(cls) -> str:
34+
return "csv"
35+
36+
37+
@dataclasses.dataclass
38+
class SKLearnPickler(DataSaver):
39+
path: str
40+
41+
def save_data(self, data: base.ClassifierMixin) -> Dict[str, Any]:
42+
pickle.dump(data, open(self.path, "wb"))
43+
return utils.get_file_metadata(self.path)
44+
45+
@classmethod
46+
def applicable_types(cls) -> Collection[Type]:
47+
return [base.ClassifierMixin]
48+
49+
@classmethod
50+
def name(cls) -> str:
51+
return "pickle"
52+
53+
54+
for adapter in [NumpyMatrixToCSV, SKLearnPickler]:
55+
registry.register_adapter(adapter)

examples/materialization/dag.pdf

27.9 KB
Binary file not shown.
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import numpy as np
2+
from sklearn import datasets, utils
3+
4+
from hamilton.function_modifiers import config
5+
6+
"""
7+
Module to load digit data.
8+
"""
9+
10+
11+
@config.when(data_loader="iris")
12+
def data__iris() -> utils.Bunch:
13+
return datasets.load_digits()
14+
15+
16+
@config.when(data_loader="digits")
17+
def data__digits() -> utils.Bunch:
18+
return datasets.load_digits()
19+
20+
21+
def target(data: utils.Bunch) -> np.ndarray:
22+
return data.target
23+
24+
25+
def target_names(data: utils.Bunch) -> np.ndarray:
26+
return data.target_names
27+
28+
29+
def feature_matrix(data: utils.Bunch) -> np.ndarray:
30+
return data.data
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
from typing import Dict
2+
3+
import numpy as np
4+
from sklearn import base, linear_model, metrics, svm
5+
from sklearn.model_selection import train_test_split
6+
7+
from hamilton import function_modifiers
8+
9+
10+
@function_modifiers.config.when(clf="svm")
11+
def prefit_clf__svm(gamma: float = 0.001) -> base.ClassifierMixin:
12+
"""Returns an unfitted SVM classifier object.
13+
14+
:param gamma: ...
15+
:return:
16+
"""
17+
return svm.SVC(gamma=gamma)
18+
19+
20+
@function_modifiers.config.when(clf="logistic")
21+
def prefit_clf__logreg(penalty: str) -> base.ClassifierMixin:
22+
"""Returns an unfitted Logistic Regression classifier object.
23+
24+
:param penalty:
25+
:return:
26+
"""
27+
return linear_model.LogisticRegression(penalty)
28+
29+
30+
@function_modifiers.extract_fields(
31+
{"X_train": np.ndarray, "X_test": np.ndarray, "y_train": np.ndarray, "y_test": np.ndarray}
32+
)
33+
def train_test_split_func(
34+
feature_matrix: np.ndarray,
35+
target: np.ndarray,
36+
test_size_fraction: float,
37+
shuffle_train_test_split: bool,
38+
) -> Dict[str, np.ndarray]:
39+
"""Function that creates the training & test splits.
40+
41+
It this then extracted out into constituent components and used downstream.
42+
43+
:param feature_matrix:
44+
:param target:
45+
:param test_size_fraction:
46+
:param shuffle_train_test_split:
47+
:return:
48+
"""
49+
X_train, X_test, y_train, y_test = train_test_split(
50+
feature_matrix, target, test_size=test_size_fraction, shuffle=shuffle_train_test_split
51+
)
52+
return {"X_train": X_train, "X_test": X_test, "y_train": y_train, "y_test": y_test}
53+
54+
55+
def y_test_with_labels(y_test: np.ndarray, target_names: np.ndarray) -> np.ndarray:
56+
"""Adds labels to the target output."""
57+
return np.array([target_names[idx] for idx in y_test])
58+
59+
60+
def fit_clf(
61+
prefit_clf: base.ClassifierMixin, X_train: np.ndarray, y_train: np.ndarray
62+
) -> base.ClassifierMixin:
63+
"""Calls fit on the classifier object; it mutates it."""
64+
prefit_clf.fit(X_train, y_train)
65+
return prefit_clf
66+
67+
68+
def predicted_output(fit_clf: base.ClassifierMixin, X_test: np.ndarray) -> np.ndarray:
69+
"""Exercised the fit classifier to perform a prediction."""
70+
return fit_clf.predict(X_test)
71+
72+
73+
def predicted_output_with_labels(
74+
predicted_output: np.ndarray, target_names: np.ndarray
75+
) -> np.ndarray:
76+
"""Replaces the predictions with the desired labels."""
77+
return np.array([target_names[idx] for idx in predicted_output])
78+
79+
80+
def classification_report(
81+
predicted_output_with_labels: np.ndarray, y_test_with_labels: np.ndarray
82+
) -> str:
83+
"""Returns a classification report."""
84+
return metrics.classification_report(y_test_with_labels, predicted_output_with_labels)
85+
86+
87+
def confusion_matrix(
88+
predicted_output_with_labels: np.ndarray, y_test_with_labels: np.ndarray
89+
) -> str:
90+
"""Returns a confusion matrix report."""
91+
return metrics.confusion_matrix(y_test_with_labels, predicted_output_with_labels)
92+
93+
94+
def model_parameters(fit_clf: base.ClassifierMixin) -> dict:
95+
"""Returns a dictionary of model parameters."""
96+
return fit_clf.get_params()

0 commit comments

Comments
 (0)