Skip to content

Commit 894dc5c

Browse files
committed
stub: physical graph for workflow submission + column lineage
1 parent 7da358c commit 894dc5c

File tree

4 files changed

+141
-17
lines changed

4 files changed

+141
-17
lines changed

api/py/ai/chronon/cli/entrypoint.py

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,15 @@
1+
from dataclasses import dataclass
2+
from typing import Dict, List
13
import click
24
from datetime import datetime, timedelta
35

4-
from ai.chronon.cli.compile.compiler import Compiler
6+
from ai.chronon.api.common.ttypes import ConfigType
7+
from ai.chronon.cli.plan.physical_index import (
8+
PhysicalIndex,
9+
get_backfill_physical_graph,
10+
submit_physical_graph,
11+
)
12+
from ai.chronon.cli.compile.compiler import CompileResult, Compiler
513
from ai.chronon.cli.compile.compile_context import CompileContext
614
from ai.chronon.cli.git_utils import get_current_branch
715

@@ -12,14 +20,11 @@ def cli():
1220
pass
1321

1422

15-
@cli.command()
16-
@click.option("--branch", default=get_current_branch, help="Branch to sync")
17-
def sync(branch):
18-
"""Sync data for the specified branch"""
19-
click.echo(f"\nSyncing data for branch \u001b[32m{branch}\u001b[0m")
23+
def compile() -> Dict[ConfigType, CompileResult]:
2024
compile_context = CompileContext()
2125
compiler = Compiler(compile_context)
22-
compiler.compile(compile_context)
26+
# TODO(orc): add column lineage to objects
27+
return compiler.compile(compile_context)
2328

2429

2530
@cli.command()
@@ -34,17 +39,15 @@ def sync(branch):
3439
default=datetime.now().strftime("%Y-%m-%d"),
3540
help="End date for backfill (YYYY-MM-DD)",
3641
)
37-
@click.option(
38-
"--scope",
39-
type=click.Choice(["upstream", "self", "downstream"]),
40-
default="upstream",
41-
help="Scope of configs to backfill",
42-
)
43-
def backfill(conf: str, start_date: str, end_date: str, scope: str):
44-
"""Backfill data between start and end dates"""
45-
click.echo(
46-
f"Backfilling with scope {scope} for config {conf} from {start_date} to {end_date}"
42+
@click.option("--branch", default=get_current_branch, help="Branch to sync")
43+
def backfill(conf: str, start_date: str, end_date: str):
44+
# compile
45+
compile_result = compile()
46+
physical_index = PhysicalIndex.from_compiled_obj(compile_result)
47+
physical_graph = get_backfill_physical_graph(
48+
conf, physical_index, start_date, end_date
4749
)
50+
submit_physical_graph(physical_graph)
4851

4952

5053
@cli.command()
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
from dataclasses import dataclass
2+
from typing import Dict, List
3+
from ai.chronon.api.common.ttypes import ConfigType
4+
from ai.chronon.cli.compile.compile_context import CompiledObj
5+
from ai.chronon.cli.compile.compiler import CompileResult
6+
from ai.chronon.lineage.ttypes import Column, ColumnLineage
7+
from ai.chronon.types import TableDependency
8+
9+
10+
@dataclass
11+
class PhysicalNode:
12+
name: str
13+
node_type: str # NodeType
14+
conf: CompiledObj
15+
table_dependencies: List[TableDependency]
16+
output_columns: List[str]
17+
output_table: str
18+
# TODO: online deps
19+
20+
def construct_lineage(self):
21+
pass
22+
23+
24+
def get_physical_nodes(obj: CompiledObj) -> List[PhysicalNode]:
25+
pass
26+
27+
28+
@dataclass
29+
class PhysicalGraph:
30+
node: PhysicalNode
31+
dependencies: List["PhysicalGraph"]
32+
start_date: str
33+
end_date: str
34+
35+
36+
@dataclass
37+
class PhysicalIndex:
38+
table_to_physical: Dict[str, PhysicalNode]
39+
40+
# TODO incorporate stage column lineage
41+
column_lineage: Dict[Column, ColumnLineage]
42+
43+
@classmethod
44+
def from_compiled_obj(
45+
cls, compiled_obj: Dict[ConfigType, CompileResult]
46+
) -> "PhysicalIndex":
47+
pass
48+
49+
50+
def get_backfill_physical_graph(
51+
conf: str, index: PhysicalIndex, start_date: str, end_date: str
52+
) -> PhysicalGraph:
53+
pass
54+
55+
56+
def get_deploy_physical_graph(conf: str, index: PhysicalIndex) -> PhysicalGraph:
57+
pass
58+
59+
60+
def submit_physical_graph(physical_graph: PhysicalGraph) -> str:
61+
# 1. get physical nodes to submit
62+
# 2. find missing nodes from server
63+
# 3.
64+
pass

api/thrift/api.thrift

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ namespace java ai.chronon.api
33

44
include "common.thrift"
55
include "observability.thrift"
6+
include "lineage.thrift"
67

78
// cd /path/to/chronon
89
// thrift --gen py -out api/py/ api/thrift/api.thrift
@@ -257,6 +258,8 @@ struct MetaData {
257258
// column -> tag_key -> tag_value
258259
7: optional map<string, map<string, string>> columnTags
259260

261+
8: optional list<lineage.StageColumnLineage> stageColumnLineages
262+
260263
// marking this as true means that the conf can be served online
261264
// once marked online, a conf cannot be changed - compiling the conf won't be allowed
262265
100: optional bool online

api/thrift/lineage.thrift

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
namespace py ai.chronon.lineage
2+
namespace java ai.chronon.lineage
3+
4+
struct Column {
5+
1: optional string logicalNodeName
6+
2: optional string stageName
7+
3: optional string columnName
8+
}
9+
10+
enum ColumnLineageStage {
11+
SELECT,
12+
AGG,
13+
DERIVE,
14+
JOIN
15+
}
16+
17+
18+
/**
19+
* NOTE: Staging Query will only be purely output columns.
20+
* stage = SELECT, output = col_name, expression = null, input_cols = null, type = DIRECT
21+
**/
22+
enum ColumnLineageType {
23+
WHERE,
24+
JOIN_KEY,
25+
GROUP_BY_KEY,
26+
DERIVE,
27+
TIMESTAMP,
28+
DIRECT
29+
30+
/**
31+
* select x+y as a, c, json_extract(payload, "$.meta.user") as user where b = 10
32+
* group_by user
33+
* agg - count(a, window=7d, bucket=c)
34+
*
35+
* Stage = SELECT, output = a, expression = x + y, input_columns = [x, y], type = DIRECT
36+
* expression = "b = 10", input_columns = [b], type = WHERE
37+
* output=user, expression = 'json_extract(payload, "$.meta.user")', input_columns = [payload], type = GROUP_BY_KEY
38+
*
39+
* Stage = AGG, output = a_count_7d_by_c, expression = 'count(a, window=7d, bucket=c)', input_columns = [a, c], type = DIRECT
40+
* output=user, input_columns = [user], type = GROUP_BY_KEY
41+
**/
42+
}
43+
44+
struct ColumnLineage {
45+
1: optional list<Column> inputColumns
46+
2: optional string expression
47+
3: optional Column outputColumn
48+
4: optional ColumnLineageType lineageType // not present means direct
49+
}
50+
51+
struct StageColumnLineage {
52+
1: optional ColumnLineageStage stage
53+
2: optional list<ColumnLineage> lineage
54+
}

0 commit comments

Comments
 (0)