Skip to content

Commit d165191

Browse files
Join and Group Bys for POC (#30)
## Summary Creates a join and a few group_bys for the Proof of Concept project. ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced aggregation functionalities for user purchases, transaction events, and merchant data. - Introduced new grouping operations for user and merchant transactions, allowing for detailed time-based analysis. - Configured structured joins to facilitate comprehensive risk management analysis across various datasets. - **Documentation** - Added a new "risk" section in the JSON structure for clarity on its purpose and namespace. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: Chewy Shaw <[email protected]>
1 parent 6d3cf13 commit d165191

File tree

8 files changed

+526
-0
lines changed

8 files changed

+526
-0
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
from ai.chronon.api.ttypes import Source, EntitySource
2+
from ai.chronon.query import Query, select
3+
from ai.chronon.group_by import (
4+
GroupBy,
5+
Aggregation,
6+
Operation,
7+
Window,
8+
TimeUnit
9+
)
10+
11+
"""
12+
This GroupBy aggregates metrics about a user's previous purchases in various windows.
13+
"""
14+
15+
# This source is raw purchase events. Every time a user makes a purchase, it will be one entry in this source.
16+
source_merchants = Source(
17+
entities=EntitySource(
18+
snapshotTable="data.merchants", # This points to the log table in the warehouse with historical purchase events, updated in batch daily
19+
query=Query(
20+
selects=select("merchant_id","account_age", "zipcode", "is_big_merchant", "country", "account_type", "preferred_language"), # Select the fields we care about
21+
)
22+
)
23+
)
24+
25+
merchant_group_by = GroupBy(
26+
sources=[source_merchants],
27+
keys=["merchant_id"],
28+
aggregations=None
29+
)
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
from ai.chronon.api.ttypes import Source, EventSource
2+
from ai.chronon.query import Query, select
3+
from ai.chronon.group_by import (
4+
GroupBy,
5+
Aggregation,
6+
Operation,
7+
Window,
8+
TimeUnit
9+
)
10+
11+
"""
12+
This GroupBy aggregates metrics about a user's previous purchases in various windows.
13+
"""
14+
15+
def create_transaction_source(key_field):
16+
return Source(
17+
events=EventSource(
18+
table="data.txn_events", # Points to the historical purchase events table
19+
topic=None,
20+
query=Query(
21+
selects=select(key_field, "transaction_amount", "transaction_type"),
22+
time_column="transaction_time"
23+
)
24+
)
25+
)
26+
27+
window_sizes = [Window(length=1, timeUnit=TimeUnit.HOURS), Window(length=1, timeUnit=TimeUnit.DAYS), Window(length=30, timeUnit=TimeUnit.DAYS), Window(length=365, timeUnit=TimeUnit.DAYS)]
28+
29+
def create_txn_group_by(source, key):
30+
return GroupBy(
31+
sources=[source],
32+
keys=[key],
33+
online=True,
34+
aggregations=[
35+
Aggregation(
36+
input_column="transaction_amount",
37+
operation=Operation.COUNT,
38+
windows=window_sizes
39+
),
40+
Aggregation(
41+
input_column="transaction_amount",
42+
operation=Operation.SUM,
43+
windows=[Window(length=1, timeUnit=TimeUnit.HOURS)]
44+
)
45+
]
46+
)
47+
48+
source_user_transactions = create_transaction_source("user_id")
49+
txn_group_by_user = create_txn_group_by(source_user_transactions, "user_id")
50+
51+
source_merchant_transactions = create_transaction_source("merchant_id")
52+
txn_group_by_merchant = create_txn_group_by(source_merchant_transactions, "merchant_id")
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
from ai.chronon.api.ttypes import Source, EntitySource
2+
from ai.chronon.query import Query, select
3+
from ai.chronon.group_by import (
4+
GroupBy,
5+
Aggregation,
6+
Operation,
7+
Window,
8+
TimeUnit
9+
)
10+
11+
"""
12+
This GroupBy aggregates metrics about a user's previous purchases in various windows.
13+
"""
14+
15+
# This source is raw purchase events. Every time a user makes a purchase, it will be one entry in this source.
16+
source_users = Source(
17+
entities=EntitySource(
18+
snapshotTable="data.users", # This points to the log table in the warehouse with historical purchase events, updated in batch daily
19+
query=Query(
20+
selects=select("user_id","account_age", "account_balance", "credit_score", "number_of_devices", "country", "account_type", "preferred_language"), # Select the fields we care about
21+
) # The event time
22+
)
23+
)
24+
25+
user_group_by = GroupBy(
26+
sources=[source_users],
27+
keys=["user_id"],
28+
aggregations=None
29+
)
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from ai.chronon.api.ttypes import Source, EventSource
2+
from ai.chronon.join import Join, JoinPart
3+
from ai.chronon.query import Query, select
4+
from group_bys.risk.transaction_events import txn_group_by_user, txn_group_by_merchant
5+
from group_bys.risk.user_data import user_group_by
6+
from group_bys.risk.merchant_data import merchant_group_by
7+
8+
source_users = Source(
9+
events=EventSource(
10+
table="data.users",
11+
query=Query(
12+
selects=select("user_id"),
13+
time_column="ts"
14+
)
15+
)
16+
)
17+
18+
txn_join = Join(
19+
left=source_users,
20+
right_parts=[JoinPart(group_by=txn_group_by_user, prefix="user"), JoinPart(group_by=txn_group_by_merchant, prefix="merchant"), JoinPart(group_by=user_group_by, prefix="user"), JoinPart(group_by=merchant_group_by, prefix="merchant")]
21+
)
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
{
2+
"metaData": {
3+
"name": "risk.transaction_events.txn_group_by_merchant",
4+
"online": 1,
5+
"customJson": "{\"lag\": 0, \"groupby_tags\": null, \"column_tags\": {}}",
6+
"dependencies": [
7+
"{\"name\": \"wait_for_data.txn_events_ds\", \"spec\": \"data.txn_events/ds={{ ds }}\", \"start\": null, \"end\": null}"
8+
],
9+
"tableProperties": {
10+
"source": "chronon"
11+
},
12+
"outputNamespace": "default",
13+
"team": "risk",
14+
"offlineSchedule": "@daily"
15+
},
16+
"sources": [
17+
{
18+
"events": {
19+
"table": "data.txn_events",
20+
"query": {
21+
"selects": {
22+
"merchant_id": "merchant_id",
23+
"transaction_amount": "transaction_amount",
24+
"transaction_type": "transaction_type"
25+
},
26+
"timeColumn": "transaction_time",
27+
"setups": []
28+
}
29+
}
30+
}
31+
],
32+
"keyColumns": [
33+
"merchant_id"
34+
],
35+
"aggregations": [
36+
{
37+
"inputColumn": "transaction_amount",
38+
"operation": 6,
39+
"argMap": {},
40+
"windows": [
41+
{
42+
"length": 1,
43+
"timeUnit": 0
44+
},
45+
{
46+
"length": 1,
47+
"timeUnit": 1
48+
},
49+
{
50+
"length": 30,
51+
"timeUnit": 1
52+
},
53+
{
54+
"length": 365,
55+
"timeUnit": 1
56+
}
57+
]
58+
},
59+
{
60+
"inputColumn": "transaction_amount",
61+
"operation": 7,
62+
"argMap": {},
63+
"windows": [
64+
{
65+
"length": 1,
66+
"timeUnit": 0
67+
}
68+
]
69+
}
70+
]
71+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
{
2+
"metaData": {
3+
"name": "risk.transaction_events.txn_group_by_user",
4+
"online": 1,
5+
"customJson": "{\"lag\": 0, \"groupby_tags\": null, \"column_tags\": {}}",
6+
"dependencies": [
7+
"{\"name\": \"wait_for_data.txn_events_ds\", \"spec\": \"data.txn_events/ds={{ ds }}\", \"start\": null, \"end\": null}"
8+
],
9+
"tableProperties": {
10+
"source": "chronon"
11+
},
12+
"outputNamespace": "default",
13+
"team": "risk",
14+
"offlineSchedule": "@daily"
15+
},
16+
"sources": [
17+
{
18+
"events": {
19+
"table": "data.txn_events",
20+
"query": {
21+
"selects": {
22+
"user_id": "user_id",
23+
"transaction_amount": "transaction_amount",
24+
"transaction_type": "transaction_type"
25+
},
26+
"timeColumn": "transaction_time",
27+
"setups": []
28+
}
29+
}
30+
}
31+
],
32+
"keyColumns": [
33+
"user_id"
34+
],
35+
"aggregations": [
36+
{
37+
"inputColumn": "transaction_amount",
38+
"operation": 6,
39+
"argMap": {},
40+
"windows": [
41+
{
42+
"length": 1,
43+
"timeUnit": 0
44+
},
45+
{
46+
"length": 1,
47+
"timeUnit": 1
48+
},
49+
{
50+
"length": 30,
51+
"timeUnit": 1
52+
},
53+
{
54+
"length": 365,
55+
"timeUnit": 1
56+
}
57+
]
58+
},
59+
{
60+
"inputColumn": "transaction_amount",
61+
"operation": 7,
62+
"argMap": {},
63+
"windows": [
64+
{
65+
"length": 1,
66+
"timeUnit": 0
67+
}
68+
]
69+
}
70+
]
71+
}

0 commit comments

Comments
 (0)