Skip to content

Commit 4b0722c

Browse files
vertex-sdk-botcopybara-github
authored andcommitted
fix: Support timestamp in Vertex SDK write_feature_values()
PiperOrigin-RevId: 525289489
1 parent 9a5c4be commit 4b0722c

File tree

2 files changed

+372
-15
lines changed

2 files changed

+372
-15
lines changed

google/cloud/aiplatform/featurestore/_entity_type.py

+138-10
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import datetime
1919
from typing import Dict, List, Optional, Sequence, Tuple, Union
2020
import uuid
21+
from google.protobuf import timestamp_pb2
2122

2223
from google.auth import credentials as auth_credentials
2324
from google.protobuf import field_mask_pb2
@@ -1575,6 +1576,7 @@ def write_feature_values(
15751576
],
15761577
"pd.DataFrame", # type: ignore # noqa: F821 - skip check for undefined name 'pd'
15771578
],
1579+
feature_time: Union[str, datetime.datetime] = None,
15781580
) -> "EntityType": # noqa: F821
15791581
"""Streaming ingestion. Write feature values directly to Feature Store.
15801582
@@ -1584,7 +1586,8 @@ def write_feature_values(
15841586
featurestore_id="my_featurestore_id",
15851587
)
15861588
1587-
# writing feature values from a pandas DataFrame
1589+
# writing feature values from a pandas DataFrame without feature timestamp column.
1590+
# In this case, current timestamp will be applied to all data.
15881591
my_dataframe = pd.DataFrame(
15891592
data = [
15901593
{"entity_id": "movie_01", "average_rating": 4.9}
@@ -1597,7 +1600,40 @@ def write_feature_values(
15971600
instances=my_df
15981601
)
15991602
1600-
# writing feature values from a Python dict
1603+
# writing feature values from a pandas DataFrame with feature timestamp column
1604+
# Example of datetime creation.
1605+
feature_time = datetime.datetime(year=2022, month=1, day=1, hour=11, minute=59, second=59)
1606+
or
1607+
feature_time_str = datetime.datetime.now().isoformat(sep=" ", timespec="milliseconds")
1608+
feature_time = datetime.datetime.strptime(feature_time_str, "%Y-%m-%d %H:%M:%S.%f")
1609+
1610+
my_dataframe = pd.DataFrame(
1611+
data = [
1612+
{"entity_id": "movie_01", "average_rating": 4.9,
1613+
"feature_timestamp": feature_time}
1614+
],
1615+
columns=["entity_id", "average_rating", "feature_timestamp"],
1616+
)
1617+
1618+
my_dataframe = my_df.set_index("entity_id")
1619+
my_entity_type.write_feature_values(
1620+
instances=my_df, feature_time="feature_timestamp"
1621+
)
1622+
1623+
# writing feature values with a timestamp. The timestamp will be applied to the entire Dataframe.
1624+
my_dataframe = pd.DataFrame(
1625+
data = [
1626+
{"entity_id": "movie_01", "average_rating": 4.9}
1627+
],
1628+
columns=["entity_id", "average_rating"],
1629+
)
1630+
my_dataframe = my_df.set_index("entity_id")
1631+
my_entity_type.write_feature_values(
1632+
instances=my_df, feature_time=feature_time
1633+
)
1634+
1635+
# writing feature values from a Python dict without timestamp column.
1636+
# In this case, current timestamp will be applied to all data.
16011637
my_data_dict = {
16021638
"movie_02" : {"average_rating": 3.7}
16031639
}
@@ -1606,16 +1642,40 @@ def write_feature_values(
16061642
instances=my_data_dict
16071643
)
16081644
1645+
# writing feature values from a Python dict with timestamp column
1646+
my_data_dict = {
1647+
"movie_02" : {"average_rating": 3.7, "feature_timestamp": timestmap}}
1648+
}
1649+
1650+
my_entity_type.write_feature_values(
1651+
instances=my_data_dict, feature_time="feature_timestamp"
1652+
)
1653+
1654+
# writing feature values from a Python dict and apply the same Feature_Timestamp
1655+
my_data_dict = {
1656+
"movie_02" : {"average_rating": 3.7}
1657+
}
1658+
1659+
my_entity_type.write_feature_values(
1660+
instances=my_data_dict, feature_time=feature_time
1661+
)
1662+
16091663
# writing feature values from a list of WriteFeatureValuesPayload objects
16101664
payloads = [
16111665
gca_featurestore_online_service.WriteFeatureValuesPayload(
16121666
entity_id="movie_03",
1613-
feature_values=gca_featurestore_online_service.FeatureValue(
1614-
double_value=4.9
1615-
)
1667+
feature_values={
1668+
"average_rating": featurestore_online_service.FeatureValue(
1669+
string_value="test",
1670+
metadata=featurestore_online_service.FeatureValue.Metadata(
1671+
generate_time=timestmap
1672+
)
1673+
}
1674+
}
16161675
)
16171676
]
1618-
1677+
# when instance is WriteFeatureValuesPayload,
1678+
# feature_time param of write_feature_values() is ignored.
16191679
my_entity_type.write_feature_values(
16201680
instances=payloads
16211681
)
@@ -1641,18 +1701,27 @@ def write_feature_values(
16411701
in the pandas Dataframe represents an entity, which has an entity ID
16421702
and its associated feature values. Currently, a single payload can be
16431703
written in a single request.
1704+
feature_time Union[str, datetime.datetime]:
1705+
Optional. Either column name in DataFrame or Dict which contains timestamp value,
1706+
or datetime to apply to the entire DataFrame or Dict.
1707+
Timestamp will be applied to generate_timestmap in all FeatureValue.
1708+
If not provided, curreent timestamp is used. This param is not used
1709+
when instances is List[WriteFeatureValuesPayload].
16441710
16451711
Returns:
16461712
EntityType - The updated EntityType object.
16471713
"""
1648-
16491714
if isinstance(instances, Dict):
1650-
payloads = self._generate_payloads(instances=instances)
1715+
payloads = self._generate_payloads(
1716+
instances=instances, feature_time=feature_time
1717+
)
16511718
elif isinstance(instances, List):
16521719
payloads = instances
16531720
else:
16541721
instances_dict = instances.to_dict(orient="index")
1655-
payloads = self._generate_payloads(instances=instances_dict)
1722+
payloads = self._generate_payloads(
1723+
instances=instances_dict, feature_time=feature_time
1724+
)
16561725

