-
Notifications
You must be signed in to change notification settings - Fork 149
Materialization improvements #264
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 7 commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
c203dfc
Fixes materialization viz function
elijahbenizzy 2be6fcd
Fixes class hierarchy in dependencies.py
elijahbenizzy 6f6ed2d
Fixes duplication of node dependencies
elijahbenizzy af2e6ef
Fixes materialize bug
elijahbenizzy ae6636e
Adds examples for materializers
elijahbenizzy 846b3a6
Sets the default executor to multithreading
elijahbenizzy eefe8ae
Fixes issue with name clashes on data loader parameters
elijahbenizzy be724aa
Adds documentation/examples for data adapters
elijahbenizzy File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
# Materialization | ||
|
||
Hamilton's driver allows for ad-hoc materialization. This enables you to take a DAG you already have, | ||
and save your data to a set of custom locations/url. | ||
|
||
Note that these materializers are _isomorphic_ in nature to the | ||
[@save_to](https://hamilton.dagworks.io/en/latest/reference/decorators/save_to/) | ||
decorator. Materializers inject the additional node at runtime, modifying the | ||
DAG to include a data saver node, and returning the metadata around materialization. | ||
|
||
This framework is meant to be highly pluggable. While the set of available data savers is currently | ||
limited, we expect folks to build their own materializers (and, hopefully, contribute them back to the community!). | ||
|
||
|
||
## example | ||
In this example we take the scikit-learn iris_loader pipeline, and materialize outputs to specific | ||
locations through a driver call. We demonstrate: | ||
|
||
1. Saving model parameters to a json file (using the default json materializer) | ||
2. Writing a custom data adapters for: | ||
1. Pickling a model to an object file | ||
2. Saving confusion matrices to a csv file | ||
|
||
See [run.py](run.py) for the full example. | ||
|
||
|
||
## `driver.materialize` | ||
|
||
This will be a high-level overview. For more details, | ||
see [documentation](https://hamilton.dagworks.io/en/latest/reference/drivers/Driver/#hamilton.driver.Driver.materializehttps://hamilton.dagworks.io/en/latest/reference/drivers/Driver/#hamilton.driver.Driver.materialize). | ||
|
||
`driver.materialize()` does the following: | ||
1. Processes a list of materializers to create a new DAG | ||
2. Alters the output to include the materializer nodes | ||
3. Processes a list of "additional variables" (for debugging) to return intermediary data | ||
4. Executes the DAG, including the materializers | ||
5. Returns a tuple of (`materialization metadata`, `additional variables`) | ||
|
||
Materializers each consume: | ||
1. A `dependencies` list to materialize | ||
2. A (optional) `combine` parameter to combine the outputs of the dependencies | ||
(this is required if there are multiple dependencies). This is a [ResultMixin](https://hamilton.dagworks.io/en/latest/concepts/customizing-execution/#result-builders) object | ||
3. an `id` parameter to identify the materializer, which serves as the nde name in the DAG | ||
|
||
Materializers are referenced by the `to` object in `hamilton.io.materialization`, which utilizes | ||
dynamic dispatch to create the appropriate materializer. | ||
|
||
These refer to a `DataSaver`, which are keyed by a string (E.G `csv`). | ||
Multiple data adapters can share the same key, each of which applies to a specific type | ||
(E.G. pandas dataframe, numpy matrix, polars dataframe). New | ||
data adapters are registered by calling `hamilton.registry.register_adapter` | ||
|
||
## Custom Materializers | ||
|
||
To define a custom materializer, all you have to do is implement the `DataSaver` class | ||
(which will allow use in `save_to` as well.) This is demonstrated in [custom_materializers.py](custom_materializers.py). | ||
|
||
## `driver.materialize` vs `@save_to` | ||
|
||
`driver.materialize` is an ad-hoc form of `save_to`. You want to use this when you're developing, and | ||
want to do ad-hoc materialization. When you have a production ETL, you can choose between `save_to` and `materialize`. | ||
If the save location/structure is unlikely to change, then you might consider using `save_to`. Otherwise, `materialize` | ||
is an idiomatic way of conducting the maerialization operations that cleanly separates side-effects from transformations. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
import dataclasses | ||
import pickle | ||
from typing import Any, Collection, Dict, Type | ||
|
||
import numpy as np | ||
from sklearn import base | ||
|
||
from hamilton import registry | ||
from hamilton.io import utils | ||
from hamilton.io.data_adapters import DataSaver | ||
|
||
# TODO -- put this back in the standard library | ||
|
||
|
||
@dataclasses.dataclass | ||
class NumpyMatrixToCSV(DataSaver): | ||
path: str | ||
sep: str = "," | ||
|
||
def __post_init__(self): | ||
if not self.path.endswith(".csv"): | ||
raise ValueError(f"CSV files must end with .csv, got {self.path}") | ||
|
||
def save_data(self, data: np.ndarray) -> Dict[str, Any]: | ||
np.savetxt(self.path, data, delimiter=self.sep) | ||
return utils.get_file_metadata(self.path) | ||
|
||
@classmethod | ||
def applicable_types(cls) -> Collection[Type]: | ||
return [np.ndarray] | ||
|
||
@classmethod | ||
def name(cls) -> str: | ||
return "csv" | ||
|
||
|
||
@dataclasses.dataclass | ||
class SKLearnPickler(DataSaver): | ||
path: str | ||
|
||
def save_data(self, data: base.ClassifierMixin) -> Dict[str, Any]: | ||
pickle.dump(data, open(self.path, "wb")) | ||
return utils.get_file_metadata(self.path) | ||
|
||
@classmethod | ||
def applicable_types(cls) -> Collection[Type]: | ||
return [base.ClassifierMixin] | ||
|
||
@classmethod | ||
def name(cls) -> str: | ||
return "pickle" | ||
|
||
|
||
for adapter in [NumpyMatrixToCSV, SKLearnPickler]: | ||
registry.register_adapter(adapter) |
Binary file not shown.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
import numpy as np | ||
from sklearn import datasets, utils | ||
|
||
from hamilton.function_modifiers import config | ||
|
||
""" | ||
Module to load digit data. | ||
""" | ||
|
||
|
||
@config.when(data_loader="iris") | ||
def data__iris() -> utils.Bunch: | ||
return datasets.load_digits() | ||
|
||
|
||
@config.when(data_loader="digits") | ||
def data__digits() -> utils.Bunch: | ||
return datasets.load_digits() | ||
|
||
|
||
def target(data: utils.Bunch) -> np.ndarray: | ||
return data.target | ||
|
||
|
||
def target_names(data: utils.Bunch) -> np.ndarray: | ||
return data.target_names | ||
|
||
|
||
def feature_matrix(data: utils.Bunch) -> np.ndarray: | ||
return data.data |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
from typing import Dict | ||
|
||
import numpy as np | ||
from sklearn import base, linear_model, metrics, svm | ||
from sklearn.model_selection import train_test_split | ||
|
||
from hamilton import function_modifiers | ||
|
||
|
||
@function_modifiers.config.when(clf="svm") | ||
def prefit_clf__svm(gamma: float = 0.001) -> base.ClassifierMixin: | ||
"""Returns an unfitted SVM classifier object. | ||
|
||
:param gamma: ... | ||
:return: | ||
""" | ||
return svm.SVC(gamma=gamma) | ||
|
||
|
||
@function_modifiers.config.when(clf="logistic") | ||
def prefit_clf__logreg(penalty: str) -> base.ClassifierMixin: | ||
"""Returns an unfitted Logistic Regression classifier object. | ||
|
||
:param penalty: | ||
:return: | ||
""" | ||
return linear_model.LogisticRegression(penalty) | ||
|
||
|
||
@function_modifiers.extract_fields( | ||
{"X_train": np.ndarray, "X_test": np.ndarray, "y_train": np.ndarray, "y_test": np.ndarray} | ||
) | ||
def train_test_split_func( | ||
feature_matrix: np.ndarray, | ||
target: np.ndarray, | ||
test_size_fraction: float, | ||
shuffle_train_test_split: bool, | ||
) -> Dict[str, np.ndarray]: | ||
"""Function that creates the training & test splits. | ||
|
||
It this then extracted out into constituent components and used downstream. | ||
|
||
:param feature_matrix: | ||
:param target: | ||
:param test_size_fraction: | ||
:param shuffle_train_test_split: | ||
:return: | ||
""" | ||
X_train, X_test, y_train, y_test = train_test_split( | ||
feature_matrix, target, test_size=test_size_fraction, shuffle=shuffle_train_test_split | ||
) | ||
return {"X_train": X_train, "X_test": X_test, "y_train": y_train, "y_test": y_test} | ||
|
||
|
||
def y_test_with_labels(y_test: np.ndarray, target_names: np.ndarray) -> np.ndarray: | ||
"""Adds labels to the target output.""" | ||
return np.array([target_names[idx] for idx in y_test]) | ||
|
||
|
||
def fit_clf( | ||
prefit_clf: base.ClassifierMixin, X_train: np.ndarray, y_train: np.ndarray | ||
) -> base.ClassifierMixin: | ||
"""Calls fit on the classifier object; it mutates it.""" | ||
prefit_clf.fit(X_train, y_train) | ||
return prefit_clf | ||
|
||
|
||
def predicted_output(fit_clf: base.ClassifierMixin, X_test: np.ndarray) -> np.ndarray: | ||
"""Exercised the fit classifier to perform a prediction.""" | ||
return fit_clf.predict(X_test) | ||
|
||
|
||
def predicted_output_with_labels( | ||
predicted_output: np.ndarray, target_names: np.ndarray | ||
) -> np.ndarray: | ||
"""Replaces the predictions with the desired labels.""" | ||
return np.array([target_names[idx] for idx in predicted_output]) | ||
|
||
|
||
def classification_report( | ||
predicted_output_with_labels: np.ndarray, y_test_with_labels: np.ndarray | ||
) -> str: | ||
"""Returns a classification report.""" | ||
return metrics.classification_report(y_test_with_labels, predicted_output_with_labels) | ||
|
||
|
||
def confusion_matrix( | ||
predicted_output_with_labels: np.ndarray, y_test_with_labels: np.ndarray | ||
) -> str: | ||
"""Returns a confusion matrix report.""" | ||
return metrics.confusion_matrix(y_test_with_labels, predicted_output_with_labels) | ||
|
||
|
||
def model_parameters(fit_clf: base.ClassifierMixin) -> dict: | ||
"""Returns a dictionary of model parameters.""" | ||
return fit_clf.get_params() |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why "additional variables"? Because that's what we use in execute? Should we rename it to "additional_outputs"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep to mirror "final_vars", but I like "outputs" better than vars
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would just make it outputs personally. But 🤷 .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On second thought we already released this and I'm a stickler for semantic versioning...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
your call.