Skip to content

Commit e0fec36

Browse files
authored
feat: enable feature store batch serve to Pandas DataFrame; fix: read instances uri for batch serve (#983)
* feat: add batch_serve_to_df, add unit tests, add integration tests; update: setup.py; fix: read_instances bug for batch_serve_to_* * fix: add self.wait() before self._parse_resource_name, add ignore_index=True for pd.concat and return pd.DataFrame upon empty frames * fix: setup.py versioning conflict * fix: optional_sync
1 parent 11d9af3 commit e0fec36

File tree

6 files changed

+565
-189
lines changed

6 files changed

+565
-189
lines changed

google/cloud/aiplatform/featurestore/entity_type.py

+37-10
Original file line numberDiff line numberDiff line change
@@ -123,16 +123,21 @@ def __init__(
123123
location=self.location, credentials=credentials,
124124
)
125125

126-
@property
127-
def featurestore_name(self) -> str:
128-
"""Full qualified resource name of the managed featurestore in which this EntityType is."""
126+
def _get_featurestore_name(self) -> str:
127+
"""Gets full qualified resource name of the managed featurestore in which this EntityType is."""
129128
entity_type_name_components = self._parse_resource_name(self.resource_name)
130129
return featurestore.Featurestore._format_resource_name(
131130
project=entity_type_name_components["project"],
132131
location=entity_type_name_components["location"],
133132
featurestore=entity_type_name_components["featurestore"],
134133
)
135134

135+
@property
136+
def featurestore_name(self) -> str:
137+
"""Full qualified resource name of the managed featurestore in which this EntityType is."""
138+
self.wait()
139+
return self._get_featurestore_name()
140+
136141
def get_featurestore(self) -> "featurestore.Featurestore":
137142
"""Retrieves the managed featurestore in which this EntityType is.
138143
@@ -141,7 +146,7 @@ def get_featurestore(self) -> "featurestore.Featurestore":
141146
"""
142147
return featurestore.Featurestore(self.featurestore_name)
143148

144-
def get_feature(self, feature_id: str) -> "featurestore.Feature":
149+
def _get_feature(self, feature_id: str) -> "featurestore.Feature":
145150
"""Retrieves an existing managed feature in this EntityType.
146151
147152
Args:
@@ -151,7 +156,6 @@ def get_feature(self, feature_id: str) -> "featurestore.Feature":
151156
featurestore.Feature - The managed feature resource object.
152157
"""
153158
entity_type_name_components = self._parse_resource_name(self.resource_name)
154-
155159
return featurestore.Feature(
156160
feature_name=featurestore.Feature._format_resource_name(
157161
project=entity_type_name_components["project"],
@@ -162,6 +166,18 @@ def get_feature(self, feature_id: str) -> "featurestore.Feature":
162166
)
163167
)
164168

169+
def get_feature(self, feature_id: str) -> "featurestore.Feature":
170+
"""Retrieves an existing managed feature in this EntityType.
171+
172+
Args:
173+
feature_id (str):
174+
Required. The managed feature resource ID in this EntityType.
175+
Returns:
176+
featurestore.Feature - The managed feature resource object.
177+
"""
178+
self.wait()
179+
return self._get_feature(feature_id=feature_id)
180+
165181
def update(
166182
self,
167183
description: Optional[str] = None,
@@ -202,6 +218,7 @@ def update(
202218
Returns:
203219
EntityType - The updated entityType resource object.
204220
"""
221+
self.wait()
205222
update_mask = list()
206223

207224
if description:
@@ -380,6 +397,7 @@ def list_features(
380397
Returns:
381398
List[featurestore.Feature] - A list of managed feature resource objects.
382399
"""
400+
self.wait()
383401
return featurestore.Feature.list(
384402
entity_type_name=self.resource_name, filter=filter, order_by=order_by,
385403
)
@@ -399,7 +417,7 @@ def delete_features(self, feature_ids: List[str], sync: bool = True,) -> None:
399417
"""
400418
features = []
401419
for feature_id in feature_ids:
402-
feature = self.get_feature(feature_id=feature_id)
420+
feature = self._get_feature(feature_id=feature_id)
403421
feature.delete(sync=False)
404422
features.append(feature)
405423

@@ -626,6 +644,7 @@ def create_feature(
626644
featurestore.Feature - feature resource object
627645
628646
"""
647+
self.wait()
629648
return featurestore.Feature.create(
630649
feature_id=feature_id,
631650
value_type=value_type,
@@ -761,8 +780,9 @@ def batch_create_features(
761780

762781
return self
763782

783+
@staticmethod
764784
def _validate_and_get_import_feature_values_request(
765-
self,
785+
entity_type_name: str,
766786
feature_ids: List[str],
767787
feature_time: Union[str, datetime.datetime],
768788
data_source: Union[gca_io.AvroSource, gca_io.BigQuerySource, gca_io.CsvSource],
@@ -773,6 +793,8 @@ def _validate_and_get_import_feature_values_request(
773793
) -> gca_featurestore_service.ImportFeatureValuesRequest:
774794
"""Validates and get import feature values request.
775795
Args:
796+
entity_type_name (str):
797+
Required. A fully-qualified entityType resource name.
776798
feature_ids (List[str]):
777799
Required. IDs of the Feature to import values
778800
of. The Features must exist in the target
@@ -840,7 +862,7 @@ def _validate_and_get_import_feature_values_request(
840862
]
841863

842864
import_feature_values_request = gca_featurestore_service.ImportFeatureValuesRequest(
843-
entity_type=self.resource_name,
865+
entity_type=entity_type_name,
844866
feature_specs=feature_specs,
845867
entity_id_field=entity_id_field,
846868
disable_online_serving=disable_online_serving,
@@ -992,6 +1014,7 @@ def ingest_from_bq(
9921014
bigquery_source = gca_io.BigQuerySource(input_uri=bq_source_uri)
9931015

9941016
import_feature_values_request = self._validate_and_get_import_feature_values_request(
1017+
entity_type_name=self.resource_name,
9951018
feature_ids=feature_ids,
9961019
feature_time=feature_time,
9971020
data_source=bigquery_source,
@@ -1114,6 +1137,7 @@ def ingest_from_gcs(
11141137
data_source = gca_io.AvroSource(gcs_source=gcs_source)
11151138

11161139
import_feature_values_request = self._validate_and_get_import_feature_values_request(
1140+
entity_type_name=self.resource_name,
11171141
feature_ids=feature_ids,
11181142
feature_time=feature_time,
11191143
data_source=data_source,
@@ -1213,6 +1237,7 @@ def ingest_from_df(
12131237
project=self.project, credentials=self.credentials
12141238
)
12151239

1240+
self.wait()
12161241
entity_type_name_components = self._parse_resource_name(self.resource_name)
12171242
featurestore_id, entity_type_id = (
12181243
entity_type_name_components["featurestore"],
@@ -1222,6 +1247,8 @@ def ingest_from_df(
12221247
temp_bq_dataset_name = f"temp_{featurestore_id}_{uuid.uuid4()}".replace(
12231248
"-", "_"
12241249
)
1250+
1251+
# TODO(b/216497263): Add support for resource project does not match initializer.global_config.project
12251252
temp_bq_dataset_id = f"{initializer.global_config.project}.{temp_bq_dataset_name}"[
12261253
:1024
12271254
]
@@ -1297,7 +1324,7 @@ def read(
12971324
Returns:
12981325
pd.DataFrame: entities' feature values in DataFrame
12991326
"""
1300-
1327+
self.wait()
13011328
if isinstance(feature_ids, str):
13021329
feature_ids = [feature_ids]
13031330

@@ -1339,7 +1366,7 @@ def read(
13391366
feature_descriptor.id for feature_descriptor in header.feature_descriptors
13401367
]
13411368

1342-
return EntityType._construct_dataframe(
1369+
return self._construct_dataframe(
13431370
feature_ids=feature_ids, entity_views=entity_views,
13441371
)
13451372

google/cloud/aiplatform/featurestore/feature.py

+17-8
Original file line numberDiff line numberDiff line change
@@ -122,17 +122,21 @@ def __init__(
122122
else featurestore_id,
123123
)
124124

125-
@property
126-
def featurestore_name(self) -> str:
127-
"""Full qualified resource name of the managed featurestore in which this Feature is."""
125+
def _get_featurestore_name(self) -> str:
126+
"""Gets full qualified resource name of the managed featurestore in which this Feature is."""
128127
feature_path_components = self._parse_resource_name(self.resource_name)
129-
130128
return featurestore.Featurestore._format_resource_name(
131129
project=feature_path_components["project"],
132130
location=feature_path_components["location"],
133131
featurestore=feature_path_components["featurestore"],
134132
)
135133

134+
@property
135+
def featurestore_name(self) -> str:
136+
"""Full qualified resource name of the managed featurestore in which this Feature is."""
137+
self.wait()
138+
return self._get_featurestore_name()
139+
136140
def get_featurestore(self) -> "featurestore.Featurestore":
137141
"""Retrieves the managed featurestore in which this Feature is.
138142
@@ -141,18 +145,22 @@ def get_featurestore(self) -> "featurestore.Featurestore":
141145
"""
142146
return featurestore.Featurestore(featurestore_name=self.featurestore_name)
143147

144-
@property
145-
def entity_type_name(self) -> str:
146-
"""Full qualified resource name of the managed entityType in which this Feature is."""
148+
def _get_entity_type_name(self) -> str:
149+
"""Gets full qualified resource name of the managed entityType in which this Feature is."""
147150
feature_path_components = self._parse_resource_name(self.resource_name)
148-
149151
return featurestore.EntityType._format_resource_name(
150152
project=feature_path_components["project"],
151153
location=feature_path_components["location"],
152154
featurestore=feature_path_components["featurestore"],
153155
entity_type=feature_path_components["entity_type"],
154156
)
155157

158+
@property
159+
def entity_type_name(self) -> str:
160+
"""Full qualified resource name of the managed entityType in which this Feature is."""
161+
self.wait()
162+
return self._get_entity_type_name()
163+
156164
def get_entity_type(self) -> "featurestore.EntityType":
157165
"""Retrieves the managed entityType in which this Feature is.
158166
@@ -203,6 +211,7 @@ def update(
203211
Returns:
204212
Feature - The updated feature resource object.
205213
"""
214+
self.wait()
206215
update_mask = list()
207216

208217
if description:

0 commit comments

Comments
 (0)