Skip to content

Commit 26f7c90

Browse files
authored
feat: enriched staging query macros + unified import + source wrapping (#438)
## Summary 1. added offset and bound support to staging query macros `{{ start_date }}` is valid as before, now `{{ start_date(offset=-10, lower_bound='2023-01-01') }}` is also valid 2. Previously we required users to pass in quotes around the macro separately. This pr removes the need for it `{{ start_date }}` used to become `2023-01-01`, it now becomes `'2023-01-01'` 2. added a unified top level module `api.chronon.types` that contain everything that users need. 3. added wrappers on source sub types to directly return sources ```py ttypes.Source(events=ttypes.EventSource(...)) # now becomes EventSource(...) ``` ## Cheour clientslist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added new functions for creating event, entity, and join data sources. - Introduced enhanced date macro utilities to enable flexible SQL query substitutions. - **Refactor** - Streamlined naming conventions and standardized parameter formatting. - Consolidated and simplified import structures for improved consistency. - Updated method signatures and calls from `select` to `selects` across various components. - Removed reliance on `ttypes` for source definitions and standardized parameter naming conventions. - Simplified macro substitution logic in the `StagingQuery` object. - **Tests** - Implemented comprehensive tests for date manipulation features to ensure robust behavior. - Updated existing tests to reflect changes in method names and query formatting. - Adjusted data generation parameters in tests to increase transaction volumes. - **Documentation** - Updated configuration descriptions to clearly illustrate new date template options and parameter adjustments. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 5de8218 commit 26f7c90

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+735
-415
lines changed

api/py/ai/chronon/group_by.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,8 +242,8 @@ def normalize(w: Union[common.Window, str]) -> common.Window:
242242
return agg
243243

244244

245-
def Window(length: int, timeUnit: common.TimeUnit) -> common.Window:
246-
return common.Window(length, timeUnit)
245+
def Window(length: int, time_unit: common.TimeUnit) -> common.Window:
246+
return common.Window(length, time_unit)
247247

248248

249249
def Derivation(name: str, expression: str) -> ttypes.Derivation:

api/py/ai/chronon/query.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,23 @@ def Query(
8383
)
8484

8585

86-
def select(*args, **kwargs):
86+
def selects(*args, **kwargs):
87+
"""
88+
Create a dictionary required for the selects parameter of Query.
89+
90+
.. code-bloour clients:: python
91+
selects(
92+
"event_id",
93+
user_id="user_id",
94+
)
95+
96+
creates the following dictionary:
97+
98+
.. code-bloour clients:: python
99+
{
100+
"event_id": "event_id",
101+
"user_id": "user_id"
102+
}
103+
"""
87104
args = {x: x for x in args}
88105
return {**args, **kwargs}

api/py/ai/chronon/source.py

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
"""
2+
Wrappers to directly create Source objects.
3+
"""
4+
5+
import ai.chronon.api.ttypes as ttypes
6+
7+
8+
def EventSource(
9+
table: str,
10+
query: ttypes.Query,
11+
topic: str = None,
12+
is_cumulative: bool = None,
13+
) -> ttypes.Source:
14+
"""
15+
Event Sources represent data that gets generated over-time.
16+
Typically, but not necessarily, logged to message buses like kafka, kinesis or google pub/sub.
17+
fct tables are also event source worthy.
18+
19+
Attributes:
20+
21+
- table: Table currently needs to be a 'ds' (date string - yyyy-MM-dd) partitioned hive table.
22+
Table names can contain subpartition specs, example db.table/system=mobile/currency=USD
23+
- topic: Topic is a kafka table. The table contains all the events historically came through this topic.
24+
- query: The logic used to scan both the table and the topic. Contains row level transformations
25+
and filtering expressed as Spark SQL statements.
26+
- isCumulative: If each new hive partition contains not just the current day's events but the entire set
27+
of events since the begininng. The key property is that the events are not mutated
28+
across partitions.
29+
30+
"""
31+
return ttypes.Source(
32+
events=ttypes.EventSource(
33+
table=table, topic=topic, query=query, isCumulative=is_cumulative
34+
)
35+
)
36+
37+
38+
def EntitySource(
39+
snapshot_table: str,
40+
query: ttypes.Query,
41+
mutation_table: str = None,
42+
mutation_topic: str = None,
43+
) -> ttypes.Source:
44+
"""
45+
Entity Sources represent data that gets mutated over-time - at row-level. This is a group of three data elements.
46+
snapshotTable, mutationTable and mutationTopic. mutationTable and mutationTopic are only necessary if we are trying
47+
to create realtime or point-in-time aggregations over these sources. Entity sources usually map 1:1 with a database
48+
tables in your OLTP store that typically serves live application traffic. When mutation data is absent they map 1:1
49+
to `dim` tables in star schema.
50+
51+
Attributes:
52+
- snapshotTable: Snapshot table currently needs to be a 'ds' (date string - yyyy-MM-dd) partitioned hive table.
53+
- mutationTable: Topic is a kafka table. The table contains
54+
all the events that historically came through this topic.
55+
We need all the fields present in the snapshot table, PLUS two additional fields,
56+
`mutation_time` - milliseconds since epoch of type Long that represents the time of the mutation
57+
`is_before` - a boolean flag that represents whether
58+
this row contains values before or after the mutation.
59+
- mutationTopic: The logic used to scan both the table and the topic. Contains row level transformations
60+
and filtering expressed as Spark SQL statements.
61+
- query: If each new hive partition contains not just the current day's events but the entire set
62+
of events since the begininng. The key property is that the events are not mutated across partitions.
63+
"""
64+
return ttypes.Source(
65+
entities=ttypes.EntitySource(
66+
snapshotTable=snapshot_table,
67+
mutationTable=mutation_table,
68+
mutationTopic=mutation_topic,
69+
query=query,
70+
)
71+
)
72+
73+
74+
def JoinSource(join: ttypes.Join, query: ttypes.Query) -> ttypes.Source:
75+
"""
76+
The output of a join can be used as a source for `GroupBy`.
77+
Useful for expressing complex computation in chronon.
78+
79+
Offline this simply means that we will compute the necessary date ranges of the join
80+
before we start computing the `GroupBy`.
81+
82+
Online we will:
83+
1. enrich the stream/topic of `join.left` with all the columns defined by the join
84+
2. apply the selects & wheres defined in the `query`
85+
3. perform aggregations defined in the *downstream* `GroupBy`
86+
4. write the result to the kv store.
87+
"""
88+
return ttypes.Source(joinSource=ttypes.JoinSource(join=join, query=query))

api/py/ai/chronon/types.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
"""
2+
importing ai.chronon.types will bring in all the api's needed to create any chronon object
3+
"""
4+
5+
import ai.chronon.group_by as group_by
6+
import ai.chronon.query as query
7+
import ai.chronon.join as join
8+
import ai.chronon.source as source
9+
import ai.chronon.api.ttypes as ttypes
10+
11+
12+
# source related concepts
13+
Query = query.Query
14+
selects = query.selects
15+
16+
17+
EventSource = source.EventSource
18+
EntitySource = source.EntitySource
19+
JoinSource = source.JoinSource
20+
21+
# Aggregation / GroupBy related concepts
22+
GroupBy = group_by.GroupBy
23+
Aggregation = group_by.Aggregation
24+
Operation = group_by.Operation
25+
Window = group_by.Window
26+
TimeUnit = group_by.TimeUnit
27+
DefaultAggregation = group_by.DefaultAggregation
28+
29+
Accuracy = ttypes.Accuracy
30+
TEMPORAL = ttypes.Accuracy.TEMPORAL
31+
SNAPSHOT = ttypes.Accuracy.SNAPSHOT
32+
33+
Derivation = group_by.Derivation
34+
35+
# join related concepts
36+
Join = join.Join
37+
JoinPart = join.JoinPart
38+
BootstrapPart = join.BootstrapPart
39+
LabelParts = join.LabelParts
40+
ContextualSource = join.ContextualSource
41+
ExternalPart = join.ExternalPart
42+
ExternalSource = join.ExternalSource
43+
DataType = join.DataType
44+
45+
46+
# Staging Query related concepts
47+
StagingQuery = ttypes.StagingQuery
48+
MetaData = ttypes.MetaData

api/py/test/sample/group_bys/kaggle/clicks.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# limitations under the License.
1414

1515
from ai.chronon.api.ttypes import Source, EventSource
16-
from ai.chronon.query import Query, select
16+
from ai.chronon.query import Query, selects
1717
from ai.chronon.group_by import (
1818
GroupBy,
1919
Aggregation,
@@ -46,7 +46,7 @@
4646
base_table
4747
), # Here we use the staging query output table because it has the necessary fields, but for a true streaming source we would likely use a log table
4848
topic="some_topic", # You would set your streaming source topic here
49-
query=Query(selects=select("ad_id", "cliour clientsed"), time_column="ts"),
49+
query=Query(selects=selects("ad_id", "cliour clientsed"), time_column="ts"),
5050
)
5151
)
5252

