Skip to content

Commit b5297f9

Browse files
rey-esptswast
andauthored
feat: add support for creating a Matrix Factorization model (#1330)
* docs: update title of pypi notebook example to reflect use of the PyPI public dataset In response to feedback on internal change 662899733. * feat: add support for creating a Matrix Factorization model * feat: add support for creating a Matrix Factorization model * Update bigframes/ml/decomposition.py Co-authored-by: Tim Sweña (Swast) <[email protected]> * Update bigframes/ml/decomposition.py Co-authored-by: Tim Sweña (Swast) <[email protected]> * Update bigframes/ml/decomposition.py Co-authored-by: Tim Sweña (Swast) <[email protected]> * rating_col * (nearly) complete class * removem print() * adding recommend * remove hyper parameter runing references * swap predict in _mf for recommend * recommend -> predict * update predict doc string * Merge branch 'main' into b338873783-matrix-factorization * preparing test files * add test data * new error: to_gbq column names need to be changed? * Merge branch 'main' into b338873783-matrix-factorization * Merge branch 'main' into b338873783-matrix-factorization * Delete demo.ipynb * passing system test * preparing to add unit tests * 2 out of 3 (so far) passing unit tests * attempted mocking * fix tests * new test file for model creation unit tests * add unit tests for num_factors, user_col, and item_col * Update tests/unit/ml/test_matrix_factorization.py Co-authored-by: Tim Sweña (Swast) <[email protected]> * Update tests/unit/ml/test_matrix_factorization.py Co-authored-by: Tim Sweña (Swast) <[email protected]> * uncomment one test * uncomment test * uncomment test * uncomment test * nearly all tests * tests complete and passing * seeing if test causes kokoro failure * uncomment test-kokoro still failing * remove comment * fix test * test kokoro * test_decomposition.py failing and now feedback_type attr does not exist * passing tests * Update bigframes/ml/decomposition.py Co-authored-by: Tim Sweña (Swast) <[email protected]> * Update tests/system/large/ml/test_decomposition.py Co-authored-by: Tim Sweña (Swast) <[email protected]> * doc attempt - _mf.py example * feedback_type case ignore * Update _mf.py - remove global_explain() * fit * W * fix docs (maybe) * Update test_matrix_factorization.py with updated error messages * ilnt * Update test_matrix_factorization.py - add 'f' * improve errors and update tests * Update tests/system/large/ml/test_decomposition.py Co-authored-by: Tim Sweña (Swast) <[email protected]> * Update bigframes/ml/decomposition.py - num_factors error messsage Co-authored-by: Tim Sweña (Swast) <[email protected]> * Update bigframes/ml/decomposition.py - user_col error message Co-authored-by: Tim Sweña (Swast) <[email protected]> * Update bigframes/ml/decomposition.py - rating_col error message Co-authored-by: Tim Sweña (Swast) <[email protected]> * Update bigframes/ml/decomposition.py - l2_reg error msg Co-authored-by: Tim Sweña (Swast) <[email protected]> * fix tests to match updated error messages * Update third_party/bigframes_vendored/sklearn/decomposition/_mf.py - docs df Co-authored-by: Tim Sweña (Swast) <[email protected]> * Update third_party/bigframes_vendored/sklearn/decomposition/_mf.py - docs model Co-authored-by: Tim Sweña (Swast) <[email protected]> * Update third_party/bigframes_vendored/sklearn/decomposition/_mf.py - docs fit Co-authored-by: Tim Sweña (Swast) <[email protected]> * Update third_party/bigframes_vendored/sklearn/decomposition/_mf.py * remove errors and tests * Update bigframes/ml/decomposition.py Co-authored-by: Tim Sweña (Swast) <[email protected]> * Update bigframes/ml/decomposition.py Co-authored-by: Tim Sweña (Swast) <[email protected]> * Update bigframes/ml/decomposition.py Co-authored-by: Tim Sweña (Swast) <[email protected]> * passing system test * E AssertionError: expected call not found. E Expected: read_gbq('SELECT * FROM ML.RECOMMEND(MODEL ..,\n (input_X_sql))', trial_id=['index_column_id']) E Actual: read_gbq('SELECT * FROM ML.RECOMMEND(MODEL ..,\n (input_X_sql))', index_col=['index_column_id']) * same # of elements in each * attempt * doc fix * doc fix --------- Co-authored-by: Tim Sweña (Swast) <[email protected]>
1 parent 7ea1fc3 commit b5297f9

File tree

11 files changed

+607
-2
lines changed

11 files changed

+607
-2
lines changed

bigframes/ml/core.py

+6
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,12 @@ def model(self) -> bigquery.Model:
117117
"""Get the BQML model associated with this wrapper"""
118118
return self._model
119119

120+
def recommend(self, input_data: bpd.DataFrame) -> bpd.DataFrame:
121+
return self._apply_ml_tvf(
122+
input_data,
123+
self._model_manipulation_sql_generator.ml_recommend,
124+
)
125+
120126
def predict(self, input_data: bpd.DataFrame) -> bpd.DataFrame:
121127
return self._apply_ml_tvf(
122128
input_data,

bigframes/ml/decomposition.py

+166-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
from typing import List, Literal, Optional, Union
2121

22+
import bigframes_vendored.sklearn.decomposition._mf
2223
import bigframes_vendored.sklearn.decomposition._pca
2324
from google.cloud import bigquery
2425

@@ -27,7 +28,15 @@
2728
import bigframes.pandas as bpd
2829
import bigframes.session
2930

30-
_BQML_PARAMS_MAPPING = {"svd_solver": "pcaSolver"}
31+
_BQML_PARAMS_MAPPING = {
32+
"svd_solver": "pcaSolver",
33+
"feedback_type": "feedbackType",
34+
"num_factors": "numFactors",
35+
"user_col": "userColumn",
36+
"item_col": "itemColumn",
37+
"_input_label_columns": "inputLabelColumns",
38+
"l2_reg": "l2Regularization",
39+
}
3140

3241

3342
@log_adapter.class_logger
@@ -197,3 +206,159 @@ def score(
197206

198207
# TODO(b/291973741): X param is ignored. Update BQML supports input in ML.EVALUATE.
199208
return self._bqml_model.evaluate()
209+
210+
211+
@log_adapter.class_logger
212+
class MatrixFactorization(
213+
base.UnsupervisedTrainablePredictor,
214+
bigframes_vendored.sklearn.decomposition._mf.MatrixFactorization,
215+
):
216+
__doc__ = bigframes_vendored.sklearn.decomposition._mf.MatrixFactorization.__doc__
217+
218+
def __init__(
219+
self,
220+
*,
221+
feedback_type: Literal["explicit", "implicit"] = "explicit",
222+
num_factors: int,
223+
user_col: str,
224+
item_col: str,
225+
rating_col: str = "rating",
226+
# TODO: Add support for hyperparameter tuning.
227+
l2_reg: float = 1.0,
228+
):
229+
230+
feedback_type = feedback_type.lower() # type: ignore
231+
if feedback_type not in ("explicit", "implicit"):
232+
raise ValueError("Expected feedback_type to be `explicit` or `implicit`.")
233+
234+
self.feedback_type = feedback_type
235+
236+
if not isinstance(num_factors, int):
237+
raise TypeError(
238+
f"Expected num_factors to be an int, but got {type(num_factors)}."
239+
)
240+
241+
if num_factors < 0:
242+
raise ValueError(
243+
f"Expected num_factors to be a positive integer, but got {num_factors}."
244+
)
245+
246+
self.num_factors = num_factors
247+
248+
if not isinstance(user_col, str):
249+
raise TypeError(f"Expected user_col to be a str, but got {type(user_col)}.")
250+
251+
self.user_col = user_col
252+
253+
if not isinstance(item_col, str):
254+
raise TypeError(f"Expected item_col to be STR, but got {type(item_col)}.")
255+
256+
self.item_col = item_col
257+
258+
if not isinstance(rating_col, str):
259+
raise TypeError(
260+
f"Expected rating_col to be a str, but got {type(rating_col)}."
261+
)
262+
263+
self._input_label_columns = [rating_col]
264+
265+
if not isinstance(l2_reg, (float, int)):
266+
raise TypeError(
267+
f"Expected l2_reg to be a float or int, but got {type(l2_reg)}."
268+
)
269+
270+
self.l2_reg = l2_reg
271+
self._bqml_model: Optional[core.BqmlModel] = None
272+
self._bqml_model_factory = globals.bqml_model_factory()
273+
274+
@property
275+
def rating_col(self) -> str:
276+
"""str: The rating column name. Defaults to 'rating'."""
277+
return self._input_label_columns[0]
278+
279+
@classmethod
280+
def _from_bq(
281+
cls, session: bigframes.session.Session, bq_model: bigquery.Model
282+
) -> MatrixFactorization:
283+
assert bq_model.model_type == "MATRIX_FACTORIZATION"
284+
285+
kwargs = utils.retrieve_params_from_bq_model(
286+
cls, bq_model, _BQML_PARAMS_MAPPING
287+
)
288+
289+
model = cls(**kwargs)
290+
model._bqml_model = core.BqmlModel(session, bq_model)
291+
return model
292+
293+
@property
294+
def _bqml_options(self) -> dict:
295+
"""The model options as they will be set for BQML"""
296+
options: dict = {
297+
"model_type": "matrix_factorization",
298+
"feedback_type": self.feedback_type,
299+
"user_col": self.user_col,
300+
"item_col": self.item_col,
301+
"rating_col": self.rating_col,
302+
"l2_reg": self.l2_reg,
303+
}
304+
305+
if self.num_factors is not None:
306+
options["num_factors"] = self.num_factors
307+
308+
return options
309+
310+
def _fit(
311+
self,
312+
X: utils.ArrayType,
313+
y=None,
314+
transforms: Optional[List[str]] = None,
315+
) -> MatrixFactorization:
316+
if y is not None:
317+
raise ValueError(
318+
"Label column not supported for Matrix Factorization model but y was not `None`"
319+
)
320+
321+
(X,) = utils.batch_convert_to_dataframe(X)
322+
323+
self._bqml_model = self._bqml_model_factory.create_model(
324+
X_train=X,
325+
transforms=transforms,
326+
options=self._bqml_options,
327+
)
328+
return self
329+
330+
def predict(self, X: utils.ArrayType) -> bpd.DataFrame:
331+
if not self._bqml_model:
332+
raise RuntimeError("A model must be fitted before recommend")
333+
334+
(X,) = utils.batch_convert_to_dataframe(X, session=self._bqml_model.session)
335+
336+
return self._bqml_model.recommend(X)
337+
338+
def to_gbq(self, model_name: str, replace: bool = False) -> MatrixFactorization:
339+
"""Save the model to BigQuery.
340+
341+
Args:
342+
model_name (str):
343+
The name of the model.
344+
replace (bool, default False):
345+
Determine whether to replace if the model already exists. Default to False.
346+
347+
Returns:
348+
MatrixFactorization: Saved model."""
349+
if not self._bqml_model:
350+
raise RuntimeError("A model must be fitted before it can be saved")
351+
352+
new_model = self._bqml_model.copy(model_name, replace)
353+
return new_model.session.read_gbq_model(model_name)
354+
355+
def score(
356+
self,
357+
X=None,
358+
y=None,
359+
) -> bpd.DataFrame:
360+
if not self._bqml_model:
361+
raise RuntimeError("A model must be fitted before score")
362+
363+
# TODO(b/291973741): X param is ignored. Update BQML supports input in ML.EVALUATE.
364+
return self._bqml_model.evaluate()

bigframes/ml/loader.py

+2
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
"LINEAR_REGRESSION": linear_model.LinearRegression,
4343
"LOGISTIC_REGRESSION": linear_model.LogisticRegression,
4444
"KMEANS": cluster.KMeans,
45+
"MATRIX_FACTORIZATION": decomposition.MatrixFactorization,
4546
"PCA": decomposition.PCA,
4647
"BOOSTED_TREE_REGRESSOR": ensemble.XGBRegressor,
4748
"BOOSTED_TREE_CLASSIFIER": ensemble.XGBClassifier,
@@ -80,6 +81,7 @@
8081
def from_bq(
8182
session: bigframes.session.Session, bq_model: bigquery.Model
8283
) -> Union[
84+
decomposition.MatrixFactorization,
8385
decomposition.PCA,
8486
cluster.KMeans,
8587
linear_model.LinearRegression,

bigframes/ml/sql.py

+5
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,11 @@ def alter_model(
299299
return "\n".join(parts)
300300

301301
# ML prediction TVFs
302+
def ml_recommend(self, source_sql: str) -> str:
303+
"""Encode ML.RECOMMEND for BQML"""
304+
return f"""SELECT * FROM ML.RECOMMEND(MODEL {self._model_ref_sql()},
305+
({source_sql}))"""
306+
302307
def ml_predict(self, source_sql: str) -> str:
303308
"""Encode ML.PREDICT for BQML"""
304309
return f"""SELECT * FROM ML.PREDICT(MODEL {self._model_ref_sql()},

tests/data/ratings.jsonl

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
{"user_id": 1, "item_id": 2, "rating": 4.0}
2+
{"user_id": 1, "item_id": 5, "rating": 3.0}
3+
{"user_id": 2, "item_id": 1, "rating": 5.0}
4+
{"user_id": 2, "item_id": 3, "rating": 2.0}
5+
{"user_id": 3, "item_id": 4, "rating": 4.5}
6+
{"user_id": 3, "item_id": 7, "rating": 3.5}
7+
{"user_id": 4, "item_id": 2, "rating": 1.0}
8+
{"user_id": 4, "item_id": 8, "rating": 5.0}
9+
{"user_id": 5, "item_id": 3, "rating": 4.0}
10+
{"user_id": 5, "item_id": 9, "rating": 2.5}
11+
{"user_id": 6, "item_id": 1, "rating": 3.0}
12+
{"user_id": 6, "item_id": 6, "rating": 4.5}
13+
{"user_id": 7, "item_id": 5, "rating": 5.0}
14+
{"user_id": 7, "item_id": 10, "rating": 1.5}
15+
{"user_id": 8, "item_id": 4, "rating": 2.0}
16+
{"user_id": 8, "item_id": 7, "rating": 4.0}
17+
{"user_id": 9, "item_id": 2, "rating": 3.5}
18+
{"user_id": 9, "item_id": 9, "rating": 5.0}
19+
{"user_id": 10, "item_id": 3, "rating": 4.5}
20+
{"user_id": 10, "item_id": 8, "rating": 2.5}

tests/data/ratings_schema.json

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
[
2+
{
3+
"mode": "NULLABLE",
4+
"name": "user_id",
5+
"type": "STRING"
6+
},
7+
{
8+
"mode": "NULLABLE",
9+
"name": "item_id",
10+
"type": "INT64"
11+
},
12+
{
13+
"mode": "NULLABLE",
14+
"name": "rating",
15+
"type": "FLOAT"
16+
}
17+
]

tests/system/conftest.py

+14
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,7 @@ def load_test_data_tables(
320320
("repeated", "repeated_schema.json", "repeated.jsonl"),
321321
("json", "json_schema.json", "json.jsonl"),
322322
("penguins", "penguins_schema.json", "penguins.jsonl"),
323+
("ratings", "ratings_schema.json", "ratings.jsonl"),
323324
("time_series", "time_series_schema.json", "time_series.jsonl"),
324325
("hockey_players", "hockey_players.json", "hockey_players.jsonl"),
325326
("matrix_2by3", "matrix_2by3.json", "matrix_2by3.jsonl"),
@@ -416,6 +417,11 @@ def penguins_table_id(test_data_tables) -> str:
416417
return test_data_tables["penguins"]
417418

418419

420+
@pytest.fixture(scope="session")
421+
def ratings_table_id(test_data_tables) -> str:
422+
return test_data_tables["ratings"]
423+
424+
419425
@pytest.fixture(scope="session")
420426
def urban_areas_table_id(test_data_tables) -> str:
421427
return test_data_tables["urban_areas"]
@@ -769,6 +775,14 @@ def penguins_df_null_index(
769775
return unordered_session.read_gbq(penguins_table_id)
770776

771777

778+
@pytest.fixture(scope="session")
779+
def ratings_df_default_index(
780+
ratings_table_id: str, session: bigframes.Session
781+
) -> bigframes.dataframe.DataFrame:
782+
"""DataFrame pointing at test data."""
783+
return session.read_gbq(ratings_table_id)
784+
785+
772786
@pytest.fixture(scope="session")
773787
def time_series_df_default_index(
774788
time_series_table_id: str, session: bigframes.Session

tests/system/large/ml/test_decomposition.py

+46
Original file line numberDiff line numberDiff line change
@@ -163,3 +163,49 @@ def test_decomposition_configure_fit_load_none_component(
163163
in reloaded_model._bqml_model.model_name
164164
)
165165
assert reloaded_model.n_components == 7
166+
167+
168+
def test_decomposition_mf_configure_fit_load(
169+
session, ratings_df_default_index, dataset_id
170+
):
171+
model = decomposition.MatrixFactorization(
172+
num_factors=6,
173+
feedback_type="explicit",
174+
user_col="user_id",
175+
item_col="item_id",
176+
rating_col="rating",
177+
l2_reg=9.83,
178+
)
179+
180+
model.fit(ratings_df_default_index)
181+
182+
reloaded_model = model.to_gbq(
183+
f"{dataset_id}.temp_configured_mf_model", replace=True
184+
)
185+
186+
new_ratings = session.read_pandas(
187+
pd.DataFrame(
188+
{
189+
"user_id": ["11", "12", "13"],
190+
"item_id": [1, 2, 3],
191+
"rating": [1.0, 2.0, 3.0],
192+
}
193+
)
194+
)
195+
196+
reloaded_model.score(new_ratings)
197+
198+
result = reloaded_model.predict(new_ratings).to_pandas()
199+
200+
assert reloaded_model._bqml_model is not None
201+
assert (
202+
f"{dataset_id}.temp_configured_mf_model"
203+
in reloaded_model._bqml_model.model_name
204+
)
205+
assert result is not None
206+
assert reloaded_model.feedback_type == "explicit"
207+
assert reloaded_model.num_factors == 6
208+
assert reloaded_model.user_col == "user_id"
209+
assert reloaded_model.item_col == "item_id"
210+
assert reloaded_model.rating_col == "rating"
211+
assert reloaded_model.l2_reg == 9.83

0 commit comments

Comments
 (0)