Skip to content

feat: add ml.preprocessing.MinMaxScaler #64

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bigframes/ml/compose.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
preprocessing.OneHotEncoder,
preprocessing.StandardScaler,
preprocessing.MaxAbsScaler,
preprocessing.MinMaxScaler,
preprocessing.LabelEncoder,
]

Expand Down
12 changes: 11 additions & 1 deletion bigframes/ml/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def __init__(self, steps: List[Tuple[str, base.BaseEstimator]]):
preprocessing.StandardScaler,
preprocessing.OneHotEncoder,
preprocessing.MaxAbsScaler,
preprocessing.MinMaxScaler,
preprocessing.LabelEncoder,
),
):
Expand Down Expand Up @@ -149,6 +150,7 @@ def _extract_as_column_transformer(
preprocessing.OneHotEncoder,
preprocessing.StandardScaler,
preprocessing.MaxAbsScaler,
preprocessing.MinMaxScaler,
preprocessing.LabelEncoder,
],
Union[str, List[str]],
Expand Down Expand Up @@ -177,10 +179,17 @@ def _extract_as_column_transformer(
elif transform_sql.startswith("ML.MAX_ABS_SCALER"):
transformers.append(
(
"max_abs_encoder",
"max_abs_scaler",
*preprocessing.MaxAbsScaler._parse_from_sql(transform_sql),
)
)
elif transform_sql.startswith("ML.MIN_MAX_SCALER"):
transformers.append(
(
"min_max_scaler",
*preprocessing.MinMaxScaler._parse_from_sql(transform_sql),
)
)
elif transform_sql.startswith("ML.LABEL_ENCODER"):
transformers.append(
(
Expand All @@ -203,6 +212,7 @@ def _merge_column_transformer(
preprocessing.StandardScaler,
preprocessing.OneHotEncoder,
preprocessing.MaxAbsScaler,
preprocessing.MinMaxScaler,
preprocessing.LabelEncoder,
]:
"""Try to merge the column transformer to a simple transformer."""
Expand Down
84 changes: 82 additions & 2 deletions bigframes/ml/preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,13 @@ def _compile_to_sql(self, columns: List[str]) -> List[Tuple[str, str]]:

@classmethod
def _parse_from_sql(cls, sql: str) -> tuple[MaxAbsScaler, str]:
"""Parse SQL to tuple(StandardScaler, column_label).
"""Parse SQL to tuple(MaxAbsScaler, column_label).

Args:
sql: SQL string of format "ML.MAX_ABS_SCALER({col_label}) OVER()"

Returns:
tuple(StandardScaler, column_label)"""
tuple(MaxAbsScaler, column_label)"""
col_label = sql[sql.find("(") + 1 : sql.find(")")]
return cls(), col_label

Expand Down Expand Up @@ -187,6 +187,86 @@ def transform(self, X: Union[bpd.DataFrame, bpd.Series]) -> bpd.DataFrame:
)


class MinMaxScaler(
base.Transformer,
third_party.bigframes_vendored.sklearn.preprocessing._data.MinMaxScaler,
):
__doc__ = (
third_party.bigframes_vendored.sklearn.preprocessing._data.MinMaxScaler.__doc__
)

def __init__(self):
self._bqml_model: Optional[core.BqmlModel] = None
self._bqml_model_factory = globals.bqml_model_factory()
self._base_sql_generator = globals.base_sql_generator()

# TODO(garrettwu): implement __hash__
def __eq__(self, other: Any) -> bool:
return type(other) is MinMaxScaler and self._bqml_model == other._bqml_model

def _compile_to_sql(self, columns: List[str]) -> List[Tuple[str, str]]:
"""Compile this transformer to a list of SQL expressions that can be included in
a BQML TRANSFORM clause

Args:
columns: a list of column names to transform

Returns: a list of tuples of (sql_expression, output_name)"""
return [
(
self._base_sql_generator.ml_min_max_scaler(
column, f"min_max_scaled_{column}"
),
f"min_max_scaled_{column}",
)
for column in columns
]

@classmethod
def _parse_from_sql(cls, sql: str) -> tuple[MinMaxScaler, str]:
"""Parse SQL to tuple(MinMaxScaler, column_label).

Args:
sql: SQL string of format "ML.MIN_MAX_SCALER({col_label}) OVER()"

Returns:
tuple(MinMaxScaler, column_label)"""
col_label = sql[sql.find("(") + 1 : sql.find(")")]
return cls(), col_label

def fit(
self,
X: Union[bpd.DataFrame, bpd.Series],
y=None, # ignored
) -> MinMaxScaler:
(X,) = utils.convert_to_dataframe(X)

compiled_transforms = self._compile_to_sql(X.columns.tolist())
transform_sqls = [transform_sql for transform_sql, _ in compiled_transforms]

self._bqml_model = self._bqml_model_factory.create_model(
X,
options={"model_type": "transform_only"},
transforms=transform_sqls,
)

# The schema of TRANSFORM output is not available in the model API, so save it during fitting
self._output_names = [name for _, name in compiled_transforms]
return self

def transform(self, X: Union[bpd.DataFrame, bpd.Series]) -> bpd.DataFrame:
if not self._bqml_model:
raise RuntimeError("Must be fitted before transform")

(X,) = utils.convert_to_dataframe(X)

df = self._bqml_model.transform(X)
return typing.cast(
bpd.DataFrame,
df[self._output_names],
)


class OneHotEncoder(
base.Transformer,
third_party.bigframes_vendored.sklearn.preprocessing._encoder.OneHotEncoder,
Expand Down
4 changes: 4 additions & 0 deletions bigframes/ml/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ def ml_max_abs_scaler(self, numeric_expr_sql: str, name: str) -> str:
"""Encode ML.MAX_ABS_SCALER for BQML"""
return f"""ML.MAX_ABS_SCALER({numeric_expr_sql}) OVER() AS {name}"""

def ml_min_max_scaler(self, numeric_expr_sql: str, name: str) -> str:
"""Encode ML.MIN_MAX_SCALER for BQML"""
return f"""ML.MIN_MAX_SCALER({numeric_expr_sql}) OVER() AS {name}"""

def ml_one_hot_encoder(
self,
numeric_expr_sql: str,
Expand Down
46 changes: 43 additions & 3 deletions tests/system/large/ml/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,11 @@ def test_pipeline_columntransformer_fit_predict(session, penguins_df_default_ind
preprocessing.MaxAbsScaler(),
["culmen_length_mm", "flipper_length_mm"],
),
(
"min_max_scale",
preprocessing.MinMaxScaler(),
["culmen_length_mm", "flipper_length_mm"],
),
(
"label",
preprocessing.LabelEncoder(),
Expand Down Expand Up @@ -647,6 +652,11 @@ def test_pipeline_columntransformer_to_gbq(penguins_df_default_index, dataset_id
preprocessing.MaxAbsScaler(),
["culmen_length_mm", "flipper_length_mm"],
),
(
"min_max_scale",
preprocessing.MinMaxScaler(),
["culmen_length_mm", "flipper_length_mm"],
),
(
"label",
preprocessing.LabelEncoder(),
Expand Down Expand Up @@ -684,9 +694,11 @@ def test_pipeline_columntransformer_to_gbq(penguins_df_default_index, dataset_id
"species",
),
("standard_scaler", preprocessing.StandardScaler(), "culmen_length_mm"),
("max_abs_encoder", preprocessing.MaxAbsScaler(), "culmen_length_mm"),
("max_abs_scaler", preprocessing.MaxAbsScaler(), "culmen_length_mm"),
("min_max_scaler", preprocessing.MinMaxScaler(), "culmen_length_mm"),
("standard_scaler", preprocessing.StandardScaler(), "flipper_length_mm"),
("max_abs_encoder", preprocessing.MaxAbsScaler(), "flipper_length_mm"),
("max_abs_scaler", preprocessing.MaxAbsScaler(), "flipper_length_mm"),
("min_max_scaler", preprocessing.MinMaxScaler(), "flipper_length_mm"),
]

assert transformers == expected
Expand Down Expand Up @@ -743,14 +755,42 @@ def test_pipeline_max_abs_scaler_to_gbq(penguins_df_default_index, dataset_id):
pl.fit(X_train, y_train)

pl_loaded = pl.to_gbq(
f"{dataset_id}.test_penguins_pipeline_standard_scaler", replace=True
f"{dataset_id}.test_penguins_pipeline_min_max_scaler", replace=True
)
assert isinstance(pl_loaded._transform, preprocessing.MaxAbsScaler)

assert isinstance(pl_loaded._estimator, linear_model.LinearRegression)
assert pl_loaded._estimator.fit_intercept is False


def test_pipeline_min_max_scaler_to_gbq(penguins_df_default_index, dataset_id):
pl = pipeline.Pipeline(
[
("transform", preprocessing.MinMaxScaler()),
("estimator", linear_model.LinearRegression(fit_intercept=False)),
]
)

df = penguins_df_default_index.dropna()
X_train = df[
[
"culmen_length_mm",
"culmen_depth_mm",
"flipper_length_mm",
]
]
y_train = df[["body_mass_g"]]
pl.fit(X_train, y_train)

pl_loaded = pl.to_gbq(
f"{dataset_id}.test_penguins_pipeline_min_max_scaler", replace=True
)
assert isinstance(pl_loaded._transform, preprocessing.MinMaxScaler)

assert isinstance(pl_loaded._estimator, linear_model.LinearRegression)
assert pl_loaded._estimator.fit_intercept is False


def test_pipeline_one_hot_encoder_to_gbq(penguins_df_default_index, dataset_id):
pl = pipeline.Pipeline(
[
Expand Down
93 changes: 93 additions & 0 deletions tests/system/small/ml/test_preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,99 @@ def test_max_abs_scaler_series_normalizes(penguins_df_default_index, new_penguin
pd.testing.assert_frame_equal(result, expected, rtol=1e-3)


def test_min_max_scaler_normalizeds_fit_transform(new_penguins_df):
scaler = bigframes.ml.preprocessing.MinMaxScaler()
result = scaler.fit_transform(
new_penguins_df[["culmen_length_mm", "culmen_depth_mm", "flipper_length_mm"]]
).to_pandas()

# TODO: bug? feature columns seem to be in nondeterministic random order
# workaround: sort columns by name. Can't repro it in pantheon, so could
# be a bigframes issue...
result = result.reindex(sorted(result.columns), axis=1)

expected = pd.DataFrame(
{
"min_max_scaled_culmen_depth_mm": [1.0, 0.0, 0.5625],
"min_max_scaled_culmen_length_mm": [1.0, 0.375, 0.0],
"min_max_scaled_flipper_length_mm": [1.0, 0.0, 0.466667],
},
dtype="Float64",
index=pd.Index([1633, 1672, 1690], name="tag_number", dtype="Int64"),
)

pd.testing.assert_frame_equal(result, expected, rtol=1e-3)


def test_min_max_scaler_series_normalizes(penguins_df_default_index, new_penguins_df):
scaler = bigframes.ml.preprocessing.MinMaxScaler()
scaler.fit(penguins_df_default_index["culmen_length_mm"])

result = scaler.transform(penguins_df_default_index["culmen_length_mm"]).to_pandas()

# If minmax-scaled correctly, min should be 0 and max should be 1.
for column in result.columns:
assert math.isclose(result[column].max(), 1.0, abs_tol=1e-3)
assert math.isclose(result[column].min(), 0.0, abs_tol=1e-3)

result = scaler.transform(new_penguins_df).to_pandas()

# TODO: bug? feature columns seem to be in nondeterministic random order
# workaround: sort columns by name. Can't repro it in pantheon, so could
# be a bigframes issue...
result = result.reindex(sorted(result.columns), axis=1)

expected = pd.DataFrame(
{
"min_max_scaled_culmen_length_mm": [0.269091, 0.232727, 0.210909],
},
dtype="Float64",
index=pd.Index([1633, 1672, 1690], name="tag_number", dtype="Int64"),
)

pd.testing.assert_frame_equal(result, expected, rtol=1e-3)


def test_min_max_scaler_normalizes(penguins_df_default_index, new_penguins_df):
# TODO(http://b/292431644): add a second test that compares output to sklearn.preprocessing.StandardScaler, when BQML's change is in prod.
scaler = bigframes.ml.preprocessing.MinMaxScaler()
scaler.fit(
penguins_df_default_index[
["culmen_length_mm", "culmen_depth_mm", "flipper_length_mm"]
]
)

result = scaler.transform(
penguins_df_default_index[
["culmen_length_mm", "culmen_depth_mm", "flipper_length_mm"]
]
).to_pandas()

# If minmax-scaled correctly, min should be 0 and max should be 1.
for column in result.columns:
assert math.isclose(result[column].max(), 1.0, abs_tol=1e-3)
assert math.isclose(result[column].min(), 0.0, abs_tol=1e-3)

result = scaler.transform(new_penguins_df).to_pandas()

# TODO: bug? feature columns seem to be in nondeterministic random order
# workaround: sort columns by name. Can't repro it in pantheon, so could
# be a bigframes issue...
result = result.reindex(sorted(result.columns), axis=1)

expected = pd.DataFrame(
{
"min_max_scaled_culmen_depth_mm": [0.678571, 0.4880952, 0.595238],
"min_max_scaled_culmen_length_mm": [0.269091, 0.232727, 0.210909],
"min_max_scaled_flipper_length_mm": [0.40678, 0.152542, 0.271186],
},
dtype="Float64",
index=pd.Index([1633, 1672, 1690], name="tag_number", dtype="Int64"),
)

pd.testing.assert_frame_equal(result, expected, rtol=1e-3)


def test_one_hot_encoder_default_params(new_penguins_df):
encoder = bigframes.ml.preprocessing.OneHotEncoder()
encoder.fit(new_penguins_df[["species", "sex"]])
Expand Down
Loading