Skip to content

Commit d534e72

Browse files
committed
WIP
1 parent 2d1a929 commit d534e72

File tree

8 files changed

+325
-8
lines changed

8 files changed

+325
-8
lines changed

examples/materialization/README.md

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# Materialization
2+
3+
Hamilton's driver allows for ad-hoc materialization. This enables you to take a DAG you already have,
4+
and save your data to a set of custom locations/url.
5+
6+
Note that these materializers are _isomorphic_ in nature to the
7+
[@save_to](https://hamilton.dagworks.io/en/latest/reference/decorators/save_to/)
8+
decorator. Materializers inject the additional node at runtime, modifying the
9+
DAG to include a data saver node, and returning the metadata around materialization.
10+
11+
This framework is meant to be highly pluggable. While the set of available data savers is currently
12+
limited, we expect folks to build their own materializers (and, hopefully, contribute them back to the community!).
13+
14+
15+
## example
16+
In this example we take the scikit-learn iris_loader pipeline, and materialize outputs to specific
17+
locations through a driver call. We demonstrate:
18+
19+
1. Saving model parameters to a json file (using the default json materializer)
20+
2. Writing a custom data adapters for:
21+
1. Pickling a model to an object file
22+
2. Saving confusion matrices to a csv file
23+
24+
See [run.py](run.py) for the full example.
25+
26+
27+
## `driver.materialize`
28+
29+
This will be a high-level overview. For more details,
30+
see [documentation](https://hamilton.dagworks.io/en/latest/reference/drivers/Driver/#hamilton.driver.Driver.materializehttps://hamilton.dagworks.io/en/latest/reference/drivers/Driver/#hamilton.driver.Driver.materialize).
31+
32+
`driver.materialize()` does the following:
33+
1. Processes a list of materializers to create a new DAG
34+
2. Alters the output to include the materializer nodes
35+
3. Processes a list of "additional variables" (for debugging) to return intermediary data
36+
4. Executes the DAG, including the materializers
37+
5. Returns a tuple of (`materialization metadata`, `additional variables`)
38+
39+
Materialiazers each consume:
40+
1. A `dependencies` list to materialize
41+
2. A (optional) `combine` parameter to combine the outputs of the dependencies
42+
(this is required if there are multiple dependencies). This is a [ResultMixin](https://hamilton.dagworks.io/en/latest/concepts/customizing-execution/#result-builders) object
43+
3. an `id` parameter to identify the materializer, which serves as the nde name in the DAG
44+
45+
Materializers are referenced by the `to` object in `hamilton.io.materialization`, which utilizes
46+
dynamic dispatch to create the appropriate materializer.
47+
48+
These refer to a `DataSaver`, which are keyed by a string (E.G `csv`).
49+
Multiple data adapters can share the same key, each of which applies to a specific type
50+
(E.G. pandas dataframe, numpy matrix, polars dataframe). New
51+
data adapters are registered by calling `hamilton.registry.register_adapter`
52+
53+
## Custom Materializers
54+
55+
To define a custom materializer, all you have to do is implement the `DataSaver` class
56+
(which will allow use in `save_to` as well.)
57+
58+
## `driver.materialize` vs `@save_to`
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import dataclasses
2+
import pickle
3+
from typing import Any, Collection, Dict, Type
4+
5+
import numpy as np
6+
from sklearn import base
7+
8+
from hamilton import registry
9+
from hamilton.io import utils
10+
from hamilton.io.data_adapters import DataSaver
11+
12+
# TODO -- put this back in the standard library
13+
14+
15+
@dataclasses.dataclass
16+
class NumpyMatrixToCSV(DataSaver):
17+
path: str
18+
sep: str = ","
19+
20+
def __post_init__(self):
21+
if not self.path.endswith(".csv"):
22+
raise ValueError(f"CSV files must end with .csv, got {self.path}")
23+
24+
def save_data(self, data: np.ndarray) -> Dict[str, Any]:
25+
np.savetxt(self.path, data, delimiter=self.sep)
26+
return utils.get_file_metadata(self.path)
27+
28+
@classmethod
29+
def applicable_types(cls) -> Collection[Type]:
30+
return [np.ndarray]
31+
32+
@classmethod
33+
def name(cls) -> str:
34+
return "csv"
35+
36+
37+
@dataclasses.dataclass
38+
class SKLearnPickler(DataSaver):
39+
path: str
40+
41+
def save_data(self, data: base.ClassifierMixin) -> Dict[str, Any]:
42+
pickle.dump(data, open(self.path, "wb"))
43+
return utils.get_file_metadata(self.path)
44+
45+
@classmethod
46+
def applicable_types(cls) -> Collection[Type]:
47+
return [base.ClassifierMixin]
48+
49+
@classmethod
50+
def name(cls) -> str:
51+
return "pickle"
52+
53+
54+
for adapter in [NumpyMatrixToCSV, SKLearnPickler]:
55+
registry.register_adapter(adapter)
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import numpy as np
2+
from sklearn import datasets, utils
3+
4+
from hamilton.function_modifiers import config
5+
6+
"""
7+
Module to load digit data.
8+
"""
9+
10+
11+
@config.when(data_loader="iris")
12+
def data__iris() -> utils.Bunch:
13+
return datasets.load_digits()
14+
15+
16+
@config.when(data_loader="digits")
17+
def data__digits() -> utils.Bunch:
18+
return datasets.load_digits()
19+
20+
21+
def target(data: utils.Bunch) -> np.ndarray:
22+
return data.target
23+
24+
25+
def target_names(data: utils.Bunch) -> np.ndarray:
26+
return data.target_names
27+
28+
29+
def feature_matrix(data: utils.Bunch) -> np.ndarray:
30+
return data.data
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
from typing import Dict
2+
3+
import numpy as np
4+
from sklearn import base, linear_model, metrics, svm
5+
from sklearn.model_selection import train_test_split
6+
7+
from hamilton import function_modifiers
8+
9+
10+
@function_modifiers.config.when(clf="svm")
11+
def prefit_clf__svm(gamma: float = 0.001) -> base.ClassifierMixin:
12+
"""Returns an unfitted SVM classifier object.
13+
14+
:param gamma: ...
15+
:return:
16+
"""
17+
return svm.SVC(gamma=gamma)
18+
19+
20+
@function_modifiers.config.when(clf="logistic")
21+
def prefit_clf__logreg(penalty: str) -> base.ClassifierMixin:
22+
"""Returns an unfitted Logistic Regression classifier object.
23+
24+
:param penalty:
25+
:return:
26+
"""
27+
return linear_model.LogisticRegression(penalty)
28+
29+
30+
@function_modifiers.extract_fields(
31+
{"X_train": np.ndarray, "X_test": np.ndarray, "y_train": np.ndarray, "y_test": np.ndarray}
32+
)
33+
def train_test_split_func(
34+
feature_matrix: np.ndarray,
35+
target: np.ndarray,
36+
test_size_fraction: float,
37+
shuffle_train_test_split: bool,
38+
) -> Dict[str, np.ndarray]:
39+
"""Function that creates the training & test splits.
40+
41+
It this then extracted out into constituent components and used downstream.
42+
43+
:param feature_matrix:
44+
:param target:
45+
:param test_size_fraction:
46+
:param shuffle_train_test_split:
47+
:return:
48+
"""
49+
X_train, X_test, y_train, y_test = train_test_split(
50+
feature_matrix, target, test_size=test_size_fraction, shuffle=shuffle_train_test_split
51+
)
52+
return {"X_train": X_train, "X_test": X_test, "y_train": y_train, "y_test": y_test}
53+
54+
55+
def y_test_with_labels(y_test: np.ndarray, target_names: np.ndarray) -> np.ndarray:
56+
"""Adds labels to the target output."""
57+
return np.array([target_names[idx] for idx in y_test])
58+
59+
60+
def fit_clf(
61+
prefit_clf: base.ClassifierMixin, X_train: np.ndarray, y_train: np.ndarray
62+
) -> base.ClassifierMixin:
63+
"""Calls fit on the classifier object; it mutates it."""
64+
prefit_clf.fit(X_train, y_train)
65+
return prefit_clf
66+
67+
68+
def predicted_output(fit_clf: base.ClassifierMixin, X_test: np.ndarray) -> np.ndarray:
69+
"""Exercised the fit classifier to perform a prediction."""
70+
return fit_clf.predict(X_test)
71+
72+
73+
def predicted_output_with_labels(
74+
predicted_output: np.ndarray, target_names: np.ndarray
75+
) -> np.ndarray:
76+
"""Replaces the predictions with the desired labels."""
77+
return np.array([target_names[idx] for idx in predicted_output])
78+
79+
80+
def classification_report(
81+
predicted_output_with_labels: np.ndarray, y_test_with_labels: np.ndarray
82+
) -> str:
83+
"""Returns a classification report."""
84+
return metrics.classification_report(y_test_with_labels, predicted_output_with_labels)
85+
86+
87+
def confusion_matrix(
88+
predicted_output_with_labels: np.ndarray, y_test_with_labels: np.ndarray
89+
) -> str:
90+
"""Returns a confusion matrix report."""
91+
return metrics.confusion_matrix(y_test_with_labels, predicted_output_with_labels)
92+
93+
94+
def model_parameters(fit_clf: base.ClassifierMixin) -> dict:
95+
"""Returns a dictionary of model parameters."""
96+
return fit_clf.get_params()
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
scikit-learn
2+
sf-hamilton

examples/materialization/run.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
"""
2+
Example script showing how one might setup a generic model training pipeline that is quickly configurable.
3+
"""
4+
5+
# Required import to register adapters
6+
import data_loaders
7+
import model_training
8+
9+
from hamilton import base, driver
10+
from hamilton.io.materialization import to
11+
12+
13+
def get_model_config(model_type: str) -> dict:
14+
"""Returns model type specific configuration"""
15+
if model_type == "svm":
16+
return {"clf": "svm", "gamma": 0.001}
17+
elif model_type == "logistic":
18+
return {"clf": "logistic", "penalty": "l2"}
19+
else:
20+
raise ValueError(f"Unsupported model {model_type}.")
21+
22+
23+
if __name__ == "__main__":
24+
import sys
25+
26+
if len(sys.argv) < 3:
27+
print("Error: required arguments are [iris|digits] [svm|logistic]")
28+
sys.exit(1)
29+
_data_set = sys.argv[1] # the data set to load
30+
_model_type = sys.argv[2] # the model type to fit and evaluate with
31+
32+
dag_config = {
33+
"test_size_fraction": 0.5,
34+
"shuffle_train_test_split": True,
35+
}
36+
# augment config
37+
dag_config.update(get_model_config(_model_type))
38+
dag_config["data_loader"] = _data_set
39+
dr = (
40+
driver.Builder()
41+
.with_adapter(base.DefaultAdapter())
42+
.with_config(dag_config)
43+
.with_modules(data_loaders, model_training)
44+
.build()
45+
)
46+
materializers = [
47+
to.json(dependencies=["model_parameters"], id="model_params_to_json", path="./params.json"),
48+
# classificaiton report to .txt file
49+
to.file(
50+
dependencies=["classification_report"],
51+
id="classification_report_to_csv",
52+
path="./classification_report.txt",
53+
),
54+
# materialize the model to a pickle file
55+
to.pickle(dependencies=["prefit_clf"], id="prefit_clf_to_pickle", path="./prefit_clf.pkl"),
56+
# materialize the predictions we made to a csv file
57+
to.csv(
58+
dependencies=["predicted_output_with_labels"],
59+
id="predicted_output_with_labels_to_csv",
60+
path="./predicted_output_with_labels.csv",
61+
),
62+
]
63+
dr.visualize_materialization(
64+
*materializers,
65+
additional_vars=["classification_report"],
66+
output_file_path="./dag",
67+
render_kwargs={},
68+
)
69+
materialization_results, additional_vars = dr.materialize(
70+
# materialize model parameters to json
71+
*materializers,
72+
additional_vars=["classification_report"],
73+
)
74+
print(materialization_results["classification_report"])
75+
print(additional_vars)

hamilton/function_modifiers/dependencies.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,15 @@ class SingleDependency(ParametrizedDependency, abc.ABC):
2929

3030

3131
@dataclasses.dataclass
32-
class LiteralDependency(ParametrizedDependency):
32+
class LiteralDependency(SingleDependency):
3333
value: Any
3434

3535
def get_dependency_type(self) -> ParametrizedDependencySource:
3636
return ParametrizedDependencySource.LITERAL
3737

3838

3939
@dataclasses.dataclass
40-
class UpstreamDependency(ParametrizedDependency):
40+
class UpstreamDependency(SingleDependency):
4141
source: str
4242

4343
def get_dependency_type(self) -> ParametrizedDependencySource:

hamilton/io/materialization.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from hamilton.function_modifiers.dependencies import SingleDependency, value
88
from hamilton.graph import FunctionGraph
99
from hamilton.io.data_adapters import DataSaver
10-
from hamilton.registry import LOADER_REGISTRY
10+
from hamilton.registry import SAVER_REGISTRY
1111

1212

1313
class materialization_meta__(type):
@@ -19,17 +19,17 @@ class in registry, or make it a function that just proxies to the decorator. We
1919
"""
2020

2121
def __getattr__(cls, item: str):
22-
if item in LOADER_REGISTRY:
23-
potential_loaders = LOADER_REGISTRY[item]
22+
if item in SAVER_REGISTRY:
23+
potential_loaders = SAVER_REGISTRY[item]
2424
savers = [loader for loader in potential_loaders if issubclass(loader, DataSaver)]
2525
if len(savers) > 0:
26-
return Materialize.partial(LOADER_REGISTRY[item])
26+
return Materialize.partial(SAVER_REGISTRY[item])
2727
try:
2828
return super().__getattribute__(item)
2929
except AttributeError as e:
3030
raise AttributeError(
31-
f"No loader named: {item} available for {cls.__name__}. "
32-
f"Available loaders are: {LOADER_REGISTRY.keys()}. "
31+
f"No data materializer named: {item}. "
32+
f"Available materializers are: {SAVER_REGISTRY.keys()}. "
3333
f"If you've gotten to this point, you either (1) spelled the "
3434
f"loader name wrong, (2) are trying to use a loader that does"
3535
f"not exist (yet)"
@@ -76,6 +76,7 @@ def _process_kwargs(
7676
"""
7777
processed_kwargs = {}
7878
for kwarg, kwarg_val in data_saver_kwargs.items():
79+
7980
if not isinstance(kwarg_val, SingleDependency):
8081
processed_kwargs[kwarg] = value(kwarg_val)
8182
else:

0 commit comments

Comments
 (0)