Skip to content

Materialization improvements #264

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions examples/materialization/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Materialization

Hamilton's driver allows for ad-hoc materialization. This enables you to take a DAG you already have,
and save your data to a set of custom locations/url.

Note that these materializers are _isomorphic_ in nature to the
[@save_to](https://hamilton.dagworks.io/en/latest/reference/decorators/save_to/)
decorator. Materializers inject the additional node at runtime, modifying the
DAG to include a data saver node, and returning the metadata around materialization.

This framework is meant to be highly pluggable. While the set of available data savers is currently
limited, we expect folks to build their own materializers (and, hopefully, contribute them back to the community!).


## example
In this example we take the scikit-learn iris_loader pipeline, and materialize outputs to specific
locations through a driver call. We demonstrate:

1. Saving model parameters to a json file (using the default json materializer)
2. Writing a custom data adapters for:
1. Pickling a model to an object file
2. Saving confusion matrices to a csv file

See [run.py](run.py) for the full example.


## `driver.materialize`

This will be a high-level overview. For more details,
see [documentation](https://hamilton.dagworks.io/en/latest/reference/drivers/Driver/#hamilton.driver.Driver.materializehttps://hamilton.dagworks.io/en/latest/reference/drivers/Driver/#hamilton.driver.Driver.materialize).

`driver.materialize()` does the following:
1. Processes a list of materializers to create a new DAG
2. Alters the output to include the materializer nodes
3. Processes a list of "additional variables" (for debugging) to return intermediary data
4. Executes the DAG, including the materializers
5. Returns a tuple of (`materialization metadata`, `additional variables`)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why "additional variables"? Because that's what we use in execute? Should we rename it to "additional_outputs"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep to mirror "final_vars", but I like "outputs" better than vars

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would just make it outputs personally. But 🤷 .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought we already released this and I'm a stickler for semantic versioning...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

your call.


Materializers each consume:
1. A `dependencies` list to materialize
2. A (optional) `combine` parameter to combine the outputs of the dependencies
(this is required if there are multiple dependencies). This is a [ResultMixin](https://hamilton.dagworks.io/en/latest/concepts/customizing-execution/#result-builders) object
3. an `id` parameter to identify the materializer, which serves as the nde name in the DAG

Materializers are referenced by the `to` object in `hamilton.io.materialization`, which utilizes
dynamic dispatch to create the appropriate materializer.

These refer to a `DataSaver`, which are keyed by a string (E.G `csv`).
Multiple data adapters can share the same key, each of which applies to a specific type
(E.G. pandas dataframe, numpy matrix, polars dataframe). New
data adapters are registered by calling `hamilton.registry.register_adapter`

## Custom Materializers

To define a custom materializer, all you have to do is implement the `DataSaver` class
(which will allow use in `save_to` as well.) This is demonstrated in [custom_materializers.py](custom_materializers.py).

## `driver.materialize` vs `@save_to`

`driver.materialize` is an ad-hoc form of `save_to`. You want to use this when you're developing, and
want to do ad-hoc materialization. When you have a production ETL, you can choose between `save_to` and `materialize`.
If the save location/structure is unlikely to change, then you might consider using `save_to`. Otherwise, `materialize`
is an idiomatic way of conducting the maerialization operations that cleanly separates side-effects from transformations.
55 changes: 55 additions & 0 deletions examples/materialization/custom_materializers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import dataclasses
import pickle
from typing import Any, Collection, Dict, Type

import numpy as np
from sklearn import base

from hamilton import registry
from hamilton.io import utils
from hamilton.io.data_adapters import DataSaver

# TODO -- put this back in the standard library


@dataclasses.dataclass
class NumpyMatrixToCSV(DataSaver):
path: str
sep: str = ","

def __post_init__(self):
if not self.path.endswith(".csv"):
raise ValueError(f"CSV files must end with .csv, got {self.path}")

def save_data(self, data: np.ndarray) -> Dict[str, Any]:
np.savetxt(self.path, data, delimiter=self.sep)
return utils.get_file_metadata(self.path)

@classmethod
def applicable_types(cls) -> Collection[Type]:
return [np.ndarray]

@classmethod
def name(cls) -> str:
return "csv"


@dataclasses.dataclass
class SKLearnPickler(DataSaver):
path: str

def save_data(self, data: base.ClassifierMixin) -> Dict[str, Any]:
pickle.dump(data, open(self.path, "wb"))
return utils.get_file_metadata(self.path)

@classmethod
def applicable_types(cls) -> Collection[Type]:
return [base.ClassifierMixin]

@classmethod
def name(cls) -> str:
return "pickle"


for adapter in [NumpyMatrixToCSV, SKLearnPickler]:
registry.register_adapter(adapter)
Binary file added examples/materialization/dag.pdf
Binary file not shown.
30 changes: 30 additions & 0 deletions examples/materialization/data_loaders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import numpy as np
from sklearn import datasets, utils

from hamilton.function_modifiers import config

"""
Module to load digit data.
"""


@config.when(data_loader="iris")
def data__iris() -> utils.Bunch:
return datasets.load_digits()


@config.when(data_loader="digits")
def data__digits() -> utils.Bunch:
return datasets.load_digits()


def target(data: utils.Bunch) -> np.ndarray:
return data.target


def target_names(data: utils.Bunch) -> np.ndarray:
return data.target_names


def feature_matrix(data: utils.Bunch) -> np.ndarray:
return data.data
96 changes: 96 additions & 0 deletions examples/materialization/model_training.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from typing import Dict

import numpy as np
from sklearn import base, linear_model, metrics, svm
from sklearn.model_selection import train_test_split

from hamilton import function_modifiers


@function_modifiers.config.when(clf="svm")
def prefit_clf__svm(gamma: float = 0.001) -> base.ClassifierMixin:
"""Returns an unfitted SVM classifier object.

:param gamma: ...
:return:
"""
return svm.SVC(gamma=gamma)


@function_modifiers.config.when(clf="logistic")
def prefit_clf__logreg(penalty: str) -> base.ClassifierMixin:
"""Returns an unfitted Logistic Regression classifier object.

:param penalty:
:return:
"""
return linear_model.LogisticRegression(penalty)


@function_modifiers.extract_fields(
{"X_train": np.ndarray, "X_test": np.ndarray, "y_train": np.ndarray, "y_test": np.ndarray}
)
def train_test_split_func(
feature_matrix: np.ndarray,
target: np.ndarray,
test_size_fraction: float,
shuffle_train_test_split: bool,
) -> Dict[str, np.ndarray]:
"""Function that creates the training & test splits.

It this then extracted out into constituent components and used downstream.

:param feature_matrix:
:param target:
:param test_size_fraction:
:param shuffle_train_test_split:
:return:
"""
X_train, X_test, y_train, y_test = train_test_split(
feature_matrix, target, test_size=test_size_fraction, shuffle=shuffle_train_test_split
)
return {"X_train": X_train, "X_test": X_test, "y_train": y_train, "y_test": y_test}


def y_test_with_labels(y_test: np.ndarray, target_names: np.ndarray) -> np.ndarray:
"""Adds labels to the target output."""
return np.array([target_names[idx] for idx in y_test])


def fit_clf(
prefit_clf: base.ClassifierMixin, X_train: np.ndarray, y_train: np.ndarray
) -> base.ClassifierMixin:
"""Calls fit on the classifier object; it mutates it."""
prefit_clf.fit(X_train, y_train)
return prefit_clf


def predicted_output(fit_clf: base.ClassifierMixin, X_test: np.ndarray) -> np.ndarray:
"""Exercised the fit classifier to perform a prediction."""
return fit_clf.predict(X_test)


def predicted_output_with_labels(
predicted_output: np.ndarray, target_names: np.ndarray
) -> np.ndarray:
"""Replaces the predictions with the desired labels."""
return np.array([target_names[idx] for idx in predicted_output])


def classification_report(
predicted_output_with_labels: np.ndarray, y_test_with_labels: np.ndarray
) -> str:
"""Returns a classification report."""
return metrics.classification_report(y_test_with_labels, predicted_output_with_labels)


def confusion_matrix(
predicted_output_with_labels: np.ndarray, y_test_with_labels: np.ndarray
) -> str:
"""Returns a confusion matrix report."""
return metrics.confusion_matrix(y_test_with_labels, predicted_output_with_labels)


def model_parameters(fit_clf: base.ClassifierMixin) -> dict:
"""Returns a dictionary of model parameters."""
return fit_clf.get_params()
Loading