Skip to content

Commit 9289f2d

Browse files
authored
feat: enable ingest from pd.DataFrame (#977)
* feat: enable ingest from pd.DataFrame * fix: remove bq create_dataset, docstrings, mocks * fix: e2e_base project * fix: delete two optional args, add note for temp bq dataset, revert deleting bq dataset create, add featurestore_extra_require, update ic tests to use online read to validate feature value ingestionfrom df * fix: add a comment of call complete upon ingestion, update unit tests
1 parent c840728 commit 9289f2d

File tree

4 files changed

+471
-58
lines changed

4 files changed

+471
-58
lines changed

google/cloud/aiplatform/featurestore/entity_type.py

+150-42
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import datetime
1919
from typing import Dict, List, Optional, Sequence, Tuple, Union
20+
import uuid
2021

2122
from google.auth import credentials as auth_credentials
2223
from google.protobuf import field_mask_pb2
@@ -34,6 +35,7 @@
3435
from google.cloud.aiplatform import utils
3536
from google.cloud.aiplatform.utils import featurestore_utils
3637

38+
from google.cloud import bigquery
3739

3840
_LOGGER = base.Logger(__name__)
3941
_ALL_FEATURE_IDS = "*"
@@ -795,23 +797,16 @@ def _validate_and_get_import_feature_values_request(
795797
If not provided, the source column need to be the same as the Feature ID.
796798
797799
Example:
800+
feature_ids = ['my_feature_id_1', 'my_feature_id_2', 'my_feature_id_3']
798801
799-
feature_ids = ['my_feature_id_1', 'my_feature_id_2', 'my_feature_id_3']
800-
801-
In case all features' source field and ID match:
802-
feature_source_fields = None or {}
803-
804-
In case all features' source field and ID do not match:
805-
feature_source_fields = {
802+
feature_source_fields = {
806803
'my_feature_id_1': 'my_feature_id_1_source_field',
807-
'my_feature_id_2': 'my_feature_id_2_source_field',
808-
'my_feature_id_3': 'my_feature_id_3_source_field',
809-
}
804+
}
805+
806+
Note:
807+
The source column of 'my_feature_id_1' is 'my_feature_id_1_source_field',
808+
The source column of 'my_feature_id_2' is the ID of the feature, same for 'my_feature_id_3'.
810809
811-
In case some features' source field and ID do not match:
812-
feature_source_fields = {
813-
'my_feature_id_1': 'my_feature_id_1_source_field',
814-
}
815810
entity_id_field (str):
816811
Optional. Source column that holds entity IDs. If not provided, entity
817812
IDs are extracted from the column named ``entity_id``.
@@ -954,23 +949,16 @@ def ingest_from_bq(
954949
If not provided, the source column need to be the same as the Feature ID.
955950
956951
Example:
952+
feature_ids = ['my_feature_id_1', 'my_feature_id_2', 'my_feature_id_3']
957953
958-
feature_ids = ['my_feature_id_1', 'my_feature_id_2', 'my_feature_id_3']
959-
960-
In case all features' source field and ID match:
961-
feature_source_fields = None or {}
962-
963-
In case all features' source field and ID do not match:
964-
feature_source_fields = {
954+
feature_source_fields = {
965955
'my_feature_id_1': 'my_feature_id_1_source_field',
966-
'my_feature_id_2': 'my_feature_id_2_source_field',
967-
'my_feature_id_3': 'my_feature_id_3_source_field',
968-
}
956+
}
957+
958+
Note:
959+
The source column of 'my_feature_id_1' is 'my_feature_id_1_source_field',
960+
The source column of 'my_feature_id_2' is the ID of the feature, same for 'my_feature_id_3'.
969961
970-
In case some features' source field and ID do not match:
971-
feature_source_fields = {
972-
'my_feature_id_1': 'my_feature_id_1_source_field',
973-
}
974962
entity_id_field (str):
975963
Optional. Source column that holds entity IDs. If not provided, entity
976964
IDs are extracted from the column named ``entity_id``.
@@ -1000,6 +988,7 @@ def ingest_from_bq(
1000988
EntityType - The entityType resource object with feature values imported.
1001989
1002990
"""
991+
1003992
bigquery_source = gca_io.BigQuerySource(input_uri=bq_source_uri)
1004993

1005994
import_feature_values_request = self._validate_and_get_import_feature_values_request(
@@ -1065,23 +1054,16 @@ def ingest_from_gcs(
10651054
If not provided, the source column need to be the same as the Feature ID.
10661055
10671056
Example:
1057+
feature_ids = ['my_feature_id_1', 'my_feature_id_2', 'my_feature_id_3']
10681058
1069-
feature_ids = ['my_feature_id_1', 'my_feature_id_2', 'my_feature_id_3']
1070-
1071-
In case all features' source field and ID match:
1072-
feature_source_fields = None or {}
1073-
1074-
In case all features' source field and ID do not match:
1075-
feature_source_fields = {
1059+
feature_source_fields = {
10761060
'my_feature_id_1': 'my_feature_id_1_source_field',
1077-
'my_feature_id_2': 'my_feature_id_2_source_field',
1078-
'my_feature_id_3': 'my_feature_id_3_source_field',
1079-
}
1061+
}
1062+
1063+
Note:
1064+
The source column of 'my_feature_id_1' is 'my_feature_id_1_source_field',
1065+
The source column of 'my_feature_id_2' is the ID of the feature, same for 'my_feature_id_3'.
10801066
1081-
In case some features' source field and ID do not match:
1082-
feature_source_fields = {
1083-
'my_feature_id_1': 'my_feature_id_1_source_field',
1084-
}
10851067
entity_id_field (str):
10861068
Optional. Source column that holds entity IDs. If not provided, entity
10871069
IDs are extracted from the column named ``entity_id``.
@@ -1146,6 +1128,132 @@ def ingest_from_gcs(
11461128
request_metadata=request_metadata,
11471129
)
11481130

1131+
def ingest_from_df(
1132+
self,
1133+
feature_ids: List[str],
1134+
feature_time: Union[str, datetime.datetime],
1135+
df_source: "pd.DataFrame", # noqa: F821 - skip check for undefined name 'pd'
1136+
feature_source_fields: Optional[Dict[str, str]] = None,
1137+
entity_id_field: Optional[str] = None,
1138+
request_metadata: Optional[Sequence[Tuple[str, str]]] = (),
1139+
) -> "EntityType":
1140+
"""Ingest feature values from DataFrame.
1141+
1142+
Note:
1143+
Calling this method will automatically create and delete a temporary
1144+
bigquery dataset in the same GCP project, which will be used
1145+
as the intermediary storage for ingesting feature values
1146+
from dataframe to featurestore.
1147+
1148+
The call will return upon ingestion completes, where the
1149+
feature values will be ingested into the entity_type.
1150+
1151+
Args:
1152+
feature_ids (List[str]):
1153+
Required. IDs of the Feature to import values
1154+
of. The Features must exist in the target
1155+
EntityType, or the request will fail.
1156+
feature_time (Union[str, datetime.datetime]):
1157+
Required. The feature_time can be one of:
1158+
- The source column that holds the Feature
1159+
timestamp for all Feature values in each entity.
1160+
1161+
Note:
1162+
The dtype of the source column should be `datetime64`.
1163+
1164+
- A single Feature timestamp for all entities
1165+
being imported. The timestamp must not have
1166+
higher than millisecond precision.
1167+
1168+
Example:
1169+
feature_time = datetime.datetime(year=2022, month=1, day=1, hour=11, minute=59, second=59)
1170+
or
1171+
feature_time_str = datetime.datetime.now().isoformat(sep=" ", timespec="milliseconds")
1172+
feature_time = datetime.datetime.strptime(feature_time_str, "%Y-%m-%d %H:%M:%S.%f")
1173+
1174+
df_source (pd.DataFrame):
1175+
Required. Pandas DataFrame containing the source data for ingestion.
1176+
feature_source_fields (Dict[str, str]):
1177+
Optional. User defined dictionary to map ID of the Feature for importing values
1178+
of to the source column for getting the Feature values from.
1179+
1180+
Specify the features whose ID and source column are not the same.
1181+
If not provided, the source column need to be the same as the Feature ID.
1182+
1183+
Example:
1184+
feature_ids = ['my_feature_id_1', 'my_feature_id_2', 'my_feature_id_3']
1185+
1186+
feature_source_fields = {
1187+
'my_feature_id_1': 'my_feature_id_1_source_field',
1188+
}
1189+
1190+
Note:
1191+
The source column of 'my_feature_id_1' is 'my_feature_id_1_source_field',
1192+
The source column of 'my_feature_id_2' is the ID of the feature, same for 'my_feature_id_3'.
1193+
1194+
entity_id_field (str):
1195+
Optional. Source column that holds entity IDs. If not provided, entity
1196+
IDs are extracted from the column named ``entity_id``.
1197+
request_metadata (Sequence[Tuple[str, str]]):
1198+
Optional. Strings which should be sent along with the request as metadata.
1199+
1200+
Returns:
1201+
EntityType - The entityType resource object with feature values imported.
1202+
1203+
"""
1204+
try:
1205+
import pyarrow # noqa: F401 - skip check for 'pyarrow' which is required when using 'google.cloud.bigquery'
1206+
except ImportError:
1207+
raise ImportError(
1208+
f"Pyarrow is not installed. Please install pyarrow to use "
1209+
f"{self.ingest_from_df.__name__}"
1210+
)
1211+
1212+
bigquery_client = bigquery.Client(
1213+
project=self.project, credentials=self.credentials
1214+
)
1215+
1216+
entity_type_name_components = self._parse_resource_name(self.resource_name)
1217+
featurestore_id, entity_type_id = (
1218+
entity_type_name_components["featurestore"],
1219+
entity_type_name_components["entity_type"],
1220+
)
1221+
1222+
temp_bq_dataset_name = f"temp_{featurestore_id}_{uuid.uuid4()}".replace(
1223+
"-", "_"
1224+
)
1225+
temp_bq_dataset_id = f"{initializer.global_config.project}.{temp_bq_dataset_name}"[
1226+
:1024
1227+
]
1228+
temp_bq_table_id = f"{temp_bq_dataset_id}.{entity_type_id}"
1229+
1230+
temp_bq_dataset = bigquery.Dataset(dataset_ref=temp_bq_dataset_id)
1231+
temp_bq_dataset.location = self.location
1232+
1233+
temp_bq_dataset = bigquery_client.create_dataset(temp_bq_dataset)
1234+
1235+
try:
1236+
job = bigquery_client.load_table_from_dataframe(
1237+
dataframe=df_source, destination=temp_bq_table_id
1238+
)
1239+
job.result()
1240+
1241+
entity_type_obj = self.ingest_from_bq(
1242+
feature_ids=feature_ids,
1243+
feature_time=feature_time,
1244+
bq_source_uri=f"bq://{temp_bq_table_id}",
1245+
feature_source_fields=feature_source_fields,
1246+
entity_id_field=entity_id_field,
1247+
request_metadata=request_metadata,
1248+
)
1249+
1250+
finally:
1251+
bigquery_client.delete_dataset(
1252+
dataset=temp_bq_dataset.dataset_id, delete_contents=True,
1253+
)
1254+
1255+
return entity_type_obj
1256+
11491257
@staticmethod
11501258
def _instantiate_featurestore_online_client(
11511259
location: Optional[str] = None,

setup.py

+2
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,15 @@
4747
"werkzeug >= 2.0.0",
4848
"tensorflow >=2.4.0",
4949
]
50+
featurestore_extra_require = ["pandas >= 1.0.0", "pyarrow >= 6.0.1"]
5051

5152
full_extra_require = list(
5253
set(
5354
tensorboard_extra_require
5455
+ metadata_extra_require
5556
+ xai_extra_require
5657
+ lit_extra_require
58+
+ featurestore_extra_require
5759
)
5860
)
5961
testing_extra_require = (

0 commit comments

Comments
 (0)