24
24
from google .cloud .aiplatform import base
25
25
from google .cloud .aiplatform .compat .types import (
26
26
entity_type as gca_entity_type ,
27
+ feature_selector as gca_feature_selector ,
27
28
featurestore_service as gca_featurestore_service ,
29
+ featurestore_online_service as gca_featurestore_online_service ,
28
30
io as gca_io ,
29
31
)
30
32
from google .cloud .aiplatform import featurestore
33
+ from google .cloud .aiplatform import initializer
31
34
from google .cloud .aiplatform import utils
32
35
from google .cloud .aiplatform .utils import featurestore_utils
33
36
37
+
34
38
_LOGGER = base .Logger (__name__ )
35
39
_ALL_FEATURE_IDS = "*"
36
40
@@ -40,7 +44,6 @@ class EntityType(base.VertexAiResourceNounWithFutureManager):
40
44
41
45
client_class = utils .FeaturestoreClientWithOverride
42
46
43
- _is_client_prediction_client = False
44
47
_resource_noun = "entityTypes"
45
48
_getter_method = "get_entity_type"
46
49
_list_method = "list_entity_types"
@@ -114,6 +117,10 @@ def __init__(
114
117
else featurestore_id ,
115
118
)
116
119
120
+ self ._featurestore_online_client = self ._instantiate_featurestore_online_client (
121
+ location = self .location , credentials = credentials ,
122
+ )
123
+
117
124
@property
118
125
def featurestore_name (self ) -> str :
119
126
"""Full qualified resource name of the managed featurestore in which this EntityType is."""
@@ -157,7 +164,7 @@ def update(
157
164
self ,
158
165
description : Optional [str ] = None ,
159
166
labels : Optional [Dict [str , str ]] = None ,
160
- request_metadata : Optional [ Sequence [Tuple [str , str ] ]] = (),
167
+ request_metadata : Sequence [Tuple [str , str ]] = (),
161
168
) -> "EntityType" :
162
169
"""Updates an existing managed entityType resource.
163
170
@@ -189,7 +196,7 @@ def update(
189
196
System reserved label keys are prefixed with
190
197
"aiplatform.googleapis.com/" and are immutable.
191
198
request_metadata (Sequence[Tuple[str, str]]):
192
- Optional . Strings which should be sent along with the request as metadata.
199
+ Required . Strings which should be sent along with the request as metadata.
193
200
Returns:
194
201
EntityType - The updated entityType resource object.
195
202
"""
@@ -1138,3 +1145,144 @@ def ingest_from_gcs(
1138
1145
import_feature_values_request = import_feature_values_request ,
1139
1146
request_metadata = request_metadata ,
1140
1147
)
1148
+
1149
+ @staticmethod
1150
+ def _instantiate_featurestore_online_client (
1151
+ location : Optional [str ] = None ,
1152
+ credentials : Optional [auth_credentials .Credentials ] = None ,
1153
+ ) -> utils .FeaturestoreOnlineServingClientWithOverride :
1154
+ """Helper method to instantiates featurestore online client.
1155
+
1156
+ Args:
1157
+ location (str): The location of this featurestore.
1158
+ credentials (google.auth.credentials.Credentials):
1159
+ Optional custom credentials to use when interacting with
1160
+ the featurestore online client.
1161
+ Returns:
1162
+ utils.FeaturestoreOnlineServingClientWithOverride:
1163
+ Initialized featurestore online client with optional overrides.
1164
+ """
1165
+ return initializer .global_config .create_client (
1166
+ client_class = utils .FeaturestoreOnlineServingClientWithOverride ,
1167
+ credentials = credentials ,
1168
+ location_override = location ,
1169
+ )
1170
+
1171
+ def read (
1172
+ self ,
1173
+ entity_ids : Union [str , List [str ]],
1174
+ feature_ids : Union [str , List [str ]] = "*" ,
1175
+ request_metadata : Optional [Sequence [Tuple [str , str ]]] = (),
1176
+ ) -> "pd.DataFrame" : # noqa: F821 - skip check for undefined name 'pd'
1177
+ """Reads feature values for given feature IDs of given entity IDs in this EntityType.
1178
+
1179
+ Args:
1180
+ entity_ids (Union[str, List[str]]):
1181
+ Required. ID for a specific entity, or a list of IDs of entities
1182
+ to read Feature values of. The maximum number of IDs is 100 if a list.
1183
+ feature_ids (Union[str, List[str]]):
1184
+ Required. ID for a specific feature, or a list of IDs of Features in the EntityType
1185
+ for reading feature values. Default to "*", where value of all features will be read.
1186
+ request_metadata (Sequence[Tuple[str, str]]):
1187
+ Optional. Strings which should be sent along with the request as metadata.
1188
+
1189
+ Returns:
1190
+ pd.DataFrame: entities' feature values in DataFrame
1191
+ """
1192
+
1193
+ if isinstance (feature_ids , str ):
1194
+ feature_ids = [feature_ids ]
1195
+
1196
+ feature_selector = gca_feature_selector .FeatureSelector (
1197
+ id_matcher = gca_feature_selector .IdMatcher (ids = feature_ids )
1198
+ )
1199
+
1200
+ if isinstance (entity_ids , str ):
1201
+ read_feature_values_request = gca_featurestore_online_service .ReadFeatureValuesRequest (
1202
+ entity_type = self .resource_name ,
1203
+ entity_id = entity_ids ,
1204
+ feature_selector = feature_selector ,
1205
+ )
1206
+ read_feature_values_response = self ._featurestore_online_client .read_feature_values (
1207
+ request = read_feature_values_request , metadata = request_metadata
1208
+ )
1209
+ header = read_feature_values_response .header
1210
+ entity_views = [read_feature_values_response .entity_view ]
1211
+ elif isinstance (entity_ids , list ):
1212
+ streaming_read_feature_values_request = gca_featurestore_online_service .StreamingReadFeatureValuesRequest (
1213
+ entity_type = self .resource_name ,
1214
+ entity_ids = entity_ids ,
1215
+ feature_selector = feature_selector ,
1216
+ )
1217
+ streaming_read_feature_values_responses = [
1218
+ response
1219
+ for response in self ._featurestore_online_client .streaming_read_feature_values (
1220
+ request = streaming_read_feature_values_request ,
1221
+ metadata = request_metadata ,
1222
+ )
1223
+ ]
1224
+ header = streaming_read_feature_values_responses [0 ].header
1225
+ entity_views = [
1226
+ response .entity_view
1227
+ for response in streaming_read_feature_values_responses [1 :]
1228
+ ]
1229
+
1230
+ feature_ids = [
1231
+ feature_descriptor .id for feature_descriptor in header .feature_descriptors
1232
+ ]
1233
+
1234
+ return EntityType ._construct_dataframe (
1235
+ feature_ids = feature_ids , entity_views = entity_views ,
1236
+ )
1237
+
1238
+ @staticmethod
1239
+ def _construct_dataframe (
1240
+ feature_ids : List [str ],
1241
+ entity_views : List [
1242
+ gca_featurestore_online_service .ReadFeatureValuesResponse .EntityView
1243
+ ],
1244
+ ) -> "pd.DataFrame" : # noqa: F821 - skip check for undefined name 'pd'
1245
+ """Constructs a dataframe using the header and entity_views
1246
+
1247
+ Args:
1248
+ feature_ids (List[str]):
1249
+ Required. A list of feature ids corresponding to the feature values for each entity in entity_views.
1250
+ entity_views (List[gca_featurestore_online_service.ReadFeatureValuesResponse.EntityView]):
1251
+ Required. A list of Entity views with Feature values.
1252
+ For each Entity view, it may be
1253
+ the entity in the Featurestore if values for all
1254
+ Features were requested, or a projection of the
1255
+ entity in the Featurestore if values for only
1256
+ some Features were requested.
1257
+
1258
+ Raises:
1259
+ ImportError: If pandas is not installed when using this method.
1260
+
1261
+ Returns:
1262
+ pd.DataFrame - entities feature values in DataFrame
1263
+ )
1264
+ """
1265
+
1266
+ try :
1267
+ import pandas as pd
1268
+ except ImportError :
1269
+ raise ImportError (
1270
+ f"Pandas is not installed. Please install pandas to use "
1271
+ f"{ EntityType ._construct_dataframe .__name__ } "
1272
+ )
1273
+
1274
+ data = []
1275
+ for entity_view in entity_views :
1276
+ entity_data = {"entity_id" : entity_view .entity_id }
1277
+ for feature_id , feature_data in zip (feature_ids , entity_view .data ):
1278
+ if feature_data ._pb .HasField ("value" ):
1279
+ value_type = feature_data .value ._pb .WhichOneof ("value" )
1280
+ feature_value = getattr (feature_data .value , value_type )
1281
+ if hasattr (feature_value , "values" ):
1282
+ feature_value = feature_value .values
1283
+ entity_data [feature_id ] = feature_value
1284
+ else :
1285
+ entity_data [feature_id ] = None
1286
+ data .append (entity_data )
1287
+
1288
+ return pd .DataFrame (data = data , columns = ["entity_id" ] + feature_ids )
0 commit comments