api/py/test/sample/group_bys/quickstart/purchases.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# limitations under the License.
1414

1515
from ai.chronon.api.ttypes import Source, EventSource
16-
from ai.chronon.query import Query, select
16+
from ai.chronon.query import Query, selects
1717
from ai.chronon.group_by import GroupBy, Aggregation, Operation
1818

1919
"""
@@ -26,10 +26,8 @@
2626
table="data.purchases", # This points to the log table in the warehouse with historical purchase events, updated in batch daily
2727
topic=None, # See the 'returns' GroupBy for an example that has a streaming source configured. In this case, this would be the streaming source topic that can be listened to for realtime events
2828
query=Query(
29-
selects=select(
30-
"user_id",
31-
"purchase_price",
32-
buour clientset_rand="'1'"
29+
selects=selects(
30+
"user_id", "purchase_price", buour clientset_rand="'1'"
3331
), # Select the fields we care about
3432
time_column="ts",
3533
), # The event time

api/py/test/sample/group_bys/quickstart/returns.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# limitations under the License.
1414

1515
from ai.chronon.api.ttypes import Source, EventSource
16-
from ai.chronon.query import Query, select
16+
from ai.chronon.query import Query, selects
1717
from ai.chronon.group_by import (
1818
GroupBy,
1919
Aggregation,
@@ -29,7 +29,7 @@
2929
table="data.returns", # This points to the log table with historical return events
3030
topic="events.returns/fields=ts,return_id,user_id,product_id,refund_amt/host=kafka/port=9092",
3131
query=Query(
32-
selects=select("user_id", "refund_amt"), # Select the fields we care about
32+
selects=selects("user_id", "refund_amt"), # Select the fields we care about
3333
time_column="ts",
3434
), # The event time
3535
)
Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
from ai.chronon.group_by import GroupBy, Aggregation, Operation
22
from ai.chronon.api.ttypes import Source, EventSource
3-
from ai.chronon.query import Query, select
3+
from ai.chronon.query import Query, selects
44

55

66
logging_schema_source = Source(
77
events=EventSource(
88
table="default.chronon_log_table",
99
query=Query(
10-
selects=select(
10+
selects=selects(
1111
schema_hash="decode(unbase64(key_base64), 'utf-8')",
12-
schema_value="decode(unbase64(value_base64), 'utf-8')"
12+
schema_value="decode(unbase64(value_base64), 'utf-8')",
1313
),
1414
wheres=["name='SCHEMA_PUBLISH_EVENT'"],
1515
time_column="ts_millis",
@@ -20,12 +20,7 @@
2020
v1 = GroupBy(
2121
keys=["schema_hash"],
2222
sources=logging_schema_source,
23-
aggregations=[
24-
Aggregation(
25-
input_column="schema_value",
26-
operation=Operation.LAST
27-
)
28-
],
23+
aggregations=[Aggregation(input_column="schema_value", operation=Operation.LAST)],
2924
online=False,
30-
baour clientsfill_start_date="2023-04-09"
25+
baour clientsfill_start_date="2023-04-09",
3126
)

api/py/test/sample/group_bys/quickstart/users.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
21
# Copyright (C) 2023 The Chronon Authors.
32
#
43
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -14,7 +13,7 @@
1413
# limitations under the License.
1514

1615
from ai.chronon.api.ttypes import Source, EntitySource
17-
from ai.chronon.query import Query, select
16+
from ai.chronon.query import Query, selects
1817
from ai.chronon.group_by import (
1918
GroupBy,
2019
)
@@ -26,15 +25,18 @@
2625

2726
source = Source(
2827
entities=EntitySource(
29-
snapshotTable="data.users", # This points to a table that contains daily snapshots of the entire product catalog
28+
snapshotTable="data.users", # This points to a table that contains daily snapshots of the entire product catalog
3029
query=Query(
31-
selects=select("user_id","account_created_ds","email_verified"), # Select the fields we care about
32-
)
33-
))
30+
selects=selects(
31+
"user_id", "account_created_ds", "email_verified"
32+
), # Select the fields we care about
33+
),
34+
)
35+
)
3436

3537
v1 = GroupBy(
3638
sources=[source],
37-
keys=["user_id"], # Primary key is the same as the primary key for the source table
38-
aggregations=None, # In this case, there are no aggregations or windows to define
39+
keys=["user_id"], # Primary key is the same as the primary key for the source table
40+
aggregations=None, # In this case, there are no aggregations or windows to define
3941
online=True,
40-
)
42+
)
Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
from ai.chronon.api.ttypes import Source, EntitySource
2-
from ai.chronon.query import Query, select
3-
from ai.chronon.group_by import (
4-
GroupBy
5-
)
2+
from ai.chronon.query import Query, selects
3+
from ai.chronon.group_by import GroupBy
64

75
"""
86
This GroupBy aggregates metrics about a user's previous purchases in various windows.
@@ -11,15 +9,21 @@
119
# This source is raw purchase events. Every time a user makes a purchase, it will be one entry in this source.
1210
source_merchants = Source(
1311
entities=EntitySource(
14-
snapshotTable="data.merchants", # This points to the log table in the warehouse with historical purchase events, updated in batch daily
12+
snapshotTable="data.merchants", # This points to the log table in the warehouse with historical purchase events, updated in batch daily
1513
query=Query(
16-
selects=select("merchant_id","account_age", "zipcode", "is_big_merchant", "country", "account_type", "preferred_language"), # Select the fields we care about
17-
)
14+
selects=selects(
15+
"merchant_id",
16+
"account_age",
17+
"zipcode",
18+
"is_big_merchant",
19+
"country",
20+
"account_type",
21+
"preferred_language",
22+
), # Select the fields we care about
23+
),
1824
)
1925
)
2026

2127
merchant_group_by = GroupBy(
22-
sources=[source_merchants],
23-
keys=["merchant_id"],
24-
aggregations=None
25-
)
28+
sources=[source_merchants], keys=["merchant_id"], aggregations=None
29+
)

api/py/test/sample/group_bys/risk/transaction_events.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from ai.chronon.api.ttypes import Source, EventSource
2-
from ai.chronon.query import Query, select
2+
from ai.chronon.query import Query, selects
33
from ai.chronon.group_by import GroupBy, Aggregation, Operation
44

55
"""
@@ -13,7 +13,7 @@ def create_transaction_source(key_field):
1313
table="data.txn_events", # Points to the historical purchase events table
1414
topic=None,
1515
query=Query(
16-
selects=select(key_field, "transaction_amount", "transaction_type"),
16+
selects=selects(key_field, "transaction_amount", "transaction_type"),
1717
time_column="transaction_time",
1818
),
1919
)

0 commit comments

Comments
 (0)