16571726
_LOGGER.log_action_start_against_resource(
16581727
"Writing",
@@ -1688,6 +1757,7 @@ def _generate_payloads(
16881757
],
16891758
],
16901759
],
1760+
feature_time: Union[str, datetime.datetime] = None,
16911761
) -> List[gca_featurestore_online_service.WriteFeatureValuesPayload]:
16921762
"""Helper method used to generate GAPIC WriteFeatureValuesPayloads from
16931763
a Python dict.
@@ -1696,18 +1766,39 @@ def _generate_payloads(
16961766
instances (Dict[str, Dict[str, Union[int, str, float, bool, bytes,
16971767
List[int], List[str], List[float], List[bool]]]]):
16981768
Required. Dict mapping entity IDs to their corresponding features.
1699-
1769+
feature_time Union[str, datetime.datetime]:
1770+
Optional. Either string representing column name which stores
1771+
feature timestamp, or timestamp to apply to entire DataFrame or
1772+
Dict.
17001773
Returns:
17011774
List[gca_featurestore_online_service.WriteFeatureValuesPayload] -
17021775
A list of WriteFeatureValuesPayload objects ready to be written to the Feature Store.
17031776
"""
17041777
payloads = []
1778+
timestamp_to_all_field = None
1779+
if feature_time and cls._is_timestamp(feature_time):
1780+
# timestamp_to_all_field will be applied to all FeatureValues.
1781+
timestamp_to_all_field = feature_time
1782+
17051783
for entity_id, features in instances.items():
17061784
feature_values = {}
17071785
for feature_id, value in features.items():
1786+
if feature_id == feature_time:
1787+
continue
17081788
feature_value = cls._convert_value_to_gapic_feature_value(
17091789
feature_id=feature_id, value=value
17101790
)
1791+
# Create a FeatureValue Metadata with generate_time if
1792+
# valid feature_time param is provided.
1793+
timestamp = cls._apply_feature_timestamp(
1794+
cls, features, timestamp_to_all_field, feature_time
1795+
)
1796+
if timestamp:
1797+
feature_value.metadata = (
1798+
gca_featurestore_online_service.FeatureValue.Metadata(
1799+
generate_time=timestamp
1800+
)
1801+
)
17111802
feature_values[feature_id] = feature_value
17121803
payload = gca_featurestore_online_service.WriteFeatureValuesPayload(
17131804
entity_id=entity_id, feature_values=feature_values
@@ -1716,6 +1807,43 @@ def _generate_payloads(
17161807

17171808
return payloads
17181809

1810+
@staticmethod
1811+
def _apply_feature_timestamp(
1812+
cls,
1813+
features: Union[
1814+
int,
1815+
str,
1816+
float,
1817+
bool,
1818+
bytes,
1819+
List[int],
1820+
List[str],
1821+
List[float],
1822+
List[bool],
1823+
],
1824+
timestamp_to_all_field: datetime.datetime = None,
1825+
feature_time: str = None,
1826+
) -> Union[datetime.datetime, timestamp_pb2.Timestamp]:
1827+
if feature_time is None:
1828+
return None
1829+
if timestamp_to_all_field:
1830+
return timestamp_to_all_field
1831+
1832+
# Return a timestamp in Dict or Dataframe if it is valid.
1833+
if feature_time in features.keys() and cls._is_timestamp(
1834+
features[feature_time]
1835+
):
1836+
return features[feature_time]
1837+
return None
1838+
1839+
@staticmethod
1840+
def _is_timestamp(
1841+
timestamp: Union[datetime.datetime, timestamp_pb2.Timestamp]
1842+
) -> bool:
1843+
return isinstance(timestamp, datetime.datetime) or isinstance(
1844+
timestamp, timestamp_pb2.Timestamp
1845+
)
1846+
17191847
@classmethod
17201848
def _convert_value_to_gapic_feature_value(
17211849
cls,

0 commit comments

Comments
 (0)