Skip to content

Commit a87c397

Browse files
nikhil-zlaicoderabbitai[bot]ezvz
authored
stub/part_1: physical graph for workflow submission + column lineage (#493)
## Summary Adding function stubs and thrift schemas for orchestration with column level lineage. ## Checklist - [ ] Added Unit Tests - [ ] Covered by existing CI - [ ] Integration tested - [ ] Documentation update <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Streamlined CLI operations with a simplified backfill command and removal of an obsolete synchronization command. - Enhanced configuration processing that now automatically compiles and returns results. - Expanded physical planning capabilities with new mechanisms for managing computation graphs. - Extended tracking of data transformations through improved column lineage support in metadata. - Introduction of new classes and methods for managing physical nodes and graphs, enhancing data processing workflows. - New interface for orchestrators to handle configuration fetching and uploading. - New structures for detailed column lineage tracking throughout data processing stages. - Added support for generating Python code from new Thrift definitions in the testing workflow. - **Bug Fixes** - Improved documentation for clarity on the functionality and logic of certain fields. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> Co-authored-by: ezvz <[email protected]>
1 parent 5b11006 commit a87c397

9 files changed

+303
-48
lines changed

api/py/ai/chronon/cli/entrypoint.py

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,14 @@
1+
from typing import Dict
12
import click
23
from datetime import datetime, timedelta
34

4-
from ai.chronon.cli.compile.compiler import Compiler
5+
from ai.chronon.api.common.ttypes import ConfigType
6+
from ai.chronon.cli.plan.physical_index import (
7+
PhysicalIndex,
8+
get_backfill_physical_graph,
9+
submit_physical_graph,
10+
)
11+
from ai.chronon.cli.compile.compiler import CompileResult, Compiler
512
from ai.chronon.cli.compile.compile_context import CompileContext
613
from ai.chronon.cli.git_utils import get_current_branch
714

@@ -12,14 +19,11 @@ def cli():
1219
pass
1320

1421

15-
@cli.command()
16-
@click.option("--branch", default=get_current_branch, help="Branch to sync")
17-
def sync(branch):
18-
"""Sync data for the specified branch"""
19-
click.echo(f"\nSyncing data for branch \u001b[32m{branch}\u001b[0m")
22+
def compile() -> Dict[ConfigType, CompileResult]:
2023
compile_context = CompileContext()
2124
compiler = Compiler(compile_context)
22-
compiler.compile(compile_context)
25+
# TODO(orc): add column lineage to objects
26+
return compiler.compile(compile_context)
2327

2428

2529
@cli.command()
@@ -34,17 +38,15 @@ def sync(branch):
3438
default=datetime.now().strftime("%Y-%m-%d"),
3539
help="End date for backfill (YYYY-MM-DD)",
3640
)
37-
@click.option(
38-
"--scope",
39-
type=click.Choice(["upstream", "self", "downstream"]),
40-
default="upstream",
41-
help="Scope of configs to backfill",
42-
)
43-
def backfill(conf: str, start_date: str, end_date: str, scope: str):
44-
"""Backfill data between start and end dates"""
45-
click.echo(
46-
f"Backfilling with scope {scope} for config {conf} from {start_date} to {end_date}"
41+
@click.option("--branch", default=get_current_branch, help="Branch to sync")
42+
def backfill(conf: str, start_date: str, end_date: str):
43+
# compile
44+
compile_result = compile()
45+
physical_index = PhysicalIndex.from_compiled_obj(compile_result)
46+
physical_graph = get_backfill_physical_graph(
47+
conf, physical_index, start_date, end_date
4748
)
49+
submit_physical_graph(physical_graph)
4850

4951

5052
@cli.command()
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
from abc import ABC, abstractmethod
2+
from typing import Dict, List, Optional
3+
4+
from ai.chronon.cli.plan.physical_graph import PhysicalGraph
5+
from ai.chronon.cli.plan.physical_index import PhysicalNode
6+
7+
8+
class ControllerIface(ABC):
9+
"""
10+
Class used to make the rest of the planner code agnostic to the underlying orchestrator.
11+
Mainly used to mock out the orchestrator for testing.
12+
"""
13+
14+
@abstractmethod
15+
def fetch_missing_confs(self, node_to_hash: Dict[str, str]) -> List[str]:
16+
pass
17+
18+
@abstractmethod
19+
def upload_conf(self, name: str, hash: str, content: str) -> None:
20+
pass
21+
22+
@abstractmethod
23+
def create_workflow(
24+
self, physical_graph: PhysicalGraph, start_date: str, end_date: str
25+
) -> str:
26+
"""
27+
Submit a physical graph to the orchestrator and return workflow id
28+
"""
29+
pass
30+
31+
@abstractmethod
32+
def get_workflow_status(self, workflow_id: str) -> str:
33+
"""
34+
Get the status of a workflow
35+
"""
36+
pass
37+
38+
@abstractmethod
39+
def get_active_workflows(
40+
self, branch: Optional[str] = None, user: Optional[str] = None
41+
) -> List[str]:
42+
"""
43+
List all active workflows
44+
"""
45+
pass
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from dataclasses import dataclass
2+
from typing import Dict, List
3+
4+
from ai.chronon.cli.plan.physical_index import PhysicalNode
5+
6+
7+
@dataclass
8+
class PhysicalGraph:
9+
node: PhysicalNode
10+
dependencies: List["PhysicalGraph"]
11+
start_date: str
12+
end_date: str
13+
14+
def flatten(self) -> Dict[str, PhysicalNode]:
15+
# recursively find hashes of all nodes in the physical graph
16+
17+
result = {self.node.name: self.node}
18+
19+
for sub_graph in self.dependencies:
20+
sub_hashes = sub_graph.flatten()
21+
result.update(sub_hashes)
22+
23+
return result
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
from dataclasses import dataclass
2+
from typing import Dict, List
3+
4+
from ai.chronon.api.common.ttypes import ConfigType
5+
from ai.chronon.cli.compile.compiler import CompileResult
6+
from ai.chronon.cli.plan.controller_iface import ControllerIface
7+
from ai.chronon.cli.plan.physical_graph import PhysicalGraph
8+
from ai.chronon.lineage.ttypes import Column, ColumnLineage
9+
from ai.chronon.cli.plan.physical_node import PhysicalNode
10+
11+
12+
@dataclass
13+
class PhysicalIndex:
14+
table_to_physical: Dict[str, PhysicalNode]
15+
16+
# TODO incorporate stage column lineage
17+
column_lineage: Dict[Column, ColumnLineage]
18+
controller: ControllerIface
19+
20+
def __init__(
21+
self,
22+
physical_nodes: List[PhysicalNode],
23+
controller: ControllerIface,
24+
branch: str,
25+
):
26+
self.controller = controller
27+
self.table_to_physical = {}
28+
self.column_lineage = {}
29+
self.branch = branch
30+
self.populate_index(physical_nodes)
31+
32+
# TODO: populate index
33+
def populate_index(self, physical_nodes: List[PhysicalNode]):
34+
raise NotImplementedError("Method not yet implemented")
35+
36+
@classmethod
37+
def from_compiled_obj(
38+
cls, compiled_obj: Dict[ConfigType, CompileResult]
39+
) -> "PhysicalIndex":
40+
raise NotImplementedError("Method not yet implemented")
41+
42+
def get_backfill_physical_graph(
43+
self, conf_name: str, start_date: str, end_date: str
44+
) -> PhysicalGraph:
45+
raise NotImplementedError("Method not yet implemented")
46+
47+
def get_deploy_physical_graph(self, conf_name: str, date: str) -> PhysicalGraph:
48+
raise NotImplementedError("Method not yet implemented")
49+
def submit_physical_graph(self, physical_graph: PhysicalGraph) -> str:
50+
51+
node_to_physical: Dict[str, PhysicalNode] = physical_graph.flatten()
52+
53+
node_to_hash = {name: node.conf_hash for name, node in node_to_physical.items()}
54+
55+
missing_conf_names = self.controller.fetch_missing_confs(node_to_hash)
56+
missing_physical_nodes = [
57+
node_to_physical[conf_name] for conf_name in missing_conf_names
58+
]
59+
60+
# upload missing confs
61+
for physical_node in missing_physical_nodes:
62+
hash = physical_node.conf_hash
63+
json = physical_node.conf.tjson
64+
name = physical_node.name
65+
self.controller.upload_conf(name, hash, json)
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from dataclasses import dataclass
2+
from typing import List
3+
4+
from ai.chronon.api.common.ttypes import TableDependency
5+
from ai.chronon.api.ttypes import GroupBy, Join, Model, StagingQuery
6+
from ai.chronon.cli.compile.compile_context import CompiledObj
7+
from ai.chronon.orchestration.ttypes import PhysicalNode
8+
9+
10+
def from_group_by(cls, group_by: GroupBy) -> List[PhysicalNode]:
11+
raise NotImplementedError("Method not yet implemented")
12+
13+
14+
def from_join(cls, join: Join) -> List[PhysicalNode]:
15+
raise NotImplementedError("Method not yet implemented")
16+
17+
18+
def from_staging_query(cls, staging_query: StagingQuery) -> List[PhysicalNode]:
19+
raise NotImplementedError("Method not yet implemented")
20+
21+
22+
def from_model(cls, model: Model) -> List[PhysicalNode]:
23+
raise NotImplementedError("Method not yet implemented")

api/thrift/api.thrift

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ namespace java ai.chronon.api
33

44
include "common.thrift"
55
include "observability.thrift"
6+
include "lineage.thrift"
67

78
// cd /path/to/chronon
89
// thrift --gen py -out api/py/ api/thrift/api.thrift
@@ -263,6 +264,10 @@ struct MetaData {
263264
// column -> tag_key -> tag_value
264265
21: optional map<string, map<string, string>> columnTags
265266

267+
// A stage is a "sub-transformation" of a given node. For example a `GroupBy` can consist of selects (with SQL expressions), filters (in the form of where clauses), followed by aggregations defined in the Zipline DSL.
268+
// Each of this is a `stage` with its own column level lineage.
269+
8: optional list<lineage.StageWithLineage> stagesWithLineage
270+
266271
// marking this as true means that the conf can be served online
267272
// once marked online, a conf cannot be changed - compiling the conf won't be allowed
268273
100: optional bool online

api/thrift/common.thrift

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,19 @@ struct TableDependency {
6767
// fully qualified table name
6868
1: optional string table
6969

70-
// params to select the partitions of the table for any query range
71-
// logic is: [max(query.start - startOffset, startCutOff), min(query.end - endOffset, endCutOff)]
70+
// DEPENDENCY_RANGE_LOGIC
71+
// 1. get final start_partition, end_partition
72+
// 2. break into step ranges
73+
// 3. for each dependency
74+
// a. dependency_start: max(query.start - startOffset, startCutOff)
75+
// b. dependency_end: min(query.end - endOffset, endCutOff)
7276
2: optional Window startOffset
7377
3: optional Window endOffset
7478
4: optional string startCutOff
7579
5: optional string endCutOff
7680

7781
# if not present we will pull from defaults
82+
// needed to enumerate what partitions are in a range
7883
100: optional string partitionColumn
7984
101: optional string partitionFormat
8085
102: optional Window partitionInterval
@@ -116,6 +121,7 @@ struct ExecutionInfo {
116121
4: optional i64 healthCheckIntervalMillis
117122

118123
# relevant for batch jobs
124+
# temporal workflow nodes maintain their own cron schedule
119125
10: optional string scheduleCron
120126
11: optional i32 stepDays
121127
12: optional bool historicalBackfill

api/thrift/lineage.thrift

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
namespace py ai.chronon.lineage
2+
namespace java ai.chronon.lineage
3+
4+
struct Column {
5+
1: optional string logicalNodeName
6+
2: optional ColumnLineageStage stageName
7+
3: optional string columnName
8+
}
9+
10+
enum ColumnLineageStage {
11+
SELECT,
12+
AGG,
13+
DERIVE,
14+
JOIN
15+
}
16+
17+
18+
/**
19+
* NOTE: Staging Query will only be purely output columns.
20+
* stage = SELECT, output = col_name, expression = null, input_cols = null, type = DIRECT
21+
*
22+
* select x+y as a, c, json_extract(payload, "$.meta.user") as user where b = 10
23+
* group_by user
24+
* agg - count(a, window=7d, bucket=c)
25+
*
26+
* Stage = SELECT, output = a, expression = x + y, input_columns = [x, y], type = DIRECT
27+
* expression = "b = 10", input_columns = [b], type = WHERE
28+
* output=user, expression = 'json_extract(payload, "$.meta.user")', input_columns = [payload], type = GROUP_BY_KEY
29+
*
30+
* Stage = AGG, output = a_count_7d_by_c, expression = 'count(a, window=7d, bucket=c)', input_columns = [a, c], type = DIRECT
31+
* output=user, input_columns = [user], type = GROUP_BY_KEY
32+
**/
33+
enum ColumnLineageType {
34+
WHERE,
35+
JOIN_KEY,
36+
GROUP_BY_KEY,
37+
DERIVE,
38+
TIMESTAMP,
39+
DIRECT
40+
}
41+
42+
struct ColumnLineage {
43+
1: optional list<Column> inputColumns
44+
2: optional string expression
45+
3: optional Column outputColumn
46+
4: optional ColumnLineageType lineageType // not present means direct
47+
}
48+
49+
struct StageWithLineage {
50+
1: optional ColumnLineageStage stage
51+
2: optional list<ColumnLineage> lineage
52+
}

0 commit comments

Comments
 (0)